1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 21 #include "spdk_internal/sock.h" 22 #include "spdk_internal/assert.h" 23 #include "../sock_kernel.h" 24 25 #define MAX_TMPBUF 1024 26 #define PORTNUMLEN 32 27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 29 30 enum spdk_sock_task_type { 31 SPDK_SOCK_TASK_POLLIN = 0, 32 SPDK_SOCK_TASK_ERRQUEUE, 33 SPDK_SOCK_TASK_WRITE, 34 SPDK_SOCK_TASK_CANCEL, 35 }; 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 enum spdk_uring_sock_task_status { 42 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 43 SPDK_URING_SOCK_TASK_IN_PROCESS, 44 }; 45 46 struct spdk_uring_task { 47 enum spdk_uring_sock_task_status status; 48 enum spdk_sock_task_type type; 49 struct spdk_uring_sock *sock; 50 struct msghdr msg; 51 struct iovec iovs[IOV_BATCH_SIZE]; 52 int iov_cnt; 53 struct spdk_sock_request *last_req; 54 bool is_zcopy; 55 STAILQ_ENTRY(spdk_uring_task) link; 56 }; 57 58 struct spdk_uring_sock { 59 struct spdk_sock base; 60 int fd; 61 uint32_t sendmsg_idx; 62 struct spdk_uring_sock_group_impl *group; 63 struct spdk_uring_task write_task; 64 struct spdk_uring_task errqueue_task; 65 struct spdk_uring_task pollin_task; 66 struct spdk_uring_task cancel_task; 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool zcopy; 71 bool pending_recv; 72 int zcopy_send_flags; 73 int connection_status; 74 int placement_id; 75 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 76 TAILQ_ENTRY(spdk_uring_sock) link; 77 }; 78 79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 80 81 struct spdk_uring_sock_group_impl { 82 struct spdk_sock_group_impl base; 83 struct io_uring uring; 84 uint32_t io_inflight; 85 uint32_t io_queued; 86 uint32_t io_avail; 87 struct pending_recv_list pending_recv; 88 }; 89 90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 91 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 92 .send_buf_size = MIN_SO_SNDBUF_SIZE, 93 .enable_recv_pipe = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = false, 97 .enable_zerocopy_send_client = false, 98 .zerocopy_threshold = 0, 99 .tls_version = 0, 100 .enable_ktls = false, 101 .psk_key = NULL, 102 .psk_identity = NULL 103 }; 104 105 static struct spdk_sock_map g_map = { 106 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 107 .mtx = PTHREAD_MUTEX_INITIALIZER 108 }; 109 110 __attribute((destructor)) static void 111 uring_sock_map_cleanup(void) 112 { 113 spdk_sock_map_cleanup(&g_map); 114 } 115 116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 117 118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 120 121 static void 122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 123 size_t len) 124 { 125 #define FIELD_OK(field) \ 126 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 127 128 #define SET_FIELD(field) \ 129 if (FIELD_OK(field)) { \ 130 dest->field = src->field; \ 131 } 132 133 SET_FIELD(recv_buf_size); 134 SET_FIELD(send_buf_size); 135 SET_FIELD(enable_recv_pipe); 136 SET_FIELD(enable_quickack); 137 SET_FIELD(enable_placement_id); 138 SET_FIELD(enable_zerocopy_send_server); 139 SET_FIELD(enable_zerocopy_send_client); 140 SET_FIELD(zerocopy_threshold); 141 SET_FIELD(tls_version); 142 SET_FIELD(enable_ktls); 143 SET_FIELD(psk_key); 144 SET_FIELD(psk_identity); 145 146 #undef SET_FIELD 147 #undef FIELD_OK 148 } 149 150 static int 151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 152 { 153 if (!opts || !len) { 154 errno = EINVAL; 155 return -1; 156 } 157 158 assert(sizeof(*opts) >= *len); 159 memset(opts, 0, *len); 160 161 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 162 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 163 164 return 0; 165 } 166 167 static int 168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 169 { 170 if (!opts) { 171 errno = EINVAL; 172 return -1; 173 } 174 175 assert(sizeof(*opts) >= len); 176 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 177 178 return 0; 179 } 180 181 static void 182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 183 { 184 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 185 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 186 187 if (opts->impl_opts != NULL) { 188 assert(sizeof(*dest) >= opts->impl_opts_size); 189 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 190 } 191 } 192 193 static int 194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 195 char *caddr, int clen, uint16_t *cport) 196 { 197 struct spdk_uring_sock *sock = __uring_sock(_sock); 198 struct sockaddr_storage sa; 199 socklen_t salen; 200 int rc; 201 202 assert(sock != NULL); 203 204 memset(&sa, 0, sizeof sa); 205 salen = sizeof sa; 206 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 207 if (rc != 0) { 208 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 209 return -1; 210 } 211 212 switch (sa.ss_family) { 213 case AF_UNIX: 214 /* Acceptable connection types that don't have IPs */ 215 return 0; 216 case AF_INET: 217 case AF_INET6: 218 /* Code below will get IP addresses */ 219 break; 220 default: 221 /* Unsupported socket family */ 222 return -1; 223 } 224 225 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 226 if (rc != 0) { 227 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 228 return -1; 229 } 230 231 if (sport) { 232 if (sa.ss_family == AF_INET) { 233 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 234 } else if (sa.ss_family == AF_INET6) { 235 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 236 } 237 } 238 239 memset(&sa, 0, sizeof sa); 240 salen = sizeof sa; 241 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 242 if (rc != 0) { 243 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 244 return -1; 245 } 246 247 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 248 if (rc != 0) { 249 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 250 return -1; 251 } 252 253 if (cport) { 254 if (sa.ss_family == AF_INET) { 255 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 256 } else if (sa.ss_family == AF_INET6) { 257 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 258 } 259 } 260 261 return 0; 262 } 263 264 enum uring_sock_create_type { 265 SPDK_SOCK_CREATE_LISTEN, 266 SPDK_SOCK_CREATE_CONNECT, 267 }; 268 269 static int 270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 271 { 272 uint8_t *new_buf; 273 struct spdk_pipe *new_pipe; 274 struct iovec siov[2]; 275 struct iovec diov[2]; 276 int sbytes; 277 ssize_t bytes; 278 279 if (sock->recv_buf_sz == sz) { 280 return 0; 281 } 282 283 /* If the new size is 0, just free the pipe */ 284 if (sz == 0) { 285 spdk_pipe_destroy(sock->recv_pipe); 286 free(sock->recv_buf); 287 sock->recv_pipe = NULL; 288 sock->recv_buf = NULL; 289 return 0; 290 } else if (sz < MIN_SOCK_PIPE_SIZE) { 291 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 292 return -1; 293 } 294 295 /* Round up to next 64 byte multiple */ 296 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 297 if (!new_buf) { 298 SPDK_ERRLOG("socket recv buf allocation failed\n"); 299 return -ENOMEM; 300 } 301 302 new_pipe = spdk_pipe_create(new_buf, sz + 1); 303 if (new_pipe == NULL) { 304 SPDK_ERRLOG("socket pipe allocation failed\n"); 305 free(new_buf); 306 return -ENOMEM; 307 } 308 309 if (sock->recv_pipe != NULL) { 310 /* Pull all of the data out of the old pipe */ 311 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 312 if (sbytes > sz) { 313 /* Too much data to fit into the new pipe size */ 314 spdk_pipe_destroy(new_pipe); 315 free(new_buf); 316 return -EINVAL; 317 } 318 319 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 320 assert(sbytes == sz); 321 322 bytes = spdk_iovcpy(siov, 2, diov, 2); 323 spdk_pipe_writer_advance(new_pipe, bytes); 324 325 spdk_pipe_destroy(sock->recv_pipe); 326 free(sock->recv_buf); 327 } 328 329 sock->recv_buf_sz = sz; 330 sock->recv_buf = new_buf; 331 sock->recv_pipe = new_pipe; 332 333 return 0; 334 } 335 336 static int 337 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 338 { 339 struct spdk_uring_sock *sock = __uring_sock(_sock); 340 int rc; 341 342 assert(sock != NULL); 343 344 if (_sock->impl_opts.enable_recv_pipe) { 345 rc = uring_sock_alloc_pipe(sock, sz); 346 if (rc) { 347 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 348 return rc; 349 } 350 } 351 352 if (sz < MIN_SO_RCVBUF_SIZE) { 353 sz = MIN_SO_RCVBUF_SIZE; 354 } 355 356 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 357 if (rc < 0) { 358 return rc; 359 } 360 361 _sock->impl_opts.recv_buf_size = sz; 362 363 return 0; 364 } 365 366 static int 367 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 368 { 369 struct spdk_uring_sock *sock = __uring_sock(_sock); 370 int rc; 371 372 assert(sock != NULL); 373 374 if (sz < MIN_SO_SNDBUF_SIZE) { 375 sz = MIN_SO_SNDBUF_SIZE; 376 } 377 378 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 379 if (rc < 0) { 380 return rc; 381 } 382 383 _sock->impl_opts.send_buf_size = sz; 384 385 return 0; 386 } 387 388 static struct spdk_uring_sock * 389 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 390 { 391 struct spdk_uring_sock *sock; 392 #if defined(__linux__) 393 int flag; 394 int rc; 395 #endif 396 397 sock = calloc(1, sizeof(*sock)); 398 if (sock == NULL) { 399 SPDK_ERRLOG("sock allocation failed\n"); 400 return NULL; 401 } 402 403 sock->fd = fd; 404 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 405 406 #if defined(__linux__) 407 flag = 1; 408 409 if (sock->base.impl_opts.enable_quickack) { 410 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 411 if (rc != 0) { 412 SPDK_ERRLOG("quickack was failed to set\n"); 413 } 414 } 415 416 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 417 &sock->placement_id); 418 #ifdef SPDK_ZEROCOPY 419 /* Try to turn on zero copy sends */ 420 flag = 1; 421 422 if (enable_zero_copy) { 423 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 424 if (rc == 0) { 425 sock->zcopy = true; 426 sock->zcopy_send_flags = MSG_ZEROCOPY; 427 } 428 } 429 #endif 430 #endif 431 432 return sock; 433 } 434 435 static struct spdk_sock * 436 uring_sock_create(const char *ip, int port, 437 enum uring_sock_create_type type, 438 struct spdk_sock_opts *opts) 439 { 440 struct spdk_uring_sock *sock; 441 struct spdk_sock_impl_opts impl_opts; 442 char buf[MAX_TMPBUF]; 443 char portnum[PORTNUMLEN]; 444 char *p; 445 struct addrinfo hints, *res, *res0; 446 int fd, flag; 447 int val = 1; 448 int rc; 449 bool enable_zcopy_impl_opts = false; 450 bool enable_zcopy_user_opts = true; 451 452 assert(opts != NULL); 453 uring_opts_get_impl_opts(opts, &impl_opts); 454 455 if (ip == NULL) { 456 return NULL; 457 } 458 if (ip[0] == '[') { 459 snprintf(buf, sizeof(buf), "%s", ip + 1); 460 p = strchr(buf, ']'); 461 if (p != NULL) { 462 *p = '\0'; 463 } 464 ip = (const char *) &buf[0]; 465 } 466 467 snprintf(portnum, sizeof portnum, "%d", port); 468 memset(&hints, 0, sizeof hints); 469 hints.ai_family = PF_UNSPEC; 470 hints.ai_socktype = SOCK_STREAM; 471 hints.ai_flags = AI_NUMERICSERV; 472 hints.ai_flags |= AI_PASSIVE; 473 hints.ai_flags |= AI_NUMERICHOST; 474 rc = getaddrinfo(ip, portnum, &hints, &res0); 475 if (rc != 0) { 476 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 477 return NULL; 478 } 479 480 /* try listen */ 481 fd = -1; 482 for (res = res0; res != NULL; res = res->ai_next) { 483 retry: 484 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 485 if (fd < 0) { 486 /* error */ 487 continue; 488 } 489 490 val = impl_opts.recv_buf_size; 491 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 492 if (rc) { 493 /* Not fatal */ 494 } 495 496 val = impl_opts.send_buf_size; 497 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 498 if (rc) { 499 /* Not fatal */ 500 } 501 502 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 503 if (rc != 0) { 504 close(fd); 505 fd = -1; 506 /* error */ 507 continue; 508 } 509 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 510 if (rc != 0) { 511 close(fd); 512 fd = -1; 513 /* error */ 514 continue; 515 } 516 517 if (opts->ack_timeout) { 518 #if defined(__linux__) 519 val = opts->ack_timeout; 520 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 521 if (rc != 0) { 522 close(fd); 523 fd = -1; 524 /* error */ 525 continue; 526 } 527 #else 528 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 529 #endif 530 } 531 532 533 534 #if defined(SO_PRIORITY) 535 if (opts != NULL && opts->priority) { 536 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 537 if (rc != 0) { 538 close(fd); 539 fd = -1; 540 /* error */ 541 continue; 542 } 543 } 544 #endif 545 if (res->ai_family == AF_INET6) { 546 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 547 if (rc != 0) { 548 close(fd); 549 fd = -1; 550 /* error */ 551 continue; 552 } 553 } 554 555 if (type == SPDK_SOCK_CREATE_LISTEN) { 556 rc = bind(fd, res->ai_addr, res->ai_addrlen); 557 if (rc != 0) { 558 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 559 switch (errno) { 560 case EINTR: 561 /* interrupted? */ 562 close(fd); 563 goto retry; 564 case EADDRNOTAVAIL: 565 SPDK_ERRLOG("IP address %s not available. " 566 "Verify IP address in config file " 567 "and make sure setup script is " 568 "run before starting spdk app.\n", ip); 569 /* FALLTHROUGH */ 570 default: 571 /* try next family */ 572 close(fd); 573 fd = -1; 574 continue; 575 } 576 } 577 /* bind OK */ 578 rc = listen(fd, 512); 579 if (rc != 0) { 580 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 581 close(fd); 582 fd = -1; 583 break; 584 } 585 586 flag = fcntl(fd, F_GETFL); 587 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 588 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 589 close(fd); 590 fd = -1; 591 break; 592 } 593 594 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 595 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 596 rc = connect(fd, res->ai_addr, res->ai_addrlen); 597 if (rc != 0) { 598 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 599 /* try next family */ 600 close(fd); 601 fd = -1; 602 continue; 603 } 604 605 flag = fcntl(fd, F_GETFL); 606 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 607 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 608 close(fd); 609 fd = -1; 610 break; 611 } 612 613 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 614 } 615 break; 616 } 617 freeaddrinfo(res0); 618 619 if (fd < 0) { 620 return NULL; 621 } 622 623 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 624 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 625 if (sock == NULL) { 626 SPDK_ERRLOG("sock allocation failed\n"); 627 close(fd); 628 return NULL; 629 } 630 631 return &sock->base; 632 } 633 634 static struct spdk_sock * 635 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 636 { 637 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 638 } 639 640 static struct spdk_sock * 641 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 642 { 643 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 644 } 645 646 static struct spdk_sock * 647 uring_sock_accept(struct spdk_sock *_sock) 648 { 649 struct spdk_uring_sock *sock = __uring_sock(_sock); 650 struct sockaddr_storage sa; 651 socklen_t salen; 652 int rc, fd; 653 struct spdk_uring_sock *new_sock; 654 int flag; 655 656 memset(&sa, 0, sizeof(sa)); 657 salen = sizeof(sa); 658 659 assert(sock != NULL); 660 661 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 662 663 if (rc == -1) { 664 return NULL; 665 } 666 667 fd = rc; 668 669 flag = fcntl(fd, F_GETFL); 670 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 671 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 672 close(fd); 673 return NULL; 674 } 675 676 #if defined(SO_PRIORITY) 677 /* The priority is not inherited, so call this function again */ 678 if (sock->base.opts.priority) { 679 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 680 if (rc != 0) { 681 close(fd); 682 return NULL; 683 } 684 } 685 #endif 686 687 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 688 if (new_sock == NULL) { 689 close(fd); 690 return NULL; 691 } 692 693 return &new_sock->base; 694 } 695 696 static int 697 uring_sock_close(struct spdk_sock *_sock) 698 { 699 struct spdk_uring_sock *sock = __uring_sock(_sock); 700 701 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 702 assert(sock->group == NULL); 703 704 /* If the socket fails to close, the best choice is to 705 * leak the fd but continue to free the rest of the sock 706 * memory. */ 707 close(sock->fd); 708 709 spdk_pipe_destroy(sock->recv_pipe); 710 free(sock->recv_buf); 711 free(sock); 712 713 return 0; 714 } 715 716 static ssize_t 717 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 718 { 719 struct iovec siov[2]; 720 int sbytes; 721 ssize_t bytes; 722 struct spdk_uring_sock_group_impl *group; 723 724 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 725 if (sbytes < 0) { 726 errno = EINVAL; 727 return -1; 728 } else if (sbytes == 0) { 729 errno = EAGAIN; 730 return -1; 731 } 732 733 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 734 735 if (bytes == 0) { 736 /* The only way this happens is if diov is 0 length */ 737 errno = EINVAL; 738 return -1; 739 } 740 741 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 742 743 /* If we drained the pipe, take it off the level-triggered list */ 744 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 745 group = __uring_group_impl(sock->base.group_impl); 746 TAILQ_REMOVE(&group->pending_recv, sock, link); 747 sock->pending_recv = false; 748 } 749 750 return bytes; 751 } 752 753 static inline ssize_t 754 sock_readv(int fd, struct iovec *iov, int iovcnt) 755 { 756 struct msghdr msg = { 757 .msg_iov = iov, 758 .msg_iovlen = iovcnt, 759 }; 760 761 return recvmsg(fd, &msg, MSG_DONTWAIT); 762 } 763 764 static inline ssize_t 765 uring_sock_read(struct spdk_uring_sock *sock) 766 { 767 struct iovec iov[2]; 768 int bytes; 769 struct spdk_uring_sock_group_impl *group; 770 771 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 772 773 if (bytes > 0) { 774 bytes = sock_readv(sock->fd, iov, 2); 775 if (bytes > 0) { 776 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 777 if (sock->base.group_impl) { 778 group = __uring_group_impl(sock->base.group_impl); 779 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 780 sock->pending_recv = true; 781 } 782 } 783 } 784 785 return bytes; 786 } 787 788 static ssize_t 789 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 790 { 791 struct spdk_uring_sock *sock = __uring_sock(_sock); 792 int rc, i; 793 size_t len; 794 795 if (sock->recv_pipe == NULL) { 796 return sock_readv(sock->fd, iov, iovcnt); 797 } 798 799 len = 0; 800 for (i = 0; i < iovcnt; i++) { 801 len += iov[i].iov_len; 802 } 803 804 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 805 /* If the user is receiving a sufficiently large amount of data, 806 * receive directly to their buffers. */ 807 if (len >= MIN_SOCK_PIPE_SIZE) { 808 return sock_readv(sock->fd, iov, iovcnt); 809 } 810 811 /* Otherwise, do a big read into our pipe */ 812 rc = uring_sock_read(sock); 813 if (rc <= 0) { 814 return rc; 815 } 816 } 817 818 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 819 } 820 821 static ssize_t 822 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 823 { 824 struct iovec iov[1]; 825 826 iov[0].iov_base = buf; 827 iov[0].iov_len = len; 828 829 return uring_sock_readv(sock, iov, 1); 830 } 831 832 static ssize_t 833 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 834 { 835 struct spdk_uring_sock *sock = __uring_sock(_sock); 836 struct msghdr msg = { 837 .msg_iov = iov, 838 .msg_iovlen = iovcnt, 839 }; 840 841 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 842 errno = EAGAIN; 843 return -1; 844 } 845 846 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 847 } 848 849 static ssize_t 850 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 851 { 852 unsigned int offset; 853 size_t len; 854 int i; 855 856 offset = req->internal.offset; 857 for (i = 0; i < req->iovcnt; i++) { 858 /* Advance by the offset first */ 859 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 860 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 861 continue; 862 } 863 864 /* Calculate the remaining length of this element */ 865 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 866 867 if (len > (size_t)rc) { 868 req->internal.offset += rc; 869 return -1; 870 } 871 872 offset = 0; 873 req->internal.offset += len; 874 rc -= len; 875 } 876 877 return rc; 878 } 879 880 static int 881 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 882 { 883 struct spdk_uring_sock *sock = __uring_sock(_sock); 884 struct spdk_sock_request *req; 885 int retval; 886 887 if (is_zcopy) { 888 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 889 * req->internal.offset, so sendmsg_idx should not be zero */ 890 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 891 sock->sendmsg_idx = 1; 892 } else { 893 sock->sendmsg_idx++; 894 } 895 } 896 897 /* Consume the requests that were actually written */ 898 req = TAILQ_FIRST(&_sock->queued_reqs); 899 while (req) { 900 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 901 req->internal.is_zcopy = is_zcopy; 902 903 rc = sock_request_advance_offset(req, rc); 904 if (rc < 0) { 905 /* This element was partially sent. */ 906 return 0; 907 } 908 909 /* Handled a full request. */ 910 spdk_sock_request_pend(_sock, req); 911 912 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 913 retval = spdk_sock_request_put(_sock, req, 0); 914 if (retval) { 915 return retval; 916 } 917 } else { 918 /* Re-use the offset field to hold the sendmsg call index. The 919 * index is 0 based, so subtract one here because we've already 920 * incremented above. */ 921 req->internal.offset = sock->sendmsg_idx - 1; 922 } 923 924 if (rc == 0) { 925 break; 926 } 927 928 req = TAILQ_FIRST(&_sock->queued_reqs); 929 } 930 931 return 0; 932 } 933 934 #ifdef SPDK_ZEROCOPY 935 static int 936 _sock_check_zcopy(struct spdk_sock *_sock, int status) 937 { 938 struct spdk_uring_sock *sock = __uring_sock(_sock); 939 ssize_t rc; 940 struct sock_extended_err *serr; 941 struct cmsghdr *cm; 942 uint32_t idx; 943 struct spdk_sock_request *req, *treq; 944 bool found; 945 946 assert(sock->zcopy == true); 947 if (spdk_unlikely(status) < 0) { 948 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 949 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 950 status); 951 } else { 952 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 953 } 954 return 0; 955 } 956 957 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 958 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 959 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 960 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 961 return 0; 962 } 963 964 serr = (struct sock_extended_err *)CMSG_DATA(cm); 965 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 966 SPDK_WARNLOG("Unexpected extended error origin\n"); 967 return 0; 968 } 969 970 /* Most of the time, the pending_reqs array is in the exact 971 * order we need such that all of the requests to complete are 972 * in order, in the front. It is guaranteed that all requests 973 * belonging to the same sendmsg call are sequential, so once 974 * we encounter one match we can stop looping as soon as a 975 * non-match is found. 976 */ 977 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 978 found = false; 979 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 980 if (!req->internal.is_zcopy) { 981 /* This wasn't a zcopy request. It was just waiting in line to complete */ 982 rc = spdk_sock_request_put(_sock, req, 0); 983 if (rc < 0) { 984 return rc; 985 } 986 } else if (req->internal.offset == idx) { 987 found = true; 988 rc = spdk_sock_request_put(_sock, req, 0); 989 if (rc < 0) { 990 return rc; 991 } 992 } else if (found) { 993 break; 994 } 995 } 996 } 997 998 return 0; 999 } 1000 1001 static void 1002 _sock_prep_errqueue(struct spdk_sock *_sock) 1003 { 1004 struct spdk_uring_sock *sock = __uring_sock(_sock); 1005 struct spdk_uring_task *task = &sock->errqueue_task; 1006 struct io_uring_sqe *sqe; 1007 1008 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1009 return; 1010 } 1011 1012 assert(sock->group != NULL); 1013 sock->group->io_queued++; 1014 1015 sqe = io_uring_get_sqe(&sock->group->uring); 1016 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1017 io_uring_sqe_set_data(sqe, task); 1018 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1019 } 1020 1021 #endif 1022 1023 static void 1024 _sock_flush(struct spdk_sock *_sock) 1025 { 1026 struct spdk_uring_sock *sock = __uring_sock(_sock); 1027 struct spdk_uring_task *task = &sock->write_task; 1028 uint32_t iovcnt; 1029 struct io_uring_sqe *sqe; 1030 int flags; 1031 1032 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1033 return; 1034 } 1035 1036 #ifdef SPDK_ZEROCOPY 1037 if (sock->zcopy) { 1038 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1039 } else 1040 #endif 1041 { 1042 flags = MSG_DONTWAIT; 1043 } 1044 1045 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1046 if (!iovcnt) { 1047 return; 1048 } 1049 1050 task->iov_cnt = iovcnt; 1051 assert(sock->group != NULL); 1052 task->msg.msg_iov = task->iovs; 1053 task->msg.msg_iovlen = task->iov_cnt; 1054 #ifdef SPDK_ZEROCOPY 1055 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1056 #endif 1057 sock->group->io_queued++; 1058 1059 sqe = io_uring_get_sqe(&sock->group->uring); 1060 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1061 io_uring_sqe_set_data(sqe, task); 1062 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1063 } 1064 1065 static void 1066 _sock_prep_pollin(struct spdk_sock *_sock) 1067 { 1068 struct spdk_uring_sock *sock = __uring_sock(_sock); 1069 struct spdk_uring_task *task = &sock->pollin_task; 1070 struct io_uring_sqe *sqe; 1071 1072 /* Do not prepare pollin event */ 1073 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) { 1074 return; 1075 } 1076 1077 assert(sock->group != NULL); 1078 sock->group->io_queued++; 1079 1080 sqe = io_uring_get_sqe(&sock->group->uring); 1081 io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR); 1082 io_uring_sqe_set_data(sqe, task); 1083 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1084 } 1085 1086 static void 1087 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1088 { 1089 struct spdk_uring_sock *sock = __uring_sock(_sock); 1090 struct spdk_uring_task *task = &sock->cancel_task; 1091 struct io_uring_sqe *sqe; 1092 1093 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1094 return; 1095 } 1096 1097 assert(sock->group != NULL); 1098 sock->group->io_queued++; 1099 1100 sqe = io_uring_get_sqe(&sock->group->uring); 1101 io_uring_prep_cancel(sqe, user_data, 0); 1102 io_uring_sqe_set_data(sqe, task); 1103 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1104 } 1105 1106 static int 1107 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1108 struct spdk_sock **socks) 1109 { 1110 int i, count, ret; 1111 struct io_uring_cqe *cqe; 1112 struct spdk_uring_sock *sock, *tmp; 1113 struct spdk_uring_task *task; 1114 int status; 1115 1116 for (i = 0; i < max; i++) { 1117 ret = io_uring_peek_cqe(&group->uring, &cqe); 1118 if (ret != 0) { 1119 break; 1120 } 1121 1122 if (cqe == NULL) { 1123 break; 1124 } 1125 1126 task = (struct spdk_uring_task *)cqe->user_data; 1127 assert(task != NULL); 1128 sock = task->sock; 1129 assert(sock != NULL); 1130 assert(sock->group != NULL); 1131 assert(sock->group == group); 1132 sock->group->io_inflight--; 1133 sock->group->io_avail++; 1134 status = cqe->res; 1135 io_uring_cqe_seen(&group->uring, cqe); 1136 1137 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1138 1139 if (spdk_unlikely(status <= 0)) { 1140 if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) { 1141 continue; 1142 } 1143 } 1144 1145 switch (task->type) { 1146 case SPDK_SOCK_TASK_POLLIN: 1147 #ifdef SPDK_ZEROCOPY 1148 if ((status & POLLERR) == POLLERR) { 1149 _sock_prep_errqueue(&sock->base); 1150 } 1151 #endif 1152 if ((status & POLLIN) == POLLIN) { 1153 if (sock->base.cb_fn != NULL && 1154 sock->pending_recv == false) { 1155 sock->pending_recv = true; 1156 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1157 } 1158 } 1159 break; 1160 case SPDK_SOCK_TASK_WRITE: 1161 task->last_req = NULL; 1162 task->iov_cnt = 0; 1163 task->is_zcopy = false; 1164 if (spdk_unlikely(status) < 0) { 1165 sock->connection_status = status; 1166 spdk_sock_abort_requests(&sock->base); 1167 } else { 1168 sock_complete_write_reqs(&sock->base, status, task->is_zcopy); 1169 } 1170 1171 break; 1172 #ifdef SPDK_ZEROCOPY 1173 case SPDK_SOCK_TASK_ERRQUEUE: 1174 if (spdk_unlikely(status == -ECANCELED)) { 1175 sock->connection_status = status; 1176 break; 1177 } 1178 _sock_check_zcopy(&sock->base, status); 1179 break; 1180 #endif 1181 case SPDK_SOCK_TASK_CANCEL: 1182 /* Do nothing */ 1183 break; 1184 default: 1185 SPDK_UNREACHABLE(); 1186 } 1187 } 1188 1189 if (!socks) { 1190 return 0; 1191 } 1192 count = 0; 1193 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1194 if (count == max_read_events) { 1195 break; 1196 } 1197 1198 if (spdk_unlikely(sock->base.cb_fn == NULL) || 1199 (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) { 1200 sock->pending_recv = false; 1201 TAILQ_REMOVE(&group->pending_recv, sock, link); 1202 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1203 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1204 continue; 1205 } 1206 } 1207 1208 socks[count++] = &sock->base; 1209 } 1210 1211 1212 /* Cycle the pending_recv list so that each time we poll things aren't 1213 * in the same order. Say we have 6 sockets in the list, named as follows: 1214 * A B C D E F 1215 * And all 6 sockets had the poll events, but max_events is only 3. That means 1216 * psock currently points at D. We want to rearrange the list to the following: 1217 * D E F A B C 1218 * 1219 * The variables below are named according to this example to make it easier to 1220 * follow the swaps. 1221 */ 1222 if (sock != NULL) { 1223 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1224 1225 /* Capture pointers to the elements we need */ 1226 ud = sock; 1227 1228 ua = TAILQ_FIRST(&group->pending_recv); 1229 if (ua == ud) { 1230 goto end; 1231 } 1232 1233 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1234 if (uf == ud) { 1235 TAILQ_REMOVE(&group->pending_recv, ud, link); 1236 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1237 goto end; 1238 } 1239 1240 uc = TAILQ_PREV(ud, pending_recv_list, link); 1241 assert(uc != NULL); 1242 1243 /* Break the link between C and D */ 1244 uc->link.tqe_next = NULL; 1245 1246 /* Connect F to A */ 1247 uf->link.tqe_next = ua; 1248 ua->link.tqe_prev = &uf->link.tqe_next; 1249 1250 /* Fix up the list first/last pointers */ 1251 group->pending_recv.tqh_first = ud; 1252 group->pending_recv.tqh_last = &uc->link.tqe_next; 1253 1254 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1255 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1256 } 1257 1258 end: 1259 return count; 1260 } 1261 1262 static int 1263 _sock_flush_client(struct spdk_sock *_sock) 1264 { 1265 struct spdk_uring_sock *sock = __uring_sock(_sock); 1266 struct msghdr msg = {}; 1267 struct iovec iovs[IOV_BATCH_SIZE]; 1268 int iovcnt; 1269 ssize_t rc; 1270 int flags = sock->zcopy_send_flags; 1271 int retval; 1272 bool is_zcopy = false; 1273 1274 /* Can't flush from within a callback or we end up with recursive calls */ 1275 if (_sock->cb_cnt > 0) { 1276 return 0; 1277 } 1278 1279 /* Gather an iov */ 1280 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1281 if (iovcnt == 0) { 1282 return 0; 1283 } 1284 1285 /* Perform the vectored write */ 1286 msg.msg_iov = iovs; 1287 msg.msg_iovlen = iovcnt; 1288 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 1289 if (rc <= 0) { 1290 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1291 return 0; 1292 } 1293 return rc; 1294 } 1295 1296 #ifdef SPDK_ZEROCOPY 1297 is_zcopy = flags & MSG_ZEROCOPY; 1298 #endif 1299 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 1300 if (retval < 0) { 1301 /* if the socket is closed, return to avoid heap-use-after-free error */ 1302 return retval; 1303 } 1304 1305 #ifdef SPDK_ZEROCOPY 1306 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 1307 _sock_check_zcopy(_sock, 0); 1308 } 1309 #endif 1310 1311 return 0; 1312 } 1313 1314 static void 1315 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1316 { 1317 struct spdk_uring_sock *sock = __uring_sock(_sock); 1318 int rc; 1319 1320 if (spdk_unlikely(sock->connection_status)) { 1321 req->cb_fn(req->cb_arg, sock->connection_status); 1322 return; 1323 } 1324 1325 spdk_sock_request_queue(_sock, req); 1326 1327 if (!sock->group) { 1328 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1329 rc = _sock_flush_client(_sock); 1330 if (rc) { 1331 spdk_sock_abort_requests(_sock); 1332 } 1333 } 1334 } 1335 } 1336 1337 static void 1338 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1339 { 1340 req->cb_fn(req->cb_arg, -ENOTSUP); 1341 } 1342 1343 static int 1344 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1345 { 1346 struct spdk_uring_sock *sock = __uring_sock(_sock); 1347 int val; 1348 int rc; 1349 1350 assert(sock != NULL); 1351 1352 val = nbytes; 1353 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1354 if (rc != 0) { 1355 return -1; 1356 } 1357 return 0; 1358 } 1359 1360 static bool 1361 uring_sock_is_ipv6(struct spdk_sock *_sock) 1362 { 1363 struct spdk_uring_sock *sock = __uring_sock(_sock); 1364 struct sockaddr_storage sa; 1365 socklen_t salen; 1366 int rc; 1367 1368 assert(sock != NULL); 1369 1370 memset(&sa, 0, sizeof sa); 1371 salen = sizeof sa; 1372 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1373 if (rc != 0) { 1374 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1375 return false; 1376 } 1377 1378 return (sa.ss_family == AF_INET6); 1379 } 1380 1381 static bool 1382 uring_sock_is_ipv4(struct spdk_sock *_sock) 1383 { 1384 struct spdk_uring_sock *sock = __uring_sock(_sock); 1385 struct sockaddr_storage sa; 1386 socklen_t salen; 1387 int rc; 1388 1389 assert(sock != NULL); 1390 1391 memset(&sa, 0, sizeof sa); 1392 salen = sizeof sa; 1393 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1394 if (rc != 0) { 1395 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1396 return false; 1397 } 1398 1399 return (sa.ss_family == AF_INET); 1400 } 1401 1402 static bool 1403 uring_sock_is_connected(struct spdk_sock *_sock) 1404 { 1405 struct spdk_uring_sock *sock = __uring_sock(_sock); 1406 uint8_t byte; 1407 int rc; 1408 1409 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1410 if (rc == 0) { 1411 return false; 1412 } 1413 1414 if (rc < 0) { 1415 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1416 return true; 1417 } 1418 1419 return false; 1420 } 1421 1422 return true; 1423 } 1424 1425 static struct spdk_sock_group_impl * 1426 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1427 { 1428 struct spdk_uring_sock *sock = __uring_sock(_sock); 1429 struct spdk_sock_group_impl *group; 1430 1431 if (sock->placement_id != -1) { 1432 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1433 return group; 1434 } 1435 1436 return NULL; 1437 } 1438 1439 static struct spdk_sock_group_impl * 1440 uring_sock_group_impl_create(void) 1441 { 1442 struct spdk_uring_sock_group_impl *group_impl; 1443 1444 group_impl = calloc(1, sizeof(*group_impl)); 1445 if (group_impl == NULL) { 1446 SPDK_ERRLOG("group_impl allocation failed\n"); 1447 return NULL; 1448 } 1449 1450 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1451 1452 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1453 SPDK_ERRLOG("uring I/O context setup failure\n"); 1454 free(group_impl); 1455 return NULL; 1456 } 1457 1458 TAILQ_INIT(&group_impl->pending_recv); 1459 1460 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1461 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1462 } 1463 1464 return &group_impl->base; 1465 } 1466 1467 static int 1468 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1469 struct spdk_sock *_sock) 1470 { 1471 struct spdk_uring_sock *sock = __uring_sock(_sock); 1472 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1473 int rc; 1474 1475 sock->group = group; 1476 sock->write_task.sock = sock; 1477 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1478 1479 sock->pollin_task.sock = sock; 1480 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1481 1482 sock->errqueue_task.sock = sock; 1483 sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE; 1484 sock->errqueue_task.msg.msg_control = sock->buf; 1485 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1486 1487 sock->cancel_task.sock = sock; 1488 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1489 1490 /* switched from another polling group due to scheduling */ 1491 if (spdk_unlikely(sock->recv_pipe != NULL && 1492 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1493 assert(sock->pending_recv == false); 1494 sock->pending_recv = true; 1495 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1496 } 1497 1498 if (sock->placement_id != -1) { 1499 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1500 if (rc != 0) { 1501 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1502 /* Do not treat this as an error. The system will continue running. */ 1503 } 1504 } 1505 1506 return 0; 1507 } 1508 1509 static int 1510 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1511 struct spdk_sock **socks) 1512 { 1513 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1514 int count, ret; 1515 int to_complete, to_submit; 1516 struct spdk_sock *_sock, *tmp; 1517 struct spdk_uring_sock *sock; 1518 1519 if (spdk_likely(socks)) { 1520 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1521 sock = __uring_sock(_sock); 1522 if (spdk_unlikely(sock->connection_status)) { 1523 continue; 1524 } 1525 _sock_flush(_sock); 1526 _sock_prep_pollin(_sock); 1527 } 1528 } 1529 1530 to_submit = group->io_queued; 1531 1532 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1533 if (to_submit > 0) { 1534 /* If there are I/O to submit, use io_uring_submit here. 1535 * It will automatically call io_uring_enter appropriately. */ 1536 ret = io_uring_submit(&group->uring); 1537 if (ret < 0) { 1538 return 1; 1539 } 1540 group->io_queued = 0; 1541 group->io_inflight += to_submit; 1542 group->io_avail -= to_submit; 1543 } 1544 1545 count = 0; 1546 to_complete = group->io_inflight; 1547 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1548 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1549 } 1550 1551 return count; 1552 } 1553 1554 static int 1555 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1556 struct spdk_sock *_sock) 1557 { 1558 struct spdk_uring_sock *sock = __uring_sock(_sock); 1559 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1560 1561 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1562 _sock_prep_cancel_task(_sock, &sock->write_task); 1563 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1564 * currently can use a while loop here. */ 1565 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1566 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1567 uring_sock_group_impl_poll(_group, 32, NULL); 1568 } 1569 } 1570 1571 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1572 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1573 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1574 * currently can use a while loop here. */ 1575 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1576 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1577 uring_sock_group_impl_poll(_group, 32, NULL); 1578 } 1579 } 1580 1581 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1582 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1583 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1584 * currently can use a while loop here. */ 1585 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1586 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1587 uring_sock_group_impl_poll(_group, 32, NULL); 1588 } 1589 } 1590 1591 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1592 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1593 assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1594 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1595 1596 if (sock->pending_recv) { 1597 TAILQ_REMOVE(&group->pending_recv, sock, link); 1598 sock->pending_recv = false; 1599 } 1600 assert(sock->pending_recv == false); 1601 1602 if (sock->placement_id != -1) { 1603 spdk_sock_map_release(&g_map, sock->placement_id); 1604 } 1605 1606 sock->group = NULL; 1607 return 0; 1608 } 1609 1610 static int 1611 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1612 { 1613 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1614 1615 /* try to reap all the active I/O */ 1616 while (group->io_inflight) { 1617 uring_sock_group_impl_poll(_group, 32, NULL); 1618 } 1619 assert(group->io_inflight == 0); 1620 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1621 1622 io_uring_queue_exit(&group->uring); 1623 1624 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1625 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1626 } 1627 1628 free(group); 1629 return 0; 1630 } 1631 1632 static int 1633 uring_sock_flush(struct spdk_sock *_sock) 1634 { 1635 struct spdk_uring_sock *sock = __uring_sock(_sock); 1636 1637 if (!sock->group) { 1638 return _sock_flush_client(_sock); 1639 } 1640 1641 return 0; 1642 } 1643 1644 static struct spdk_net_impl g_uring_net_impl = { 1645 .name = "uring", 1646 .getaddr = uring_sock_getaddr, 1647 .connect = uring_sock_connect, 1648 .listen = uring_sock_listen, 1649 .accept = uring_sock_accept, 1650 .close = uring_sock_close, 1651 .recv = uring_sock_recv, 1652 .readv = uring_sock_readv, 1653 .readv_async = uring_sock_readv_async, 1654 .writev = uring_sock_writev, 1655 .writev_async = uring_sock_writev_async, 1656 .flush = uring_sock_flush, 1657 .set_recvlowat = uring_sock_set_recvlowat, 1658 .set_recvbuf = uring_sock_set_recvbuf, 1659 .set_sendbuf = uring_sock_set_sendbuf, 1660 .is_ipv6 = uring_sock_is_ipv6, 1661 .is_ipv4 = uring_sock_is_ipv4, 1662 .is_connected = uring_sock_is_connected, 1663 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1664 .group_impl_create = uring_sock_group_impl_create, 1665 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1666 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1667 .group_impl_poll = uring_sock_group_impl_poll, 1668 .group_impl_close = uring_sock_group_impl_close, 1669 .get_opts = uring_sock_impl_get_opts, 1670 .set_opts = uring_sock_impl_set_opts, 1671 }; 1672 1673 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); 1674