1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 21 #include "spdk_internal/sock.h" 22 #include "spdk_internal/assert.h" 23 #include "../sock_kernel.h" 24 25 #define MAX_TMPBUF 1024 26 #define PORTNUMLEN 32 27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 29 30 enum spdk_sock_task_type { 31 SPDK_SOCK_TASK_POLLIN = 0, 32 SPDK_SOCK_TASK_ERRQUEUE, 33 SPDK_SOCK_TASK_WRITE, 34 SPDK_SOCK_TASK_CANCEL, 35 }; 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 enum spdk_uring_sock_task_status { 42 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 43 SPDK_URING_SOCK_TASK_IN_PROCESS, 44 }; 45 46 struct spdk_uring_task { 47 enum spdk_uring_sock_task_status status; 48 enum spdk_sock_task_type type; 49 struct spdk_uring_sock *sock; 50 struct msghdr msg; 51 struct iovec iovs[IOV_BATCH_SIZE]; 52 int iov_cnt; 53 struct spdk_sock_request *last_req; 54 bool is_zcopy; 55 STAILQ_ENTRY(spdk_uring_task) link; 56 }; 57 58 struct spdk_uring_sock { 59 struct spdk_sock base; 60 int fd; 61 uint32_t sendmsg_idx; 62 struct spdk_uring_sock_group_impl *group; 63 struct spdk_uring_task write_task; 64 struct spdk_uring_task errqueue_task; 65 struct spdk_uring_task pollin_task; 66 struct spdk_uring_task cancel_task; 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool zcopy; 71 bool pending_recv; 72 int zcopy_send_flags; 73 int connection_status; 74 int placement_id; 75 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 76 TAILQ_ENTRY(spdk_uring_sock) link; 77 }; 78 79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 80 81 struct spdk_uring_sock_group_impl { 82 struct spdk_sock_group_impl base; 83 struct io_uring uring; 84 uint32_t io_inflight; 85 uint32_t io_queued; 86 uint32_t io_avail; 87 struct pending_recv_list pending_recv; 88 }; 89 90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 91 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 92 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 93 .enable_recv_pipe = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = false, 97 .enable_zerocopy_send_client = false, 98 .zerocopy_threshold = 0, 99 .tls_version = 0, 100 .enable_ktls = false, 101 .psk_key = NULL, 102 .psk_identity = NULL 103 }; 104 105 static struct spdk_sock_map g_map = { 106 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 107 .mtx = PTHREAD_MUTEX_INITIALIZER 108 }; 109 110 __attribute((destructor)) static void 111 uring_sock_map_cleanup(void) 112 { 113 spdk_sock_map_cleanup(&g_map); 114 } 115 116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 117 118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 120 121 static void 122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 123 size_t len) 124 { 125 #define FIELD_OK(field) \ 126 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 127 128 #define SET_FIELD(field) \ 129 if (FIELD_OK(field)) { \ 130 dest->field = src->field; \ 131 } 132 133 SET_FIELD(recv_buf_size); 134 SET_FIELD(send_buf_size); 135 SET_FIELD(enable_recv_pipe); 136 SET_FIELD(enable_quickack); 137 SET_FIELD(enable_placement_id); 138 SET_FIELD(enable_zerocopy_send_server); 139 SET_FIELD(enable_zerocopy_send_client); 140 SET_FIELD(zerocopy_threshold); 141 SET_FIELD(tls_version); 142 SET_FIELD(enable_ktls); 143 SET_FIELD(psk_key); 144 SET_FIELD(psk_identity); 145 146 #undef SET_FIELD 147 #undef FIELD_OK 148 } 149 150 static int 151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 152 { 153 if (!opts || !len) { 154 errno = EINVAL; 155 return -1; 156 } 157 158 assert(sizeof(*opts) >= *len); 159 memset(opts, 0, *len); 160 161 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 162 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 163 164 return 0; 165 } 166 167 static int 168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 169 { 170 if (!opts) { 171 errno = EINVAL; 172 return -1; 173 } 174 175 assert(sizeof(*opts) >= len); 176 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 177 178 return 0; 179 } 180 181 static void 182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 183 { 184 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 185 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 186 187 if (opts->impl_opts != NULL) { 188 assert(sizeof(*dest) >= opts->impl_opts_size); 189 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 190 } 191 } 192 193 static int 194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 195 char *caddr, int clen, uint16_t *cport) 196 { 197 struct spdk_uring_sock *sock = __uring_sock(_sock); 198 struct sockaddr_storage sa; 199 socklen_t salen; 200 int rc; 201 202 assert(sock != NULL); 203 204 memset(&sa, 0, sizeof sa); 205 salen = sizeof sa; 206 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 207 if (rc != 0) { 208 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 209 return -1; 210 } 211 212 switch (sa.ss_family) { 213 case AF_UNIX: 214 /* Acceptable connection types that don't have IPs */ 215 return 0; 216 case AF_INET: 217 case AF_INET6: 218 /* Code below will get IP addresses */ 219 break; 220 default: 221 /* Unsupported socket family */ 222 return -1; 223 } 224 225 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 226 if (rc != 0) { 227 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 228 return -1; 229 } 230 231 if (sport) { 232 if (sa.ss_family == AF_INET) { 233 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 234 } else if (sa.ss_family == AF_INET6) { 235 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 236 } 237 } 238 239 memset(&sa, 0, sizeof sa); 240 salen = sizeof sa; 241 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 242 if (rc != 0) { 243 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 244 return -1; 245 } 246 247 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 248 if (rc != 0) { 249 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 250 return -1; 251 } 252 253 if (cport) { 254 if (sa.ss_family == AF_INET) { 255 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 256 } else if (sa.ss_family == AF_INET6) { 257 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 258 } 259 } 260 261 return 0; 262 } 263 264 enum uring_sock_create_type { 265 SPDK_SOCK_CREATE_LISTEN, 266 SPDK_SOCK_CREATE_CONNECT, 267 }; 268 269 static int 270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 271 { 272 uint8_t *new_buf; 273 struct spdk_pipe *new_pipe; 274 struct iovec siov[2]; 275 struct iovec diov[2]; 276 int sbytes; 277 ssize_t bytes; 278 int rc; 279 280 if (sock->recv_buf_sz == sz) { 281 return 0; 282 } 283 284 /* If the new size is 0, just free the pipe */ 285 if (sz == 0) { 286 spdk_pipe_destroy(sock->recv_pipe); 287 free(sock->recv_buf); 288 sock->recv_pipe = NULL; 289 sock->recv_buf = NULL; 290 return 0; 291 } else if (sz < MIN_SOCK_PIPE_SIZE) { 292 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 293 return -1; 294 } 295 296 /* Round up to next 64 byte multiple */ 297 rc = posix_memalign((void **)&new_buf, 64, sz); 298 if (rc != 0) { 299 SPDK_ERRLOG("socket recv buf allocation failed\n"); 300 return -ENOMEM; 301 } 302 memset(new_buf, 0, sz); 303 304 new_pipe = spdk_pipe_create(new_buf, sz); 305 if (new_pipe == NULL) { 306 SPDK_ERRLOG("socket pipe allocation failed\n"); 307 free(new_buf); 308 return -ENOMEM; 309 } 310 311 if (sock->recv_pipe != NULL) { 312 /* Pull all of the data out of the old pipe */ 313 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 314 if (sbytes > sz) { 315 /* Too much data to fit into the new pipe size */ 316 spdk_pipe_destroy(new_pipe); 317 free(new_buf); 318 return -EINVAL; 319 } 320 321 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 322 assert(sbytes == sz); 323 324 bytes = spdk_iovcpy(siov, 2, diov, 2); 325 spdk_pipe_writer_advance(new_pipe, bytes); 326 327 spdk_pipe_destroy(sock->recv_pipe); 328 free(sock->recv_buf); 329 } 330 331 sock->recv_buf_sz = sz; 332 sock->recv_buf = new_buf; 333 sock->recv_pipe = new_pipe; 334 335 return 0; 336 } 337 338 static int 339 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 340 { 341 struct spdk_uring_sock *sock = __uring_sock(_sock); 342 int min_size; 343 int rc; 344 345 assert(sock != NULL); 346 347 if (_sock->impl_opts.enable_recv_pipe) { 348 rc = uring_sock_alloc_pipe(sock, sz); 349 if (rc) { 350 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 351 return rc; 352 } 353 } 354 355 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 356 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 357 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 358 359 if (sz < min_size) { 360 sz = min_size; 361 } 362 363 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 364 if (rc < 0) { 365 return rc; 366 } 367 368 _sock->impl_opts.recv_buf_size = sz; 369 370 return 0; 371 } 372 373 static int 374 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 375 { 376 struct spdk_uring_sock *sock = __uring_sock(_sock); 377 int min_size; 378 int rc; 379 380 assert(sock != NULL); 381 382 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 383 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 384 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 385 386 if (sz < min_size) { 387 sz = min_size; 388 } 389 390 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 391 if (rc < 0) { 392 return rc; 393 } 394 395 _sock->impl_opts.send_buf_size = sz; 396 397 return 0; 398 } 399 400 static struct spdk_uring_sock * 401 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 402 { 403 struct spdk_uring_sock *sock; 404 #if defined(__linux__) 405 int flag; 406 int rc; 407 #endif 408 409 sock = calloc(1, sizeof(*sock)); 410 if (sock == NULL) { 411 SPDK_ERRLOG("sock allocation failed\n"); 412 return NULL; 413 } 414 415 sock->fd = fd; 416 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 417 418 #if defined(__linux__) 419 flag = 1; 420 421 if (sock->base.impl_opts.enable_quickack) { 422 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 423 if (rc != 0) { 424 SPDK_ERRLOG("quickack was failed to set\n"); 425 } 426 } 427 428 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 429 &sock->placement_id); 430 #ifdef SPDK_ZEROCOPY 431 /* Try to turn on zero copy sends */ 432 flag = 1; 433 434 if (enable_zero_copy) { 435 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 436 if (rc == 0) { 437 sock->zcopy = true; 438 sock->zcopy_send_flags = MSG_ZEROCOPY; 439 } 440 } 441 #endif 442 #endif 443 444 return sock; 445 } 446 447 static struct spdk_sock * 448 uring_sock_create(const char *ip, int port, 449 enum uring_sock_create_type type, 450 struct spdk_sock_opts *opts) 451 { 452 struct spdk_uring_sock *sock; 453 struct spdk_sock_impl_opts impl_opts; 454 char buf[MAX_TMPBUF]; 455 char portnum[PORTNUMLEN]; 456 char *p; 457 struct addrinfo hints, *res, *res0; 458 int fd, flag; 459 int val = 1; 460 int rc; 461 bool enable_zcopy_impl_opts = false; 462 bool enable_zcopy_user_opts = true; 463 464 assert(opts != NULL); 465 uring_opts_get_impl_opts(opts, &impl_opts); 466 467 if (ip == NULL) { 468 return NULL; 469 } 470 if (ip[0] == '[') { 471 snprintf(buf, sizeof(buf), "%s", ip + 1); 472 p = strchr(buf, ']'); 473 if (p != NULL) { 474 *p = '\0'; 475 } 476 ip = (const char *) &buf[0]; 477 } 478 479 snprintf(portnum, sizeof portnum, "%d", port); 480 memset(&hints, 0, sizeof hints); 481 hints.ai_family = PF_UNSPEC; 482 hints.ai_socktype = SOCK_STREAM; 483 hints.ai_flags = AI_NUMERICSERV; 484 hints.ai_flags |= AI_PASSIVE; 485 hints.ai_flags |= AI_NUMERICHOST; 486 rc = getaddrinfo(ip, portnum, &hints, &res0); 487 if (rc != 0) { 488 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 489 return NULL; 490 } 491 492 /* try listen */ 493 fd = -1; 494 for (res = res0; res != NULL; res = res->ai_next) { 495 retry: 496 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 497 if (fd < 0) { 498 /* error */ 499 continue; 500 } 501 502 val = impl_opts.recv_buf_size; 503 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 504 if (rc) { 505 /* Not fatal */ 506 } 507 508 val = impl_opts.send_buf_size; 509 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 510 if (rc) { 511 /* Not fatal */ 512 } 513 514 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 515 if (rc != 0) { 516 close(fd); 517 fd = -1; 518 /* error */ 519 continue; 520 } 521 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 522 if (rc != 0) { 523 close(fd); 524 fd = -1; 525 /* error */ 526 continue; 527 } 528 529 if (opts->ack_timeout) { 530 #if defined(__linux__) 531 val = opts->ack_timeout; 532 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 533 if (rc != 0) { 534 close(fd); 535 fd = -1; 536 /* error */ 537 continue; 538 } 539 #else 540 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 541 #endif 542 } 543 544 545 546 #if defined(SO_PRIORITY) 547 if (opts != NULL && opts->priority) { 548 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 549 if (rc != 0) { 550 close(fd); 551 fd = -1; 552 /* error */ 553 continue; 554 } 555 } 556 #endif 557 if (res->ai_family == AF_INET6) { 558 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 559 if (rc != 0) { 560 close(fd); 561 fd = -1; 562 /* error */ 563 continue; 564 } 565 } 566 567 if (type == SPDK_SOCK_CREATE_LISTEN) { 568 rc = bind(fd, res->ai_addr, res->ai_addrlen); 569 if (rc != 0) { 570 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 571 switch (errno) { 572 case EINTR: 573 /* interrupted? */ 574 close(fd); 575 goto retry; 576 case EADDRNOTAVAIL: 577 SPDK_ERRLOG("IP address %s not available. " 578 "Verify IP address in config file " 579 "and make sure setup script is " 580 "run before starting spdk app.\n", ip); 581 /* FALLTHROUGH */ 582 default: 583 /* try next family */ 584 close(fd); 585 fd = -1; 586 continue; 587 } 588 } 589 /* bind OK */ 590 rc = listen(fd, 512); 591 if (rc != 0) { 592 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 593 close(fd); 594 fd = -1; 595 break; 596 } 597 598 flag = fcntl(fd, F_GETFL); 599 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 600 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 601 close(fd); 602 fd = -1; 603 break; 604 } 605 606 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 607 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 608 rc = connect(fd, res->ai_addr, res->ai_addrlen); 609 if (rc != 0) { 610 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 611 /* try next family */ 612 close(fd); 613 fd = -1; 614 continue; 615 } 616 617 flag = fcntl(fd, F_GETFL); 618 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 619 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 620 close(fd); 621 fd = -1; 622 break; 623 } 624 625 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 626 } 627 break; 628 } 629 freeaddrinfo(res0); 630 631 if (fd < 0) { 632 return NULL; 633 } 634 635 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 636 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 637 if (sock == NULL) { 638 SPDK_ERRLOG("sock allocation failed\n"); 639 close(fd); 640 return NULL; 641 } 642 643 return &sock->base; 644 } 645 646 static struct spdk_sock * 647 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 648 { 649 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 650 } 651 652 static struct spdk_sock * 653 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 654 { 655 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 656 } 657 658 static struct spdk_sock * 659 uring_sock_accept(struct spdk_sock *_sock) 660 { 661 struct spdk_uring_sock *sock = __uring_sock(_sock); 662 struct sockaddr_storage sa; 663 socklen_t salen; 664 int rc, fd; 665 struct spdk_uring_sock *new_sock; 666 int flag; 667 668 memset(&sa, 0, sizeof(sa)); 669 salen = sizeof(sa); 670 671 assert(sock != NULL); 672 673 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 674 675 if (rc == -1) { 676 return NULL; 677 } 678 679 fd = rc; 680 681 flag = fcntl(fd, F_GETFL); 682 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 683 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 684 close(fd); 685 return NULL; 686 } 687 688 #if defined(SO_PRIORITY) 689 /* The priority is not inherited, so call this function again */ 690 if (sock->base.opts.priority) { 691 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 692 if (rc != 0) { 693 close(fd); 694 return NULL; 695 } 696 } 697 #endif 698 699 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 700 if (new_sock == NULL) { 701 close(fd); 702 return NULL; 703 } 704 705 return &new_sock->base; 706 } 707 708 static int 709 uring_sock_close(struct spdk_sock *_sock) 710 { 711 struct spdk_uring_sock *sock = __uring_sock(_sock); 712 713 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 714 assert(sock->group == NULL); 715 716 /* If the socket fails to close, the best choice is to 717 * leak the fd but continue to free the rest of the sock 718 * memory. */ 719 close(sock->fd); 720 721 spdk_pipe_destroy(sock->recv_pipe); 722 free(sock->recv_buf); 723 free(sock); 724 725 return 0; 726 } 727 728 static ssize_t 729 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 730 { 731 struct iovec siov[2]; 732 int sbytes; 733 ssize_t bytes; 734 struct spdk_uring_sock_group_impl *group; 735 736 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 737 if (sbytes < 0) { 738 errno = EINVAL; 739 return -1; 740 } else if (sbytes == 0) { 741 errno = EAGAIN; 742 return -1; 743 } 744 745 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 746 747 if (bytes == 0) { 748 /* The only way this happens is if diov is 0 length */ 749 errno = EINVAL; 750 return -1; 751 } 752 753 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 754 755 /* If we drained the pipe, take it off the level-triggered list */ 756 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 757 group = __uring_group_impl(sock->base.group_impl); 758 TAILQ_REMOVE(&group->pending_recv, sock, link); 759 sock->pending_recv = false; 760 } 761 762 return bytes; 763 } 764 765 static inline ssize_t 766 sock_readv(int fd, struct iovec *iov, int iovcnt) 767 { 768 struct msghdr msg = { 769 .msg_iov = iov, 770 .msg_iovlen = iovcnt, 771 }; 772 773 return recvmsg(fd, &msg, MSG_DONTWAIT); 774 } 775 776 static inline ssize_t 777 uring_sock_read(struct spdk_uring_sock *sock) 778 { 779 struct iovec iov[2]; 780 int bytes; 781 struct spdk_uring_sock_group_impl *group; 782 783 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 784 785 if (bytes > 0) { 786 bytes = sock_readv(sock->fd, iov, 2); 787 if (bytes > 0) { 788 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 789 if (sock->base.group_impl) { 790 group = __uring_group_impl(sock->base.group_impl); 791 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 792 sock->pending_recv = true; 793 } 794 } 795 } 796 797 return bytes; 798 } 799 800 static ssize_t 801 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 802 { 803 struct spdk_uring_sock *sock = __uring_sock(_sock); 804 int rc, i; 805 size_t len; 806 807 if (sock->recv_pipe == NULL) { 808 return sock_readv(sock->fd, iov, iovcnt); 809 } 810 811 len = 0; 812 for (i = 0; i < iovcnt; i++) { 813 len += iov[i].iov_len; 814 } 815 816 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 817 /* If the user is receiving a sufficiently large amount of data, 818 * receive directly to their buffers. */ 819 if (len >= MIN_SOCK_PIPE_SIZE) { 820 return sock_readv(sock->fd, iov, iovcnt); 821 } 822 823 /* Otherwise, do a big read into our pipe */ 824 rc = uring_sock_read(sock); 825 if (rc <= 0) { 826 return rc; 827 } 828 } 829 830 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 831 } 832 833 static ssize_t 834 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 835 { 836 struct iovec iov[1]; 837 838 iov[0].iov_base = buf; 839 iov[0].iov_len = len; 840 841 return uring_sock_readv(sock, iov, 1); 842 } 843 844 static ssize_t 845 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 846 { 847 struct spdk_uring_sock *sock = __uring_sock(_sock); 848 struct msghdr msg = { 849 .msg_iov = iov, 850 .msg_iovlen = iovcnt, 851 }; 852 853 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 854 errno = EAGAIN; 855 return -1; 856 } 857 858 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 859 } 860 861 static ssize_t 862 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 863 { 864 unsigned int offset; 865 size_t len; 866 int i; 867 868 offset = req->internal.offset; 869 for (i = 0; i < req->iovcnt; i++) { 870 /* Advance by the offset first */ 871 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 872 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 873 continue; 874 } 875 876 /* Calculate the remaining length of this element */ 877 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 878 879 if (len > (size_t)rc) { 880 req->internal.offset += rc; 881 return -1; 882 } 883 884 offset = 0; 885 req->internal.offset += len; 886 rc -= len; 887 } 888 889 return rc; 890 } 891 892 static int 893 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 894 { 895 struct spdk_uring_sock *sock = __uring_sock(_sock); 896 struct spdk_sock_request *req; 897 int retval; 898 899 if (is_zcopy) { 900 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 901 * req->internal.offset, so sendmsg_idx should not be zero */ 902 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 903 sock->sendmsg_idx = 1; 904 } else { 905 sock->sendmsg_idx++; 906 } 907 } 908 909 /* Consume the requests that were actually written */ 910 req = TAILQ_FIRST(&_sock->queued_reqs); 911 while (req) { 912 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 913 req->internal.is_zcopy = is_zcopy; 914 915 rc = sock_request_advance_offset(req, rc); 916 if (rc < 0) { 917 /* This element was partially sent. */ 918 return 0; 919 } 920 921 /* Handled a full request. */ 922 spdk_sock_request_pend(_sock, req); 923 924 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 925 retval = spdk_sock_request_put(_sock, req, 0); 926 if (retval) { 927 return retval; 928 } 929 } else { 930 /* Re-use the offset field to hold the sendmsg call index. The 931 * index is 0 based, so subtract one here because we've already 932 * incremented above. */ 933 req->internal.offset = sock->sendmsg_idx - 1; 934 } 935 936 if (rc == 0) { 937 break; 938 } 939 940 req = TAILQ_FIRST(&_sock->queued_reqs); 941 } 942 943 return 0; 944 } 945 946 #ifdef SPDK_ZEROCOPY 947 static int 948 _sock_check_zcopy(struct spdk_sock *_sock, int status) 949 { 950 struct spdk_uring_sock *sock = __uring_sock(_sock); 951 ssize_t rc; 952 struct sock_extended_err *serr; 953 struct cmsghdr *cm; 954 uint32_t idx; 955 struct spdk_sock_request *req, *treq; 956 bool found; 957 958 assert(sock->zcopy == true); 959 if (spdk_unlikely(status) < 0) { 960 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 961 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 962 status); 963 } else { 964 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 965 } 966 return 0; 967 } 968 969 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 970 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 971 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 972 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 973 return 0; 974 } 975 976 serr = (struct sock_extended_err *)CMSG_DATA(cm); 977 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 978 SPDK_WARNLOG("Unexpected extended error origin\n"); 979 return 0; 980 } 981 982 /* Most of the time, the pending_reqs array is in the exact 983 * order we need such that all of the requests to complete are 984 * in order, in the front. It is guaranteed that all requests 985 * belonging to the same sendmsg call are sequential, so once 986 * we encounter one match we can stop looping as soon as a 987 * non-match is found. 988 */ 989 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 990 found = false; 991 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 992 if (!req->internal.is_zcopy) { 993 /* This wasn't a zcopy request. It was just waiting in line to complete */ 994 rc = spdk_sock_request_put(_sock, req, 0); 995 if (rc < 0) { 996 return rc; 997 } 998 } else if (req->internal.offset == idx) { 999 found = true; 1000 rc = spdk_sock_request_put(_sock, req, 0); 1001 if (rc < 0) { 1002 return rc; 1003 } 1004 } else if (found) { 1005 break; 1006 } 1007 } 1008 } 1009 1010 return 0; 1011 } 1012 1013 static void 1014 _sock_prep_errqueue(struct spdk_sock *_sock) 1015 { 1016 struct spdk_uring_sock *sock = __uring_sock(_sock); 1017 struct spdk_uring_task *task = &sock->errqueue_task; 1018 struct io_uring_sqe *sqe; 1019 1020 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1021 return; 1022 } 1023 1024 assert(sock->group != NULL); 1025 sock->group->io_queued++; 1026 1027 sqe = io_uring_get_sqe(&sock->group->uring); 1028 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1029 io_uring_sqe_set_data(sqe, task); 1030 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1031 } 1032 1033 #endif 1034 1035 static void 1036 _sock_flush(struct spdk_sock *_sock) 1037 { 1038 struct spdk_uring_sock *sock = __uring_sock(_sock); 1039 struct spdk_uring_task *task = &sock->write_task; 1040 uint32_t iovcnt; 1041 struct io_uring_sqe *sqe; 1042 int flags; 1043 1044 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1045 return; 1046 } 1047 1048 #ifdef SPDK_ZEROCOPY 1049 if (sock->zcopy) { 1050 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1051 } else 1052 #endif 1053 { 1054 flags = MSG_DONTWAIT; 1055 } 1056 1057 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1058 if (!iovcnt) { 1059 return; 1060 } 1061 1062 task->iov_cnt = iovcnt; 1063 assert(sock->group != NULL); 1064 task->msg.msg_iov = task->iovs; 1065 task->msg.msg_iovlen = task->iov_cnt; 1066 #ifdef SPDK_ZEROCOPY 1067 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1068 #endif 1069 sock->group->io_queued++; 1070 1071 sqe = io_uring_get_sqe(&sock->group->uring); 1072 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1073 io_uring_sqe_set_data(sqe, task); 1074 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1075 } 1076 1077 static void 1078 _sock_prep_pollin(struct spdk_sock *_sock) 1079 { 1080 struct spdk_uring_sock *sock = __uring_sock(_sock); 1081 struct spdk_uring_task *task = &sock->pollin_task; 1082 struct io_uring_sqe *sqe; 1083 1084 /* Do not prepare pollin event */ 1085 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) { 1086 return; 1087 } 1088 1089 assert(sock->group != NULL); 1090 sock->group->io_queued++; 1091 1092 sqe = io_uring_get_sqe(&sock->group->uring); 1093 io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR); 1094 io_uring_sqe_set_data(sqe, task); 1095 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1096 } 1097 1098 static void 1099 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1100 { 1101 struct spdk_uring_sock *sock = __uring_sock(_sock); 1102 struct spdk_uring_task *task = &sock->cancel_task; 1103 struct io_uring_sqe *sqe; 1104 1105 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1106 return; 1107 } 1108 1109 assert(sock->group != NULL); 1110 sock->group->io_queued++; 1111 1112 sqe = io_uring_get_sqe(&sock->group->uring); 1113 io_uring_prep_cancel(sqe, user_data, 0); 1114 io_uring_sqe_set_data(sqe, task); 1115 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1116 } 1117 1118 static int 1119 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1120 struct spdk_sock **socks) 1121 { 1122 int i, count, ret; 1123 struct io_uring_cqe *cqe; 1124 struct spdk_uring_sock *sock, *tmp; 1125 struct spdk_uring_task *task; 1126 int status; 1127 bool is_zcopy; 1128 1129 for (i = 0; i < max; i++) { 1130 ret = io_uring_peek_cqe(&group->uring, &cqe); 1131 if (ret != 0) { 1132 break; 1133 } 1134 1135 if (cqe == NULL) { 1136 break; 1137 } 1138 1139 task = (struct spdk_uring_task *)cqe->user_data; 1140 assert(task != NULL); 1141 sock = task->sock; 1142 assert(sock != NULL); 1143 assert(sock->group != NULL); 1144 assert(sock->group == group); 1145 sock->group->io_inflight--; 1146 sock->group->io_avail++; 1147 status = cqe->res; 1148 io_uring_cqe_seen(&group->uring, cqe); 1149 1150 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1151 1152 if (spdk_unlikely(status <= 0)) { 1153 if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) { 1154 continue; 1155 } 1156 } 1157 1158 switch (task->type) { 1159 case SPDK_SOCK_TASK_POLLIN: 1160 #ifdef SPDK_ZEROCOPY 1161 if ((status & POLLERR) == POLLERR) { 1162 _sock_prep_errqueue(&sock->base); 1163 } 1164 #endif 1165 if ((status & POLLIN) == POLLIN) { 1166 if (sock->base.cb_fn != NULL && 1167 sock->pending_recv == false) { 1168 sock->pending_recv = true; 1169 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1170 } 1171 } 1172 break; 1173 case SPDK_SOCK_TASK_WRITE: 1174 task->last_req = NULL; 1175 task->iov_cnt = 0; 1176 is_zcopy = task->is_zcopy; 1177 task->is_zcopy = false; 1178 if (spdk_unlikely(status) < 0) { 1179 sock->connection_status = status; 1180 spdk_sock_abort_requests(&sock->base); 1181 } else { 1182 sock_complete_write_reqs(&sock->base, status, is_zcopy); 1183 } 1184 1185 break; 1186 #ifdef SPDK_ZEROCOPY 1187 case SPDK_SOCK_TASK_ERRQUEUE: 1188 if (spdk_unlikely(status == -ECANCELED)) { 1189 sock->connection_status = status; 1190 break; 1191 } 1192 _sock_check_zcopy(&sock->base, status); 1193 break; 1194 #endif 1195 case SPDK_SOCK_TASK_CANCEL: 1196 /* Do nothing */ 1197 break; 1198 default: 1199 SPDK_UNREACHABLE(); 1200 } 1201 } 1202 1203 if (!socks) { 1204 return 0; 1205 } 1206 count = 0; 1207 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1208 if (count == max_read_events) { 1209 break; 1210 } 1211 1212 if (spdk_unlikely(sock->base.cb_fn == NULL) || 1213 (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) { 1214 sock->pending_recv = false; 1215 TAILQ_REMOVE(&group->pending_recv, sock, link); 1216 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1217 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1218 continue; 1219 } 1220 } 1221 1222 socks[count++] = &sock->base; 1223 } 1224 1225 1226 /* Cycle the pending_recv list so that each time we poll things aren't 1227 * in the same order. Say we have 6 sockets in the list, named as follows: 1228 * A B C D E F 1229 * And all 6 sockets had the poll events, but max_events is only 3. That means 1230 * psock currently points at D. We want to rearrange the list to the following: 1231 * D E F A B C 1232 * 1233 * The variables below are named according to this example to make it easier to 1234 * follow the swaps. 1235 */ 1236 if (sock != NULL) { 1237 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1238 1239 /* Capture pointers to the elements we need */ 1240 ud = sock; 1241 1242 ua = TAILQ_FIRST(&group->pending_recv); 1243 if (ua == ud) { 1244 goto end; 1245 } 1246 1247 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1248 if (uf == ud) { 1249 TAILQ_REMOVE(&group->pending_recv, ud, link); 1250 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1251 goto end; 1252 } 1253 1254 uc = TAILQ_PREV(ud, pending_recv_list, link); 1255 assert(uc != NULL); 1256 1257 /* Break the link between C and D */ 1258 uc->link.tqe_next = NULL; 1259 1260 /* Connect F to A */ 1261 uf->link.tqe_next = ua; 1262 ua->link.tqe_prev = &uf->link.tqe_next; 1263 1264 /* Fix up the list first/last pointers */ 1265 group->pending_recv.tqh_first = ud; 1266 group->pending_recv.tqh_last = &uc->link.tqe_next; 1267 1268 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1269 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1270 } 1271 1272 end: 1273 return count; 1274 } 1275 1276 static int uring_sock_flush(struct spdk_sock *_sock); 1277 1278 static void 1279 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1280 { 1281 struct spdk_uring_sock *sock = __uring_sock(_sock); 1282 int rc; 1283 1284 if (spdk_unlikely(sock->connection_status)) { 1285 req->cb_fn(req->cb_arg, sock->connection_status); 1286 return; 1287 } 1288 1289 spdk_sock_request_queue(_sock, req); 1290 1291 if (!sock->group) { 1292 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1293 rc = uring_sock_flush(_sock); 1294 if (rc < 0 && errno != EAGAIN) { 1295 spdk_sock_abort_requests(_sock); 1296 } 1297 } 1298 } 1299 } 1300 1301 static void 1302 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1303 { 1304 req->cb_fn(req->cb_arg, -ENOTSUP); 1305 } 1306 1307 static int 1308 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1309 { 1310 struct spdk_uring_sock *sock = __uring_sock(_sock); 1311 int val; 1312 int rc; 1313 1314 assert(sock != NULL); 1315 1316 val = nbytes; 1317 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1318 if (rc != 0) { 1319 return -1; 1320 } 1321 return 0; 1322 } 1323 1324 static bool 1325 uring_sock_is_ipv6(struct spdk_sock *_sock) 1326 { 1327 struct spdk_uring_sock *sock = __uring_sock(_sock); 1328 struct sockaddr_storage sa; 1329 socklen_t salen; 1330 int rc; 1331 1332 assert(sock != NULL); 1333 1334 memset(&sa, 0, sizeof sa); 1335 salen = sizeof sa; 1336 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1337 if (rc != 0) { 1338 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1339 return false; 1340 } 1341 1342 return (sa.ss_family == AF_INET6); 1343 } 1344 1345 static bool 1346 uring_sock_is_ipv4(struct spdk_sock *_sock) 1347 { 1348 struct spdk_uring_sock *sock = __uring_sock(_sock); 1349 struct sockaddr_storage sa; 1350 socklen_t salen; 1351 int rc; 1352 1353 assert(sock != NULL); 1354 1355 memset(&sa, 0, sizeof sa); 1356 salen = sizeof sa; 1357 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1358 if (rc != 0) { 1359 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1360 return false; 1361 } 1362 1363 return (sa.ss_family == AF_INET); 1364 } 1365 1366 static bool 1367 uring_sock_is_connected(struct spdk_sock *_sock) 1368 { 1369 struct spdk_uring_sock *sock = __uring_sock(_sock); 1370 uint8_t byte; 1371 int rc; 1372 1373 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1374 if (rc == 0) { 1375 return false; 1376 } 1377 1378 if (rc < 0) { 1379 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1380 return true; 1381 } 1382 1383 return false; 1384 } 1385 1386 return true; 1387 } 1388 1389 static struct spdk_sock_group_impl * 1390 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1391 { 1392 struct spdk_uring_sock *sock = __uring_sock(_sock); 1393 struct spdk_sock_group_impl *group; 1394 1395 if (sock->placement_id != -1) { 1396 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1397 return group; 1398 } 1399 1400 return NULL; 1401 } 1402 1403 static struct spdk_sock_group_impl * 1404 uring_sock_group_impl_create(void) 1405 { 1406 struct spdk_uring_sock_group_impl *group_impl; 1407 1408 group_impl = calloc(1, sizeof(*group_impl)); 1409 if (group_impl == NULL) { 1410 SPDK_ERRLOG("group_impl allocation failed\n"); 1411 return NULL; 1412 } 1413 1414 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1415 1416 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1417 SPDK_ERRLOG("uring I/O context setup failure\n"); 1418 free(group_impl); 1419 return NULL; 1420 } 1421 1422 TAILQ_INIT(&group_impl->pending_recv); 1423 1424 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1425 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1426 } 1427 1428 return &group_impl->base; 1429 } 1430 1431 static int 1432 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1433 struct spdk_sock *_sock) 1434 { 1435 struct spdk_uring_sock *sock = __uring_sock(_sock); 1436 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1437 int rc; 1438 1439 sock->group = group; 1440 sock->write_task.sock = sock; 1441 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1442 1443 sock->pollin_task.sock = sock; 1444 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1445 1446 sock->errqueue_task.sock = sock; 1447 sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE; 1448 sock->errqueue_task.msg.msg_control = sock->buf; 1449 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1450 1451 sock->cancel_task.sock = sock; 1452 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1453 1454 /* switched from another polling group due to scheduling */ 1455 if (spdk_unlikely(sock->recv_pipe != NULL && 1456 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1457 assert(sock->pending_recv == false); 1458 sock->pending_recv = true; 1459 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1460 } 1461 1462 if (sock->placement_id != -1) { 1463 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1464 if (rc != 0) { 1465 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1466 /* Do not treat this as an error. The system will continue running. */ 1467 } 1468 } 1469 1470 return 0; 1471 } 1472 1473 static int 1474 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1475 struct spdk_sock **socks) 1476 { 1477 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1478 int count, ret; 1479 int to_complete, to_submit; 1480 struct spdk_sock *_sock, *tmp; 1481 struct spdk_uring_sock *sock; 1482 1483 if (spdk_likely(socks)) { 1484 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1485 sock = __uring_sock(_sock); 1486 if (spdk_unlikely(sock->connection_status)) { 1487 continue; 1488 } 1489 _sock_flush(_sock); 1490 _sock_prep_pollin(_sock); 1491 } 1492 } 1493 1494 to_submit = group->io_queued; 1495 1496 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1497 if (to_submit > 0) { 1498 /* If there are I/O to submit, use io_uring_submit here. 1499 * It will automatically call io_uring_enter appropriately. */ 1500 ret = io_uring_submit(&group->uring); 1501 if (ret < 0) { 1502 return 1; 1503 } 1504 group->io_queued = 0; 1505 group->io_inflight += to_submit; 1506 group->io_avail -= to_submit; 1507 } 1508 1509 count = 0; 1510 to_complete = group->io_inflight; 1511 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1512 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1513 } 1514 1515 return count; 1516 } 1517 1518 static int 1519 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1520 struct spdk_sock *_sock) 1521 { 1522 struct spdk_uring_sock *sock = __uring_sock(_sock); 1523 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1524 1525 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1526 _sock_prep_cancel_task(_sock, &sock->write_task); 1527 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1528 * currently can use a while loop here. */ 1529 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1530 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1531 uring_sock_group_impl_poll(_group, 32, NULL); 1532 } 1533 } 1534 1535 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1536 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1537 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1538 * currently can use a while loop here. */ 1539 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1540 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1541 uring_sock_group_impl_poll(_group, 32, NULL); 1542 } 1543 } 1544 1545 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1546 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1547 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1548 * currently can use a while loop here. */ 1549 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1550 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1551 uring_sock_group_impl_poll(_group, 32, NULL); 1552 } 1553 } 1554 1555 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1556 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1557 assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1558 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1559 1560 if (sock->pending_recv) { 1561 TAILQ_REMOVE(&group->pending_recv, sock, link); 1562 sock->pending_recv = false; 1563 } 1564 assert(sock->pending_recv == false); 1565 1566 if (sock->placement_id != -1) { 1567 spdk_sock_map_release(&g_map, sock->placement_id); 1568 } 1569 1570 sock->group = NULL; 1571 return 0; 1572 } 1573 1574 static int 1575 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1576 { 1577 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1578 1579 /* try to reap all the active I/O */ 1580 while (group->io_inflight) { 1581 uring_sock_group_impl_poll(_group, 32, NULL); 1582 } 1583 assert(group->io_inflight == 0); 1584 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1585 1586 io_uring_queue_exit(&group->uring); 1587 1588 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1589 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1590 } 1591 1592 free(group); 1593 return 0; 1594 } 1595 1596 static int 1597 uring_sock_flush(struct spdk_sock *_sock) 1598 { 1599 struct spdk_uring_sock *sock = __uring_sock(_sock); 1600 struct msghdr msg = {}; 1601 struct iovec iovs[IOV_BATCH_SIZE]; 1602 int iovcnt; 1603 ssize_t rc; 1604 int flags = sock->zcopy_send_flags; 1605 int retval; 1606 bool is_zcopy = false; 1607 1608 /* Can't flush from within a callback or we end up with recursive calls */ 1609 if (_sock->cb_cnt > 0) { 1610 errno = EAGAIN; 1611 return -1; 1612 } 1613 1614 /* Can't flush while a write is already outstanding */ 1615 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1616 errno = EAGAIN; 1617 return -1; 1618 } 1619 1620 /* Gather an iov */ 1621 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1622 if (iovcnt == 0) { 1623 /* Nothing to send */ 1624 return 0; 1625 } 1626 1627 /* Perform the vectored write */ 1628 msg.msg_iov = iovs; 1629 msg.msg_iovlen = iovcnt; 1630 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 1631 if (rc <= 0) { 1632 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) { 1633 errno = EAGAIN; 1634 } 1635 return -1; 1636 } 1637 1638 #ifdef SPDK_ZEROCOPY 1639 is_zcopy = flags & MSG_ZEROCOPY; 1640 #endif 1641 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 1642 if (retval < 0) { 1643 /* if the socket is closed, return to avoid heap-use-after-free error */ 1644 errno = ENOTCONN; 1645 return -1; 1646 } 1647 1648 #ifdef SPDK_ZEROCOPY 1649 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 1650 _sock_check_zcopy(_sock, 0); 1651 } 1652 #endif 1653 1654 return rc; 1655 } 1656 1657 static struct spdk_net_impl g_uring_net_impl = { 1658 .name = "uring", 1659 .getaddr = uring_sock_getaddr, 1660 .connect = uring_sock_connect, 1661 .listen = uring_sock_listen, 1662 .accept = uring_sock_accept, 1663 .close = uring_sock_close, 1664 .recv = uring_sock_recv, 1665 .readv = uring_sock_readv, 1666 .readv_async = uring_sock_readv_async, 1667 .writev = uring_sock_writev, 1668 .writev_async = uring_sock_writev_async, 1669 .flush = uring_sock_flush, 1670 .set_recvlowat = uring_sock_set_recvlowat, 1671 .set_recvbuf = uring_sock_set_recvbuf, 1672 .set_sendbuf = uring_sock_set_sendbuf, 1673 .is_ipv6 = uring_sock_is_ipv6, 1674 .is_ipv4 = uring_sock_is_ipv4, 1675 .is_connected = uring_sock_is_connected, 1676 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1677 .group_impl_create = uring_sock_group_impl_create, 1678 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1679 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1680 .group_impl_poll = uring_sock_group_impl_poll, 1681 .group_impl_close = uring_sock_group_impl_close, 1682 .get_opts = uring_sock_impl_get_opts, 1683 .set_opts = uring_sock_impl_set_opts, 1684 }; 1685 1686 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 2); 1687