1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/config.h" 36 37 #include <linux/errqueue.h> 38 #include <sys/epoll.h> 39 #include <liburing.h> 40 41 #include "spdk/barrier.h" 42 #include "spdk/env.h" 43 #include "spdk/log.h" 44 #include "spdk/pipe.h" 45 #include "spdk/sock.h" 46 #include "spdk/string.h" 47 #include "spdk/util.h" 48 49 #include "spdk_internal/sock.h" 50 #include "spdk_internal/assert.h" 51 #include "../sock_kernel.h" 52 53 #define MAX_TMPBUF 1024 54 #define PORTNUMLEN 32 55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 56 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 57 58 enum spdk_sock_task_type { 59 SPDK_SOCK_TASK_POLLIN = 0, 60 SPDK_SOCK_TASK_RECV, 61 SPDK_SOCK_TASK_WRITE, 62 SPDK_SOCK_TASK_CANCEL, 63 }; 64 65 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 66 #define SPDK_ZEROCOPY 67 #endif 68 69 enum spdk_uring_sock_task_status { 70 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 71 SPDK_URING_SOCK_TASK_IN_PROCESS, 72 }; 73 74 struct spdk_uring_task { 75 enum spdk_uring_sock_task_status status; 76 enum spdk_sock_task_type type; 77 struct spdk_uring_sock *sock; 78 struct msghdr msg; 79 struct iovec iovs[IOV_BATCH_SIZE]; 80 int iov_cnt; 81 struct spdk_sock_request *last_req; 82 STAILQ_ENTRY(spdk_uring_task) link; 83 }; 84 85 struct spdk_uring_sock { 86 struct spdk_sock base; 87 int fd; 88 uint32_t sendmsg_idx; 89 struct spdk_uring_sock_group_impl *group; 90 struct spdk_uring_task write_task; 91 struct spdk_uring_task recv_task; 92 struct spdk_uring_task pollin_task; 93 struct spdk_uring_task cancel_task; 94 struct spdk_pipe *recv_pipe; 95 void *recv_buf; 96 int recv_buf_sz; 97 bool zcopy; 98 bool pending_recv; 99 int zcopy_send_flags; 100 int connection_status; 101 int placement_id; 102 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 103 TAILQ_ENTRY(spdk_uring_sock) link; 104 }; 105 106 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 107 108 struct spdk_uring_sock_group_impl { 109 struct spdk_sock_group_impl base; 110 struct io_uring uring; 111 uint32_t io_inflight; 112 uint32_t io_queued; 113 uint32_t io_avail; 114 struct pending_recv_list pending_recv; 115 }; 116 117 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 118 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 119 .send_buf_size = MIN_SO_SNDBUF_SIZE, 120 .enable_recv_pipe = true, 121 .enable_quickack = false, 122 .enable_placement_id = PLACEMENT_NONE, 123 .enable_zerocopy_send_server = false, 124 .enable_zerocopy_send_client = false, 125 }; 126 127 static struct spdk_sock_map g_map = { 128 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 129 .mtx = PTHREAD_MUTEX_INITIALIZER 130 }; 131 132 __attribute((destructor)) static void 133 uring_sock_map_cleanup(void) 134 { 135 spdk_sock_map_cleanup(&g_map); 136 } 137 138 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 139 140 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 141 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 142 143 static int 144 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 145 char *caddr, int clen, uint16_t *cport) 146 { 147 struct spdk_uring_sock *sock = __uring_sock(_sock); 148 struct sockaddr_storage sa; 149 socklen_t salen; 150 int rc; 151 152 assert(sock != NULL); 153 154 memset(&sa, 0, sizeof sa); 155 salen = sizeof sa; 156 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 157 if (rc != 0) { 158 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 159 return -1; 160 } 161 162 switch (sa.ss_family) { 163 case AF_UNIX: 164 /* Acceptable connection types that don't have IPs */ 165 return 0; 166 case AF_INET: 167 case AF_INET6: 168 /* Code below will get IP addresses */ 169 break; 170 default: 171 /* Unsupported socket family */ 172 return -1; 173 } 174 175 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 176 if (rc != 0) { 177 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 178 return -1; 179 } 180 181 if (sport) { 182 if (sa.ss_family == AF_INET) { 183 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 184 } else if (sa.ss_family == AF_INET6) { 185 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 186 } 187 } 188 189 memset(&sa, 0, sizeof sa); 190 salen = sizeof sa; 191 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 192 if (rc != 0) { 193 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 194 return -1; 195 } 196 197 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 198 if (rc != 0) { 199 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 200 return -1; 201 } 202 203 if (cport) { 204 if (sa.ss_family == AF_INET) { 205 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 206 } else if (sa.ss_family == AF_INET6) { 207 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 208 } 209 } 210 211 return 0; 212 } 213 214 enum uring_sock_create_type { 215 SPDK_SOCK_CREATE_LISTEN, 216 SPDK_SOCK_CREATE_CONNECT, 217 }; 218 219 static int 220 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 221 { 222 uint8_t *new_buf; 223 struct spdk_pipe *new_pipe; 224 struct iovec siov[2]; 225 struct iovec diov[2]; 226 int sbytes; 227 ssize_t bytes; 228 229 if (sock->recv_buf_sz == sz) { 230 return 0; 231 } 232 233 /* If the new size is 0, just free the pipe */ 234 if (sz == 0) { 235 spdk_pipe_destroy(sock->recv_pipe); 236 free(sock->recv_buf); 237 sock->recv_pipe = NULL; 238 sock->recv_buf = NULL; 239 return 0; 240 } else if (sz < MIN_SOCK_PIPE_SIZE) { 241 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 242 return -1; 243 } 244 245 /* Round up to next 64 byte multiple */ 246 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 247 if (!new_buf) { 248 SPDK_ERRLOG("socket recv buf allocation failed\n"); 249 return -ENOMEM; 250 } 251 252 new_pipe = spdk_pipe_create(new_buf, sz + 1); 253 if (new_pipe == NULL) { 254 SPDK_ERRLOG("socket pipe allocation failed\n"); 255 free(new_buf); 256 return -ENOMEM; 257 } 258 259 if (sock->recv_pipe != NULL) { 260 /* Pull all of the data out of the old pipe */ 261 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 262 if (sbytes > sz) { 263 /* Too much data to fit into the new pipe size */ 264 spdk_pipe_destroy(new_pipe); 265 free(new_buf); 266 return -EINVAL; 267 } 268 269 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 270 assert(sbytes == sz); 271 272 bytes = spdk_iovcpy(siov, 2, diov, 2); 273 spdk_pipe_writer_advance(new_pipe, bytes); 274 275 spdk_pipe_destroy(sock->recv_pipe); 276 free(sock->recv_buf); 277 } 278 279 sock->recv_buf_sz = sz; 280 sock->recv_buf = new_buf; 281 sock->recv_pipe = new_pipe; 282 283 return 0; 284 } 285 286 static int 287 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 288 { 289 struct spdk_uring_sock *sock = __uring_sock(_sock); 290 int rc; 291 292 assert(sock != NULL); 293 294 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 295 rc = uring_sock_alloc_pipe(sock, sz); 296 if (rc) { 297 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 298 return rc; 299 } 300 } 301 302 if (sz < MIN_SO_RCVBUF_SIZE) { 303 sz = MIN_SO_RCVBUF_SIZE; 304 } 305 306 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 307 if (rc < 0) { 308 return rc; 309 } 310 311 return 0; 312 } 313 314 static int 315 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 316 { 317 struct spdk_uring_sock *sock = __uring_sock(_sock); 318 int rc; 319 320 assert(sock != NULL); 321 322 if (sz < MIN_SO_SNDBUF_SIZE) { 323 sz = MIN_SO_SNDBUF_SIZE; 324 } 325 326 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 327 if (rc < 0) { 328 return rc; 329 } 330 331 return 0; 332 } 333 334 static struct spdk_uring_sock * 335 uring_sock_alloc(int fd, bool enable_zero_copy) 336 { 337 struct spdk_uring_sock *sock; 338 #if defined(__linux__) 339 int flag; 340 int rc; 341 #endif 342 343 sock = calloc(1, sizeof(*sock)); 344 if (sock == NULL) { 345 SPDK_ERRLOG("sock allocation failed\n"); 346 return NULL; 347 } 348 349 sock->fd = fd; 350 351 #if defined(__linux__) 352 flag = 1; 353 354 if (g_spdk_uring_sock_impl_opts.enable_quickack) { 355 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 356 if (rc != 0) { 357 SPDK_ERRLOG("quickack was failed to set\n"); 358 } 359 } 360 361 spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id, 362 &sock->placement_id); 363 #ifdef SPDK_ZEROCOPY 364 /* Try to turn on zero copy sends */ 365 flag = 1; 366 367 if (enable_zero_copy) { 368 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 369 if (rc == 0) { 370 sock->zcopy = true; 371 sock->zcopy_send_flags = MSG_ZEROCOPY; 372 } 373 } 374 #endif 375 #endif 376 377 return sock; 378 } 379 380 static struct spdk_sock * 381 uring_sock_create(const char *ip, int port, 382 enum uring_sock_create_type type, 383 struct spdk_sock_opts *opts) 384 { 385 struct spdk_uring_sock *sock; 386 char buf[MAX_TMPBUF]; 387 char portnum[PORTNUMLEN]; 388 char *p; 389 struct addrinfo hints, *res, *res0; 390 int fd, flag; 391 int val = 1; 392 int rc; 393 bool enable_zcopy_impl_opts = false; 394 bool enable_zcopy_user_opts = true; 395 396 assert(opts != NULL); 397 398 if (ip == NULL) { 399 return NULL; 400 } 401 if (ip[0] == '[') { 402 snprintf(buf, sizeof(buf), "%s", ip + 1); 403 p = strchr(buf, ']'); 404 if (p != NULL) { 405 *p = '\0'; 406 } 407 ip = (const char *) &buf[0]; 408 } 409 410 snprintf(portnum, sizeof portnum, "%d", port); 411 memset(&hints, 0, sizeof hints); 412 hints.ai_family = PF_UNSPEC; 413 hints.ai_socktype = SOCK_STREAM; 414 hints.ai_flags = AI_NUMERICSERV; 415 hints.ai_flags |= AI_PASSIVE; 416 hints.ai_flags |= AI_NUMERICHOST; 417 rc = getaddrinfo(ip, portnum, &hints, &res0); 418 if (rc != 0) { 419 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 420 return NULL; 421 } 422 423 /* try listen */ 424 fd = -1; 425 for (res = res0; res != NULL; res = res->ai_next) { 426 retry: 427 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 428 if (fd < 0) { 429 /* error */ 430 continue; 431 } 432 433 val = g_spdk_uring_sock_impl_opts.recv_buf_size; 434 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 435 if (rc) { 436 /* Not fatal */ 437 } 438 439 val = g_spdk_uring_sock_impl_opts.send_buf_size; 440 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 441 if (rc) { 442 /* Not fatal */ 443 } 444 445 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 446 if (rc != 0) { 447 close(fd); 448 fd = -1; 449 /* error */ 450 continue; 451 } 452 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 453 if (rc != 0) { 454 close(fd); 455 fd = -1; 456 /* error */ 457 continue; 458 } 459 460 #if defined(SO_PRIORITY) 461 if (opts != NULL && opts->priority) { 462 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 463 if (rc != 0) { 464 close(fd); 465 fd = -1; 466 /* error */ 467 continue; 468 } 469 } 470 #endif 471 if (res->ai_family == AF_INET6) { 472 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 473 if (rc != 0) { 474 close(fd); 475 fd = -1; 476 /* error */ 477 continue; 478 } 479 } 480 481 if (type == SPDK_SOCK_CREATE_LISTEN) { 482 rc = bind(fd, res->ai_addr, res->ai_addrlen); 483 if (rc != 0) { 484 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 485 switch (errno) { 486 case EINTR: 487 /* interrupted? */ 488 close(fd); 489 goto retry; 490 case EADDRNOTAVAIL: 491 SPDK_ERRLOG("IP address %s not available. " 492 "Verify IP address in config file " 493 "and make sure setup script is " 494 "run before starting spdk app.\n", ip); 495 /* FALLTHROUGH */ 496 default: 497 /* try next family */ 498 close(fd); 499 fd = -1; 500 continue; 501 } 502 } 503 /* bind OK */ 504 rc = listen(fd, 512); 505 if (rc != 0) { 506 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 507 close(fd); 508 fd = -1; 509 break; 510 } 511 enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_server; 512 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 513 rc = connect(fd, res->ai_addr, res->ai_addrlen); 514 if (rc != 0) { 515 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 516 /* try next family */ 517 close(fd); 518 fd = -1; 519 continue; 520 } 521 522 enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_client; 523 } 524 525 flag = fcntl(fd, F_GETFL); 526 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 527 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 528 close(fd); 529 fd = -1; 530 break; 531 } 532 break; 533 } 534 freeaddrinfo(res0); 535 536 if (fd < 0) { 537 return NULL; 538 } 539 540 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 541 sock = uring_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts); 542 if (sock == NULL) { 543 SPDK_ERRLOG("sock allocation failed\n"); 544 close(fd); 545 return NULL; 546 } 547 548 return &sock->base; 549 } 550 551 static struct spdk_sock * 552 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 553 { 554 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 555 } 556 557 static struct spdk_sock * 558 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 559 { 560 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 561 } 562 563 static struct spdk_sock * 564 uring_sock_accept(struct spdk_sock *_sock) 565 { 566 struct spdk_uring_sock *sock = __uring_sock(_sock); 567 struct sockaddr_storage sa; 568 socklen_t salen; 569 int rc, fd; 570 struct spdk_uring_sock *new_sock; 571 int flag; 572 573 memset(&sa, 0, sizeof(sa)); 574 salen = sizeof(sa); 575 576 assert(sock != NULL); 577 578 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 579 580 if (rc == -1) { 581 return NULL; 582 } 583 584 fd = rc; 585 586 flag = fcntl(fd, F_GETFL); 587 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 588 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 589 close(fd); 590 return NULL; 591 } 592 593 #if defined(SO_PRIORITY) 594 /* The priority is not inherited, so call this function again */ 595 if (sock->base.opts.priority) { 596 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 597 if (rc != 0) { 598 close(fd); 599 return NULL; 600 } 601 } 602 #endif 603 604 new_sock = uring_sock_alloc(fd, sock->zcopy); 605 if (new_sock == NULL) { 606 close(fd); 607 return NULL; 608 } 609 610 return &new_sock->base; 611 } 612 613 static int 614 uring_sock_close(struct spdk_sock *_sock) 615 { 616 struct spdk_uring_sock *sock = __uring_sock(_sock); 617 618 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 619 assert(sock->group == NULL); 620 621 /* If the socket fails to close, the best choice is to 622 * leak the fd but continue to free the rest of the sock 623 * memory. */ 624 close(sock->fd); 625 626 spdk_pipe_destroy(sock->recv_pipe); 627 free(sock->recv_buf); 628 free(sock); 629 630 return 0; 631 } 632 633 static ssize_t 634 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 635 { 636 struct iovec siov[2]; 637 int sbytes; 638 ssize_t bytes; 639 struct spdk_uring_sock_group_impl *group; 640 641 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 642 if (sbytes < 0) { 643 errno = EINVAL; 644 return -1; 645 } else if (sbytes == 0) { 646 errno = EAGAIN; 647 return -1; 648 } 649 650 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 651 652 if (bytes == 0) { 653 /* The only way this happens is if diov is 0 length */ 654 errno = EINVAL; 655 return -1; 656 } 657 658 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 659 660 /* If we drained the pipe, take it off the level-triggered list */ 661 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 662 group = __uring_group_impl(sock->base.group_impl); 663 TAILQ_REMOVE(&group->pending_recv, sock, link); 664 sock->pending_recv = false; 665 } 666 667 return bytes; 668 } 669 670 static inline ssize_t 671 uring_sock_read(struct spdk_uring_sock *sock) 672 { 673 struct iovec iov[2]; 674 int bytes; 675 struct spdk_uring_sock_group_impl *group; 676 677 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 678 679 if (bytes > 0) { 680 bytes = readv(sock->fd, iov, 2); 681 if (bytes > 0) { 682 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 683 if (sock->base.group_impl) { 684 group = __uring_group_impl(sock->base.group_impl); 685 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 686 sock->pending_recv = true; 687 } 688 } 689 } 690 691 return bytes; 692 } 693 694 static ssize_t 695 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 696 { 697 struct spdk_uring_sock *sock = __uring_sock(_sock); 698 int rc, i; 699 size_t len; 700 701 if (sock->recv_pipe == NULL) { 702 return readv(sock->fd, iov, iovcnt); 703 } 704 705 len = 0; 706 for (i = 0; i < iovcnt; i++) { 707 len += iov[i].iov_len; 708 } 709 710 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 711 /* If the user is receiving a sufficiently large amount of data, 712 * receive directly to their buffers. */ 713 if (len >= MIN_SOCK_PIPE_SIZE) { 714 return readv(sock->fd, iov, iovcnt); 715 } 716 717 /* Otherwise, do a big read into our pipe */ 718 rc = uring_sock_read(sock); 719 if (rc <= 0) { 720 return rc; 721 } 722 } 723 724 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 725 } 726 727 static ssize_t 728 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 729 { 730 struct iovec iov[1]; 731 732 iov[0].iov_base = buf; 733 iov[0].iov_len = len; 734 735 return uring_sock_readv(sock, iov, 1); 736 } 737 738 static ssize_t 739 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 740 { 741 struct spdk_uring_sock *sock = __uring_sock(_sock); 742 743 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 744 errno = EAGAIN; 745 return -1; 746 } 747 748 return writev(sock->fd, iov, iovcnt); 749 } 750 751 static int 752 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) 753 { 754 struct spdk_uring_sock *sock = __uring_sock(_sock); 755 struct spdk_sock_request *req; 756 int i, retval; 757 unsigned int offset; 758 size_t len; 759 760 if (sock->zcopy) { 761 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 762 * req->internal.offset, so sendmsg_idx should not be zero */ 763 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 764 sock->sendmsg_idx = 1; 765 } else { 766 sock->sendmsg_idx++; 767 } 768 } 769 770 /* Consume the requests that were actually written */ 771 req = TAILQ_FIRST(&_sock->queued_reqs); 772 while (req) { 773 offset = req->internal.offset; 774 775 for (i = 0; i < req->iovcnt; i++) { 776 /* Advance by the offset first */ 777 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 778 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 779 continue; 780 } 781 782 /* Calculate the remaining length of this element */ 783 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 784 785 if (len > (size_t)rc) { 786 /* This element was partially sent. */ 787 req->internal.offset += rc; 788 return 0; 789 } 790 791 offset = 0; 792 req->internal.offset += len; 793 rc -= len; 794 } 795 796 /* Handled a full request. */ 797 spdk_sock_request_pend(_sock, req); 798 799 if (!sock->zcopy) { 800 retval = spdk_sock_request_put(_sock, req, 0); 801 if (retval) { 802 return retval; 803 } 804 } else { 805 /* Re-use the offset field to hold the sendmsg call index. The 806 * index is 0 based, so subtract one here because we've already 807 * incremented above. */ 808 req->internal.offset = sock->sendmsg_idx - 1; 809 } 810 811 if (rc == 0) { 812 break; 813 } 814 815 req = TAILQ_FIRST(&_sock->queued_reqs); 816 } 817 818 return 0; 819 } 820 821 #ifdef SPDK_ZEROCOPY 822 static int 823 _sock_check_zcopy(struct spdk_sock *_sock, int status) 824 { 825 struct spdk_uring_sock *sock = __uring_sock(_sock); 826 ssize_t rc; 827 struct sock_extended_err *serr; 828 struct cmsghdr *cm; 829 uint32_t idx; 830 struct spdk_sock_request *req, *treq; 831 bool found; 832 833 assert(sock->zcopy == true); 834 if (spdk_unlikely(status) < 0) { 835 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 836 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 837 status); 838 } else { 839 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 840 } 841 return 0; 842 } 843 844 cm = CMSG_FIRSTHDR(&sock->recv_task.msg); 845 if (cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { 846 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 847 return 0; 848 } 849 850 serr = (struct sock_extended_err *)CMSG_DATA(cm); 851 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 852 SPDK_WARNLOG("Unexpected extended error origin\n"); 853 return 0; 854 } 855 856 /* Most of the time, the pending_reqs array is in the exact 857 * order we need such that all of the requests to complete are 858 * in order, in the front. It is guaranteed that all requests 859 * belonging to the same sendmsg call are sequential, so once 860 * we encounter one match we can stop looping as soon as a 861 * non-match is found. 862 */ 863 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 864 found = false; 865 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 866 if (req->internal.offset == idx) { 867 found = true; 868 rc = spdk_sock_request_put(_sock, req, 0); 869 if (rc < 0) { 870 return rc; 871 } 872 873 } else if (found) { 874 break; 875 } 876 } 877 } 878 879 return 0; 880 } 881 882 static void 883 _sock_prep_recv(struct spdk_sock *_sock) 884 { 885 struct spdk_uring_sock *sock = __uring_sock(_sock); 886 struct spdk_uring_task *task = &sock->recv_task; 887 struct io_uring_sqe *sqe; 888 889 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 890 return; 891 } 892 893 assert(sock->group != NULL); 894 sock->group->io_queued++; 895 896 sqe = io_uring_get_sqe(&sock->group->uring); 897 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 898 io_uring_sqe_set_data(sqe, task); 899 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 900 } 901 902 #endif 903 904 static void 905 _sock_flush(struct spdk_sock *_sock) 906 { 907 struct spdk_uring_sock *sock = __uring_sock(_sock); 908 struct spdk_uring_task *task = &sock->write_task; 909 uint32_t iovcnt; 910 struct io_uring_sqe *sqe; 911 int flags = MSG_DONTWAIT | sock->zcopy_send_flags; 912 913 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 914 return; 915 } 916 917 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); 918 if (!iovcnt) { 919 return; 920 } 921 922 task->iov_cnt = iovcnt; 923 assert(sock->group != NULL); 924 task->msg.msg_iov = task->iovs; 925 task->msg.msg_iovlen = task->iov_cnt; 926 927 sock->group->io_queued++; 928 929 sqe = io_uring_get_sqe(&sock->group->uring); 930 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 931 io_uring_sqe_set_data(sqe, task); 932 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 933 } 934 935 static void 936 _sock_prep_pollin(struct spdk_sock *_sock) 937 { 938 struct spdk_uring_sock *sock = __uring_sock(_sock); 939 struct spdk_uring_task *task = &sock->pollin_task; 940 struct io_uring_sqe *sqe; 941 942 /* Do not prepare pollin event */ 943 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) { 944 return; 945 } 946 947 assert(sock->group != NULL); 948 sock->group->io_queued++; 949 950 sqe = io_uring_get_sqe(&sock->group->uring); 951 io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR); 952 io_uring_sqe_set_data(sqe, task); 953 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 954 } 955 956 static void 957 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 958 { 959 struct spdk_uring_sock *sock = __uring_sock(_sock); 960 struct spdk_uring_task *task = &sock->cancel_task; 961 struct io_uring_sqe *sqe; 962 963 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 964 return; 965 } 966 967 assert(sock->group != NULL); 968 sock->group->io_queued++; 969 970 sqe = io_uring_get_sqe(&sock->group->uring); 971 io_uring_prep_cancel(sqe, user_data, 0); 972 io_uring_sqe_set_data(sqe, task); 973 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 974 } 975 976 static int 977 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 978 struct spdk_sock **socks) 979 { 980 int i, count, ret; 981 struct io_uring_cqe *cqe; 982 struct spdk_uring_sock *sock, *tmp; 983 struct spdk_uring_task *task; 984 int status; 985 986 for (i = 0; i < max; i++) { 987 ret = io_uring_peek_cqe(&group->uring, &cqe); 988 if (ret != 0) { 989 break; 990 } 991 992 if (cqe == NULL) { 993 break; 994 } 995 996 task = (struct spdk_uring_task *)cqe->user_data; 997 assert(task != NULL); 998 sock = task->sock; 999 assert(sock != NULL); 1000 assert(sock->group != NULL); 1001 assert(sock->group == group); 1002 sock->group->io_inflight--; 1003 sock->group->io_avail++; 1004 status = cqe->res; 1005 io_uring_cqe_seen(&group->uring, cqe); 1006 1007 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1008 1009 if (spdk_unlikely(status <= 0)) { 1010 if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) { 1011 continue; 1012 } 1013 } 1014 1015 switch (task->type) { 1016 case SPDK_SOCK_TASK_POLLIN: 1017 #ifdef SPDK_ZEROCOPY 1018 if ((status & POLLERR) == POLLERR) { 1019 _sock_prep_recv(&sock->base); 1020 } 1021 #endif 1022 if ((status & POLLIN) == POLLIN) { 1023 if (sock->base.cb_fn != NULL && 1024 sock->pending_recv == false) { 1025 sock->pending_recv = true; 1026 TAILQ_INSERT_HEAD(&group->pending_recv, sock, link); 1027 } 1028 } 1029 break; 1030 case SPDK_SOCK_TASK_WRITE: 1031 task->last_req = NULL; 1032 task->iov_cnt = 0; 1033 if (spdk_unlikely(status) < 0) { 1034 sock->connection_status = status; 1035 spdk_sock_abort_requests(&sock->base); 1036 } else { 1037 sock_complete_reqs(&sock->base, status); 1038 } 1039 1040 break; 1041 #ifdef SPDK_ZEROCOPY 1042 case SPDK_SOCK_TASK_RECV: 1043 if (spdk_unlikely(status == -ECANCELED)) { 1044 sock->connection_status = status; 1045 break; 1046 } 1047 _sock_check_zcopy(&sock->base, status); 1048 break; 1049 #endif 1050 case SPDK_SOCK_TASK_CANCEL: 1051 /* Do nothing */ 1052 break; 1053 default: 1054 SPDK_UNREACHABLE(); 1055 } 1056 } 1057 1058 if (!socks) { 1059 return 0; 1060 } 1061 count = 0; 1062 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1063 if (count == max_read_events) { 1064 break; 1065 } 1066 1067 if (spdk_unlikely(sock->base.cb_fn == NULL) || 1068 (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) { 1069 sock->pending_recv = false; 1070 TAILQ_REMOVE(&group->pending_recv, sock, link); 1071 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1072 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1073 continue; 1074 } 1075 } 1076 1077 socks[count++] = &sock->base; 1078 } 1079 1080 1081 /* Cycle the pending_recv list so that each time we poll things aren't 1082 * in the same order. Say we have 6 sockets in the list, named as follows: 1083 * A B C D E F 1084 * And all 6 sockets had the poll events, but max_events is only 3. That means 1085 * psock currently points at D. We want to rearrange the list to the following: 1086 * D E F A B C 1087 * 1088 * The variables below are named according to this example to make it easier to 1089 * follow the swaps. 1090 */ 1091 if (sock != NULL) { 1092 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1093 1094 /* Capture pointers to the elements we need */ 1095 ud = sock; 1096 1097 ua = TAILQ_FIRST(&group->pending_recv); 1098 if (ua == ud) { 1099 goto end; 1100 } 1101 1102 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1103 if (uf == ud) { 1104 TAILQ_REMOVE(&group->pending_recv, ud, link); 1105 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1106 goto end; 1107 } 1108 1109 uc = TAILQ_PREV(ud, pending_recv_list, link); 1110 assert(uc != NULL); 1111 1112 /* Break the link between C and D */ 1113 uc->link.tqe_next = NULL; 1114 1115 /* Connect F to A */ 1116 uf->link.tqe_next = ua; 1117 ua->link.tqe_prev = &uf->link.tqe_next; 1118 1119 /* Fix up the list first/last pointers */ 1120 group->pending_recv.tqh_first = ud; 1121 group->pending_recv.tqh_last = &uc->link.tqe_next; 1122 1123 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1124 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1125 } 1126 1127 end: 1128 return count; 1129 } 1130 1131 static int 1132 _sock_flush_client(struct spdk_sock *_sock) 1133 { 1134 struct spdk_uring_sock *sock = __uring_sock(_sock); 1135 struct msghdr msg = {}; 1136 struct iovec iovs[IOV_BATCH_SIZE]; 1137 int iovcnt; 1138 ssize_t rc; 1139 int flags = sock->zcopy_send_flags; 1140 1141 /* Can't flush from within a callback or we end up with recursive calls */ 1142 if (_sock->cb_cnt > 0) { 1143 return 0; 1144 } 1145 1146 /* Gather an iov */ 1147 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL); 1148 if (iovcnt == 0) { 1149 return 0; 1150 } 1151 1152 /* Perform the vectored write */ 1153 msg.msg_iov = iovs; 1154 msg.msg_iovlen = iovcnt; 1155 rc = sendmsg(sock->fd, &msg, flags); 1156 if (rc <= 0) { 1157 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1158 return 0; 1159 } 1160 return rc; 1161 } 1162 1163 sock_complete_reqs(_sock, rc); 1164 1165 #ifdef SPDK_ZEROCOPY 1166 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 1167 _sock_check_zcopy(_sock, 0); 1168 } 1169 #endif 1170 1171 return 0; 1172 } 1173 1174 static void 1175 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1176 { 1177 struct spdk_uring_sock *sock = __uring_sock(_sock); 1178 int rc; 1179 1180 if (spdk_unlikely(sock->connection_status)) { 1181 req->cb_fn(req->cb_arg, sock->connection_status); 1182 return; 1183 } 1184 1185 spdk_sock_request_queue(_sock, req); 1186 1187 if (!sock->group) { 1188 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1189 rc = _sock_flush_client(_sock); 1190 if (rc) { 1191 spdk_sock_abort_requests(_sock); 1192 } 1193 } 1194 } 1195 } 1196 1197 static int 1198 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1199 { 1200 struct spdk_uring_sock *sock = __uring_sock(_sock); 1201 int val; 1202 int rc; 1203 1204 assert(sock != NULL); 1205 1206 val = nbytes; 1207 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1208 if (rc != 0) { 1209 return -1; 1210 } 1211 return 0; 1212 } 1213 1214 static bool 1215 uring_sock_is_ipv6(struct spdk_sock *_sock) 1216 { 1217 struct spdk_uring_sock *sock = __uring_sock(_sock); 1218 struct sockaddr_storage sa; 1219 socklen_t salen; 1220 int rc; 1221 1222 assert(sock != NULL); 1223 1224 memset(&sa, 0, sizeof sa); 1225 salen = sizeof sa; 1226 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1227 if (rc != 0) { 1228 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1229 return false; 1230 } 1231 1232 return (sa.ss_family == AF_INET6); 1233 } 1234 1235 static bool 1236 uring_sock_is_ipv4(struct spdk_sock *_sock) 1237 { 1238 struct spdk_uring_sock *sock = __uring_sock(_sock); 1239 struct sockaddr_storage sa; 1240 socklen_t salen; 1241 int rc; 1242 1243 assert(sock != NULL); 1244 1245 memset(&sa, 0, sizeof sa); 1246 salen = sizeof sa; 1247 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1248 if (rc != 0) { 1249 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1250 return false; 1251 } 1252 1253 return (sa.ss_family == AF_INET); 1254 } 1255 1256 static bool 1257 uring_sock_is_connected(struct spdk_sock *_sock) 1258 { 1259 struct spdk_uring_sock *sock = __uring_sock(_sock); 1260 uint8_t byte; 1261 int rc; 1262 1263 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1264 if (rc == 0) { 1265 return false; 1266 } 1267 1268 if (rc < 0) { 1269 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1270 return true; 1271 } 1272 1273 return false; 1274 } 1275 1276 return true; 1277 } 1278 1279 static struct spdk_sock_group_impl * 1280 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock) 1281 { 1282 struct spdk_uring_sock *sock = __uring_sock(_sock); 1283 struct spdk_sock_group_impl *group; 1284 1285 if (sock->placement_id != -1) { 1286 spdk_sock_map_lookup(&g_map, sock->placement_id, &group); 1287 return group; 1288 } 1289 1290 return NULL; 1291 } 1292 1293 static struct spdk_sock_group_impl * 1294 uring_sock_group_impl_create(void) 1295 { 1296 struct spdk_uring_sock_group_impl *group_impl; 1297 1298 group_impl = calloc(1, sizeof(*group_impl)); 1299 if (group_impl == NULL) { 1300 SPDK_ERRLOG("group_impl allocation failed\n"); 1301 return NULL; 1302 } 1303 1304 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1305 1306 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1307 SPDK_ERRLOG("uring I/O context setup failure\n"); 1308 free(group_impl); 1309 return NULL; 1310 } 1311 1312 TAILQ_INIT(&group_impl->pending_recv); 1313 1314 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1315 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1316 } 1317 1318 return &group_impl->base; 1319 } 1320 1321 static int 1322 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1323 struct spdk_sock *_sock) 1324 { 1325 struct spdk_uring_sock *sock = __uring_sock(_sock); 1326 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1327 int rc; 1328 1329 sock->group = group; 1330 sock->write_task.sock = sock; 1331 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1332 1333 sock->pollin_task.sock = sock; 1334 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1335 1336 sock->recv_task.sock = sock; 1337 sock->recv_task.type = SPDK_SOCK_TASK_RECV; 1338 sock->recv_task.msg.msg_control = sock->buf; 1339 sock->recv_task.msg.msg_controllen = sizeof(sock->buf); 1340 1341 sock->cancel_task.sock = sock; 1342 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1343 1344 /* switched from another polling group due to scheduling */ 1345 if (spdk_unlikely(sock->recv_pipe != NULL && 1346 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1347 assert(sock->pending_recv == false); 1348 sock->pending_recv = true; 1349 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1350 } 1351 1352 if (sock->placement_id != -1) { 1353 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1354 if (rc != 0) { 1355 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1356 /* Do not treat this as an error. The system will continue running. */ 1357 } 1358 } 1359 1360 return 0; 1361 } 1362 1363 static int 1364 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1365 struct spdk_sock **socks) 1366 { 1367 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1368 int count, ret; 1369 int to_complete, to_submit; 1370 struct spdk_sock *_sock, *tmp; 1371 struct spdk_uring_sock *sock; 1372 1373 if (spdk_likely(socks)) { 1374 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1375 sock = __uring_sock(_sock); 1376 if (spdk_unlikely(sock->connection_status)) { 1377 continue; 1378 } 1379 _sock_flush(_sock); 1380 _sock_prep_pollin(_sock); 1381 } 1382 } 1383 1384 to_submit = group->io_queued; 1385 1386 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1387 if (to_submit > 0) { 1388 /* If there are I/O to submit, use io_uring_submit here. 1389 * It will automatically call io_uring_enter appropriately. */ 1390 ret = io_uring_submit(&group->uring); 1391 if (ret < 0) { 1392 return 1; 1393 } 1394 group->io_queued = 0; 1395 group->io_inflight += to_submit; 1396 group->io_avail -= to_submit; 1397 } 1398 1399 count = 0; 1400 to_complete = group->io_inflight; 1401 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1402 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1403 } 1404 1405 return count; 1406 } 1407 1408 static int 1409 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1410 struct spdk_sock *_sock) 1411 { 1412 struct spdk_uring_sock *sock = __uring_sock(_sock); 1413 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1414 1415 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1416 _sock_prep_cancel_task(_sock, &sock->write_task); 1417 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1418 * currently can use a while loop here. */ 1419 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1420 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1421 uring_sock_group_impl_poll(_group, 32, NULL); 1422 } 1423 } 1424 1425 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1426 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1427 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1428 * currently can use a while loop here. */ 1429 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1430 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1431 uring_sock_group_impl_poll(_group, 32, NULL); 1432 } 1433 } 1434 1435 if (sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1436 _sock_prep_cancel_task(_sock, &sock->recv_task); 1437 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1438 * currently can use a while loop here. */ 1439 while ((sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1440 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1441 uring_sock_group_impl_poll(_group, 32, NULL); 1442 } 1443 } 1444 1445 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1446 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1447 assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1448 assert(sock->recv_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1449 1450 if (sock->pending_recv) { 1451 TAILQ_REMOVE(&group->pending_recv, sock, link); 1452 sock->pending_recv = false; 1453 } 1454 assert(sock->pending_recv == false); 1455 1456 if (sock->placement_id != -1) { 1457 spdk_sock_map_release(&g_map, sock->placement_id); 1458 } 1459 1460 sock->group = NULL; 1461 return 0; 1462 } 1463 1464 static int 1465 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1466 { 1467 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1468 1469 /* try to reap all the active I/O */ 1470 while (group->io_inflight) { 1471 uring_sock_group_impl_poll(_group, 32, NULL); 1472 } 1473 assert(group->io_inflight == 0); 1474 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1475 1476 io_uring_queue_exit(&group->uring); 1477 1478 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1479 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1480 } 1481 1482 free(group); 1483 return 0; 1484 } 1485 1486 static int 1487 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1488 { 1489 if (!opts || !len) { 1490 errno = EINVAL; 1491 return -1; 1492 } 1493 memset(opts, 0, *len); 1494 1495 #define FIELD_OK(field) \ 1496 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1497 1498 #define GET_FIELD(field) \ 1499 if (FIELD_OK(field)) { \ 1500 opts->field = g_spdk_uring_sock_impl_opts.field; \ 1501 } 1502 1503 GET_FIELD(recv_buf_size); 1504 GET_FIELD(send_buf_size); 1505 GET_FIELD(enable_recv_pipe); 1506 GET_FIELD(enable_quickack); 1507 GET_FIELD(enable_placement_id); 1508 GET_FIELD(enable_zerocopy_send_server); 1509 GET_FIELD(enable_zerocopy_send_client); 1510 1511 #undef GET_FIELD 1512 #undef FIELD_OK 1513 1514 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 1515 return 0; 1516 } 1517 1518 static int 1519 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1520 { 1521 if (!opts) { 1522 errno = EINVAL; 1523 return -1; 1524 } 1525 1526 #define FIELD_OK(field) \ 1527 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1528 1529 #define SET_FIELD(field) \ 1530 if (FIELD_OK(field)) { \ 1531 g_spdk_uring_sock_impl_opts.field = opts->field; \ 1532 } 1533 1534 SET_FIELD(recv_buf_size); 1535 SET_FIELD(send_buf_size); 1536 SET_FIELD(enable_recv_pipe); 1537 SET_FIELD(enable_quickack); 1538 SET_FIELD(enable_placement_id); 1539 SET_FIELD(enable_zerocopy_send_server); 1540 SET_FIELD(enable_zerocopy_send_client); 1541 1542 #undef SET_FIELD 1543 #undef FIELD_OK 1544 1545 return 0; 1546 } 1547 1548 static int 1549 uring_sock_flush(struct spdk_sock *_sock) 1550 { 1551 struct spdk_uring_sock *sock = __uring_sock(_sock); 1552 1553 if (!sock->group) { 1554 return _sock_flush_client(_sock); 1555 } 1556 1557 return 0; 1558 } 1559 1560 static struct spdk_net_impl g_uring_net_impl = { 1561 .name = "uring", 1562 .getaddr = uring_sock_getaddr, 1563 .connect = uring_sock_connect, 1564 .listen = uring_sock_listen, 1565 .accept = uring_sock_accept, 1566 .close = uring_sock_close, 1567 .recv = uring_sock_recv, 1568 .readv = uring_sock_readv, 1569 .writev = uring_sock_writev, 1570 .writev_async = uring_sock_writev_async, 1571 .flush = uring_sock_flush, 1572 .set_recvlowat = uring_sock_set_recvlowat, 1573 .set_recvbuf = uring_sock_set_recvbuf, 1574 .set_sendbuf = uring_sock_set_sendbuf, 1575 .is_ipv6 = uring_sock_is_ipv6, 1576 .is_ipv4 = uring_sock_is_ipv4, 1577 .is_connected = uring_sock_is_connected, 1578 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1579 .group_impl_create = uring_sock_group_impl_create, 1580 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1581 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1582 .group_impl_poll = uring_sock_group_impl_poll, 1583 .group_impl_close = uring_sock_group_impl_close, 1584 .get_opts = uring_sock_impl_get_opts, 1585 .set_opts = uring_sock_impl_set_opts, 1586 }; 1587 1588 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1589