1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <sys/epoll.h> 38 #include <liburing.h> 39 40 #include "spdk/barrier.h" 41 #include "spdk/env.h" 42 #include "spdk/log.h" 43 #include "spdk/pipe.h" 44 #include "spdk/sock.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 48 #include "spdk_internal/sock.h" 49 #include "spdk_internal/assert.h" 50 51 #define MAX_TMPBUF 1024 52 #define PORTNUMLEN 32 53 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 54 55 enum spdk_sock_task_type { 56 SPDK_SOCK_TASK_POLLIN = 0, 57 SPDK_SOCK_TASK_WRITE, 58 SPDK_SOCK_TASK_CANCEL, 59 }; 60 61 enum spdk_uring_sock_task_status { 62 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 63 SPDK_URING_SOCK_TASK_IN_PROCESS, 64 }; 65 66 struct spdk_uring_task { 67 enum spdk_uring_sock_task_status status; 68 enum spdk_sock_task_type type; 69 struct spdk_uring_sock *sock; 70 struct msghdr msg; 71 struct iovec iovs[IOV_BATCH_SIZE]; 72 int iov_cnt; 73 struct spdk_sock_request *last_req; 74 STAILQ_ENTRY(spdk_uring_task) link; 75 }; 76 77 struct spdk_uring_sock { 78 struct spdk_sock base; 79 int fd; 80 struct spdk_uring_sock_group_impl *group; 81 struct spdk_uring_task write_task; 82 struct spdk_uring_task pollin_task; 83 struct spdk_uring_task cancel_task; 84 struct spdk_pipe *recv_pipe; 85 void *recv_buf; 86 int recv_buf_sz; 87 bool pending_recv; 88 int connection_status; 89 int placement_id; 90 TAILQ_ENTRY(spdk_uring_sock) link; 91 }; 92 93 struct spdk_uring_sock_group_impl { 94 struct spdk_sock_group_impl base; 95 struct io_uring uring; 96 uint32_t io_inflight; 97 uint32_t io_queued; 98 uint32_t io_avail; 99 TAILQ_HEAD(, spdk_uring_sock) pending_recv; 100 }; 101 102 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 103 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 104 .send_buf_size = MIN_SO_SNDBUF_SIZE, 105 .enable_recv_pipe = true, 106 .enable_quickack = false, 107 .enable_placement_id = PLACEMENT_NONE, 108 }; 109 110 static struct spdk_sock_map g_map = { 111 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 112 .mtx = PTHREAD_MUTEX_INITIALIZER 113 }; 114 115 __attribute((destructor)) static void 116 uring_sock_map_cleanup(void) 117 { 118 spdk_sock_map_cleanup(&g_map); 119 } 120 121 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 122 123 static int 124 get_addr_str(struct sockaddr *sa, char *host, size_t hlen) 125 { 126 const char *result = NULL; 127 128 if (sa == NULL || host == NULL) { 129 return -1; 130 } 131 132 switch (sa->sa_family) { 133 case AF_INET: 134 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), 135 host, hlen); 136 break; 137 case AF_INET6: 138 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), 139 host, hlen); 140 break; 141 default: 142 break; 143 } 144 145 if (result != NULL) { 146 return 0; 147 } else { 148 return -1; 149 } 150 } 151 152 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 153 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 154 155 static int 156 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 157 char *caddr, int clen, uint16_t *cport) 158 { 159 struct spdk_uring_sock *sock = __uring_sock(_sock); 160 struct sockaddr_storage sa; 161 socklen_t salen; 162 int rc; 163 164 assert(sock != NULL); 165 166 memset(&sa, 0, sizeof sa); 167 salen = sizeof sa; 168 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 169 if (rc != 0) { 170 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 171 return -1; 172 } 173 174 switch (sa.ss_family) { 175 case AF_UNIX: 176 /* Acceptable connection types that don't have IPs */ 177 return 0; 178 case AF_INET: 179 case AF_INET6: 180 /* Code below will get IP addresses */ 181 break; 182 default: 183 /* Unsupported socket family */ 184 return -1; 185 } 186 187 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 188 if (rc != 0) { 189 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 190 return -1; 191 } 192 193 if (sport) { 194 if (sa.ss_family == AF_INET) { 195 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 196 } else if (sa.ss_family == AF_INET6) { 197 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 198 } 199 } 200 201 memset(&sa, 0, sizeof sa); 202 salen = sizeof sa; 203 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 204 if (rc != 0) { 205 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 206 return -1; 207 } 208 209 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 210 if (rc != 0) { 211 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 212 return -1; 213 } 214 215 if (cport) { 216 if (sa.ss_family == AF_INET) { 217 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 218 } else if (sa.ss_family == AF_INET6) { 219 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 220 } 221 } 222 223 return 0; 224 } 225 226 enum uring_sock_create_type { 227 SPDK_SOCK_CREATE_LISTEN, 228 SPDK_SOCK_CREATE_CONNECT, 229 }; 230 231 static int 232 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 233 { 234 uint8_t *new_buf; 235 struct spdk_pipe *new_pipe; 236 struct iovec siov[2]; 237 struct iovec diov[2]; 238 int sbytes; 239 ssize_t bytes; 240 241 if (sock->recv_buf_sz == sz) { 242 return 0; 243 } 244 245 /* If the new size is 0, just free the pipe */ 246 if (sz == 0) { 247 spdk_pipe_destroy(sock->recv_pipe); 248 free(sock->recv_buf); 249 sock->recv_pipe = NULL; 250 sock->recv_buf = NULL; 251 return 0; 252 } else if (sz < MIN_SOCK_PIPE_SIZE) { 253 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 254 return -1; 255 } 256 257 /* Round up to next 64 byte multiple */ 258 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 259 if (!new_buf) { 260 SPDK_ERRLOG("socket recv buf allocation failed\n"); 261 return -ENOMEM; 262 } 263 264 new_pipe = spdk_pipe_create(new_buf, sz + 1); 265 if (new_pipe == NULL) { 266 SPDK_ERRLOG("socket pipe allocation failed\n"); 267 free(new_buf); 268 return -ENOMEM; 269 } 270 271 if (sock->recv_pipe != NULL) { 272 /* Pull all of the data out of the old pipe */ 273 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 274 if (sbytes > sz) { 275 /* Too much data to fit into the new pipe size */ 276 spdk_pipe_destroy(new_pipe); 277 free(new_buf); 278 return -EINVAL; 279 } 280 281 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 282 assert(sbytes == sz); 283 284 bytes = spdk_iovcpy(siov, 2, diov, 2); 285 spdk_pipe_writer_advance(new_pipe, bytes); 286 287 spdk_pipe_destroy(sock->recv_pipe); 288 free(sock->recv_buf); 289 } 290 291 sock->recv_buf_sz = sz; 292 sock->recv_buf = new_buf; 293 sock->recv_pipe = new_pipe; 294 295 return 0; 296 } 297 298 static int 299 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 300 { 301 struct spdk_uring_sock *sock = __uring_sock(_sock); 302 int rc; 303 304 assert(sock != NULL); 305 306 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 307 rc = uring_sock_alloc_pipe(sock, sz); 308 if (rc) { 309 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 310 return rc; 311 } 312 } 313 314 if (sz < MIN_SO_RCVBUF_SIZE) { 315 sz = MIN_SO_RCVBUF_SIZE; 316 } 317 318 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 319 if (rc < 0) { 320 return rc; 321 } 322 323 return 0; 324 } 325 326 static int 327 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 328 { 329 struct spdk_uring_sock *sock = __uring_sock(_sock); 330 int rc; 331 332 assert(sock != NULL); 333 334 if (sz < MIN_SO_SNDBUF_SIZE) { 335 sz = MIN_SO_SNDBUF_SIZE; 336 } 337 338 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 339 if (rc < 0) { 340 return rc; 341 } 342 343 return 0; 344 } 345 346 static struct spdk_uring_sock * 347 uring_sock_alloc(int fd) 348 { 349 struct spdk_uring_sock *sock; 350 #if defined(__linux__) 351 int flag; 352 int rc; 353 #endif 354 355 sock = calloc(1, sizeof(*sock)); 356 if (sock == NULL) { 357 SPDK_ERRLOG("sock allocation failed\n"); 358 return NULL; 359 } 360 361 sock->fd = fd; 362 363 #if defined(__linux__) 364 flag = 1; 365 366 if (g_spdk_uring_sock_impl_opts.enable_quickack) { 367 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 368 if (rc != 0) { 369 SPDK_ERRLOG("quickack was failed to set\n"); 370 } 371 } 372 373 spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id, 374 &sock->placement_id); 375 #endif 376 377 return sock; 378 } 379 380 static struct spdk_sock * 381 uring_sock_create(const char *ip, int port, 382 enum uring_sock_create_type type, 383 struct spdk_sock_opts *opts) 384 { 385 struct spdk_uring_sock *sock; 386 char buf[MAX_TMPBUF]; 387 char portnum[PORTNUMLEN]; 388 char *p; 389 struct addrinfo hints, *res, *res0; 390 int fd, flag; 391 int val = 1; 392 int rc; 393 394 if (ip == NULL) { 395 return NULL; 396 } 397 if (ip[0] == '[') { 398 snprintf(buf, sizeof(buf), "%s", ip + 1); 399 p = strchr(buf, ']'); 400 if (p != NULL) { 401 *p = '\0'; 402 } 403 ip = (const char *) &buf[0]; 404 } 405 406 snprintf(portnum, sizeof portnum, "%d", port); 407 memset(&hints, 0, sizeof hints); 408 hints.ai_family = PF_UNSPEC; 409 hints.ai_socktype = SOCK_STREAM; 410 hints.ai_flags = AI_NUMERICSERV; 411 hints.ai_flags |= AI_PASSIVE; 412 hints.ai_flags |= AI_NUMERICHOST; 413 rc = getaddrinfo(ip, portnum, &hints, &res0); 414 if (rc != 0) { 415 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 416 return NULL; 417 } 418 419 /* try listen */ 420 fd = -1; 421 for (res = res0; res != NULL; res = res->ai_next) { 422 retry: 423 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 424 if (fd < 0) { 425 /* error */ 426 continue; 427 } 428 429 val = g_spdk_uring_sock_impl_opts.recv_buf_size; 430 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 431 if (rc) { 432 /* Not fatal */ 433 } 434 435 val = g_spdk_uring_sock_impl_opts.send_buf_size; 436 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 437 if (rc) { 438 /* Not fatal */ 439 } 440 441 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 442 if (rc != 0) { 443 close(fd); 444 /* error */ 445 continue; 446 } 447 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 448 if (rc != 0) { 449 close(fd); 450 /* error */ 451 continue; 452 } 453 454 #if defined(SO_PRIORITY) 455 if (opts != NULL && opts->priority) { 456 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 457 if (rc != 0) { 458 close(fd); 459 /* error */ 460 continue; 461 } 462 } 463 #endif 464 if (res->ai_family == AF_INET6) { 465 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 466 if (rc != 0) { 467 close(fd); 468 /* error */ 469 continue; 470 } 471 } 472 473 if (type == SPDK_SOCK_CREATE_LISTEN) { 474 rc = bind(fd, res->ai_addr, res->ai_addrlen); 475 if (rc != 0) { 476 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 477 switch (errno) { 478 case EINTR: 479 /* interrupted? */ 480 close(fd); 481 goto retry; 482 case EADDRNOTAVAIL: 483 SPDK_ERRLOG("IP address %s not available. " 484 "Verify IP address in config file " 485 "and make sure setup script is " 486 "run before starting spdk app.\n", ip); 487 /* FALLTHROUGH */ 488 default: 489 /* try next family */ 490 close(fd); 491 fd = -1; 492 continue; 493 } 494 } 495 /* bind OK */ 496 rc = listen(fd, 512); 497 if (rc != 0) { 498 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 499 close(fd); 500 fd = -1; 501 break; 502 } 503 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 504 rc = connect(fd, res->ai_addr, res->ai_addrlen); 505 if (rc != 0) { 506 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 507 /* try next family */ 508 close(fd); 509 fd = -1; 510 continue; 511 } 512 } 513 514 flag = fcntl(fd, F_GETFL); 515 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 516 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 517 close(fd); 518 fd = -1; 519 break; 520 } 521 break; 522 } 523 freeaddrinfo(res0); 524 525 if (fd < 0) { 526 return NULL; 527 } 528 529 sock = uring_sock_alloc(fd); 530 if (sock == NULL) { 531 SPDK_ERRLOG("sock allocation failed\n"); 532 close(fd); 533 return NULL; 534 } 535 536 return &sock->base; 537 } 538 539 static struct spdk_sock * 540 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 541 { 542 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 543 } 544 545 static struct spdk_sock * 546 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 547 { 548 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 549 } 550 551 static struct spdk_sock * 552 uring_sock_accept(struct spdk_sock *_sock) 553 { 554 struct spdk_uring_sock *sock = __uring_sock(_sock); 555 struct sockaddr_storage sa; 556 socklen_t salen; 557 int rc, fd; 558 struct spdk_uring_sock *new_sock; 559 int flag; 560 561 memset(&sa, 0, sizeof(sa)); 562 salen = sizeof(sa); 563 564 assert(sock != NULL); 565 566 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 567 568 if (rc == -1) { 569 return NULL; 570 } 571 572 fd = rc; 573 574 flag = fcntl(fd, F_GETFL); 575 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 576 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 577 close(fd); 578 return NULL; 579 } 580 581 #if defined(SO_PRIORITY) 582 /* The priority is not inherited, so call this function again */ 583 if (sock->base.opts.priority) { 584 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 585 if (rc != 0) { 586 close(fd); 587 return NULL; 588 } 589 } 590 #endif 591 592 new_sock = uring_sock_alloc(fd); 593 if (new_sock == NULL) { 594 close(fd); 595 return NULL; 596 } 597 598 return &new_sock->base; 599 } 600 601 static int 602 uring_sock_close(struct spdk_sock *_sock) 603 { 604 struct spdk_uring_sock *sock = __uring_sock(_sock); 605 606 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 607 assert(sock->group == NULL); 608 609 /* If the socket fails to close, the best choice is to 610 * leak the fd but continue to free the rest of the sock 611 * memory. */ 612 close(sock->fd); 613 614 spdk_pipe_destroy(sock->recv_pipe); 615 free(sock->recv_buf); 616 free(sock); 617 618 return 0; 619 } 620 621 static ssize_t 622 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 623 { 624 struct iovec siov[2]; 625 int sbytes; 626 ssize_t bytes; 627 struct spdk_uring_sock_group_impl *group; 628 629 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 630 if (sbytes < 0) { 631 errno = EINVAL; 632 return -1; 633 } else if (sbytes == 0) { 634 errno = EAGAIN; 635 return -1; 636 } 637 638 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 639 640 if (bytes == 0) { 641 /* The only way this happens is if diov is 0 length */ 642 errno = EINVAL; 643 return -1; 644 } 645 646 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 647 648 /* If we drained the pipe, take it off the level-triggered list */ 649 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 650 group = __uring_group_impl(sock->base.group_impl); 651 TAILQ_REMOVE(&group->pending_recv, sock, link); 652 sock->pending_recv = false; 653 } 654 655 return bytes; 656 } 657 658 static inline ssize_t 659 uring_sock_read(struct spdk_uring_sock *sock) 660 { 661 struct iovec iov[2]; 662 int bytes; 663 struct spdk_uring_sock_group_impl *group; 664 665 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 666 667 if (bytes > 0) { 668 bytes = readv(sock->fd, iov, 2); 669 if (bytes > 0) { 670 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 671 if (sock->base.group_impl) { 672 group = __uring_group_impl(sock->base.group_impl); 673 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 674 sock->pending_recv = true; 675 } 676 } 677 } 678 679 return bytes; 680 } 681 682 static ssize_t 683 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 684 { 685 struct spdk_uring_sock *sock = __uring_sock(_sock); 686 int rc, i; 687 size_t len; 688 689 if (sock->recv_pipe == NULL) { 690 return readv(sock->fd, iov, iovcnt); 691 } 692 693 len = 0; 694 for (i = 0; i < iovcnt; i++) { 695 len += iov[i].iov_len; 696 } 697 698 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 699 /* If the user is receiving a sufficiently large amount of data, 700 * receive directly to their buffers. */ 701 if (len >= MIN_SOCK_PIPE_SIZE) { 702 return readv(sock->fd, iov, iovcnt); 703 } 704 705 /* Otherwise, do a big read into our pipe */ 706 rc = uring_sock_read(sock); 707 if (rc <= 0) { 708 return rc; 709 } 710 } 711 712 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 713 } 714 715 static ssize_t 716 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 717 { 718 struct iovec iov[1]; 719 720 iov[0].iov_base = buf; 721 iov[0].iov_len = len; 722 723 return uring_sock_readv(sock, iov, 1); 724 } 725 726 static ssize_t 727 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 728 { 729 struct spdk_uring_sock *sock = __uring_sock(_sock); 730 731 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 732 errno = EAGAIN; 733 return -1; 734 } 735 736 return writev(sock->fd, iov, iovcnt); 737 } 738 739 static int 740 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 741 { 742 struct spdk_sock_request *req; 743 int i, retval; 744 unsigned int offset; 745 size_t len; 746 747 /* Consume the requests that were actually written */ 748 req = TAILQ_FIRST(&_sock->queued_reqs); 749 while (req) { 750 offset = req->internal.offset; 751 752 for (i = 0; i < req->iovcnt; i++) { 753 /* Advance by the offset first */ 754 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 755 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 756 continue; 757 } 758 759 /* Calculate the remaining length of this element */ 760 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 761 762 if (len > (size_t)rc) { 763 /* This element was partially sent. */ 764 req->internal.offset += rc; 765 return 0; 766 } 767 768 offset = 0; 769 req->internal.offset += len; 770 rc -= len; 771 } 772 773 /* Handled a full request. */ 774 spdk_sock_request_pend(_sock, req); 775 776 retval = spdk_sock_request_put(_sock, req, 0); 777 if (retval) { 778 return retval; 779 } 780 781 if (rc == 0) { 782 break; 783 } 784 785 req = TAILQ_FIRST(&_sock->queued_reqs); 786 } 787 788 return 0; 789 } 790 791 static void 792 _sock_flush(struct spdk_sock *_sock) 793 { 794 struct spdk_uring_sock *sock = __uring_sock(_sock); 795 struct spdk_uring_task *task = &sock->write_task; 796 uint32_t iovcnt; 797 struct io_uring_sqe *sqe; 798 799 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 800 return; 801 } 802 803 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 804 if (!iovcnt) { 805 return; 806 } 807 808 task->iov_cnt = iovcnt; 809 assert(sock->group != NULL); 810 task->msg.msg_iov = task->iovs; 811 task->msg.msg_iovlen = task->iov_cnt; 812 813 sock->group->io_queued++; 814 815 sqe = io_uring_get_sqe(&sock->group->uring); 816 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); 817 io_uring_sqe_set_data(sqe, task); 818 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 819 } 820 821 static void 822 _sock_prep_pollin(struct spdk_sock *_sock) 823 { 824 struct spdk_uring_sock *sock = __uring_sock(_sock); 825 struct spdk_uring_task *task = &sock->pollin_task; 826 struct io_uring_sqe *sqe; 827 828 /* Do not prepare pollin event */ 829 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { 830 return; 831 } 832 833 assert(sock->group != NULL); 834 sock->group->io_queued++; 835 836 sqe = io_uring_get_sqe(&sock->group->uring); 837 io_uring_prep_poll_add(sqe, sock->fd, POLLIN); 838 io_uring_sqe_set_data(sqe, task); 839 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 840 } 841 842 static void 843 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 844 { 845 struct spdk_uring_sock *sock = __uring_sock(_sock); 846 struct spdk_uring_task *task = &sock->cancel_task; 847 struct io_uring_sqe *sqe; 848 849 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 850 return; 851 } 852 853 assert(sock->group != NULL); 854 sock->group->io_queued++; 855 856 sqe = io_uring_get_sqe(&sock->group->uring); 857 io_uring_prep_cancel(sqe, user_data, 0); 858 io_uring_sqe_set_data(sqe, task); 859 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 860 } 861 862 static int 863 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 864 struct spdk_sock **socks) 865 { 866 int i, count, ret; 867 struct io_uring_cqe *cqe; 868 struct spdk_uring_sock *sock, *tmp; 869 struct spdk_uring_task *task; 870 int status; 871 872 for (i = 0; i < max; i++) { 873 ret = io_uring_peek_cqe(&group->uring, &cqe); 874 if (ret != 0) { 875 break; 876 } 877 878 if (cqe == NULL) { 879 break; 880 } 881 882 task = (struct spdk_uring_task *)cqe->user_data; 883 assert(task != NULL); 884 sock = task->sock; 885 assert(sock != NULL); 886 assert(sock->group != NULL); 887 assert(sock->group == group); 888 sock->group->io_inflight--; 889 sock->group->io_avail++; 890 status = cqe->res; 891 io_uring_cqe_seen(&group->uring, cqe); 892 893 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 894 895 if (spdk_unlikely(status <= 0)) { 896 if (status == -EAGAIN || status == -EWOULDBLOCK) { 897 continue; 898 } 899 } 900 901 switch (task->type) { 902 case SPDK_SOCK_TASK_POLLIN: 903 if ((status & POLLIN) == POLLIN) { 904 if (sock->base.cb_fn != NULL) { 905 assert(sock->pending_recv == false); 906 sock->pending_recv = true; 907 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 908 } 909 } 910 break; 911 case SPDK_SOCK_TASK_WRITE: 912 assert(TAILQ_EMPTY(&sock->base.pending_reqs)); 913 task->last_req = NULL; 914 task->iov_cnt = 0; 915 if (spdk_unlikely(status) < 0) { 916 sock->connection_status = status; 917 spdk_sock_abort_requests(&sock->base); 918 } else { 919 sock_complete_reqs(&sock->base, status); 920 } 921 922 break; 923 case SPDK_SOCK_TASK_CANCEL: 924 /* Do nothing */ 925 break; 926 default: 927 SPDK_UNREACHABLE(); 928 } 929 } 930 931 if (!socks) { 932 return 0; 933 } 934 count = 0; 935 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 936 if (count == max_read_events) { 937 break; 938 } 939 940 /* If the socket's cb_fn is NULL, just remove it from 941 * the list and do not add it to socks array */ 942 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 943 sock->pending_recv = false; 944 TAILQ_REMOVE(&group->pending_recv, sock, link); 945 continue; 946 } 947 948 socks[count++] = &sock->base; 949 } 950 951 /* Cycle the pending_recv list so that each time we poll things aren't 952 * in the same order. */ 953 for (i = 0; i < count; i++) { 954 sock = __uring_sock(socks[i]); 955 956 TAILQ_REMOVE(&group->pending_recv, sock, link); 957 958 if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 959 sock->pending_recv = false; 960 } else { 961 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 962 } 963 } 964 965 return count; 966 } 967 968 static int 969 _sock_flush_client(struct spdk_sock *_sock) 970 { 971 struct spdk_uring_sock *sock = __uring_sock(_sock); 972 struct msghdr msg = {}; 973 struct iovec iovs[IOV_BATCH_SIZE]; 974 int iovcnt; 975 ssize_t rc; 976 977 /* Can't flush from within a callback or we end up with recursive calls */ 978 if (_sock->cb_cnt > 0) { 979 return 0; 980 } 981 982 /* Gather an iov */ 983 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL); 984 if (iovcnt == 0) { 985 return 0; 986 } 987 988 /* Perform the vectored write */ 989 msg.msg_iov = iovs; 990 msg.msg_iovlen = iovcnt; 991 rc = sendmsg(sock->fd, &msg, 0); 992 if (rc <= 0) { 993 if (errno == EAGAIN || errno == EWOULDBLOCK) { 994 return 0; 995 } 996 return rc; 997 } 998 999 sock_complete_reqs(_sock, rc); 1000 1001 return 0; 1002 } 1003 1004 static void 1005 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1006 { 1007 struct spdk_uring_sock *sock = __uring_sock(_sock); 1008 int rc; 1009 1010 if (spdk_unlikely(sock->connection_status)) { 1011 req->cb_fn(req->cb_arg, sock->connection_status); 1012 return; 1013 } 1014 1015 spdk_sock_request_queue(_sock, req); 1016 1017 if (!sock->group) { 1018 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1019 rc = _sock_flush_client(_sock); 1020 if (rc) { 1021 spdk_sock_abort_requests(_sock); 1022 } 1023 } 1024 } 1025 } 1026 1027 static int 1028 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1029 { 1030 struct spdk_uring_sock *sock = __uring_sock(_sock); 1031 int val; 1032 int rc; 1033 1034 assert(sock != NULL); 1035 1036 val = nbytes; 1037 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1038 if (rc != 0) { 1039 return -1; 1040 } 1041 return 0; 1042 } 1043 1044 static bool 1045 uring_sock_is_ipv6(struct spdk_sock *_sock) 1046 { 1047 struct spdk_uring_sock *sock = __uring_sock(_sock); 1048 struct sockaddr_storage sa; 1049 socklen_t salen; 1050 int rc; 1051 1052 assert(sock != NULL); 1053 1054 memset(&sa, 0, sizeof sa); 1055 salen = sizeof sa; 1056 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1057 if (rc != 0) { 1058 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1059 return false; 1060 } 1061 1062 return (sa.ss_family == AF_INET6); 1063 } 1064 1065 static bool 1066 uring_sock_is_ipv4(struct spdk_sock *_sock) 1067 { 1068 struct spdk_uring_sock *sock = __uring_sock(_sock); 1069 struct sockaddr_storage sa; 1070 socklen_t salen; 1071 int rc; 1072 1073 assert(sock != NULL); 1074 1075 memset(&sa, 0, sizeof sa); 1076 salen = sizeof sa; 1077 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1078 if (rc != 0) { 1079 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1080 return false; 1081 } 1082 1083 return (sa.ss_family == AF_INET); 1084 } 1085 1086 static bool 1087 uring_sock_is_connected(struct spdk_sock *_sock) 1088 { 1089 struct spdk_uring_sock *sock = __uring_sock(_sock); 1090 uint8_t byte; 1091 int rc; 1092 1093 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1094 if (rc == 0) { 1095 return false; 1096 } 1097 1098 if (rc < 0) { 1099 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1100 return true; 1101 } 1102 1103 return false; 1104 } 1105 1106 return true; 1107 } 1108 1109 static struct spdk_sock_group_impl * 1110 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock) 1111 { 1112 struct spdk_uring_sock *sock = __uring_sock(_sock); 1113 struct spdk_sock_group_impl *group; 1114 1115 if (sock->placement_id != -1) { 1116 spdk_sock_map_lookup(&g_map, sock->placement_id, &group); 1117 return group; 1118 } 1119 1120 return NULL; 1121 } 1122 1123 static struct spdk_sock_group_impl * 1124 uring_sock_group_impl_create(void) 1125 { 1126 struct spdk_uring_sock_group_impl *group_impl; 1127 1128 group_impl = calloc(1, sizeof(*group_impl)); 1129 if (group_impl == NULL) { 1130 SPDK_ERRLOG("group_impl allocation failed\n"); 1131 return NULL; 1132 } 1133 1134 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1135 1136 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1137 SPDK_ERRLOG("uring I/O context setup failure\n"); 1138 free(group_impl); 1139 return NULL; 1140 } 1141 1142 TAILQ_INIT(&group_impl->pending_recv); 1143 1144 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1145 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1146 } 1147 1148 return &group_impl->base; 1149 } 1150 1151 static int 1152 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1153 struct spdk_sock *_sock) 1154 { 1155 struct spdk_uring_sock *sock = __uring_sock(_sock); 1156 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1157 int rc; 1158 1159 sock->group = group; 1160 sock->write_task.sock = sock; 1161 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1162 1163 sock->pollin_task.sock = sock; 1164 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1165 1166 sock->cancel_task.sock = sock; 1167 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1168 1169 /* switched from another polling group due to scheduling */ 1170 if (spdk_unlikely(sock->recv_pipe != NULL && 1171 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1172 assert(sock->pending_recv == false); 1173 sock->pending_recv = true; 1174 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1175 } 1176 1177 if (sock->placement_id != -1) { 1178 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1179 if (rc != 0) { 1180 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1181 /* Do not treat this as an error. The system will continue running. */ 1182 } 1183 } 1184 1185 return 0; 1186 } 1187 1188 static int 1189 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1190 struct spdk_sock **socks) 1191 { 1192 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1193 int count, ret; 1194 int to_complete, to_submit; 1195 struct spdk_sock *_sock, *tmp; 1196 struct spdk_uring_sock *sock; 1197 1198 if (spdk_likely(socks)) { 1199 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1200 sock = __uring_sock(_sock); 1201 if (spdk_unlikely(sock->connection_status)) { 1202 continue; 1203 } 1204 _sock_flush(_sock); 1205 _sock_prep_pollin(_sock); 1206 } 1207 } 1208 1209 to_submit = group->io_queued; 1210 1211 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1212 if (to_submit > 0) { 1213 /* If there are I/O to submit, use io_uring_submit here. 1214 * It will automatically call io_uring_enter appropriately. */ 1215 ret = io_uring_submit(&group->uring); 1216 if (ret < 0) { 1217 return 1; 1218 } 1219 group->io_queued = 0; 1220 group->io_inflight += to_submit; 1221 group->io_avail -= to_submit; 1222 } 1223 1224 count = 0; 1225 to_complete = group->io_inflight; 1226 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1227 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1228 } 1229 1230 return count; 1231 } 1232 1233 static int 1234 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1235 struct spdk_sock *_sock) 1236 { 1237 struct spdk_uring_sock *sock = __uring_sock(_sock); 1238 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1239 1240 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1241 _sock_prep_cancel_task(_sock, &sock->write_task); 1242 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1243 * currently can use a while loop here. */ 1244 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1245 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1246 uring_sock_group_impl_poll(_group, 32, NULL); 1247 } 1248 } 1249 1250 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1251 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1252 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1253 * currently can use a while loop here. */ 1254 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1255 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1256 uring_sock_group_impl_poll(_group, 32, NULL); 1257 } 1258 } 1259 1260 if (sock->pending_recv) { 1261 TAILQ_REMOVE(&group->pending_recv, sock, link); 1262 sock->pending_recv = false; 1263 } 1264 assert(sock->pending_recv == false); 1265 1266 if (sock->placement_id != -1) { 1267 spdk_sock_map_release(&g_map, sock->placement_id); 1268 } 1269 1270 sock->group = NULL; 1271 return 0; 1272 } 1273 1274 static int 1275 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1276 { 1277 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1278 1279 /* try to reap all the active I/O */ 1280 while (group->io_inflight) { 1281 uring_sock_group_impl_poll(_group, 32, NULL); 1282 } 1283 assert(group->io_inflight == 0); 1284 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1285 1286 io_uring_queue_exit(&group->uring); 1287 1288 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1289 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1290 } 1291 1292 free(group); 1293 return 0; 1294 } 1295 1296 static int 1297 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1298 { 1299 if (!opts || !len) { 1300 errno = EINVAL; 1301 return -1; 1302 } 1303 memset(opts, 0, *len); 1304 1305 #define FIELD_OK(field) \ 1306 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1307 1308 #define GET_FIELD(field) \ 1309 if (FIELD_OK(field)) { \ 1310 opts->field = g_spdk_uring_sock_impl_opts.field; \ 1311 } 1312 1313 GET_FIELD(recv_buf_size); 1314 GET_FIELD(send_buf_size); 1315 GET_FIELD(enable_recv_pipe); 1316 GET_FIELD(enable_quickack); 1317 GET_FIELD(enable_placement_id); 1318 1319 #undef GET_FIELD 1320 #undef FIELD_OK 1321 1322 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 1323 return 0; 1324 } 1325 1326 static int 1327 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1328 { 1329 if (!opts) { 1330 errno = EINVAL; 1331 return -1; 1332 } 1333 1334 #define FIELD_OK(field) \ 1335 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1336 1337 #define SET_FIELD(field) \ 1338 if (FIELD_OK(field)) { \ 1339 g_spdk_uring_sock_impl_opts.field = opts->field; \ 1340 } 1341 1342 SET_FIELD(recv_buf_size); 1343 SET_FIELD(send_buf_size); 1344 SET_FIELD(enable_recv_pipe); 1345 SET_FIELD(enable_quickack); 1346 SET_FIELD(enable_placement_id); 1347 1348 #undef SET_FIELD 1349 #undef FIELD_OK 1350 1351 return 0; 1352 } 1353 1354 static int 1355 uring_sock_flush(struct spdk_sock *_sock) 1356 { 1357 struct spdk_uring_sock *sock = __uring_sock(_sock); 1358 1359 if (!sock->group) { 1360 return _sock_flush_client(_sock); 1361 } 1362 1363 return 0; 1364 } 1365 1366 static struct spdk_net_impl g_uring_net_impl = { 1367 .name = "uring", 1368 .getaddr = uring_sock_getaddr, 1369 .connect = uring_sock_connect, 1370 .listen = uring_sock_listen, 1371 .accept = uring_sock_accept, 1372 .close = uring_sock_close, 1373 .recv = uring_sock_recv, 1374 .readv = uring_sock_readv, 1375 .writev = uring_sock_writev, 1376 .writev_async = uring_sock_writev_async, 1377 .flush = uring_sock_flush, 1378 .set_recvlowat = uring_sock_set_recvlowat, 1379 .set_recvbuf = uring_sock_set_recvbuf, 1380 .set_sendbuf = uring_sock_set_sendbuf, 1381 .is_ipv6 = uring_sock_is_ipv6, 1382 .is_ipv4 = uring_sock_is_ipv4, 1383 .is_connected = uring_sock_is_connected, 1384 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1385 .group_impl_create = uring_sock_group_impl_create, 1386 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1387 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1388 .group_impl_poll = uring_sock_group_impl_poll, 1389 .group_impl_close = uring_sock_group_impl_close, 1390 .get_opts = uring_sock_impl_get_opts, 1391 .set_opts = uring_sock_impl_set_opts, 1392 }; 1393 1394 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1395