1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 21 #include "spdk_internal/sock.h" 22 #include "spdk_internal/assert.h" 23 #include "../sock_kernel.h" 24 25 #define MAX_TMPBUF 1024 26 #define PORTNUMLEN 32 27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 29 30 enum spdk_sock_task_type { 31 SPDK_SOCK_TASK_POLLIN = 0, 32 SPDK_SOCK_TASK_ERRQUEUE, 33 SPDK_SOCK_TASK_WRITE, 34 SPDK_SOCK_TASK_CANCEL, 35 }; 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 enum spdk_uring_sock_task_status { 42 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 43 SPDK_URING_SOCK_TASK_IN_PROCESS, 44 }; 45 46 struct spdk_uring_task { 47 enum spdk_uring_sock_task_status status; 48 enum spdk_sock_task_type type; 49 struct spdk_uring_sock *sock; 50 struct msghdr msg; 51 struct iovec iovs[IOV_BATCH_SIZE]; 52 int iov_cnt; 53 struct spdk_sock_request *last_req; 54 bool is_zcopy; 55 STAILQ_ENTRY(spdk_uring_task) link; 56 }; 57 58 struct spdk_uring_sock { 59 struct spdk_sock base; 60 int fd; 61 uint32_t sendmsg_idx; 62 struct spdk_uring_sock_group_impl *group; 63 struct spdk_uring_task write_task; 64 struct spdk_uring_task errqueue_task; 65 struct spdk_uring_task pollin_task; 66 struct spdk_uring_task cancel_task; 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool zcopy; 71 bool pending_recv; 72 int zcopy_send_flags; 73 int connection_status; 74 int placement_id; 75 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 76 TAILQ_ENTRY(spdk_uring_sock) link; 77 }; 78 79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 80 81 struct spdk_uring_sock_group_impl { 82 struct spdk_sock_group_impl base; 83 struct io_uring uring; 84 uint32_t io_inflight; 85 uint32_t io_queued; 86 uint32_t io_avail; 87 struct pending_recv_list pending_recv; 88 }; 89 90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 91 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 92 .send_buf_size = MIN_SO_SNDBUF_SIZE, 93 .enable_recv_pipe = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = false, 97 .enable_zerocopy_send_client = false, 98 .zerocopy_threshold = 0, 99 .tls_version = 0, 100 .enable_ktls = false, 101 .psk_key = NULL, 102 .psk_identity = NULL 103 }; 104 105 static struct spdk_sock_map g_map = { 106 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 107 .mtx = PTHREAD_MUTEX_INITIALIZER 108 }; 109 110 __attribute((destructor)) static void 111 uring_sock_map_cleanup(void) 112 { 113 spdk_sock_map_cleanup(&g_map); 114 } 115 116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 117 118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 120 121 static void 122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 123 size_t len) 124 { 125 #define FIELD_OK(field) \ 126 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 127 128 #define SET_FIELD(field) \ 129 if (FIELD_OK(field)) { \ 130 dest->field = src->field; \ 131 } 132 133 SET_FIELD(recv_buf_size); 134 SET_FIELD(send_buf_size); 135 SET_FIELD(enable_recv_pipe); 136 SET_FIELD(enable_quickack); 137 SET_FIELD(enable_placement_id); 138 SET_FIELD(enable_zerocopy_send_server); 139 SET_FIELD(enable_zerocopy_send_client); 140 SET_FIELD(zerocopy_threshold); 141 SET_FIELD(tls_version); 142 SET_FIELD(enable_ktls); 143 SET_FIELD(psk_key); 144 SET_FIELD(psk_identity); 145 146 #undef SET_FIELD 147 #undef FIELD_OK 148 } 149 150 static int 151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 152 { 153 if (!opts || !len) { 154 errno = EINVAL; 155 return -1; 156 } 157 158 assert(sizeof(*opts) >= *len); 159 memset(opts, 0, *len); 160 161 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 162 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 163 164 return 0; 165 } 166 167 static int 168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 169 { 170 if (!opts) { 171 errno = EINVAL; 172 return -1; 173 } 174 175 assert(sizeof(*opts) >= len); 176 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 177 178 return 0; 179 } 180 181 static void 182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 183 { 184 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 185 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 186 187 if (opts->impl_opts != NULL) { 188 assert(sizeof(*dest) >= opts->impl_opts_size); 189 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 190 } 191 } 192 193 static int 194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 195 char *caddr, int clen, uint16_t *cport) 196 { 197 struct spdk_uring_sock *sock = __uring_sock(_sock); 198 struct sockaddr_storage sa; 199 socklen_t salen; 200 int rc; 201 202 assert(sock != NULL); 203 204 memset(&sa, 0, sizeof sa); 205 salen = sizeof sa; 206 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 207 if (rc != 0) { 208 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 209 return -1; 210 } 211 212 switch (sa.ss_family) { 213 case AF_UNIX: 214 /* Acceptable connection types that don't have IPs */ 215 return 0; 216 case AF_INET: 217 case AF_INET6: 218 /* Code below will get IP addresses */ 219 break; 220 default: 221 /* Unsupported socket family */ 222 return -1; 223 } 224 225 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 226 if (rc != 0) { 227 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 228 return -1; 229 } 230 231 if (sport) { 232 if (sa.ss_family == AF_INET) { 233 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 234 } else if (sa.ss_family == AF_INET6) { 235 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 236 } 237 } 238 239 memset(&sa, 0, sizeof sa); 240 salen = sizeof sa; 241 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 242 if (rc != 0) { 243 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 244 return -1; 245 } 246 247 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 248 if (rc != 0) { 249 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 250 return -1; 251 } 252 253 if (cport) { 254 if (sa.ss_family == AF_INET) { 255 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 256 } else if (sa.ss_family == AF_INET6) { 257 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 258 } 259 } 260 261 return 0; 262 } 263 264 enum uring_sock_create_type { 265 SPDK_SOCK_CREATE_LISTEN, 266 SPDK_SOCK_CREATE_CONNECT, 267 }; 268 269 static int 270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 271 { 272 uint8_t *new_buf; 273 struct spdk_pipe *new_pipe; 274 struct iovec siov[2]; 275 struct iovec diov[2]; 276 int sbytes; 277 ssize_t bytes; 278 279 if (sock->recv_buf_sz == sz) { 280 return 0; 281 } 282 283 /* If the new size is 0, just free the pipe */ 284 if (sz == 0) { 285 spdk_pipe_destroy(sock->recv_pipe); 286 free(sock->recv_buf); 287 sock->recv_pipe = NULL; 288 sock->recv_buf = NULL; 289 return 0; 290 } else if (sz < MIN_SOCK_PIPE_SIZE) { 291 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 292 return -1; 293 } 294 295 /* Round up to next 64 byte multiple */ 296 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 297 if (!new_buf) { 298 SPDK_ERRLOG("socket recv buf allocation failed\n"); 299 return -ENOMEM; 300 } 301 302 new_pipe = spdk_pipe_create(new_buf, sz + 1); 303 if (new_pipe == NULL) { 304 SPDK_ERRLOG("socket pipe allocation failed\n"); 305 free(new_buf); 306 return -ENOMEM; 307 } 308 309 if (sock->recv_pipe != NULL) { 310 /* Pull all of the data out of the old pipe */ 311 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 312 if (sbytes > sz) { 313 /* Too much data to fit into the new pipe size */ 314 spdk_pipe_destroy(new_pipe); 315 free(new_buf); 316 return -EINVAL; 317 } 318 319 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 320 assert(sbytes == sz); 321 322 bytes = spdk_iovcpy(siov, 2, diov, 2); 323 spdk_pipe_writer_advance(new_pipe, bytes); 324 325 spdk_pipe_destroy(sock->recv_pipe); 326 free(sock->recv_buf); 327 } 328 329 sock->recv_buf_sz = sz; 330 sock->recv_buf = new_buf; 331 sock->recv_pipe = new_pipe; 332 333 return 0; 334 } 335 336 static int 337 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 338 { 339 struct spdk_uring_sock *sock = __uring_sock(_sock); 340 int min_size; 341 int rc; 342 343 assert(sock != NULL); 344 345 if (_sock->impl_opts.enable_recv_pipe) { 346 rc = uring_sock_alloc_pipe(sock, sz); 347 if (rc) { 348 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 349 return rc; 350 } 351 } 352 353 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 354 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 355 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 356 357 if (sz < min_size) { 358 sz = min_size; 359 } 360 361 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 362 if (rc < 0) { 363 return rc; 364 } 365 366 _sock->impl_opts.recv_buf_size = sz; 367 368 return 0; 369 } 370 371 static int 372 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 373 { 374 struct spdk_uring_sock *sock = __uring_sock(_sock); 375 int min_size; 376 int rc; 377 378 assert(sock != NULL); 379 380 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 381 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 382 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 383 384 if (sz < min_size) { 385 sz = min_size; 386 } 387 388 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 389 if (rc < 0) { 390 return rc; 391 } 392 393 _sock->impl_opts.send_buf_size = sz; 394 395 return 0; 396 } 397 398 static struct spdk_uring_sock * 399 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 400 { 401 struct spdk_uring_sock *sock; 402 #if defined(__linux__) 403 int flag; 404 int rc; 405 #endif 406 407 sock = calloc(1, sizeof(*sock)); 408 if (sock == NULL) { 409 SPDK_ERRLOG("sock allocation failed\n"); 410 return NULL; 411 } 412 413 sock->fd = fd; 414 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 415 416 #if defined(__linux__) 417 flag = 1; 418 419 if (sock->base.impl_opts.enable_quickack) { 420 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 421 if (rc != 0) { 422 SPDK_ERRLOG("quickack was failed to set\n"); 423 } 424 } 425 426 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 427 &sock->placement_id); 428 #ifdef SPDK_ZEROCOPY 429 /* Try to turn on zero copy sends */ 430 flag = 1; 431 432 if (enable_zero_copy) { 433 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 434 if (rc == 0) { 435 sock->zcopy = true; 436 sock->zcopy_send_flags = MSG_ZEROCOPY; 437 } 438 } 439 #endif 440 #endif 441 442 return sock; 443 } 444 445 static struct spdk_sock * 446 uring_sock_create(const char *ip, int port, 447 enum uring_sock_create_type type, 448 struct spdk_sock_opts *opts) 449 { 450 struct spdk_uring_sock *sock; 451 struct spdk_sock_impl_opts impl_opts; 452 char buf[MAX_TMPBUF]; 453 char portnum[PORTNUMLEN]; 454 char *p; 455 struct addrinfo hints, *res, *res0; 456 int fd, flag; 457 int val = 1; 458 int rc; 459 bool enable_zcopy_impl_opts = false; 460 bool enable_zcopy_user_opts = true; 461 462 assert(opts != NULL); 463 uring_opts_get_impl_opts(opts, &impl_opts); 464 465 if (ip == NULL) { 466 return NULL; 467 } 468 if (ip[0] == '[') { 469 snprintf(buf, sizeof(buf), "%s", ip + 1); 470 p = strchr(buf, ']'); 471 if (p != NULL) { 472 *p = '\0'; 473 } 474 ip = (const char *) &buf[0]; 475 } 476 477 snprintf(portnum, sizeof portnum, "%d", port); 478 memset(&hints, 0, sizeof hints); 479 hints.ai_family = PF_UNSPEC; 480 hints.ai_socktype = SOCK_STREAM; 481 hints.ai_flags = AI_NUMERICSERV; 482 hints.ai_flags |= AI_PASSIVE; 483 hints.ai_flags |= AI_NUMERICHOST; 484 rc = getaddrinfo(ip, portnum, &hints, &res0); 485 if (rc != 0) { 486 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 487 return NULL; 488 } 489 490 /* try listen */ 491 fd = -1; 492 for (res = res0; res != NULL; res = res->ai_next) { 493 retry: 494 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 495 if (fd < 0) { 496 /* error */ 497 continue; 498 } 499 500 val = impl_opts.recv_buf_size; 501 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 502 if (rc) { 503 /* Not fatal */ 504 } 505 506 val = impl_opts.send_buf_size; 507 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 508 if (rc) { 509 /* Not fatal */ 510 } 511 512 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 513 if (rc != 0) { 514 close(fd); 515 fd = -1; 516 /* error */ 517 continue; 518 } 519 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 520 if (rc != 0) { 521 close(fd); 522 fd = -1; 523 /* error */ 524 continue; 525 } 526 527 if (opts->ack_timeout) { 528 #if defined(__linux__) 529 val = opts->ack_timeout; 530 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 531 if (rc != 0) { 532 close(fd); 533 fd = -1; 534 /* error */ 535 continue; 536 } 537 #else 538 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 539 #endif 540 } 541 542 543 544 #if defined(SO_PRIORITY) 545 if (opts != NULL && opts->priority) { 546 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 547 if (rc != 0) { 548 close(fd); 549 fd = -1; 550 /* error */ 551 continue; 552 } 553 } 554 #endif 555 if (res->ai_family == AF_INET6) { 556 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 557 if (rc != 0) { 558 close(fd); 559 fd = -1; 560 /* error */ 561 continue; 562 } 563 } 564 565 if (type == SPDK_SOCK_CREATE_LISTEN) { 566 rc = bind(fd, res->ai_addr, res->ai_addrlen); 567 if (rc != 0) { 568 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 569 switch (errno) { 570 case EINTR: 571 /* interrupted? */ 572 close(fd); 573 goto retry; 574 case EADDRNOTAVAIL: 575 SPDK_ERRLOG("IP address %s not available. " 576 "Verify IP address in config file " 577 "and make sure setup script is " 578 "run before starting spdk app.\n", ip); 579 /* FALLTHROUGH */ 580 default: 581 /* try next family */ 582 close(fd); 583 fd = -1; 584 continue; 585 } 586 } 587 /* bind OK */ 588 rc = listen(fd, 512); 589 if (rc != 0) { 590 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 591 close(fd); 592 fd = -1; 593 break; 594 } 595 596 flag = fcntl(fd, F_GETFL); 597 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 598 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 599 close(fd); 600 fd = -1; 601 break; 602 } 603 604 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 605 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 606 rc = connect(fd, res->ai_addr, res->ai_addrlen); 607 if (rc != 0) { 608 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 609 /* try next family */ 610 close(fd); 611 fd = -1; 612 continue; 613 } 614 615 flag = fcntl(fd, F_GETFL); 616 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 617 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 618 close(fd); 619 fd = -1; 620 break; 621 } 622 623 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 624 } 625 break; 626 } 627 freeaddrinfo(res0); 628 629 if (fd < 0) { 630 return NULL; 631 } 632 633 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 634 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 635 if (sock == NULL) { 636 SPDK_ERRLOG("sock allocation failed\n"); 637 close(fd); 638 return NULL; 639 } 640 641 return &sock->base; 642 } 643 644 static struct spdk_sock * 645 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 646 { 647 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 648 } 649 650 static struct spdk_sock * 651 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 652 { 653 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 654 } 655 656 static struct spdk_sock * 657 uring_sock_accept(struct spdk_sock *_sock) 658 { 659 struct spdk_uring_sock *sock = __uring_sock(_sock); 660 struct sockaddr_storage sa; 661 socklen_t salen; 662 int rc, fd; 663 struct spdk_uring_sock *new_sock; 664 int flag; 665 666 memset(&sa, 0, sizeof(sa)); 667 salen = sizeof(sa); 668 669 assert(sock != NULL); 670 671 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 672 673 if (rc == -1) { 674 return NULL; 675 } 676 677 fd = rc; 678 679 flag = fcntl(fd, F_GETFL); 680 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 681 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 682 close(fd); 683 return NULL; 684 } 685 686 #if defined(SO_PRIORITY) 687 /* The priority is not inherited, so call this function again */ 688 if (sock->base.opts.priority) { 689 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 690 if (rc != 0) { 691 close(fd); 692 return NULL; 693 } 694 } 695 #endif 696 697 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 698 if (new_sock == NULL) { 699 close(fd); 700 return NULL; 701 } 702 703 return &new_sock->base; 704 } 705 706 static int 707 uring_sock_close(struct spdk_sock *_sock) 708 { 709 struct spdk_uring_sock *sock = __uring_sock(_sock); 710 711 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 712 assert(sock->group == NULL); 713 714 /* If the socket fails to close, the best choice is to 715 * leak the fd but continue to free the rest of the sock 716 * memory. */ 717 close(sock->fd); 718 719 spdk_pipe_destroy(sock->recv_pipe); 720 free(sock->recv_buf); 721 free(sock); 722 723 return 0; 724 } 725 726 static ssize_t 727 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 728 { 729 struct iovec siov[2]; 730 int sbytes; 731 ssize_t bytes; 732 struct spdk_uring_sock_group_impl *group; 733 734 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 735 if (sbytes < 0) { 736 errno = EINVAL; 737 return -1; 738 } else if (sbytes == 0) { 739 errno = EAGAIN; 740 return -1; 741 } 742 743 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 744 745 if (bytes == 0) { 746 /* The only way this happens is if diov is 0 length */ 747 errno = EINVAL; 748 return -1; 749 } 750 751 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 752 753 /* If we drained the pipe, take it off the level-triggered list */ 754 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 755 group = __uring_group_impl(sock->base.group_impl); 756 TAILQ_REMOVE(&group->pending_recv, sock, link); 757 sock->pending_recv = false; 758 } 759 760 return bytes; 761 } 762 763 static inline ssize_t 764 sock_readv(int fd, struct iovec *iov, int iovcnt) 765 { 766 struct msghdr msg = { 767 .msg_iov = iov, 768 .msg_iovlen = iovcnt, 769 }; 770 771 return recvmsg(fd, &msg, MSG_DONTWAIT); 772 } 773 774 static inline ssize_t 775 uring_sock_read(struct spdk_uring_sock *sock) 776 { 777 struct iovec iov[2]; 778 int bytes; 779 struct spdk_uring_sock_group_impl *group; 780 781 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 782 783 if (bytes > 0) { 784 bytes = sock_readv(sock->fd, iov, 2); 785 if (bytes > 0) { 786 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 787 if (sock->base.group_impl) { 788 group = __uring_group_impl(sock->base.group_impl); 789 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 790 sock->pending_recv = true; 791 } 792 } 793 } 794 795 return bytes; 796 } 797 798 static ssize_t 799 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 800 { 801 struct spdk_uring_sock *sock = __uring_sock(_sock); 802 int rc, i; 803 size_t len; 804 805 if (sock->recv_pipe == NULL) { 806 return sock_readv(sock->fd, iov, iovcnt); 807 } 808 809 len = 0; 810 for (i = 0; i < iovcnt; i++) { 811 len += iov[i].iov_len; 812 } 813 814 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 815 /* If the user is receiving a sufficiently large amount of data, 816 * receive directly to their buffers. */ 817 if (len >= MIN_SOCK_PIPE_SIZE) { 818 return sock_readv(sock->fd, iov, iovcnt); 819 } 820 821 /* Otherwise, do a big read into our pipe */ 822 rc = uring_sock_read(sock); 823 if (rc <= 0) { 824 return rc; 825 } 826 } 827 828 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 829 } 830 831 static ssize_t 832 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 833 { 834 struct iovec iov[1]; 835 836 iov[0].iov_base = buf; 837 iov[0].iov_len = len; 838 839 return uring_sock_readv(sock, iov, 1); 840 } 841 842 static ssize_t 843 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 844 { 845 struct spdk_uring_sock *sock = __uring_sock(_sock); 846 struct msghdr msg = { 847 .msg_iov = iov, 848 .msg_iovlen = iovcnt, 849 }; 850 851 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 852 errno = EAGAIN; 853 return -1; 854 } 855 856 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 857 } 858 859 static ssize_t 860 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 861 { 862 unsigned int offset; 863 size_t len; 864 int i; 865 866 offset = req->internal.offset; 867 for (i = 0; i < req->iovcnt; i++) { 868 /* Advance by the offset first */ 869 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 870 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 871 continue; 872 } 873 874 /* Calculate the remaining length of this element */ 875 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 876 877 if (len > (size_t)rc) { 878 req->internal.offset += rc; 879 return -1; 880 } 881 882 offset = 0; 883 req->internal.offset += len; 884 rc -= len; 885 } 886 887 return rc; 888 } 889 890 static int 891 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 892 { 893 struct spdk_uring_sock *sock = __uring_sock(_sock); 894 struct spdk_sock_request *req; 895 int retval; 896 897 if (is_zcopy) { 898 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 899 * req->internal.offset, so sendmsg_idx should not be zero */ 900 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 901 sock->sendmsg_idx = 1; 902 } else { 903 sock->sendmsg_idx++; 904 } 905 } 906 907 /* Consume the requests that were actually written */ 908 req = TAILQ_FIRST(&_sock->queued_reqs); 909 while (req) { 910 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 911 req->internal.is_zcopy = is_zcopy; 912 913 rc = sock_request_advance_offset(req, rc); 914 if (rc < 0) { 915 /* This element was partially sent. */ 916 return 0; 917 } 918 919 /* Handled a full request. */ 920 spdk_sock_request_pend(_sock, req); 921 922 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 923 retval = spdk_sock_request_put(_sock, req, 0); 924 if (retval) { 925 return retval; 926 } 927 } else { 928 /* Re-use the offset field to hold the sendmsg call index. The 929 * index is 0 based, so subtract one here because we've already 930 * incremented above. */ 931 req->internal.offset = sock->sendmsg_idx - 1; 932 } 933 934 if (rc == 0) { 935 break; 936 } 937 938 req = TAILQ_FIRST(&_sock->queued_reqs); 939 } 940 941 return 0; 942 } 943 944 #ifdef SPDK_ZEROCOPY 945 static int 946 _sock_check_zcopy(struct spdk_sock *_sock, int status) 947 { 948 struct spdk_uring_sock *sock = __uring_sock(_sock); 949 ssize_t rc; 950 struct sock_extended_err *serr; 951 struct cmsghdr *cm; 952 uint32_t idx; 953 struct spdk_sock_request *req, *treq; 954 bool found; 955 956 assert(sock->zcopy == true); 957 if (spdk_unlikely(status) < 0) { 958 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 959 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 960 status); 961 } else { 962 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 963 } 964 return 0; 965 } 966 967 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 968 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 969 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 970 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 971 return 0; 972 } 973 974 serr = (struct sock_extended_err *)CMSG_DATA(cm); 975 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 976 SPDK_WARNLOG("Unexpected extended error origin\n"); 977 return 0; 978 } 979 980 /* Most of the time, the pending_reqs array is in the exact 981 * order we need such that all of the requests to complete are 982 * in order, in the front. It is guaranteed that all requests 983 * belonging to the same sendmsg call are sequential, so once 984 * we encounter one match we can stop looping as soon as a 985 * non-match is found. 986 */ 987 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 988 found = false; 989 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 990 if (!req->internal.is_zcopy) { 991 /* This wasn't a zcopy request. It was just waiting in line to complete */ 992 rc = spdk_sock_request_put(_sock, req, 0); 993 if (rc < 0) { 994 return rc; 995 } 996 } else if (req->internal.offset == idx) { 997 found = true; 998 rc = spdk_sock_request_put(_sock, req, 0); 999 if (rc < 0) { 1000 return rc; 1001 } 1002 } else if (found) { 1003 break; 1004 } 1005 } 1006 } 1007 1008 return 0; 1009 } 1010 1011 static void 1012 _sock_prep_errqueue(struct spdk_sock *_sock) 1013 { 1014 struct spdk_uring_sock *sock = __uring_sock(_sock); 1015 struct spdk_uring_task *task = &sock->errqueue_task; 1016 struct io_uring_sqe *sqe; 1017 1018 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1019 return; 1020 } 1021 1022 assert(sock->group != NULL); 1023 sock->group->io_queued++; 1024 1025 sqe = io_uring_get_sqe(&sock->group->uring); 1026 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1027 io_uring_sqe_set_data(sqe, task); 1028 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1029 } 1030 1031 #endif 1032 1033 static void 1034 _sock_flush(struct spdk_sock *_sock) 1035 { 1036 struct spdk_uring_sock *sock = __uring_sock(_sock); 1037 struct spdk_uring_task *task = &sock->write_task; 1038 uint32_t iovcnt; 1039 struct io_uring_sqe *sqe; 1040 int flags; 1041 1042 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1043 return; 1044 } 1045 1046 #ifdef SPDK_ZEROCOPY 1047 if (sock->zcopy) { 1048 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1049 } else 1050 #endif 1051 { 1052 flags = MSG_DONTWAIT; 1053 } 1054 1055 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1056 if (!iovcnt) { 1057 return; 1058 } 1059 1060 task->iov_cnt = iovcnt; 1061 assert(sock->group != NULL); 1062 task->msg.msg_iov = task->iovs; 1063 task->msg.msg_iovlen = task->iov_cnt; 1064 #ifdef SPDK_ZEROCOPY 1065 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1066 #endif 1067 sock->group->io_queued++; 1068 1069 sqe = io_uring_get_sqe(&sock->group->uring); 1070 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1071 io_uring_sqe_set_data(sqe, task); 1072 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1073 } 1074 1075 static void 1076 _sock_prep_pollin(struct spdk_sock *_sock) 1077 { 1078 struct spdk_uring_sock *sock = __uring_sock(_sock); 1079 struct spdk_uring_task *task = &sock->pollin_task; 1080 struct io_uring_sqe *sqe; 1081 1082 /* Do not prepare pollin event */ 1083 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) { 1084 return; 1085 } 1086 1087 assert(sock->group != NULL); 1088 sock->group->io_queued++; 1089 1090 sqe = io_uring_get_sqe(&sock->group->uring); 1091 io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR); 1092 io_uring_sqe_set_data(sqe, task); 1093 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1094 } 1095 1096 static void 1097 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1098 { 1099 struct spdk_uring_sock *sock = __uring_sock(_sock); 1100 struct spdk_uring_task *task = &sock->cancel_task; 1101 struct io_uring_sqe *sqe; 1102 1103 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1104 return; 1105 } 1106 1107 assert(sock->group != NULL); 1108 sock->group->io_queued++; 1109 1110 sqe = io_uring_get_sqe(&sock->group->uring); 1111 io_uring_prep_cancel(sqe, user_data, 0); 1112 io_uring_sqe_set_data(sqe, task); 1113 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1114 } 1115 1116 static int 1117 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1118 struct spdk_sock **socks) 1119 { 1120 int i, count, ret; 1121 struct io_uring_cqe *cqe; 1122 struct spdk_uring_sock *sock, *tmp; 1123 struct spdk_uring_task *task; 1124 int status; 1125 1126 for (i = 0; i < max; i++) { 1127 ret = io_uring_peek_cqe(&group->uring, &cqe); 1128 if (ret != 0) { 1129 break; 1130 } 1131 1132 if (cqe == NULL) { 1133 break; 1134 } 1135 1136 task = (struct spdk_uring_task *)cqe->user_data; 1137 assert(task != NULL); 1138 sock = task->sock; 1139 assert(sock != NULL); 1140 assert(sock->group != NULL); 1141 assert(sock->group == group); 1142 sock->group->io_inflight--; 1143 sock->group->io_avail++; 1144 status = cqe->res; 1145 io_uring_cqe_seen(&group->uring, cqe); 1146 1147 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1148 1149 if (spdk_unlikely(status <= 0)) { 1150 if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) { 1151 continue; 1152 } 1153 } 1154 1155 switch (task->type) { 1156 case SPDK_SOCK_TASK_POLLIN: 1157 #ifdef SPDK_ZEROCOPY 1158 if ((status & POLLERR) == POLLERR) { 1159 _sock_prep_errqueue(&sock->base); 1160 } 1161 #endif 1162 if ((status & POLLIN) == POLLIN) { 1163 if (sock->base.cb_fn != NULL && 1164 sock->pending_recv == false) { 1165 sock->pending_recv = true; 1166 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1167 } 1168 } 1169 break; 1170 case SPDK_SOCK_TASK_WRITE: 1171 task->last_req = NULL; 1172 task->iov_cnt = 0; 1173 task->is_zcopy = false; 1174 if (spdk_unlikely(status) < 0) { 1175 sock->connection_status = status; 1176 spdk_sock_abort_requests(&sock->base); 1177 } else { 1178 sock_complete_write_reqs(&sock->base, status, task->is_zcopy); 1179 } 1180 1181 break; 1182 #ifdef SPDK_ZEROCOPY 1183 case SPDK_SOCK_TASK_ERRQUEUE: 1184 if (spdk_unlikely(status == -ECANCELED)) { 1185 sock->connection_status = status; 1186 break; 1187 } 1188 _sock_check_zcopy(&sock->base, status); 1189 break; 1190 #endif 1191 case SPDK_SOCK_TASK_CANCEL: 1192 /* Do nothing */ 1193 break; 1194 default: 1195 SPDK_UNREACHABLE(); 1196 } 1197 } 1198 1199 if (!socks) { 1200 return 0; 1201 } 1202 count = 0; 1203 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1204 if (count == max_read_events) { 1205 break; 1206 } 1207 1208 if (spdk_unlikely(sock->base.cb_fn == NULL) || 1209 (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) { 1210 sock->pending_recv = false; 1211 TAILQ_REMOVE(&group->pending_recv, sock, link); 1212 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1213 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1214 continue; 1215 } 1216 } 1217 1218 socks[count++] = &sock->base; 1219 } 1220 1221 1222 /* Cycle the pending_recv list so that each time we poll things aren't 1223 * in the same order. Say we have 6 sockets in the list, named as follows: 1224 * A B C D E F 1225 * And all 6 sockets had the poll events, but max_events is only 3. That means 1226 * psock currently points at D. We want to rearrange the list to the following: 1227 * D E F A B C 1228 * 1229 * The variables below are named according to this example to make it easier to 1230 * follow the swaps. 1231 */ 1232 if (sock != NULL) { 1233 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1234 1235 /* Capture pointers to the elements we need */ 1236 ud = sock; 1237 1238 ua = TAILQ_FIRST(&group->pending_recv); 1239 if (ua == ud) { 1240 goto end; 1241 } 1242 1243 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1244 if (uf == ud) { 1245 TAILQ_REMOVE(&group->pending_recv, ud, link); 1246 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1247 goto end; 1248 } 1249 1250 uc = TAILQ_PREV(ud, pending_recv_list, link); 1251 assert(uc != NULL); 1252 1253 /* Break the link between C and D */ 1254 uc->link.tqe_next = NULL; 1255 1256 /* Connect F to A */ 1257 uf->link.tqe_next = ua; 1258 ua->link.tqe_prev = &uf->link.tqe_next; 1259 1260 /* Fix up the list first/last pointers */ 1261 group->pending_recv.tqh_first = ud; 1262 group->pending_recv.tqh_last = &uc->link.tqe_next; 1263 1264 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1265 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1266 } 1267 1268 end: 1269 return count; 1270 } 1271 1272 static int 1273 _sock_flush_client(struct spdk_sock *_sock) 1274 { 1275 struct spdk_uring_sock *sock = __uring_sock(_sock); 1276 struct msghdr msg = {}; 1277 struct iovec iovs[IOV_BATCH_SIZE]; 1278 int iovcnt; 1279 ssize_t rc; 1280 int flags = sock->zcopy_send_flags; 1281 int retval; 1282 bool is_zcopy = false; 1283 1284 /* Can't flush from within a callback or we end up with recursive calls */ 1285 if (_sock->cb_cnt > 0) { 1286 return 0; 1287 } 1288 1289 /* Gather an iov */ 1290 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1291 if (iovcnt == 0) { 1292 return 0; 1293 } 1294 1295 /* Perform the vectored write */ 1296 msg.msg_iov = iovs; 1297 msg.msg_iovlen = iovcnt; 1298 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 1299 if (rc <= 0) { 1300 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1301 return 0; 1302 } 1303 return rc; 1304 } 1305 1306 #ifdef SPDK_ZEROCOPY 1307 is_zcopy = flags & MSG_ZEROCOPY; 1308 #endif 1309 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 1310 if (retval < 0) { 1311 /* if the socket is closed, return to avoid heap-use-after-free error */ 1312 return retval; 1313 } 1314 1315 #ifdef SPDK_ZEROCOPY 1316 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 1317 _sock_check_zcopy(_sock, 0); 1318 } 1319 #endif 1320 1321 return 0; 1322 } 1323 1324 static void 1325 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1326 { 1327 struct spdk_uring_sock *sock = __uring_sock(_sock); 1328 int rc; 1329 1330 if (spdk_unlikely(sock->connection_status)) { 1331 req->cb_fn(req->cb_arg, sock->connection_status); 1332 return; 1333 } 1334 1335 spdk_sock_request_queue(_sock, req); 1336 1337 if (!sock->group) { 1338 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1339 rc = _sock_flush_client(_sock); 1340 if (rc) { 1341 spdk_sock_abort_requests(_sock); 1342 } 1343 } 1344 } 1345 } 1346 1347 static void 1348 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1349 { 1350 req->cb_fn(req->cb_arg, -ENOTSUP); 1351 } 1352 1353 static int 1354 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1355 { 1356 struct spdk_uring_sock *sock = __uring_sock(_sock); 1357 int val; 1358 int rc; 1359 1360 assert(sock != NULL); 1361 1362 val = nbytes; 1363 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1364 if (rc != 0) { 1365 return -1; 1366 } 1367 return 0; 1368 } 1369 1370 static bool 1371 uring_sock_is_ipv6(struct spdk_sock *_sock) 1372 { 1373 struct spdk_uring_sock *sock = __uring_sock(_sock); 1374 struct sockaddr_storage sa; 1375 socklen_t salen; 1376 int rc; 1377 1378 assert(sock != NULL); 1379 1380 memset(&sa, 0, sizeof sa); 1381 salen = sizeof sa; 1382 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1383 if (rc != 0) { 1384 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1385 return false; 1386 } 1387 1388 return (sa.ss_family == AF_INET6); 1389 } 1390 1391 static bool 1392 uring_sock_is_ipv4(struct spdk_sock *_sock) 1393 { 1394 struct spdk_uring_sock *sock = __uring_sock(_sock); 1395 struct sockaddr_storage sa; 1396 socklen_t salen; 1397 int rc; 1398 1399 assert(sock != NULL); 1400 1401 memset(&sa, 0, sizeof sa); 1402 salen = sizeof sa; 1403 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1404 if (rc != 0) { 1405 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1406 return false; 1407 } 1408 1409 return (sa.ss_family == AF_INET); 1410 } 1411 1412 static bool 1413 uring_sock_is_connected(struct spdk_sock *_sock) 1414 { 1415 struct spdk_uring_sock *sock = __uring_sock(_sock); 1416 uint8_t byte; 1417 int rc; 1418 1419 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1420 if (rc == 0) { 1421 return false; 1422 } 1423 1424 if (rc < 0) { 1425 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1426 return true; 1427 } 1428 1429 return false; 1430 } 1431 1432 return true; 1433 } 1434 1435 static struct spdk_sock_group_impl * 1436 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1437 { 1438 struct spdk_uring_sock *sock = __uring_sock(_sock); 1439 struct spdk_sock_group_impl *group; 1440 1441 if (sock->placement_id != -1) { 1442 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1443 return group; 1444 } 1445 1446 return NULL; 1447 } 1448 1449 static struct spdk_sock_group_impl * 1450 uring_sock_group_impl_create(void) 1451 { 1452 struct spdk_uring_sock_group_impl *group_impl; 1453 1454 group_impl = calloc(1, sizeof(*group_impl)); 1455 if (group_impl == NULL) { 1456 SPDK_ERRLOG("group_impl allocation failed\n"); 1457 return NULL; 1458 } 1459 1460 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1461 1462 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1463 SPDK_ERRLOG("uring I/O context setup failure\n"); 1464 free(group_impl); 1465 return NULL; 1466 } 1467 1468 TAILQ_INIT(&group_impl->pending_recv); 1469 1470 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1471 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1472 } 1473 1474 return &group_impl->base; 1475 } 1476 1477 static int 1478 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1479 struct spdk_sock *_sock) 1480 { 1481 struct spdk_uring_sock *sock = __uring_sock(_sock); 1482 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1483 int rc; 1484 1485 sock->group = group; 1486 sock->write_task.sock = sock; 1487 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1488 1489 sock->pollin_task.sock = sock; 1490 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1491 1492 sock->errqueue_task.sock = sock; 1493 sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE; 1494 sock->errqueue_task.msg.msg_control = sock->buf; 1495 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1496 1497 sock->cancel_task.sock = sock; 1498 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1499 1500 /* switched from another polling group due to scheduling */ 1501 if (spdk_unlikely(sock->recv_pipe != NULL && 1502 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1503 assert(sock->pending_recv == false); 1504 sock->pending_recv = true; 1505 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1506 } 1507 1508 if (sock->placement_id != -1) { 1509 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1510 if (rc != 0) { 1511 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1512 /* Do not treat this as an error. The system will continue running. */ 1513 } 1514 } 1515 1516 return 0; 1517 } 1518 1519 static int 1520 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1521 struct spdk_sock **socks) 1522 { 1523 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1524 int count, ret; 1525 int to_complete, to_submit; 1526 struct spdk_sock *_sock, *tmp; 1527 struct spdk_uring_sock *sock; 1528 1529 if (spdk_likely(socks)) { 1530 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1531 sock = __uring_sock(_sock); 1532 if (spdk_unlikely(sock->connection_status)) { 1533 continue; 1534 } 1535 _sock_flush(_sock); 1536 _sock_prep_pollin(_sock); 1537 } 1538 } 1539 1540 to_submit = group->io_queued; 1541 1542 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1543 if (to_submit > 0) { 1544 /* If there are I/O to submit, use io_uring_submit here. 1545 * It will automatically call io_uring_enter appropriately. */ 1546 ret = io_uring_submit(&group->uring); 1547 if (ret < 0) { 1548 return 1; 1549 } 1550 group->io_queued = 0; 1551 group->io_inflight += to_submit; 1552 group->io_avail -= to_submit; 1553 } 1554 1555 count = 0; 1556 to_complete = group->io_inflight; 1557 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1558 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1559 } 1560 1561 return count; 1562 } 1563 1564 static int 1565 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1566 struct spdk_sock *_sock) 1567 { 1568 struct spdk_uring_sock *sock = __uring_sock(_sock); 1569 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1570 1571 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1572 _sock_prep_cancel_task(_sock, &sock->write_task); 1573 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1574 * currently can use a while loop here. */ 1575 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1576 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1577 uring_sock_group_impl_poll(_group, 32, NULL); 1578 } 1579 } 1580 1581 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1582 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1583 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1584 * currently can use a while loop here. */ 1585 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1586 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1587 uring_sock_group_impl_poll(_group, 32, NULL); 1588 } 1589 } 1590 1591 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1592 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1593 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1594 * currently can use a while loop here. */ 1595 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1596 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1597 uring_sock_group_impl_poll(_group, 32, NULL); 1598 } 1599 } 1600 1601 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1602 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1603 assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1604 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1605 1606 if (sock->pending_recv) { 1607 TAILQ_REMOVE(&group->pending_recv, sock, link); 1608 sock->pending_recv = false; 1609 } 1610 assert(sock->pending_recv == false); 1611 1612 if (sock->placement_id != -1) { 1613 spdk_sock_map_release(&g_map, sock->placement_id); 1614 } 1615 1616 sock->group = NULL; 1617 return 0; 1618 } 1619 1620 static int 1621 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1622 { 1623 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1624 1625 /* try to reap all the active I/O */ 1626 while (group->io_inflight) { 1627 uring_sock_group_impl_poll(_group, 32, NULL); 1628 } 1629 assert(group->io_inflight == 0); 1630 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1631 1632 io_uring_queue_exit(&group->uring); 1633 1634 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1635 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1636 } 1637 1638 free(group); 1639 return 0; 1640 } 1641 1642 static int 1643 uring_sock_flush(struct spdk_sock *_sock) 1644 { 1645 struct spdk_uring_sock *sock = __uring_sock(_sock); 1646 1647 if (!sock->group) { 1648 return _sock_flush_client(_sock); 1649 } 1650 1651 return 0; 1652 } 1653 1654 static struct spdk_net_impl g_uring_net_impl = { 1655 .name = "uring", 1656 .getaddr = uring_sock_getaddr, 1657 .connect = uring_sock_connect, 1658 .listen = uring_sock_listen, 1659 .accept = uring_sock_accept, 1660 .close = uring_sock_close, 1661 .recv = uring_sock_recv, 1662 .readv = uring_sock_readv, 1663 .readv_async = uring_sock_readv_async, 1664 .writev = uring_sock_writev, 1665 .writev_async = uring_sock_writev_async, 1666 .flush = uring_sock_flush, 1667 .set_recvlowat = uring_sock_set_recvlowat, 1668 .set_recvbuf = uring_sock_set_recvbuf, 1669 .set_sendbuf = uring_sock_set_sendbuf, 1670 .is_ipv6 = uring_sock_is_ipv6, 1671 .is_ipv4 = uring_sock_is_ipv4, 1672 .is_connected = uring_sock_is_connected, 1673 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1674 .group_impl_create = uring_sock_group_impl_create, 1675 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1676 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1677 .group_impl_poll = uring_sock_group_impl_poll, 1678 .group_impl_close = uring_sock_group_impl_close, 1679 .get_opts = uring_sock_impl_get_opts, 1680 .set_opts = uring_sock_impl_set_opts, 1681 }; 1682 1683 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 2); 1684