1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 21 #include "spdk_internal/sock.h" 22 #include "spdk_internal/assert.h" 23 #include "../sock_kernel.h" 24 25 #define MAX_TMPBUF 1024 26 #define PORTNUMLEN 32 27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 29 30 enum spdk_sock_task_type { 31 SPDK_SOCK_TASK_POLLIN = 0, 32 SPDK_SOCK_TASK_ERRQUEUE, 33 SPDK_SOCK_TASK_WRITE, 34 SPDK_SOCK_TASK_CANCEL, 35 }; 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 enum spdk_uring_sock_task_status { 42 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 43 SPDK_URING_SOCK_TASK_IN_PROCESS, 44 }; 45 46 struct spdk_uring_task { 47 enum spdk_uring_sock_task_status status; 48 enum spdk_sock_task_type type; 49 struct spdk_uring_sock *sock; 50 struct msghdr msg; 51 struct iovec iovs[IOV_BATCH_SIZE]; 52 int iov_cnt; 53 struct spdk_sock_request *last_req; 54 bool is_zcopy; 55 STAILQ_ENTRY(spdk_uring_task) link; 56 }; 57 58 struct spdk_uring_sock { 59 struct spdk_sock base; 60 int fd; 61 uint32_t sendmsg_idx; 62 struct spdk_uring_sock_group_impl *group; 63 struct spdk_uring_task write_task; 64 struct spdk_uring_task errqueue_task; 65 struct spdk_uring_task pollin_task; 66 struct spdk_uring_task cancel_task; 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool zcopy; 71 bool pending_recv; 72 int zcopy_send_flags; 73 int connection_status; 74 int placement_id; 75 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 76 TAILQ_ENTRY(spdk_uring_sock) link; 77 }; 78 79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 80 81 struct spdk_uring_sock_group_impl { 82 struct spdk_sock_group_impl base; 83 struct io_uring uring; 84 uint32_t io_inflight; 85 uint32_t io_queued; 86 uint32_t io_avail; 87 struct pending_recv_list pending_recv; 88 }; 89 90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 91 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 92 .send_buf_size = MIN_SO_SNDBUF_SIZE, 93 .enable_recv_pipe = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = false, 97 .enable_zerocopy_send_client = false, 98 .zerocopy_threshold = 0, 99 .tls_version = 0, 100 .enable_ktls = false, 101 .psk_key = NULL, 102 .psk_identity = NULL 103 }; 104 105 static struct spdk_sock_map g_map = { 106 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 107 .mtx = PTHREAD_MUTEX_INITIALIZER 108 }; 109 110 __attribute((destructor)) static void 111 uring_sock_map_cleanup(void) 112 { 113 spdk_sock_map_cleanup(&g_map); 114 } 115 116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 117 118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 120 121 static void 122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 123 size_t len) 124 { 125 #define FIELD_OK(field) \ 126 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 127 128 #define SET_FIELD(field) \ 129 if (FIELD_OK(field)) { \ 130 dest->field = src->field; \ 131 } 132 133 SET_FIELD(recv_buf_size); 134 SET_FIELD(send_buf_size); 135 SET_FIELD(enable_recv_pipe); 136 SET_FIELD(enable_quickack); 137 SET_FIELD(enable_placement_id); 138 SET_FIELD(enable_zerocopy_send_server); 139 SET_FIELD(enable_zerocopy_send_client); 140 SET_FIELD(zerocopy_threshold); 141 SET_FIELD(tls_version); 142 SET_FIELD(enable_ktls); 143 SET_FIELD(psk_key); 144 SET_FIELD(psk_identity); 145 146 #undef SET_FIELD 147 #undef FIELD_OK 148 } 149 150 static int 151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 152 { 153 if (!opts || !len) { 154 errno = EINVAL; 155 return -1; 156 } 157 158 assert(sizeof(*opts) >= *len); 159 memset(opts, 0, *len); 160 161 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 162 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 163 164 return 0; 165 } 166 167 static int 168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 169 { 170 if (!opts) { 171 errno = EINVAL; 172 return -1; 173 } 174 175 assert(sizeof(*opts) >= len); 176 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 177 178 return 0; 179 } 180 181 static void 182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 183 { 184 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 185 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 186 187 if (opts->impl_opts != NULL) { 188 assert(sizeof(*dest) >= opts->impl_opts_size); 189 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 190 } 191 } 192 193 static int 194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 195 char *caddr, int clen, uint16_t *cport) 196 { 197 struct spdk_uring_sock *sock = __uring_sock(_sock); 198 struct sockaddr_storage sa; 199 socklen_t salen; 200 int rc; 201 202 assert(sock != NULL); 203 204 memset(&sa, 0, sizeof sa); 205 salen = sizeof sa; 206 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 207 if (rc != 0) { 208 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 209 return -1; 210 } 211 212 switch (sa.ss_family) { 213 case AF_UNIX: 214 /* Acceptable connection types that don't have IPs */ 215 return 0; 216 case AF_INET: 217 case AF_INET6: 218 /* Code below will get IP addresses */ 219 break; 220 default: 221 /* Unsupported socket family */ 222 return -1; 223 } 224 225 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 226 if (rc != 0) { 227 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 228 return -1; 229 } 230 231 if (sport) { 232 if (sa.ss_family == AF_INET) { 233 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 234 } else if (sa.ss_family == AF_INET6) { 235 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 236 } 237 } 238 239 memset(&sa, 0, sizeof sa); 240 salen = sizeof sa; 241 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 242 if (rc != 0) { 243 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 244 return -1; 245 } 246 247 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 248 if (rc != 0) { 249 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 250 return -1; 251 } 252 253 if (cport) { 254 if (sa.ss_family == AF_INET) { 255 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 256 } else if (sa.ss_family == AF_INET6) { 257 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 258 } 259 } 260 261 return 0; 262 } 263 264 enum uring_sock_create_type { 265 SPDK_SOCK_CREATE_LISTEN, 266 SPDK_SOCK_CREATE_CONNECT, 267 }; 268 269 static int 270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 271 { 272 uint8_t *new_buf; 273 struct spdk_pipe *new_pipe; 274 struct iovec siov[2]; 275 struct iovec diov[2]; 276 int sbytes; 277 ssize_t bytes; 278 279 if (sock->recv_buf_sz == sz) { 280 return 0; 281 } 282 283 /* If the new size is 0, just free the pipe */ 284 if (sz == 0) { 285 spdk_pipe_destroy(sock->recv_pipe); 286 free(sock->recv_buf); 287 sock->recv_pipe = NULL; 288 sock->recv_buf = NULL; 289 return 0; 290 } else if (sz < MIN_SOCK_PIPE_SIZE) { 291 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 292 return -1; 293 } 294 295 /* Round up to next 64 byte multiple */ 296 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 297 if (!new_buf) { 298 SPDK_ERRLOG("socket recv buf allocation failed\n"); 299 return -ENOMEM; 300 } 301 302 new_pipe = spdk_pipe_create(new_buf, sz + 1); 303 if (new_pipe == NULL) { 304 SPDK_ERRLOG("socket pipe allocation failed\n"); 305 free(new_buf); 306 return -ENOMEM; 307 } 308 309 if (sock->recv_pipe != NULL) { 310 /* Pull all of the data out of the old pipe */ 311 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 312 if (sbytes > sz) { 313 /* Too much data to fit into the new pipe size */ 314 spdk_pipe_destroy(new_pipe); 315 free(new_buf); 316 return -EINVAL; 317 } 318 319 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 320 assert(sbytes == sz); 321 322 bytes = spdk_iovcpy(siov, 2, diov, 2); 323 spdk_pipe_writer_advance(new_pipe, bytes); 324 325 spdk_pipe_destroy(sock->recv_pipe); 326 free(sock->recv_buf); 327 } 328 329 sock->recv_buf_sz = sz; 330 sock->recv_buf = new_buf; 331 sock->recv_pipe = new_pipe; 332 333 return 0; 334 } 335 336 static int 337 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 338 { 339 struct spdk_uring_sock *sock = __uring_sock(_sock); 340 int min_size; 341 int rc; 342 343 assert(sock != NULL); 344 345 if (_sock->impl_opts.enable_recv_pipe) { 346 rc = uring_sock_alloc_pipe(sock, sz); 347 if (rc) { 348 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 349 return rc; 350 } 351 } 352 353 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 354 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 355 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 356 357 if (sz < min_size) { 358 sz = min_size; 359 } 360 361 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 362 if (rc < 0) { 363 return rc; 364 } 365 366 _sock->impl_opts.recv_buf_size = sz; 367 368 return 0; 369 } 370 371 static int 372 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 373 { 374 struct spdk_uring_sock *sock = __uring_sock(_sock); 375 int min_size; 376 int rc; 377 378 assert(sock != NULL); 379 380 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 381 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 382 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 383 384 if (sz < min_size) { 385 sz = min_size; 386 } 387 388 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 389 if (rc < 0) { 390 return rc; 391 } 392 393 _sock->impl_opts.send_buf_size = sz; 394 395 return 0; 396 } 397 398 static struct spdk_uring_sock * 399 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 400 { 401 struct spdk_uring_sock *sock; 402 #if defined(__linux__) 403 int flag; 404 int rc; 405 #endif 406 407 sock = calloc(1, sizeof(*sock)); 408 if (sock == NULL) { 409 SPDK_ERRLOG("sock allocation failed\n"); 410 return NULL; 411 } 412 413 sock->fd = fd; 414 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 415 416 #if defined(__linux__) 417 flag = 1; 418 419 if (sock->base.impl_opts.enable_quickack) { 420 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 421 if (rc != 0) { 422 SPDK_ERRLOG("quickack was failed to set\n"); 423 } 424 } 425 426 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 427 &sock->placement_id); 428 #ifdef SPDK_ZEROCOPY 429 /* Try to turn on zero copy sends */ 430 flag = 1; 431 432 if (enable_zero_copy) { 433 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 434 if (rc == 0) { 435 sock->zcopy = true; 436 sock->zcopy_send_flags = MSG_ZEROCOPY; 437 } 438 } 439 #endif 440 #endif 441 442 return sock; 443 } 444 445 static struct spdk_sock * 446 uring_sock_create(const char *ip, int port, 447 enum uring_sock_create_type type, 448 struct spdk_sock_opts *opts) 449 { 450 struct spdk_uring_sock *sock; 451 struct spdk_sock_impl_opts impl_opts; 452 char buf[MAX_TMPBUF]; 453 char portnum[PORTNUMLEN]; 454 char *p; 455 struct addrinfo hints, *res, *res0; 456 int fd, flag; 457 int val = 1; 458 int rc; 459 bool enable_zcopy_impl_opts = false; 460 bool enable_zcopy_user_opts = true; 461 462 assert(opts != NULL); 463 uring_opts_get_impl_opts(opts, &impl_opts); 464 465 if (ip == NULL) { 466 return NULL; 467 } 468 if (ip[0] == '[') { 469 snprintf(buf, sizeof(buf), "%s", ip + 1); 470 p = strchr(buf, ']'); 471 if (p != NULL) { 472 *p = '\0'; 473 } 474 ip = (const char *) &buf[0]; 475 } 476 477 snprintf(portnum, sizeof portnum, "%d", port); 478 memset(&hints, 0, sizeof hints); 479 hints.ai_family = PF_UNSPEC; 480 hints.ai_socktype = SOCK_STREAM; 481 hints.ai_flags = AI_NUMERICSERV; 482 hints.ai_flags |= AI_PASSIVE; 483 hints.ai_flags |= AI_NUMERICHOST; 484 rc = getaddrinfo(ip, portnum, &hints, &res0); 485 if (rc != 0) { 486 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 487 return NULL; 488 } 489 490 /* try listen */ 491 fd = -1; 492 for (res = res0; res != NULL; res = res->ai_next) { 493 retry: 494 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 495 if (fd < 0) { 496 /* error */ 497 continue; 498 } 499 500 val = impl_opts.recv_buf_size; 501 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 502 if (rc) { 503 /* Not fatal */ 504 } 505 506 val = impl_opts.send_buf_size; 507 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 508 if (rc) { 509 /* Not fatal */ 510 } 511 512 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 513 if (rc != 0) { 514 close(fd); 515 fd = -1; 516 /* error */ 517 continue; 518 } 519 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 520 if (rc != 0) { 521 close(fd); 522 fd = -1; 523 /* error */ 524 continue; 525 } 526 527 if (opts->ack_timeout) { 528 #if defined(__linux__) 529 val = opts->ack_timeout; 530 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 531 if (rc != 0) { 532 close(fd); 533 fd = -1; 534 /* error */ 535 continue; 536 } 537 #else 538 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 539 #endif 540 } 541 542 543 544 #if defined(SO_PRIORITY) 545 if (opts != NULL && opts->priority) { 546 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 547 if (rc != 0) { 548 close(fd); 549 fd = -1; 550 /* error */ 551 continue; 552 } 553 } 554 #endif 555 if (res->ai_family == AF_INET6) { 556 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 557 if (rc != 0) { 558 close(fd); 559 fd = -1; 560 /* error */ 561 continue; 562 } 563 } 564 565 if (type == SPDK_SOCK_CREATE_LISTEN) { 566 rc = bind(fd, res->ai_addr, res->ai_addrlen); 567 if (rc != 0) { 568 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 569 switch (errno) { 570 case EINTR: 571 /* interrupted? */ 572 close(fd); 573 goto retry; 574 case EADDRNOTAVAIL: 575 SPDK_ERRLOG("IP address %s not available. " 576 "Verify IP address in config file " 577 "and make sure setup script is " 578 "run before starting spdk app.\n", ip); 579 /* FALLTHROUGH */ 580 default: 581 /* try next family */ 582 close(fd); 583 fd = -1; 584 continue; 585 } 586 } 587 /* bind OK */ 588 rc = listen(fd, 512); 589 if (rc != 0) { 590 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 591 close(fd); 592 fd = -1; 593 break; 594 } 595 596 flag = fcntl(fd, F_GETFL); 597 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 598 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 599 close(fd); 600 fd = -1; 601 break; 602 } 603 604 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 605 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 606 rc = connect(fd, res->ai_addr, res->ai_addrlen); 607 if (rc != 0) { 608 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 609 /* try next family */ 610 close(fd); 611 fd = -1; 612 continue; 613 } 614 615 flag = fcntl(fd, F_GETFL); 616 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 617 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 618 close(fd); 619 fd = -1; 620 break; 621 } 622 623 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 624 } 625 break; 626 } 627 freeaddrinfo(res0); 628 629 if (fd < 0) { 630 return NULL; 631 } 632 633 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 634 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 635 if (sock == NULL) { 636 SPDK_ERRLOG("sock allocation failed\n"); 637 close(fd); 638 return NULL; 639 } 640 641 return &sock->base; 642 } 643 644 static struct spdk_sock * 645 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 646 { 647 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 648 } 649 650 static struct spdk_sock * 651 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 652 { 653 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 654 } 655 656 static struct spdk_sock * 657 uring_sock_accept(struct spdk_sock *_sock) 658 { 659 struct spdk_uring_sock *sock = __uring_sock(_sock); 660 struct sockaddr_storage sa; 661 socklen_t salen; 662 int rc, fd; 663 struct spdk_uring_sock *new_sock; 664 int flag; 665 666 memset(&sa, 0, sizeof(sa)); 667 salen = sizeof(sa); 668 669 assert(sock != NULL); 670 671 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 672 673 if (rc == -1) { 674 return NULL; 675 } 676 677 fd = rc; 678 679 flag = fcntl(fd, F_GETFL); 680 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 681 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 682 close(fd); 683 return NULL; 684 } 685 686 #if defined(SO_PRIORITY) 687 /* The priority is not inherited, so call this function again */ 688 if (sock->base.opts.priority) { 689 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 690 if (rc != 0) { 691 close(fd); 692 return NULL; 693 } 694 } 695 #endif 696 697 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 698 if (new_sock == NULL) { 699 close(fd); 700 return NULL; 701 } 702 703 return &new_sock->base; 704 } 705 706 static int 707 uring_sock_close(struct spdk_sock *_sock) 708 { 709 struct spdk_uring_sock *sock = __uring_sock(_sock); 710 711 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 712 assert(sock->group == NULL); 713 714 /* If the socket fails to close, the best choice is to 715 * leak the fd but continue to free the rest of the sock 716 * memory. */ 717 close(sock->fd); 718 719 spdk_pipe_destroy(sock->recv_pipe); 720 free(sock->recv_buf); 721 free(sock); 722 723 return 0; 724 } 725 726 static ssize_t 727 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 728 { 729 struct iovec siov[2]; 730 int sbytes; 731 ssize_t bytes; 732 struct spdk_uring_sock_group_impl *group; 733 734 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 735 if (sbytes < 0) { 736 errno = EINVAL; 737 return -1; 738 } else if (sbytes == 0) { 739 errno = EAGAIN; 740 return -1; 741 } 742 743 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 744 745 if (bytes == 0) { 746 /* The only way this happens is if diov is 0 length */ 747 errno = EINVAL; 748 return -1; 749 } 750 751 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 752 753 /* If we drained the pipe, take it off the level-triggered list */ 754 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 755 group = __uring_group_impl(sock->base.group_impl); 756 TAILQ_REMOVE(&group->pending_recv, sock, link); 757 sock->pending_recv = false; 758 } 759 760 return bytes; 761 } 762 763 static inline ssize_t 764 sock_readv(int fd, struct iovec *iov, int iovcnt) 765 { 766 struct msghdr msg = { 767 .msg_iov = iov, 768 .msg_iovlen = iovcnt, 769 }; 770 771 return recvmsg(fd, &msg, MSG_DONTWAIT); 772 } 773 774 static inline ssize_t 775 uring_sock_read(struct spdk_uring_sock *sock) 776 { 777 struct iovec iov[2]; 778 int bytes; 779 struct spdk_uring_sock_group_impl *group; 780 781 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 782 783 if (bytes > 0) { 784 bytes = sock_readv(sock->fd, iov, 2); 785 if (bytes > 0) { 786 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 787 if (sock->base.group_impl) { 788 group = __uring_group_impl(sock->base.group_impl); 789 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 790 sock->pending_recv = true; 791 } 792 } 793 } 794 795 return bytes; 796 } 797 798 static ssize_t 799 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 800 { 801 struct spdk_uring_sock *sock = __uring_sock(_sock); 802 int rc, i; 803 size_t len; 804 805 if (sock->recv_pipe == NULL) { 806 return sock_readv(sock->fd, iov, iovcnt); 807 } 808 809 len = 0; 810 for (i = 0; i < iovcnt; i++) { 811 len += iov[i].iov_len; 812 } 813 814 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 815 /* If the user is receiving a sufficiently large amount of data, 816 * receive directly to their buffers. */ 817 if (len >= MIN_SOCK_PIPE_SIZE) { 818 return sock_readv(sock->fd, iov, iovcnt); 819 } 820 821 /* Otherwise, do a big read into our pipe */ 822 rc = uring_sock_read(sock); 823 if (rc <= 0) { 824 return rc; 825 } 826 } 827 828 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 829 } 830 831 static ssize_t 832 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 833 { 834 struct iovec iov[1]; 835 836 iov[0].iov_base = buf; 837 iov[0].iov_len = len; 838 839 return uring_sock_readv(sock, iov, 1); 840 } 841 842 static ssize_t 843 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 844 { 845 struct spdk_uring_sock *sock = __uring_sock(_sock); 846 struct msghdr msg = { 847 .msg_iov = iov, 848 .msg_iovlen = iovcnt, 849 }; 850 851 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 852 errno = EAGAIN; 853 return -1; 854 } 855 856 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 857 } 858 859 static ssize_t 860 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 861 { 862 unsigned int offset; 863 size_t len; 864 int i; 865 866 offset = req->internal.offset; 867 for (i = 0; i < req->iovcnt; i++) { 868 /* Advance by the offset first */ 869 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 870 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 871 continue; 872 } 873 874 /* Calculate the remaining length of this element */ 875 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 876 877 if (len > (size_t)rc) { 878 req->internal.offset += rc; 879 return -1; 880 } 881 882 offset = 0; 883 req->internal.offset += len; 884 rc -= len; 885 } 886 887 return rc; 888 } 889 890 static int 891 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 892 { 893 struct spdk_uring_sock *sock = __uring_sock(_sock); 894 struct spdk_sock_request *req; 895 int retval; 896 897 if (is_zcopy) { 898 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 899 * req->internal.offset, so sendmsg_idx should not be zero */ 900 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 901 sock->sendmsg_idx = 1; 902 } else { 903 sock->sendmsg_idx++; 904 } 905 } 906 907 /* Consume the requests that were actually written */ 908 req = TAILQ_FIRST(&_sock->queued_reqs); 909 while (req) { 910 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 911 req->internal.is_zcopy = is_zcopy; 912 913 rc = sock_request_advance_offset(req, rc); 914 if (rc < 0) { 915 /* This element was partially sent. */ 916 return 0; 917 } 918 919 /* Handled a full request. */ 920 spdk_sock_request_pend(_sock, req); 921 922 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 923 retval = spdk_sock_request_put(_sock, req, 0); 924 if (retval) { 925 return retval; 926 } 927 } else { 928 /* Re-use the offset field to hold the sendmsg call index. The 929 * index is 0 based, so subtract one here because we've already 930 * incremented above. */ 931 req->internal.offset = sock->sendmsg_idx - 1; 932 } 933 934 if (rc == 0) { 935 break; 936 } 937 938 req = TAILQ_FIRST(&_sock->queued_reqs); 939 } 940 941 return 0; 942 } 943 944 #ifdef SPDK_ZEROCOPY 945 static int 946 _sock_check_zcopy(struct spdk_sock *_sock, int status) 947 { 948 struct spdk_uring_sock *sock = __uring_sock(_sock); 949 ssize_t rc; 950 struct sock_extended_err *serr; 951 struct cmsghdr *cm; 952 uint32_t idx; 953 struct spdk_sock_request *req, *treq; 954 bool found; 955 956 assert(sock->zcopy == true); 957 if (spdk_unlikely(status) < 0) { 958 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 959 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 960 status); 961 } else { 962 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 963 } 964 return 0; 965 } 966 967 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 968 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 969 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 970 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 971 return 0; 972 } 973 974 serr = (struct sock_extended_err *)CMSG_DATA(cm); 975 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 976 SPDK_WARNLOG("Unexpected extended error origin\n"); 977 return 0; 978 } 979 980 /* Most of the time, the pending_reqs array is in the exact 981 * order we need such that all of the requests to complete are 982 * in order, in the front. It is guaranteed that all requests 983 * belonging to the same sendmsg call are sequential, so once 984 * we encounter one match we can stop looping as soon as a 985 * non-match is found. 986 */ 987 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 988 found = false; 989 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 990 if (!req->internal.is_zcopy) { 991 /* This wasn't a zcopy request. It was just waiting in line to complete */ 992 rc = spdk_sock_request_put(_sock, req, 0); 993 if (rc < 0) { 994 return rc; 995 } 996 } else if (req->internal.offset == idx) { 997 found = true; 998 rc = spdk_sock_request_put(_sock, req, 0); 999 if (rc < 0) { 1000 return rc; 1001 } 1002 } else if (found) { 1003 break; 1004 } 1005 } 1006 } 1007 1008 return 0; 1009 } 1010 1011 static void 1012 _sock_prep_errqueue(struct spdk_sock *_sock) 1013 { 1014 struct spdk_uring_sock *sock = __uring_sock(_sock); 1015 struct spdk_uring_task *task = &sock->errqueue_task; 1016 struct io_uring_sqe *sqe; 1017 1018 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1019 return; 1020 } 1021 1022 assert(sock->group != NULL); 1023 sock->group->io_queued++; 1024 1025 sqe = io_uring_get_sqe(&sock->group->uring); 1026 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1027 io_uring_sqe_set_data(sqe, task); 1028 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1029 } 1030 1031 #endif 1032 1033 static void 1034 _sock_flush(struct spdk_sock *_sock) 1035 { 1036 struct spdk_uring_sock *sock = __uring_sock(_sock); 1037 struct spdk_uring_task *task = &sock->write_task; 1038 uint32_t iovcnt; 1039 struct io_uring_sqe *sqe; 1040 int flags; 1041 1042 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1043 return; 1044 } 1045 1046 #ifdef SPDK_ZEROCOPY 1047 if (sock->zcopy) { 1048 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1049 } else 1050 #endif 1051 { 1052 flags = MSG_DONTWAIT; 1053 } 1054 1055 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1056 if (!iovcnt) { 1057 return; 1058 } 1059 1060 task->iov_cnt = iovcnt; 1061 assert(sock->group != NULL); 1062 task->msg.msg_iov = task->iovs; 1063 task->msg.msg_iovlen = task->iov_cnt; 1064 #ifdef SPDK_ZEROCOPY 1065 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1066 #endif 1067 sock->group->io_queued++; 1068 1069 sqe = io_uring_get_sqe(&sock->group->uring); 1070 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1071 io_uring_sqe_set_data(sqe, task); 1072 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1073 } 1074 1075 static void 1076 _sock_prep_pollin(struct spdk_sock *_sock) 1077 { 1078 struct spdk_uring_sock *sock = __uring_sock(_sock); 1079 struct spdk_uring_task *task = &sock->pollin_task; 1080 struct io_uring_sqe *sqe; 1081 1082 /* Do not prepare pollin event */ 1083 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) { 1084 return; 1085 } 1086 1087 assert(sock->group != NULL); 1088 sock->group->io_queued++; 1089 1090 sqe = io_uring_get_sqe(&sock->group->uring); 1091 io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR); 1092 io_uring_sqe_set_data(sqe, task); 1093 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1094 } 1095 1096 static void 1097 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1098 { 1099 struct spdk_uring_sock *sock = __uring_sock(_sock); 1100 struct spdk_uring_task *task = &sock->cancel_task; 1101 struct io_uring_sqe *sqe; 1102 1103 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1104 return; 1105 } 1106 1107 assert(sock->group != NULL); 1108 sock->group->io_queued++; 1109 1110 sqe = io_uring_get_sqe(&sock->group->uring); 1111 io_uring_prep_cancel(sqe, user_data, 0); 1112 io_uring_sqe_set_data(sqe, task); 1113 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1114 } 1115 1116 static int 1117 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1118 struct spdk_sock **socks) 1119 { 1120 int i, count, ret; 1121 struct io_uring_cqe *cqe; 1122 struct spdk_uring_sock *sock, *tmp; 1123 struct spdk_uring_task *task; 1124 int status; 1125 bool is_zcopy; 1126 1127 for (i = 0; i < max; i++) { 1128 ret = io_uring_peek_cqe(&group->uring, &cqe); 1129 if (ret != 0) { 1130 break; 1131 } 1132 1133 if (cqe == NULL) { 1134 break; 1135 } 1136 1137 task = (struct spdk_uring_task *)cqe->user_data; 1138 assert(task != NULL); 1139 sock = task->sock; 1140 assert(sock != NULL); 1141 assert(sock->group != NULL); 1142 assert(sock->group == group); 1143 sock->group->io_inflight--; 1144 sock->group->io_avail++; 1145 status = cqe->res; 1146 io_uring_cqe_seen(&group->uring, cqe); 1147 1148 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1149 1150 if (spdk_unlikely(status <= 0)) { 1151 if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) { 1152 continue; 1153 } 1154 } 1155 1156 switch (task->type) { 1157 case SPDK_SOCK_TASK_POLLIN: 1158 #ifdef SPDK_ZEROCOPY 1159 if ((status & POLLERR) == POLLERR) { 1160 _sock_prep_errqueue(&sock->base); 1161 } 1162 #endif 1163 if ((status & POLLIN) == POLLIN) { 1164 if (sock->base.cb_fn != NULL && 1165 sock->pending_recv == false) { 1166 sock->pending_recv = true; 1167 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1168 } 1169 } 1170 break; 1171 case SPDK_SOCK_TASK_WRITE: 1172 task->last_req = NULL; 1173 task->iov_cnt = 0; 1174 is_zcopy = task->is_zcopy; 1175 task->is_zcopy = false; 1176 if (spdk_unlikely(status) < 0) { 1177 sock->connection_status = status; 1178 spdk_sock_abort_requests(&sock->base); 1179 } else { 1180 sock_complete_write_reqs(&sock->base, status, is_zcopy); 1181 } 1182 1183 break; 1184 #ifdef SPDK_ZEROCOPY 1185 case SPDK_SOCK_TASK_ERRQUEUE: 1186 if (spdk_unlikely(status == -ECANCELED)) { 1187 sock->connection_status = status; 1188 break; 1189 } 1190 _sock_check_zcopy(&sock->base, status); 1191 break; 1192 #endif 1193 case SPDK_SOCK_TASK_CANCEL: 1194 /* Do nothing */ 1195 break; 1196 default: 1197 SPDK_UNREACHABLE(); 1198 } 1199 } 1200 1201 if (!socks) { 1202 return 0; 1203 } 1204 count = 0; 1205 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1206 if (count == max_read_events) { 1207 break; 1208 } 1209 1210 if (spdk_unlikely(sock->base.cb_fn == NULL) || 1211 (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) { 1212 sock->pending_recv = false; 1213 TAILQ_REMOVE(&group->pending_recv, sock, link); 1214 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1215 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1216 continue; 1217 } 1218 } 1219 1220 socks[count++] = &sock->base; 1221 } 1222 1223 1224 /* Cycle the pending_recv list so that each time we poll things aren't 1225 * in the same order. Say we have 6 sockets in the list, named as follows: 1226 * A B C D E F 1227 * And all 6 sockets had the poll events, but max_events is only 3. That means 1228 * psock currently points at D. We want to rearrange the list to the following: 1229 * D E F A B C 1230 * 1231 * The variables below are named according to this example to make it easier to 1232 * follow the swaps. 1233 */ 1234 if (sock != NULL) { 1235 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1236 1237 /* Capture pointers to the elements we need */ 1238 ud = sock; 1239 1240 ua = TAILQ_FIRST(&group->pending_recv); 1241 if (ua == ud) { 1242 goto end; 1243 } 1244 1245 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1246 if (uf == ud) { 1247 TAILQ_REMOVE(&group->pending_recv, ud, link); 1248 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1249 goto end; 1250 } 1251 1252 uc = TAILQ_PREV(ud, pending_recv_list, link); 1253 assert(uc != NULL); 1254 1255 /* Break the link between C and D */ 1256 uc->link.tqe_next = NULL; 1257 1258 /* Connect F to A */ 1259 uf->link.tqe_next = ua; 1260 ua->link.tqe_prev = &uf->link.tqe_next; 1261 1262 /* Fix up the list first/last pointers */ 1263 group->pending_recv.tqh_first = ud; 1264 group->pending_recv.tqh_last = &uc->link.tqe_next; 1265 1266 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1267 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1268 } 1269 1270 end: 1271 return count; 1272 } 1273 1274 static int uring_sock_flush(struct spdk_sock *_sock); 1275 1276 static void 1277 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1278 { 1279 struct spdk_uring_sock *sock = __uring_sock(_sock); 1280 int rc; 1281 1282 if (spdk_unlikely(sock->connection_status)) { 1283 req->cb_fn(req->cb_arg, sock->connection_status); 1284 return; 1285 } 1286 1287 spdk_sock_request_queue(_sock, req); 1288 1289 if (!sock->group) { 1290 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1291 rc = uring_sock_flush(_sock); 1292 if (rc < 0 && errno != EAGAIN) { 1293 spdk_sock_abort_requests(_sock); 1294 } 1295 } 1296 } 1297 } 1298 1299 static void 1300 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1301 { 1302 req->cb_fn(req->cb_arg, -ENOTSUP); 1303 } 1304 1305 static int 1306 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1307 { 1308 struct spdk_uring_sock *sock = __uring_sock(_sock); 1309 int val; 1310 int rc; 1311 1312 assert(sock != NULL); 1313 1314 val = nbytes; 1315 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1316 if (rc != 0) { 1317 return -1; 1318 } 1319 return 0; 1320 } 1321 1322 static bool 1323 uring_sock_is_ipv6(struct spdk_sock *_sock) 1324 { 1325 struct spdk_uring_sock *sock = __uring_sock(_sock); 1326 struct sockaddr_storage sa; 1327 socklen_t salen; 1328 int rc; 1329 1330 assert(sock != NULL); 1331 1332 memset(&sa, 0, sizeof sa); 1333 salen = sizeof sa; 1334 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1335 if (rc != 0) { 1336 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1337 return false; 1338 } 1339 1340 return (sa.ss_family == AF_INET6); 1341 } 1342 1343 static bool 1344 uring_sock_is_ipv4(struct spdk_sock *_sock) 1345 { 1346 struct spdk_uring_sock *sock = __uring_sock(_sock); 1347 struct sockaddr_storage sa; 1348 socklen_t salen; 1349 int rc; 1350 1351 assert(sock != NULL); 1352 1353 memset(&sa, 0, sizeof sa); 1354 salen = sizeof sa; 1355 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1356 if (rc != 0) { 1357 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1358 return false; 1359 } 1360 1361 return (sa.ss_family == AF_INET); 1362 } 1363 1364 static bool 1365 uring_sock_is_connected(struct spdk_sock *_sock) 1366 { 1367 struct spdk_uring_sock *sock = __uring_sock(_sock); 1368 uint8_t byte; 1369 int rc; 1370 1371 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1372 if (rc == 0) { 1373 return false; 1374 } 1375 1376 if (rc < 0) { 1377 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1378 return true; 1379 } 1380 1381 return false; 1382 } 1383 1384 return true; 1385 } 1386 1387 static struct spdk_sock_group_impl * 1388 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1389 { 1390 struct spdk_uring_sock *sock = __uring_sock(_sock); 1391 struct spdk_sock_group_impl *group; 1392 1393 if (sock->placement_id != -1) { 1394 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1395 return group; 1396 } 1397 1398 return NULL; 1399 } 1400 1401 static struct spdk_sock_group_impl * 1402 uring_sock_group_impl_create(void) 1403 { 1404 struct spdk_uring_sock_group_impl *group_impl; 1405 1406 group_impl = calloc(1, sizeof(*group_impl)); 1407 if (group_impl == NULL) { 1408 SPDK_ERRLOG("group_impl allocation failed\n"); 1409 return NULL; 1410 } 1411 1412 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1413 1414 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1415 SPDK_ERRLOG("uring I/O context setup failure\n"); 1416 free(group_impl); 1417 return NULL; 1418 } 1419 1420 TAILQ_INIT(&group_impl->pending_recv); 1421 1422 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1423 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1424 } 1425 1426 return &group_impl->base; 1427 } 1428 1429 static int 1430 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1431 struct spdk_sock *_sock) 1432 { 1433 struct spdk_uring_sock *sock = __uring_sock(_sock); 1434 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1435 int rc; 1436 1437 sock->group = group; 1438 sock->write_task.sock = sock; 1439 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1440 1441 sock->pollin_task.sock = sock; 1442 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1443 1444 sock->errqueue_task.sock = sock; 1445 sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE; 1446 sock->errqueue_task.msg.msg_control = sock->buf; 1447 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1448 1449 sock->cancel_task.sock = sock; 1450 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1451 1452 /* switched from another polling group due to scheduling */ 1453 if (spdk_unlikely(sock->recv_pipe != NULL && 1454 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1455 assert(sock->pending_recv == false); 1456 sock->pending_recv = true; 1457 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1458 } 1459 1460 if (sock->placement_id != -1) { 1461 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1462 if (rc != 0) { 1463 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1464 /* Do not treat this as an error. The system will continue running. */ 1465 } 1466 } 1467 1468 return 0; 1469 } 1470 1471 static int 1472 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1473 struct spdk_sock **socks) 1474 { 1475 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1476 int count, ret; 1477 int to_complete, to_submit; 1478 struct spdk_sock *_sock, *tmp; 1479 struct spdk_uring_sock *sock; 1480 1481 if (spdk_likely(socks)) { 1482 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1483 sock = __uring_sock(_sock); 1484 if (spdk_unlikely(sock->connection_status)) { 1485 continue; 1486 } 1487 _sock_flush(_sock); 1488 _sock_prep_pollin(_sock); 1489 } 1490 } 1491 1492 to_submit = group->io_queued; 1493 1494 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1495 if (to_submit > 0) { 1496 /* If there are I/O to submit, use io_uring_submit here. 1497 * It will automatically call io_uring_enter appropriately. */ 1498 ret = io_uring_submit(&group->uring); 1499 if (ret < 0) { 1500 return 1; 1501 } 1502 group->io_queued = 0; 1503 group->io_inflight += to_submit; 1504 group->io_avail -= to_submit; 1505 } 1506 1507 count = 0; 1508 to_complete = group->io_inflight; 1509 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1510 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1511 } 1512 1513 return count; 1514 } 1515 1516 static int 1517 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1518 struct spdk_sock *_sock) 1519 { 1520 struct spdk_uring_sock *sock = __uring_sock(_sock); 1521 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1522 1523 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1524 _sock_prep_cancel_task(_sock, &sock->write_task); 1525 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1526 * currently can use a while loop here. */ 1527 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1528 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1529 uring_sock_group_impl_poll(_group, 32, NULL); 1530 } 1531 } 1532 1533 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1534 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1535 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1536 * currently can use a while loop here. */ 1537 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1538 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1539 uring_sock_group_impl_poll(_group, 32, NULL); 1540 } 1541 } 1542 1543 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1544 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1545 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1546 * currently can use a while loop here. */ 1547 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1548 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1549 uring_sock_group_impl_poll(_group, 32, NULL); 1550 } 1551 } 1552 1553 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1554 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1555 assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1556 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1557 1558 if (sock->pending_recv) { 1559 TAILQ_REMOVE(&group->pending_recv, sock, link); 1560 sock->pending_recv = false; 1561 } 1562 assert(sock->pending_recv == false); 1563 1564 if (sock->placement_id != -1) { 1565 spdk_sock_map_release(&g_map, sock->placement_id); 1566 } 1567 1568 sock->group = NULL; 1569 return 0; 1570 } 1571 1572 static int 1573 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1574 { 1575 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1576 1577 /* try to reap all the active I/O */ 1578 while (group->io_inflight) { 1579 uring_sock_group_impl_poll(_group, 32, NULL); 1580 } 1581 assert(group->io_inflight == 0); 1582 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1583 1584 io_uring_queue_exit(&group->uring); 1585 1586 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1587 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1588 } 1589 1590 free(group); 1591 return 0; 1592 } 1593 1594 static int 1595 uring_sock_flush(struct spdk_sock *_sock) 1596 { 1597 struct spdk_uring_sock *sock = __uring_sock(_sock); 1598 struct msghdr msg = {}; 1599 struct iovec iovs[IOV_BATCH_SIZE]; 1600 int iovcnt; 1601 ssize_t rc; 1602 int flags = sock->zcopy_send_flags; 1603 int retval; 1604 bool is_zcopy = false; 1605 1606 /* Can't flush from within a callback or we end up with recursive calls */ 1607 if (_sock->cb_cnt > 0) { 1608 errno = EAGAIN; 1609 return -1; 1610 } 1611 1612 /* Can't flush while a write is already outstanding */ 1613 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1614 errno = EAGAIN; 1615 return -1; 1616 } 1617 1618 /* Gather an iov */ 1619 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1620 if (iovcnt == 0) { 1621 /* Nothing to send */ 1622 return 0; 1623 } 1624 1625 /* Perform the vectored write */ 1626 msg.msg_iov = iovs; 1627 msg.msg_iovlen = iovcnt; 1628 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 1629 if (rc <= 0) { 1630 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) { 1631 errno = EAGAIN; 1632 } 1633 return -1; 1634 } 1635 1636 #ifdef SPDK_ZEROCOPY 1637 is_zcopy = flags & MSG_ZEROCOPY; 1638 #endif 1639 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 1640 if (retval < 0) { 1641 /* if the socket is closed, return to avoid heap-use-after-free error */ 1642 errno = ENOTCONN; 1643 return -1; 1644 } 1645 1646 #ifdef SPDK_ZEROCOPY 1647 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 1648 _sock_check_zcopy(_sock, 0); 1649 } 1650 #endif 1651 1652 return rc; 1653 } 1654 1655 static struct spdk_net_impl g_uring_net_impl = { 1656 .name = "uring", 1657 .getaddr = uring_sock_getaddr, 1658 .connect = uring_sock_connect, 1659 .listen = uring_sock_listen, 1660 .accept = uring_sock_accept, 1661 .close = uring_sock_close, 1662 .recv = uring_sock_recv, 1663 .readv = uring_sock_readv, 1664 .readv_async = uring_sock_readv_async, 1665 .writev = uring_sock_writev, 1666 .writev_async = uring_sock_writev_async, 1667 .flush = uring_sock_flush, 1668 .set_recvlowat = uring_sock_set_recvlowat, 1669 .set_recvbuf = uring_sock_set_recvbuf, 1670 .set_sendbuf = uring_sock_set_sendbuf, 1671 .is_ipv6 = uring_sock_is_ipv6, 1672 .is_ipv4 = uring_sock_is_ipv4, 1673 .is_connected = uring_sock_is_connected, 1674 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1675 .group_impl_create = uring_sock_group_impl_create, 1676 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1677 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1678 .group_impl_poll = uring_sock_group_impl_poll, 1679 .group_impl_close = uring_sock_group_impl_close, 1680 .get_opts = uring_sock_impl_get_opts, 1681 .set_opts = uring_sock_impl_set_opts, 1682 }; 1683 1684 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 2); 1685