1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 21 #include "spdk_internal/sock.h" 22 #include "spdk_internal/assert.h" 23 #include "../sock_kernel.h" 24 25 #define MAX_TMPBUF 1024 26 #define PORTNUMLEN 32 27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 29 30 enum spdk_sock_task_type { 31 SPDK_SOCK_TASK_POLLIN = 0, 32 SPDK_SOCK_TASK_ERRQUEUE, 33 SPDK_SOCK_TASK_WRITE, 34 SPDK_SOCK_TASK_CANCEL, 35 }; 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 enum spdk_uring_sock_task_status { 42 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 43 SPDK_URING_SOCK_TASK_IN_PROCESS, 44 }; 45 46 struct spdk_uring_task { 47 enum spdk_uring_sock_task_status status; 48 enum spdk_sock_task_type type; 49 struct spdk_uring_sock *sock; 50 struct msghdr msg; 51 struct iovec iovs[IOV_BATCH_SIZE]; 52 int iov_cnt; 53 struct spdk_sock_request *last_req; 54 bool is_zcopy; 55 STAILQ_ENTRY(spdk_uring_task) link; 56 }; 57 58 struct spdk_uring_sock { 59 struct spdk_sock base; 60 int fd; 61 uint32_t sendmsg_idx; 62 struct spdk_uring_sock_group_impl *group; 63 struct spdk_uring_task write_task; 64 struct spdk_uring_task errqueue_task; 65 struct spdk_uring_task pollin_task; 66 struct spdk_uring_task cancel_task; 67 struct spdk_pipe *recv_pipe; 68 void *recv_buf; 69 int recv_buf_sz; 70 bool zcopy; 71 bool pending_recv; 72 int zcopy_send_flags; 73 int connection_status; 74 int placement_id; 75 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 76 TAILQ_ENTRY(spdk_uring_sock) link; 77 }; 78 79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 80 81 struct spdk_uring_sock_group_impl { 82 struct spdk_sock_group_impl base; 83 struct io_uring uring; 84 uint32_t io_inflight; 85 uint32_t io_queued; 86 uint32_t io_avail; 87 struct pending_recv_list pending_recv; 88 }; 89 90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 91 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 92 .send_buf_size = MIN_SO_SNDBUF_SIZE, 93 .enable_recv_pipe = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = false, 97 .enable_zerocopy_send_client = false, 98 .zerocopy_threshold = 0, 99 .tls_version = 0, 100 .enable_ktls = false, 101 .psk_key = NULL, 102 .psk_identity = NULL 103 }; 104 105 static struct spdk_sock_map g_map = { 106 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 107 .mtx = PTHREAD_MUTEX_INITIALIZER 108 }; 109 110 __attribute((destructor)) static void 111 uring_sock_map_cleanup(void) 112 { 113 spdk_sock_map_cleanup(&g_map); 114 } 115 116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 117 118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 120 121 static void 122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 123 size_t len) 124 { 125 #define FIELD_OK(field) \ 126 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 127 128 #define SET_FIELD(field) \ 129 if (FIELD_OK(field)) { \ 130 dest->field = src->field; \ 131 } 132 133 SET_FIELD(recv_buf_size); 134 SET_FIELD(send_buf_size); 135 SET_FIELD(enable_recv_pipe); 136 SET_FIELD(enable_quickack); 137 SET_FIELD(enable_placement_id); 138 SET_FIELD(enable_zerocopy_send_server); 139 SET_FIELD(enable_zerocopy_send_client); 140 SET_FIELD(zerocopy_threshold); 141 SET_FIELD(tls_version); 142 SET_FIELD(enable_ktls); 143 SET_FIELD(psk_key); 144 SET_FIELD(psk_identity); 145 146 #undef SET_FIELD 147 #undef FIELD_OK 148 } 149 150 static int 151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 152 { 153 if (!opts || !len) { 154 errno = EINVAL; 155 return -1; 156 } 157 158 assert(sizeof(*opts) >= *len); 159 memset(opts, 0, *len); 160 161 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 162 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 163 164 return 0; 165 } 166 167 static int 168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 169 { 170 if (!opts) { 171 errno = EINVAL; 172 return -1; 173 } 174 175 assert(sizeof(*opts) >= len); 176 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 177 178 return 0; 179 } 180 181 static void 182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 183 { 184 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 185 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 186 187 if (opts->impl_opts != NULL) { 188 assert(sizeof(*dest) >= opts->impl_opts_size); 189 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 190 } 191 } 192 193 static int 194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 195 char *caddr, int clen, uint16_t *cport) 196 { 197 struct spdk_uring_sock *sock = __uring_sock(_sock); 198 struct sockaddr_storage sa; 199 socklen_t salen; 200 int rc; 201 202 assert(sock != NULL); 203 204 memset(&sa, 0, sizeof sa); 205 salen = sizeof sa; 206 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 207 if (rc != 0) { 208 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 209 return -1; 210 } 211 212 switch (sa.ss_family) { 213 case AF_UNIX: 214 /* Acceptable connection types that don't have IPs */ 215 return 0; 216 case AF_INET: 217 case AF_INET6: 218 /* Code below will get IP addresses */ 219 break; 220 default: 221 /* Unsupported socket family */ 222 return -1; 223 } 224 225 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 226 if (rc != 0) { 227 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 228 return -1; 229 } 230 231 if (sport) { 232 if (sa.ss_family == AF_INET) { 233 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 234 } else if (sa.ss_family == AF_INET6) { 235 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 236 } 237 } 238 239 memset(&sa, 0, sizeof sa); 240 salen = sizeof sa; 241 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 242 if (rc != 0) { 243 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 244 return -1; 245 } 246 247 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 248 if (rc != 0) { 249 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 250 return -1; 251 } 252 253 if (cport) { 254 if (sa.ss_family == AF_INET) { 255 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 256 } else if (sa.ss_family == AF_INET6) { 257 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 258 } 259 } 260 261 return 0; 262 } 263 264 enum uring_sock_create_type { 265 SPDK_SOCK_CREATE_LISTEN, 266 SPDK_SOCK_CREATE_CONNECT, 267 }; 268 269 static int 270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 271 { 272 uint8_t *new_buf; 273 struct spdk_pipe *new_pipe; 274 struct iovec siov[2]; 275 struct iovec diov[2]; 276 int sbytes; 277 ssize_t bytes; 278 279 if (sock->recv_buf_sz == sz) { 280 return 0; 281 } 282 283 /* If the new size is 0, just free the pipe */ 284 if (sz == 0) { 285 spdk_pipe_destroy(sock->recv_pipe); 286 free(sock->recv_buf); 287 sock->recv_pipe = NULL; 288 sock->recv_buf = NULL; 289 return 0; 290 } else if (sz < MIN_SOCK_PIPE_SIZE) { 291 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 292 return -1; 293 } 294 295 /* Round up to next 64 byte multiple */ 296 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 297 if (!new_buf) { 298 SPDK_ERRLOG("socket recv buf allocation failed\n"); 299 return -ENOMEM; 300 } 301 302 new_pipe = spdk_pipe_create(new_buf, sz + 1); 303 if (new_pipe == NULL) { 304 SPDK_ERRLOG("socket pipe allocation failed\n"); 305 free(new_buf); 306 return -ENOMEM; 307 } 308 309 if (sock->recv_pipe != NULL) { 310 /* Pull all of the data out of the old pipe */ 311 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 312 if (sbytes > sz) { 313 /* Too much data to fit into the new pipe size */ 314 spdk_pipe_destroy(new_pipe); 315 free(new_buf); 316 return -EINVAL; 317 } 318 319 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 320 assert(sbytes == sz); 321 322 bytes = spdk_iovcpy(siov, 2, diov, 2); 323 spdk_pipe_writer_advance(new_pipe, bytes); 324 325 spdk_pipe_destroy(sock->recv_pipe); 326 free(sock->recv_buf); 327 } 328 329 sock->recv_buf_sz = sz; 330 sock->recv_buf = new_buf; 331 sock->recv_pipe = new_pipe; 332 333 return 0; 334 } 335 336 static int 337 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 338 { 339 struct spdk_uring_sock *sock = __uring_sock(_sock); 340 int min_size; 341 int rc; 342 343 assert(sock != NULL); 344 345 if (_sock->impl_opts.enable_recv_pipe) { 346 rc = uring_sock_alloc_pipe(sock, sz); 347 if (rc) { 348 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 349 return rc; 350 } 351 } 352 353 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 354 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 355 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 356 357 if (sz < min_size) { 358 sz = min_size; 359 } 360 361 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 362 if (rc < 0) { 363 return rc; 364 } 365 366 _sock->impl_opts.recv_buf_size = sz; 367 368 return 0; 369 } 370 371 static int 372 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 373 { 374 struct spdk_uring_sock *sock = __uring_sock(_sock); 375 int min_size; 376 int rc; 377 378 assert(sock != NULL); 379 380 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 381 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 382 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 383 384 if (sz < min_size) { 385 sz = min_size; 386 } 387 388 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 389 if (rc < 0) { 390 return rc; 391 } 392 393 _sock->impl_opts.send_buf_size = sz; 394 395 return 0; 396 } 397 398 static struct spdk_uring_sock * 399 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 400 { 401 struct spdk_uring_sock *sock; 402 #if defined(__linux__) 403 int flag; 404 int rc; 405 #endif 406 407 sock = calloc(1, sizeof(*sock)); 408 if (sock == NULL) { 409 SPDK_ERRLOG("sock allocation failed\n"); 410 return NULL; 411 } 412 413 sock->fd = fd; 414 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 415 416 #if defined(__linux__) 417 flag = 1; 418 419 if (sock->base.impl_opts.enable_quickack) { 420 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 421 if (rc != 0) { 422 SPDK_ERRLOG("quickack was failed to set\n"); 423 } 424 } 425 426 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 427 &sock->placement_id); 428 #ifdef SPDK_ZEROCOPY 429 /* Try to turn on zero copy sends */ 430 flag = 1; 431 432 if (enable_zero_copy) { 433 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 434 if (rc == 0) { 435 sock->zcopy = true; 436 sock->zcopy_send_flags = MSG_ZEROCOPY; 437 } 438 } 439 #endif 440 #endif 441 442 return sock; 443 } 444 445 static struct spdk_sock * 446 uring_sock_create(const char *ip, int port, 447 enum uring_sock_create_type type, 448 struct spdk_sock_opts *opts) 449 { 450 struct spdk_uring_sock *sock; 451 struct spdk_sock_impl_opts impl_opts; 452 char buf[MAX_TMPBUF]; 453 char portnum[PORTNUMLEN]; 454 char *p; 455 struct addrinfo hints, *res, *res0; 456 int fd, flag; 457 int val = 1; 458 int rc; 459 bool enable_zcopy_impl_opts = false; 460 bool enable_zcopy_user_opts = true; 461 462 assert(opts != NULL); 463 uring_opts_get_impl_opts(opts, &impl_opts); 464 465 if (ip == NULL) { 466 return NULL; 467 } 468 if (ip[0] == '[') { 469 snprintf(buf, sizeof(buf), "%s", ip + 1); 470 p = strchr(buf, ']'); 471 if (p != NULL) { 472 *p = '\0'; 473 } 474 ip = (const char *) &buf[0]; 475 } 476 477 snprintf(portnum, sizeof portnum, "%d", port); 478 memset(&hints, 0, sizeof hints); 479 hints.ai_family = PF_UNSPEC; 480 hints.ai_socktype = SOCK_STREAM; 481 hints.ai_flags = AI_NUMERICSERV; 482 hints.ai_flags |= AI_PASSIVE; 483 hints.ai_flags |= AI_NUMERICHOST; 484 rc = getaddrinfo(ip, portnum, &hints, &res0); 485 if (rc != 0) { 486 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); 487 return NULL; 488 } 489 490 /* try listen */ 491 fd = -1; 492 for (res = res0; res != NULL; res = res->ai_next) { 493 retry: 494 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 495 if (fd < 0) { 496 /* error */ 497 continue; 498 } 499 500 val = impl_opts.recv_buf_size; 501 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 502 if (rc) { 503 /* Not fatal */ 504 } 505 506 val = impl_opts.send_buf_size; 507 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 508 if (rc) { 509 /* Not fatal */ 510 } 511 512 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 513 if (rc != 0) { 514 close(fd); 515 fd = -1; 516 /* error */ 517 continue; 518 } 519 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 520 if (rc != 0) { 521 close(fd); 522 fd = -1; 523 /* error */ 524 continue; 525 } 526 527 if (opts->ack_timeout) { 528 #if defined(__linux__) 529 val = opts->ack_timeout; 530 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 531 if (rc != 0) { 532 close(fd); 533 fd = -1; 534 /* error */ 535 continue; 536 } 537 #else 538 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 539 #endif 540 } 541 542 543 544 #if defined(SO_PRIORITY) 545 if (opts != NULL && opts->priority) { 546 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 547 if (rc != 0) { 548 close(fd); 549 fd = -1; 550 /* error */ 551 continue; 552 } 553 } 554 #endif 555 if (res->ai_family == AF_INET6) { 556 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 557 if (rc != 0) { 558 close(fd); 559 fd = -1; 560 /* error */ 561 continue; 562 } 563 } 564 565 if (type == SPDK_SOCK_CREATE_LISTEN) { 566 rc = bind(fd, res->ai_addr, res->ai_addrlen); 567 if (rc != 0) { 568 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 569 switch (errno) { 570 case EINTR: 571 /* interrupted? */ 572 close(fd); 573 goto retry; 574 case EADDRNOTAVAIL: 575 SPDK_ERRLOG("IP address %s not available. " 576 "Verify IP address in config file " 577 "and make sure setup script is " 578 "run before starting spdk app.\n", ip); 579 /* FALLTHROUGH */ 580 default: 581 /* try next family */ 582 close(fd); 583 fd = -1; 584 continue; 585 } 586 } 587 /* bind OK */ 588 rc = listen(fd, 512); 589 if (rc != 0) { 590 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 591 close(fd); 592 fd = -1; 593 break; 594 } 595 596 flag = fcntl(fd, F_GETFL); 597 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 598 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 599 close(fd); 600 fd = -1; 601 break; 602 } 603 604 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 605 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 606 rc = connect(fd, res->ai_addr, res->ai_addrlen); 607 if (rc != 0) { 608 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 609 /* try next family */ 610 close(fd); 611 fd = -1; 612 continue; 613 } 614 615 flag = fcntl(fd, F_GETFL); 616 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 617 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 618 close(fd); 619 fd = -1; 620 break; 621 } 622 623 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 624 } 625 break; 626 } 627 freeaddrinfo(res0); 628 629 if (fd < 0) { 630 return NULL; 631 } 632 633 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 634 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 635 if (sock == NULL) { 636 SPDK_ERRLOG("sock allocation failed\n"); 637 close(fd); 638 return NULL; 639 } 640 641 return &sock->base; 642 } 643 644 static struct spdk_sock * 645 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 646 { 647 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 648 } 649 650 static struct spdk_sock * 651 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 652 { 653 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 654 } 655 656 static struct spdk_sock * 657 uring_sock_accept(struct spdk_sock *_sock) 658 { 659 struct spdk_uring_sock *sock = __uring_sock(_sock); 660 struct sockaddr_storage sa; 661 socklen_t salen; 662 int rc, fd; 663 struct spdk_uring_sock *new_sock; 664 int flag; 665 666 memset(&sa, 0, sizeof(sa)); 667 salen = sizeof(sa); 668 669 assert(sock != NULL); 670 671 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 672 673 if (rc == -1) { 674 return NULL; 675 } 676 677 fd = rc; 678 679 flag = fcntl(fd, F_GETFL); 680 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 681 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 682 close(fd); 683 return NULL; 684 } 685 686 #if defined(SO_PRIORITY) 687 /* The priority is not inherited, so call this function again */ 688 if (sock->base.opts.priority) { 689 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 690 if (rc != 0) { 691 close(fd); 692 return NULL; 693 } 694 } 695 #endif 696 697 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 698 if (new_sock == NULL) { 699 close(fd); 700 return NULL; 701 } 702 703 return &new_sock->base; 704 } 705 706 static int 707 uring_sock_close(struct spdk_sock *_sock) 708 { 709 struct spdk_uring_sock *sock = __uring_sock(_sock); 710 711 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 712 assert(sock->group == NULL); 713 714 /* If the socket fails to close, the best choice is to 715 * leak the fd but continue to free the rest of the sock 716 * memory. */ 717 close(sock->fd); 718 719 spdk_pipe_destroy(sock->recv_pipe); 720 free(sock->recv_buf); 721 free(sock); 722 723 return 0; 724 } 725 726 static ssize_t 727 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 728 { 729 struct iovec siov[2]; 730 int sbytes; 731 ssize_t bytes; 732 struct spdk_uring_sock_group_impl *group; 733 734 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 735 if (sbytes < 0) { 736 errno = EINVAL; 737 return -1; 738 } else if (sbytes == 0) { 739 errno = EAGAIN; 740 return -1; 741 } 742 743 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 744 745 if (bytes == 0) { 746 /* The only way this happens is if diov is 0 length */ 747 errno = EINVAL; 748 return -1; 749 } 750 751 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 752 753 /* If we drained the pipe, take it off the level-triggered list */ 754 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 755 group = __uring_group_impl(sock->base.group_impl); 756 TAILQ_REMOVE(&group->pending_recv, sock, link); 757 sock->pending_recv = false; 758 } 759 760 return bytes; 761 } 762 763 static inline ssize_t 764 sock_readv(int fd, struct iovec *iov, int iovcnt) 765 { 766 struct msghdr msg = { 767 .msg_iov = iov, 768 .msg_iovlen = iovcnt, 769 }; 770 771 return recvmsg(fd, &msg, MSG_DONTWAIT); 772 } 773 774 static inline ssize_t 775 uring_sock_read(struct spdk_uring_sock *sock) 776 { 777 struct iovec iov[2]; 778 int bytes; 779 struct spdk_uring_sock_group_impl *group; 780 781 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 782 783 if (bytes > 0) { 784 bytes = sock_readv(sock->fd, iov, 2); 785 if (bytes > 0) { 786 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 787 if (sock->base.group_impl) { 788 group = __uring_group_impl(sock->base.group_impl); 789 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 790 sock->pending_recv = true; 791 } 792 } 793 } 794 795 return bytes; 796 } 797 798 static ssize_t 799 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 800 { 801 struct spdk_uring_sock *sock = __uring_sock(_sock); 802 int rc, i; 803 size_t len; 804 805 if (sock->recv_pipe == NULL) { 806 return sock_readv(sock->fd, iov, iovcnt); 807 } 808 809 len = 0; 810 for (i = 0; i < iovcnt; i++) { 811 len += iov[i].iov_len; 812 } 813 814 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 815 /* If the user is receiving a sufficiently large amount of data, 816 * receive directly to their buffers. */ 817 if (len >= MIN_SOCK_PIPE_SIZE) { 818 return sock_readv(sock->fd, iov, iovcnt); 819 } 820 821 /* Otherwise, do a big read into our pipe */ 822 rc = uring_sock_read(sock); 823 if (rc <= 0) { 824 return rc; 825 } 826 } 827 828 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 829 } 830 831 static ssize_t 832 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 833 { 834 struct iovec iov[1]; 835 836 iov[0].iov_base = buf; 837 iov[0].iov_len = len; 838 839 return uring_sock_readv(sock, iov, 1); 840 } 841 842 static ssize_t 843 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 844 { 845 struct spdk_uring_sock *sock = __uring_sock(_sock); 846 struct msghdr msg = { 847 .msg_iov = iov, 848 .msg_iovlen = iovcnt, 849 }; 850 851 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 852 errno = EAGAIN; 853 return -1; 854 } 855 856 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 857 } 858 859 static ssize_t 860 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 861 { 862 unsigned int offset; 863 size_t len; 864 int i; 865 866 offset = req->internal.offset; 867 for (i = 0; i < req->iovcnt; i++) { 868 /* Advance by the offset first */ 869 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 870 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 871 continue; 872 } 873 874 /* Calculate the remaining length of this element */ 875 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 876 877 if (len > (size_t)rc) { 878 req->internal.offset += rc; 879 return -1; 880 } 881 882 offset = 0; 883 req->internal.offset += len; 884 rc -= len; 885 } 886 887 return rc; 888 } 889 890 static int 891 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 892 { 893 struct spdk_uring_sock *sock = __uring_sock(_sock); 894 struct spdk_sock_request *req; 895 int retval; 896 897 if (is_zcopy) { 898 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 899 * req->internal.offset, so sendmsg_idx should not be zero */ 900 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 901 sock->sendmsg_idx = 1; 902 } else { 903 sock->sendmsg_idx++; 904 } 905 } 906 907 /* Consume the requests that were actually written */ 908 req = TAILQ_FIRST(&_sock->queued_reqs); 909 while (req) { 910 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 911 req->internal.is_zcopy = is_zcopy; 912 913 rc = sock_request_advance_offset(req, rc); 914 if (rc < 0) { 915 /* This element was partially sent. */ 916 return 0; 917 } 918 919 /* Handled a full request. */ 920 spdk_sock_request_pend(_sock, req); 921 922 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 923 retval = spdk_sock_request_put(_sock, req, 0); 924 if (retval) { 925 return retval; 926 } 927 } else { 928 /* Re-use the offset field to hold the sendmsg call index. The 929 * index is 0 based, so subtract one here because we've already 930 * incremented above. */ 931 req->internal.offset = sock->sendmsg_idx - 1; 932 } 933 934 if (rc == 0) { 935 break; 936 } 937 938 req = TAILQ_FIRST(&_sock->queued_reqs); 939 } 940 941 return 0; 942 } 943 944 #ifdef SPDK_ZEROCOPY 945 static int 946 _sock_check_zcopy(struct spdk_sock *_sock, int status) 947 { 948 struct spdk_uring_sock *sock = __uring_sock(_sock); 949 ssize_t rc; 950 struct sock_extended_err *serr; 951 struct cmsghdr *cm; 952 uint32_t idx; 953 struct spdk_sock_request *req, *treq; 954 bool found; 955 956 assert(sock->zcopy == true); 957 if (spdk_unlikely(status) < 0) { 958 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 959 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 960 status); 961 } else { 962 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 963 } 964 return 0; 965 } 966 967 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 968 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 969 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 970 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 971 return 0; 972 } 973 974 serr = (struct sock_extended_err *)CMSG_DATA(cm); 975 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 976 SPDK_WARNLOG("Unexpected extended error origin\n"); 977 return 0; 978 } 979 980 /* Most of the time, the pending_reqs array is in the exact 981 * order we need such that all of the requests to complete are 982 * in order, in the front. It is guaranteed that all requests 983 * belonging to the same sendmsg call are sequential, so once 984 * we encounter one match we can stop looping as soon as a 985 * non-match is found. 986 */ 987 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 988 found = false; 989 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 990 if (!req->internal.is_zcopy) { 991 /* This wasn't a zcopy request. It was just waiting in line to complete */ 992 rc = spdk_sock_request_put(_sock, req, 0); 993 if (rc < 0) { 994 return rc; 995 } 996 } else if (req->internal.offset == idx) { 997 found = true; 998 rc = spdk_sock_request_put(_sock, req, 0); 999 if (rc < 0) { 1000 return rc; 1001 } 1002 } else if (found) { 1003 break; 1004 } 1005 } 1006 } 1007 1008 return 0; 1009 } 1010 1011 static void 1012 _sock_prep_errqueue(struct spdk_sock *_sock) 1013 { 1014 struct spdk_uring_sock *sock = __uring_sock(_sock); 1015 struct spdk_uring_task *task = &sock->errqueue_task; 1016 struct io_uring_sqe *sqe; 1017 1018 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1019 return; 1020 } 1021 1022 assert(sock->group != NULL); 1023 sock->group->io_queued++; 1024 1025 sqe = io_uring_get_sqe(&sock->group->uring); 1026 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1027 io_uring_sqe_set_data(sqe, task); 1028 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1029 } 1030 1031 #endif 1032 1033 static void 1034 _sock_flush(struct spdk_sock *_sock) 1035 { 1036 struct spdk_uring_sock *sock = __uring_sock(_sock); 1037 struct spdk_uring_task *task = &sock->write_task; 1038 uint32_t iovcnt; 1039 struct io_uring_sqe *sqe; 1040 int flags; 1041 1042 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1043 return; 1044 } 1045 1046 #ifdef SPDK_ZEROCOPY 1047 if (sock->zcopy) { 1048 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1049 } else 1050 #endif 1051 { 1052 flags = MSG_DONTWAIT; 1053 } 1054 1055 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1056 if (!iovcnt) { 1057 return; 1058 } 1059 1060 task->iov_cnt = iovcnt; 1061 assert(sock->group != NULL); 1062 task->msg.msg_iov = task->iovs; 1063 task->msg.msg_iovlen = task->iov_cnt; 1064 #ifdef SPDK_ZEROCOPY 1065 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1066 #endif 1067 sock->group->io_queued++; 1068 1069 sqe = io_uring_get_sqe(&sock->group->uring); 1070 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1071 io_uring_sqe_set_data(sqe, task); 1072 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1073 } 1074 1075 static void 1076 _sock_prep_pollin(struct spdk_sock *_sock) 1077 { 1078 struct spdk_uring_sock *sock = __uring_sock(_sock); 1079 struct spdk_uring_task *task = &sock->pollin_task; 1080 struct io_uring_sqe *sqe; 1081 1082 /* Do not prepare pollin event */ 1083 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) { 1084 return; 1085 } 1086 1087 assert(sock->group != NULL); 1088 sock->group->io_queued++; 1089 1090 sqe = io_uring_get_sqe(&sock->group->uring); 1091 io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR); 1092 io_uring_sqe_set_data(sqe, task); 1093 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1094 } 1095 1096 static void 1097 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1098 { 1099 struct spdk_uring_sock *sock = __uring_sock(_sock); 1100 struct spdk_uring_task *task = &sock->cancel_task; 1101 struct io_uring_sqe *sqe; 1102 1103 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1104 return; 1105 } 1106 1107 assert(sock->group != NULL); 1108 sock->group->io_queued++; 1109 1110 sqe = io_uring_get_sqe(&sock->group->uring); 1111 io_uring_prep_cancel(sqe, user_data, 0); 1112 io_uring_sqe_set_data(sqe, task); 1113 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1114 } 1115 1116 static int 1117 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1118 struct spdk_sock **socks) 1119 { 1120 int i, count, ret; 1121 struct io_uring_cqe *cqe; 1122 struct spdk_uring_sock *sock, *tmp; 1123 struct spdk_uring_task *task; 1124 int status; 1125 bool is_zcopy; 1126 1127 for (i = 0; i < max; i++) { 1128 ret = io_uring_peek_cqe(&group->uring, &cqe); 1129 if (ret != 0) { 1130 break; 1131 } 1132 1133 if (cqe == NULL) { 1134 break; 1135 } 1136 1137 task = (struct spdk_uring_task *)cqe->user_data; 1138 assert(task != NULL); 1139 sock = task->sock; 1140 assert(sock != NULL); 1141 assert(sock->group != NULL); 1142 assert(sock->group == group); 1143 sock->group->io_inflight--; 1144 sock->group->io_avail++; 1145 status = cqe->res; 1146 io_uring_cqe_seen(&group->uring, cqe); 1147 1148 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1149 1150 if (spdk_unlikely(status <= 0)) { 1151 if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) { 1152 continue; 1153 } 1154 } 1155 1156 switch (task->type) { 1157 case SPDK_SOCK_TASK_POLLIN: 1158 #ifdef SPDK_ZEROCOPY 1159 if ((status & POLLERR) == POLLERR) { 1160 _sock_prep_errqueue(&sock->base); 1161 } 1162 #endif 1163 if ((status & POLLIN) == POLLIN) { 1164 if (sock->base.cb_fn != NULL && 1165 sock->pending_recv == false) { 1166 sock->pending_recv = true; 1167 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1168 } 1169 } 1170 break; 1171 case SPDK_SOCK_TASK_WRITE: 1172 task->last_req = NULL; 1173 task->iov_cnt = 0; 1174 is_zcopy = task->is_zcopy; 1175 task->is_zcopy = false; 1176 if (spdk_unlikely(status) < 0) { 1177 sock->connection_status = status; 1178 spdk_sock_abort_requests(&sock->base); 1179 } else { 1180 sock_complete_write_reqs(&sock->base, status, is_zcopy); 1181 } 1182 1183 break; 1184 #ifdef SPDK_ZEROCOPY 1185 case SPDK_SOCK_TASK_ERRQUEUE: 1186 if (spdk_unlikely(status == -ECANCELED)) { 1187 sock->connection_status = status; 1188 break; 1189 } 1190 _sock_check_zcopy(&sock->base, status); 1191 break; 1192 #endif 1193 case SPDK_SOCK_TASK_CANCEL: 1194 /* Do nothing */ 1195 break; 1196 default: 1197 SPDK_UNREACHABLE(); 1198 } 1199 } 1200 1201 if (!socks) { 1202 return 0; 1203 } 1204 count = 0; 1205 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1206 if (count == max_read_events) { 1207 break; 1208 } 1209 1210 if (spdk_unlikely(sock->base.cb_fn == NULL) || 1211 (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) { 1212 sock->pending_recv = false; 1213 TAILQ_REMOVE(&group->pending_recv, sock, link); 1214 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1215 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1216 continue; 1217 } 1218 } 1219 1220 socks[count++] = &sock->base; 1221 } 1222 1223 1224 /* Cycle the pending_recv list so that each time we poll things aren't 1225 * in the same order. Say we have 6 sockets in the list, named as follows: 1226 * A B C D E F 1227 * And all 6 sockets had the poll events, but max_events is only 3. That means 1228 * psock currently points at D. We want to rearrange the list to the following: 1229 * D E F A B C 1230 * 1231 * The variables below are named according to this example to make it easier to 1232 * follow the swaps. 1233 */ 1234 if (sock != NULL) { 1235 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1236 1237 /* Capture pointers to the elements we need */ 1238 ud = sock; 1239 1240 ua = TAILQ_FIRST(&group->pending_recv); 1241 if (ua == ud) { 1242 goto end; 1243 } 1244 1245 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1246 if (uf == ud) { 1247 TAILQ_REMOVE(&group->pending_recv, ud, link); 1248 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1249 goto end; 1250 } 1251 1252 uc = TAILQ_PREV(ud, pending_recv_list, link); 1253 assert(uc != NULL); 1254 1255 /* Break the link between C and D */ 1256 uc->link.tqe_next = NULL; 1257 1258 /* Connect F to A */ 1259 uf->link.tqe_next = ua; 1260 ua->link.tqe_prev = &uf->link.tqe_next; 1261 1262 /* Fix up the list first/last pointers */ 1263 group->pending_recv.tqh_first = ud; 1264 group->pending_recv.tqh_last = &uc->link.tqe_next; 1265 1266 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1267 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1268 } 1269 1270 end: 1271 return count; 1272 } 1273 1274 static int 1275 _sock_flush_client(struct spdk_sock *_sock) 1276 { 1277 struct spdk_uring_sock *sock = __uring_sock(_sock); 1278 struct msghdr msg = {}; 1279 struct iovec iovs[IOV_BATCH_SIZE]; 1280 int iovcnt; 1281 ssize_t rc; 1282 int flags = sock->zcopy_send_flags; 1283 int retval; 1284 bool is_zcopy = false; 1285 1286 /* Can't flush from within a callback or we end up with recursive calls */ 1287 if (_sock->cb_cnt > 0) { 1288 return 0; 1289 } 1290 1291 /* Gather an iov */ 1292 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1293 if (iovcnt == 0) { 1294 return 0; 1295 } 1296 1297 /* Perform the vectored write */ 1298 msg.msg_iov = iovs; 1299 msg.msg_iovlen = iovcnt; 1300 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 1301 if (rc <= 0) { 1302 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1303 return 0; 1304 } 1305 return rc; 1306 } 1307 1308 #ifdef SPDK_ZEROCOPY 1309 is_zcopy = flags & MSG_ZEROCOPY; 1310 #endif 1311 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 1312 if (retval < 0) { 1313 /* if the socket is closed, return to avoid heap-use-after-free error */ 1314 return retval; 1315 } 1316 1317 #ifdef SPDK_ZEROCOPY 1318 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 1319 _sock_check_zcopy(_sock, 0); 1320 } 1321 #endif 1322 1323 return 0; 1324 } 1325 1326 static void 1327 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1328 { 1329 struct spdk_uring_sock *sock = __uring_sock(_sock); 1330 int rc; 1331 1332 if (spdk_unlikely(sock->connection_status)) { 1333 req->cb_fn(req->cb_arg, sock->connection_status); 1334 return; 1335 } 1336 1337 spdk_sock_request_queue(_sock, req); 1338 1339 if (!sock->group) { 1340 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1341 rc = _sock_flush_client(_sock); 1342 if (rc) { 1343 spdk_sock_abort_requests(_sock); 1344 } 1345 } 1346 } 1347 } 1348 1349 static void 1350 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1351 { 1352 req->cb_fn(req->cb_arg, -ENOTSUP); 1353 } 1354 1355 static int 1356 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1357 { 1358 struct spdk_uring_sock *sock = __uring_sock(_sock); 1359 int val; 1360 int rc; 1361 1362 assert(sock != NULL); 1363 1364 val = nbytes; 1365 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1366 if (rc != 0) { 1367 return -1; 1368 } 1369 return 0; 1370 } 1371 1372 static bool 1373 uring_sock_is_ipv6(struct spdk_sock *_sock) 1374 { 1375 struct spdk_uring_sock *sock = __uring_sock(_sock); 1376 struct sockaddr_storage sa; 1377 socklen_t salen; 1378 int rc; 1379 1380 assert(sock != NULL); 1381 1382 memset(&sa, 0, sizeof sa); 1383 salen = sizeof sa; 1384 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1385 if (rc != 0) { 1386 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1387 return false; 1388 } 1389 1390 return (sa.ss_family == AF_INET6); 1391 } 1392 1393 static bool 1394 uring_sock_is_ipv4(struct spdk_sock *_sock) 1395 { 1396 struct spdk_uring_sock *sock = __uring_sock(_sock); 1397 struct sockaddr_storage sa; 1398 socklen_t salen; 1399 int rc; 1400 1401 assert(sock != NULL); 1402 1403 memset(&sa, 0, sizeof sa); 1404 salen = sizeof sa; 1405 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1406 if (rc != 0) { 1407 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1408 return false; 1409 } 1410 1411 return (sa.ss_family == AF_INET); 1412 } 1413 1414 static bool 1415 uring_sock_is_connected(struct spdk_sock *_sock) 1416 { 1417 struct spdk_uring_sock *sock = __uring_sock(_sock); 1418 uint8_t byte; 1419 int rc; 1420 1421 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1422 if (rc == 0) { 1423 return false; 1424 } 1425 1426 if (rc < 0) { 1427 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1428 return true; 1429 } 1430 1431 return false; 1432 } 1433 1434 return true; 1435 } 1436 1437 static struct spdk_sock_group_impl * 1438 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1439 { 1440 struct spdk_uring_sock *sock = __uring_sock(_sock); 1441 struct spdk_sock_group_impl *group; 1442 1443 if (sock->placement_id != -1) { 1444 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1445 return group; 1446 } 1447 1448 return NULL; 1449 } 1450 1451 static struct spdk_sock_group_impl * 1452 uring_sock_group_impl_create(void) 1453 { 1454 struct spdk_uring_sock_group_impl *group_impl; 1455 1456 group_impl = calloc(1, sizeof(*group_impl)); 1457 if (group_impl == NULL) { 1458 SPDK_ERRLOG("group_impl allocation failed\n"); 1459 return NULL; 1460 } 1461 1462 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1463 1464 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1465 SPDK_ERRLOG("uring I/O context setup failure\n"); 1466 free(group_impl); 1467 return NULL; 1468 } 1469 1470 TAILQ_INIT(&group_impl->pending_recv); 1471 1472 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1473 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1474 } 1475 1476 return &group_impl->base; 1477 } 1478 1479 static int 1480 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1481 struct spdk_sock *_sock) 1482 { 1483 struct spdk_uring_sock *sock = __uring_sock(_sock); 1484 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1485 int rc; 1486 1487 sock->group = group; 1488 sock->write_task.sock = sock; 1489 sock->write_task.type = SPDK_SOCK_TASK_WRITE; 1490 1491 sock->pollin_task.sock = sock; 1492 sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; 1493 1494 sock->errqueue_task.sock = sock; 1495 sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE; 1496 sock->errqueue_task.msg.msg_control = sock->buf; 1497 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1498 1499 sock->cancel_task.sock = sock; 1500 sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; 1501 1502 /* switched from another polling group due to scheduling */ 1503 if (spdk_unlikely(sock->recv_pipe != NULL && 1504 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1505 assert(sock->pending_recv == false); 1506 sock->pending_recv = true; 1507 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1508 } 1509 1510 if (sock->placement_id != -1) { 1511 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1512 if (rc != 0) { 1513 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1514 /* Do not treat this as an error. The system will continue running. */ 1515 } 1516 } 1517 1518 return 0; 1519 } 1520 1521 static int 1522 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1523 struct spdk_sock **socks) 1524 { 1525 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1526 int count, ret; 1527 int to_complete, to_submit; 1528 struct spdk_sock *_sock, *tmp; 1529 struct spdk_uring_sock *sock; 1530 1531 if (spdk_likely(socks)) { 1532 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1533 sock = __uring_sock(_sock); 1534 if (spdk_unlikely(sock->connection_status)) { 1535 continue; 1536 } 1537 _sock_flush(_sock); 1538 _sock_prep_pollin(_sock); 1539 } 1540 } 1541 1542 to_submit = group->io_queued; 1543 1544 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1545 if (to_submit > 0) { 1546 /* If there are I/O to submit, use io_uring_submit here. 1547 * It will automatically call io_uring_enter appropriately. */ 1548 ret = io_uring_submit(&group->uring); 1549 if (ret < 0) { 1550 return 1; 1551 } 1552 group->io_queued = 0; 1553 group->io_inflight += to_submit; 1554 group->io_avail -= to_submit; 1555 } 1556 1557 count = 0; 1558 to_complete = group->io_inflight; 1559 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1560 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1561 } 1562 1563 return count; 1564 } 1565 1566 static int 1567 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1568 struct spdk_sock *_sock) 1569 { 1570 struct spdk_uring_sock *sock = __uring_sock(_sock); 1571 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1572 1573 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1574 _sock_prep_cancel_task(_sock, &sock->write_task); 1575 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1576 * currently can use a while loop here. */ 1577 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1578 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1579 uring_sock_group_impl_poll(_group, 32, NULL); 1580 } 1581 } 1582 1583 if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1584 _sock_prep_cancel_task(_sock, &sock->pollin_task); 1585 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1586 * currently can use a while loop here. */ 1587 while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1588 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1589 uring_sock_group_impl_poll(_group, 32, NULL); 1590 } 1591 } 1592 1593 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1594 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1595 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1596 * currently can use a while loop here. */ 1597 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1598 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1599 uring_sock_group_impl_poll(_group, 32, NULL); 1600 } 1601 } 1602 1603 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1604 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1605 assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1606 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1607 1608 if (sock->pending_recv) { 1609 TAILQ_REMOVE(&group->pending_recv, sock, link); 1610 sock->pending_recv = false; 1611 } 1612 assert(sock->pending_recv == false); 1613 1614 if (sock->placement_id != -1) { 1615 spdk_sock_map_release(&g_map, sock->placement_id); 1616 } 1617 1618 sock->group = NULL; 1619 return 0; 1620 } 1621 1622 static int 1623 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1624 { 1625 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1626 1627 /* try to reap all the active I/O */ 1628 while (group->io_inflight) { 1629 uring_sock_group_impl_poll(_group, 32, NULL); 1630 } 1631 assert(group->io_inflight == 0); 1632 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1633 1634 io_uring_queue_exit(&group->uring); 1635 1636 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1637 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1638 } 1639 1640 free(group); 1641 return 0; 1642 } 1643 1644 static int 1645 uring_sock_flush(struct spdk_sock *_sock) 1646 { 1647 struct spdk_uring_sock *sock = __uring_sock(_sock); 1648 1649 if (!sock->group) { 1650 return _sock_flush_client(_sock); 1651 } 1652 1653 return 0; 1654 } 1655 1656 static struct spdk_net_impl g_uring_net_impl = { 1657 .name = "uring", 1658 .getaddr = uring_sock_getaddr, 1659 .connect = uring_sock_connect, 1660 .listen = uring_sock_listen, 1661 .accept = uring_sock_accept, 1662 .close = uring_sock_close, 1663 .recv = uring_sock_recv, 1664 .readv = uring_sock_readv, 1665 .readv_async = uring_sock_readv_async, 1666 .writev = uring_sock_writev, 1667 .writev_async = uring_sock_writev_async, 1668 .flush = uring_sock_flush, 1669 .set_recvlowat = uring_sock_set_recvlowat, 1670 .set_recvbuf = uring_sock_set_recvbuf, 1671 .set_sendbuf = uring_sock_set_sendbuf, 1672 .is_ipv6 = uring_sock_is_ipv6, 1673 .is_ipv4 = uring_sock_is_ipv4, 1674 .is_connected = uring_sock_is_connected, 1675 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 1676 .group_impl_create = uring_sock_group_impl_create, 1677 .group_impl_add_sock = uring_sock_group_impl_add_sock, 1678 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 1679 .group_impl_poll = uring_sock_group_impl_poll, 1680 .group_impl_close = uring_sock_group_impl_close, 1681 .get_opts = uring_sock_impl_get_opts, 1682 .set_opts = uring_sock_impl_set_opts, 1683 }; 1684 1685 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 2); 1686