1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 21 #include "spdk_internal/sock.h" 22 #include "spdk_internal/assert.h" 23 #include "../sock_kernel.h" 24 25 #define MAX_TMPBUF 1024 26 #define PORTNUMLEN 32 27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 29 30 enum uring_task_type { 31 URING_TASK_READ = 0, 32 URING_TASK_ERRQUEUE, 33 URING_TASK_WRITE, 34 URING_TASK_CANCEL, 35 }; 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 /* We don't know how big the buffers that the user posts will be, but this 42 * is the maximum we'll ever allow it to receive in a single command. 43 * If the user buffers are smaller, it will just receive less. */ 44 #define URING_MAX_RECV_SIZE (128 * 1024) 45 46 /* We don't know how many buffers the user will post, but this is the 47 * maximum number we'll take from the pool to post per group. */ 48 #define URING_BUF_POOL_SIZE 128 49 50 /* We use 1 just so it's not zero and we can validate it's right. */ 51 #define URING_BUF_GROUP_ID 1 52 53 enum spdk_uring_sock_task_status { 54 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 55 SPDK_URING_SOCK_TASK_IN_PROCESS, 56 }; 57 58 struct spdk_uring_task { 59 enum spdk_uring_sock_task_status status; 60 enum uring_task_type type; 61 struct spdk_uring_sock *sock; 62 struct msghdr msg; 63 struct iovec iovs[IOV_BATCH_SIZE]; 64 int iov_cnt; 65 struct spdk_sock_request *last_req; 66 bool is_zcopy; 67 STAILQ_ENTRY(spdk_uring_task) link; 68 }; 69 70 struct spdk_uring_sock { 71 struct spdk_sock base; 72 int fd; 73 uint32_t sendmsg_idx; 74 struct spdk_uring_sock_group_impl *group; 75 STAILQ_HEAD(, spdk_uring_buf_tracker) recv_stream; 76 size_t recv_offset; 77 struct spdk_uring_task write_task; 78 struct spdk_uring_task errqueue_task; 79 struct spdk_uring_task read_task; 80 struct spdk_uring_task cancel_task; 81 struct spdk_pipe *recv_pipe; 82 void *recv_buf; 83 int recv_buf_sz; 84 bool zcopy; 85 bool pending_recv; 86 bool pending_group_remove; 87 int zcopy_send_flags; 88 int connection_status; 89 int placement_id; 90 uint8_t reserved[4]; 91 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 92 TAILQ_ENTRY(spdk_uring_sock) link; 93 }; 94 /* 'struct cmsghdr' is mapped to the buffer 'buf', and while first element 95 * of this control message header has a size of 8 bytes, 'buf' 96 * must be 8-byte aligned. 97 */ 98 SPDK_STATIC_ASSERT(offsetof(struct spdk_uring_sock, buf) % 8 == 0, 99 "Incorrect alignment: `buf` must be aligned to 8 bytes"); 100 101 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 102 103 struct spdk_uring_buf_tracker { 104 void *buf; 105 size_t buflen; 106 size_t len; 107 void *ctx; 108 int id; 109 STAILQ_ENTRY(spdk_uring_buf_tracker) link; 110 }; 111 112 struct spdk_uring_sock_group_impl { 113 struct spdk_sock_group_impl base; 114 struct io_uring uring; 115 uint32_t io_inflight; 116 uint32_t io_queued; 117 uint32_t io_avail; 118 struct pending_recv_list pending_recv; 119 120 struct io_uring_buf_ring *buf_ring; 121 uint32_t buf_ring_count; 122 struct spdk_uring_buf_tracker *trackers; 123 STAILQ_HEAD(, spdk_uring_buf_tracker) free_trackers; 124 }; 125 126 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 127 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 128 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 129 .enable_recv_pipe = true, 130 .enable_quickack = false, 131 .enable_placement_id = PLACEMENT_NONE, 132 .enable_zerocopy_send_server = false, 133 .enable_zerocopy_send_client = false, 134 .zerocopy_threshold = 0, 135 .tls_version = 0, 136 .enable_ktls = false, 137 .psk_key = NULL, 138 .psk_identity = NULL 139 }; 140 141 static struct spdk_sock_map g_map = { 142 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 143 .mtx = PTHREAD_MUTEX_INITIALIZER 144 }; 145 146 __attribute((destructor)) static void 147 uring_sock_map_cleanup(void) 148 { 149 spdk_sock_map_cleanup(&g_map); 150 } 151 152 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 153 154 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 155 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 156 157 static void 158 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 159 size_t len) 160 { 161 #define FIELD_OK(field) \ 162 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 163 164 #define SET_FIELD(field) \ 165 if (FIELD_OK(field)) { \ 166 dest->field = src->field; \ 167 } 168 169 SET_FIELD(recv_buf_size); 170 SET_FIELD(send_buf_size); 171 SET_FIELD(enable_recv_pipe); 172 SET_FIELD(enable_quickack); 173 SET_FIELD(enable_placement_id); 174 SET_FIELD(enable_zerocopy_send_server); 175 SET_FIELD(enable_zerocopy_send_client); 176 SET_FIELD(zerocopy_threshold); 177 SET_FIELD(tls_version); 178 SET_FIELD(enable_ktls); 179 SET_FIELD(psk_key); 180 SET_FIELD(psk_identity); 181 182 #undef SET_FIELD 183 #undef FIELD_OK 184 } 185 186 static int 187 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 188 { 189 if (!opts || !len) { 190 errno = EINVAL; 191 return -1; 192 } 193 194 assert(sizeof(*opts) >= *len); 195 memset(opts, 0, *len); 196 197 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 198 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 199 200 return 0; 201 } 202 203 static int 204 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 205 { 206 if (!opts) { 207 errno = EINVAL; 208 return -1; 209 } 210 211 assert(sizeof(*opts) >= len); 212 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 213 214 return 0; 215 } 216 217 static void 218 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 219 { 220 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 221 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 222 223 if (opts->impl_opts != NULL) { 224 assert(sizeof(*dest) >= opts->impl_opts_size); 225 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 226 } 227 } 228 229 static int 230 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 231 char *caddr, int clen, uint16_t *cport) 232 { 233 struct spdk_uring_sock *sock = __uring_sock(_sock); 234 struct sockaddr_storage sa; 235 socklen_t salen; 236 int rc; 237 238 assert(sock != NULL); 239 240 memset(&sa, 0, sizeof sa); 241 salen = sizeof sa; 242 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 243 if (rc != 0) { 244 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 245 return -1; 246 } 247 248 switch (sa.ss_family) { 249 case AF_UNIX: 250 /* Acceptable connection types that don't have IPs */ 251 return 0; 252 case AF_INET: 253 case AF_INET6: 254 /* Code below will get IP addresses */ 255 break; 256 default: 257 /* Unsupported socket family */ 258 return -1; 259 } 260 261 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 262 if (rc != 0) { 263 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 264 return -1; 265 } 266 267 if (sport) { 268 if (sa.ss_family == AF_INET) { 269 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 270 } else if (sa.ss_family == AF_INET6) { 271 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 272 } 273 } 274 275 memset(&sa, 0, sizeof sa); 276 salen = sizeof sa; 277 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 278 if (rc != 0) { 279 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 280 return -1; 281 } 282 283 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 284 if (rc != 0) { 285 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 286 return -1; 287 } 288 289 if (cport) { 290 if (sa.ss_family == AF_INET) { 291 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 292 } else if (sa.ss_family == AF_INET6) { 293 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 294 } 295 } 296 297 return 0; 298 } 299 300 enum uring_sock_create_type { 301 SPDK_SOCK_CREATE_LISTEN, 302 SPDK_SOCK_CREATE_CONNECT, 303 }; 304 305 static int 306 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 307 { 308 uint8_t *new_buf; 309 struct spdk_pipe *new_pipe; 310 struct iovec siov[2]; 311 struct iovec diov[2]; 312 int sbytes; 313 ssize_t bytes; 314 int rc; 315 316 if (sock->recv_buf_sz == sz) { 317 return 0; 318 } 319 320 /* If the new size is 0, just free the pipe */ 321 if (sz == 0) { 322 spdk_pipe_destroy(sock->recv_pipe); 323 free(sock->recv_buf); 324 sock->recv_pipe = NULL; 325 sock->recv_buf = NULL; 326 return 0; 327 } else if (sz < MIN_SOCK_PIPE_SIZE) { 328 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 329 return -1; 330 } 331 332 /* Round up to next 64 byte multiple */ 333 rc = posix_memalign((void **)&new_buf, 64, sz); 334 if (rc != 0) { 335 SPDK_ERRLOG("socket recv buf allocation failed\n"); 336 return -ENOMEM; 337 } 338 memset(new_buf, 0, sz); 339 340 new_pipe = spdk_pipe_create(new_buf, sz); 341 if (new_pipe == NULL) { 342 SPDK_ERRLOG("socket pipe allocation failed\n"); 343 free(new_buf); 344 return -ENOMEM; 345 } 346 347 if (sock->recv_pipe != NULL) { 348 /* Pull all of the data out of the old pipe */ 349 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 350 if (sbytes > sz) { 351 /* Too much data to fit into the new pipe size */ 352 spdk_pipe_destroy(new_pipe); 353 free(new_buf); 354 return -EINVAL; 355 } 356 357 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 358 assert(sbytes == sz); 359 360 bytes = spdk_iovcpy(siov, 2, diov, 2); 361 spdk_pipe_writer_advance(new_pipe, bytes); 362 363 spdk_pipe_destroy(sock->recv_pipe); 364 free(sock->recv_buf); 365 } 366 367 sock->recv_buf_sz = sz; 368 sock->recv_buf = new_buf; 369 sock->recv_pipe = new_pipe; 370 371 return 0; 372 } 373 374 static int 375 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 376 { 377 struct spdk_uring_sock *sock = __uring_sock(_sock); 378 int min_size; 379 int rc; 380 381 assert(sock != NULL); 382 383 if (_sock->impl_opts.enable_recv_pipe) { 384 rc = uring_sock_alloc_pipe(sock, sz); 385 if (rc) { 386 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 387 return rc; 388 } 389 } 390 391 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 392 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 393 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 394 395 if (sz < min_size) { 396 sz = min_size; 397 } 398 399 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 400 if (rc < 0) { 401 return rc; 402 } 403 404 _sock->impl_opts.recv_buf_size = sz; 405 406 return 0; 407 } 408 409 static int 410 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 411 { 412 struct spdk_uring_sock *sock = __uring_sock(_sock); 413 int min_size; 414 int rc; 415 416 assert(sock != NULL); 417 418 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 419 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 420 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 421 422 if (sz < min_size) { 423 sz = min_size; 424 } 425 426 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 427 if (rc < 0) { 428 return rc; 429 } 430 431 _sock->impl_opts.send_buf_size = sz; 432 433 return 0; 434 } 435 436 static struct spdk_uring_sock * 437 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 438 { 439 struct spdk_uring_sock *sock; 440 #if defined(__linux__) 441 int flag; 442 int rc; 443 #endif 444 445 sock = calloc(1, sizeof(*sock)); 446 if (sock == NULL) { 447 SPDK_ERRLOG("sock allocation failed\n"); 448 return NULL; 449 } 450 451 sock->fd = fd; 452 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 453 454 STAILQ_INIT(&sock->recv_stream); 455 456 #if defined(__linux__) 457 flag = 1; 458 459 if (sock->base.impl_opts.enable_quickack) { 460 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 461 if (rc != 0) { 462 SPDK_ERRLOG("quickack was failed to set\n"); 463 } 464 } 465 466 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 467 &sock->placement_id); 468 #ifdef SPDK_ZEROCOPY 469 /* Try to turn on zero copy sends */ 470 flag = 1; 471 472 if (enable_zero_copy) { 473 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 474 if (rc == 0) { 475 sock->zcopy = true; 476 sock->zcopy_send_flags = MSG_ZEROCOPY; 477 } 478 } 479 #endif 480 #endif 481 482 return sock; 483 } 484 485 static struct spdk_sock * 486 uring_sock_create(const char *ip, int port, 487 enum uring_sock_create_type type, 488 struct spdk_sock_opts *opts) 489 { 490 struct spdk_uring_sock *sock; 491 struct spdk_sock_impl_opts impl_opts; 492 char buf[MAX_TMPBUF]; 493 char portnum[PORTNUMLEN]; 494 char *p; 495 struct addrinfo hints, *res, *res0; 496 int fd, flag; 497 int val = 1; 498 int rc; 499 bool enable_zcopy_impl_opts = false; 500 bool enable_zcopy_user_opts = true; 501 502 assert(opts != NULL); 503 uring_opts_get_impl_opts(opts, &impl_opts); 504 505 if (ip == NULL) { 506 return NULL; 507 } 508 if (ip[0] == '[') { 509 snprintf(buf, sizeof(buf), "%s", ip + 1); 510 p = strchr(buf, ']'); 511 if (p != NULL) { 512 *p = '\0'; 513 } 514 ip = (const char *) &buf[0]; 515 } 516 517 snprintf(portnum, sizeof portnum, "%d", port); 518 memset(&hints, 0, sizeof hints); 519 hints.ai_family = PF_UNSPEC; 520 hints.ai_socktype = SOCK_STREAM; 521 hints.ai_flags = AI_NUMERICSERV; 522 hints.ai_flags |= AI_PASSIVE; 523 hints.ai_flags |= AI_NUMERICHOST; 524 rc = getaddrinfo(ip, portnum, &hints, &res0); 525 if (rc != 0) { 526 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 527 return NULL; 528 } 529 530 /* try listen */ 531 fd = -1; 532 for (res = res0; res != NULL; res = res->ai_next) { 533 retry: 534 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 535 if (fd < 0) { 536 /* error */ 537 continue; 538 } 539 540 val = impl_opts.recv_buf_size; 541 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 542 if (rc) { 543 /* Not fatal */ 544 } 545 546 val = impl_opts.send_buf_size; 547 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 548 if (rc) { 549 /* Not fatal */ 550 } 551 552 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 553 if (rc != 0) { 554 close(fd); 555 fd = -1; 556 /* error */ 557 continue; 558 } 559 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 560 if (rc != 0) { 561 close(fd); 562 fd = -1; 563 /* error */ 564 continue; 565 } 566 567 if (opts->ack_timeout) { 568 #if defined(__linux__) 569 val = opts->ack_timeout; 570 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 571 if (rc != 0) { 572 close(fd); 573 fd = -1; 574 /* error */ 575 continue; 576 } 577 #else 578 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 579 #endif 580 } 581 582 583 584 #if defined(SO_PRIORITY) 585 if (opts != NULL && opts->priority) { 586 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 587 if (rc != 0) { 588 close(fd); 589 fd = -1; 590 /* error */ 591 continue; 592 } 593 } 594 #endif 595 if (res->ai_family == AF_INET6) { 596 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 597 if (rc != 0) { 598 close(fd); 599 fd = -1; 600 /* error */ 601 continue; 602 } 603 } 604 605 if (type == SPDK_SOCK_CREATE_LISTEN) { 606 rc = bind(fd, res->ai_addr, res->ai_addrlen); 607 if (rc != 0) { 608 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 609 switch (errno) { 610 case EINTR: 611 /* interrupted? */ 612 close(fd); 613 goto retry; 614 case EADDRNOTAVAIL: 615 SPDK_ERRLOG("IP address %s not available. " 616 "Verify IP address in config file " 617 "and make sure setup script is " 618 "run before starting spdk app.\n", ip); 619 /* FALLTHROUGH */ 620 default: 621 /* try next family */ 622 close(fd); 623 fd = -1; 624 continue; 625 } 626 } 627 /* bind OK */ 628 rc = listen(fd, 512); 629 if (rc != 0) { 630 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 631 close(fd); 632 fd = -1; 633 break; 634 } 635 636 flag = fcntl(fd, F_GETFL); 637 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 638 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 639 close(fd); 640 fd = -1; 641 break; 642 } 643 644 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 645 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 646 rc = connect(fd, res->ai_addr, res->ai_addrlen); 647 if (rc != 0) { 648 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 649 /* try next family */ 650 close(fd); 651 fd = -1; 652 continue; 653 } 654 655 flag = fcntl(fd, F_GETFL); 656 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 657 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 658 close(fd); 659 fd = -1; 660 break; 661 } 662 663 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 664 } 665 break; 666 } 667 freeaddrinfo(res0); 668 669 if (fd < 0) { 670 return NULL; 671 } 672 673 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 674 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 675 if (sock == NULL) { 676 SPDK_ERRLOG("sock allocation failed\n"); 677 close(fd); 678 return NULL; 679 } 680 681 return &sock->base; 682 } 683 684 static struct spdk_sock * 685 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 686 { 687 if (spdk_interrupt_mode_is_enabled()) { 688 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 689 return NULL; 690 } 691 692 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 693 } 694 695 static struct spdk_sock * 696 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 697 { 698 if (spdk_interrupt_mode_is_enabled()) { 699 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 700 return NULL; 701 } 702 703 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 704 } 705 706 static struct spdk_sock * 707 uring_sock_accept(struct spdk_sock *_sock) 708 { 709 struct spdk_uring_sock *sock = __uring_sock(_sock); 710 struct sockaddr_storage sa; 711 socklen_t salen; 712 int rc, fd; 713 struct spdk_uring_sock *new_sock; 714 int flag; 715 716 memset(&sa, 0, sizeof(sa)); 717 salen = sizeof(sa); 718 719 assert(sock != NULL); 720 721 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 722 723 if (rc == -1) { 724 return NULL; 725 } 726 727 fd = rc; 728 729 flag = fcntl(fd, F_GETFL); 730 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 731 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 732 close(fd); 733 return NULL; 734 } 735 736 #if defined(SO_PRIORITY) 737 /* The priority is not inherited, so call this function again */ 738 if (sock->base.opts.priority) { 739 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 740 if (rc != 0) { 741 close(fd); 742 return NULL; 743 } 744 } 745 #endif 746 747 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 748 if (new_sock == NULL) { 749 close(fd); 750 return NULL; 751 } 752 753 return &new_sock->base; 754 } 755 756 static int 757 uring_sock_close(struct spdk_sock *_sock) 758 { 759 struct spdk_uring_sock *sock = __uring_sock(_sock); 760 761 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 762 assert(sock->group == NULL); 763 764 /* If the socket fails to close, the best choice is to 765 * leak the fd but continue to free the rest of the sock 766 * memory. */ 767 close(sock->fd); 768 769 spdk_pipe_destroy(sock->recv_pipe); 770 free(sock->recv_buf); 771 free(sock); 772 773 return 0; 774 } 775 776 static ssize_t 777 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 778 { 779 struct iovec siov[2]; 780 int sbytes; 781 ssize_t bytes; 782 struct spdk_uring_sock_group_impl *group; 783 784 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 785 if (sbytes < 0) { 786 errno = EINVAL; 787 return -1; 788 } else if (sbytes == 0) { 789 errno = EAGAIN; 790 return -1; 791 } 792 793 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 794 795 if (bytes == 0) { 796 /* The only way this happens is if diov is 0 length */ 797 errno = EINVAL; 798 return -1; 799 } 800 801 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 802 803 /* If we drained the pipe, take it off the level-triggered list */ 804 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 805 group = __uring_group_impl(sock->base.group_impl); 806 TAILQ_REMOVE(&group->pending_recv, sock, link); 807 sock->pending_recv = false; 808 } 809 810 return bytes; 811 } 812 813 static inline ssize_t 814 sock_readv(int fd, struct iovec *iov, int iovcnt) 815 { 816 struct msghdr msg = { 817 .msg_iov = iov, 818 .msg_iovlen = iovcnt, 819 }; 820 821 return recvmsg(fd, &msg, MSG_DONTWAIT); 822 } 823 824 static inline ssize_t 825 uring_sock_read(struct spdk_uring_sock *sock) 826 { 827 struct iovec iov[2]; 828 int bytes; 829 struct spdk_uring_sock_group_impl *group; 830 831 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 832 833 if (bytes > 0) { 834 bytes = sock_readv(sock->fd, iov, 2); 835 if (bytes > 0) { 836 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 837 if (sock->base.group_impl && !sock->pending_recv) { 838 group = __uring_group_impl(sock->base.group_impl); 839 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 840 sock->pending_recv = true; 841 } 842 } 843 } 844 845 return bytes; 846 } 847 848 static int 849 uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx) 850 { 851 struct spdk_uring_sock *sock = __uring_sock(_sock); 852 struct spdk_uring_sock_group_impl *group; 853 struct spdk_uring_buf_tracker *tr; 854 855 if (sock->connection_status < 0) { 856 errno = -sock->connection_status; 857 return -1; 858 } 859 860 if (sock->recv_pipe != NULL) { 861 errno = ENOTSUP; 862 return -1; 863 } 864 865 group = __uring_group_impl(_sock->group_impl); 866 867 tr = STAILQ_FIRST(&sock->recv_stream); 868 if (tr == NULL) { 869 if (sock->group->buf_ring_count > 0) { 870 /* There are buffers posted, but data hasn't arrived. */ 871 errno = EAGAIN; 872 } else { 873 /* There are no buffers posted, so this won't ever 874 * make forward progress. */ 875 errno = ENOBUFS; 876 } 877 return -1; 878 } 879 assert(sock->pending_recv == true); 880 assert(tr->buf != NULL); 881 882 *_buf = tr->buf + sock->recv_offset; 883 *ctx = tr->ctx; 884 885 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 886 STAILQ_INSERT_HEAD(&group->free_trackers, tr, link); 887 888 if (STAILQ_EMPTY(&sock->recv_stream)) { 889 sock->pending_recv = false; 890 TAILQ_REMOVE(&group->pending_recv, sock, link); 891 } 892 893 return tr->len - sock->recv_offset; 894 } 895 896 static ssize_t 897 uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt) 898 { 899 struct spdk_uring_sock *sock = __uring_sock(_sock); 900 struct spdk_uring_buf_tracker *tr; 901 struct iovec iov; 902 ssize_t total, len; 903 int i; 904 905 if (sock->connection_status < 0) { 906 errno = -sock->connection_status; 907 return -1; 908 } 909 910 if (_sock->group_impl == NULL) { 911 /* If not in a group just read from the socket the regular way. */ 912 return sock_readv(sock->fd, iovs, iovcnt); 913 } 914 915 if (STAILQ_EMPTY(&sock->recv_stream)) { 916 if (sock->group->buf_ring_count == 0) { 917 /* If the user hasn't posted any buffers, read from the socket 918 * directly. */ 919 920 if (sock->pending_recv) { 921 sock->pending_recv = false; 922 TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link); 923 } 924 925 return sock_readv(sock->fd, iovs, iovcnt); 926 } 927 928 errno = EAGAIN; 929 return -1; 930 } 931 932 total = 0; 933 for (i = 0; i < iovcnt; i++) { 934 /* Copy to stack so we can change it */ 935 iov = iovs[i]; 936 937 tr = STAILQ_FIRST(&sock->recv_stream); 938 while (tr != NULL) { 939 len = spdk_min(iov.iov_len, tr->len - sock->recv_offset); 940 memcpy(iov.iov_base, tr->buf + sock->recv_offset, len); 941 942 total += len; 943 sock->recv_offset += len; 944 iov.iov_base += len; 945 iov.iov_len -= len; 946 947 if (sock->recv_offset == tr->len) { 948 sock->recv_offset = 0; 949 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 950 STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link); 951 spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx); 952 tr = STAILQ_FIRST(&sock->recv_stream); 953 } 954 955 if (iov.iov_len == 0) { 956 break; 957 } 958 } 959 } 960 961 if (STAILQ_EMPTY(&sock->recv_stream)) { 962 struct spdk_uring_sock_group_impl *group; 963 964 group = __uring_group_impl(_sock->group_impl); 965 sock->pending_recv = false; 966 TAILQ_REMOVE(&group->pending_recv, sock, link); 967 } 968 969 assert(total > 0); 970 return total; 971 } 972 973 static ssize_t 974 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 975 { 976 struct spdk_uring_sock *sock = __uring_sock(_sock); 977 int rc, i; 978 size_t len; 979 980 if (sock->connection_status < 0) { 981 errno = -sock->connection_status; 982 return -1; 983 } 984 985 if (sock->recv_pipe == NULL) { 986 return uring_sock_readv_no_pipe(_sock, iov, iovcnt); 987 } 988 989 len = 0; 990 for (i = 0; i < iovcnt; i++) { 991 len += iov[i].iov_len; 992 } 993 994 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 995 /* If the user is receiving a sufficiently large amount of data, 996 * receive directly to their buffers. */ 997 if (len >= MIN_SOCK_PIPE_SIZE) { 998 return sock_readv(sock->fd, iov, iovcnt); 999 } 1000 1001 /* Otherwise, do a big read into our pipe */ 1002 rc = uring_sock_read(sock); 1003 if (rc <= 0) { 1004 return rc; 1005 } 1006 } 1007 1008 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 1009 } 1010 1011 static ssize_t 1012 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1013 { 1014 struct iovec iov[1]; 1015 1016 iov[0].iov_base = buf; 1017 iov[0].iov_len = len; 1018 1019 return uring_sock_readv(sock, iov, 1); 1020 } 1021 1022 static ssize_t 1023 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1024 { 1025 struct spdk_uring_sock *sock = __uring_sock(_sock); 1026 struct msghdr msg = { 1027 .msg_iov = iov, 1028 .msg_iovlen = iovcnt, 1029 }; 1030 1031 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1032 errno = EAGAIN; 1033 return -1; 1034 } 1035 1036 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 1037 } 1038 1039 static ssize_t 1040 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 1041 { 1042 unsigned int offset; 1043 size_t len; 1044 int i; 1045 1046 offset = req->internal.offset; 1047 for (i = 0; i < req->iovcnt; i++) { 1048 /* Advance by the offset first */ 1049 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1050 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1051 continue; 1052 } 1053 1054 /* Calculate the remaining length of this element */ 1055 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1056 1057 if (len > (size_t)rc) { 1058 req->internal.offset += rc; 1059 return -1; 1060 } 1061 1062 offset = 0; 1063 req->internal.offset += len; 1064 rc -= len; 1065 } 1066 1067 return rc; 1068 } 1069 1070 static int 1071 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 1072 { 1073 struct spdk_uring_sock *sock = __uring_sock(_sock); 1074 struct spdk_sock_request *req; 1075 int retval; 1076 1077 if (is_zcopy) { 1078 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1079 * req->internal.offset, so sendmsg_idx should not be zero */ 1080 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 1081 sock->sendmsg_idx = 1; 1082 } else { 1083 sock->sendmsg_idx++; 1084 } 1085 } 1086 1087 /* Consume the requests that were actually written */ 1088 req = TAILQ_FIRST(&_sock->queued_reqs); 1089 while (req) { 1090 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1091 req->internal.is_zcopy = is_zcopy; 1092 1093 rc = sock_request_advance_offset(req, rc); 1094 if (rc < 0) { 1095 /* This element was partially sent. */ 1096 return 0; 1097 } 1098 1099 /* Handled a full request. */ 1100 spdk_sock_request_pend(_sock, req); 1101 1102 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 1103 retval = spdk_sock_request_put(_sock, req, 0); 1104 if (retval) { 1105 return retval; 1106 } 1107 } else { 1108 /* Re-use the offset field to hold the sendmsg call index. The 1109 * index is 0 based, so subtract one here because we've already 1110 * incremented above. */ 1111 req->internal.offset = sock->sendmsg_idx - 1; 1112 } 1113 1114 if (rc == 0) { 1115 break; 1116 } 1117 1118 req = TAILQ_FIRST(&_sock->queued_reqs); 1119 } 1120 1121 return 0; 1122 } 1123 1124 #ifdef SPDK_ZEROCOPY 1125 static int 1126 _sock_check_zcopy(struct spdk_sock *_sock, int status) 1127 { 1128 struct spdk_uring_sock *sock = __uring_sock(_sock); 1129 ssize_t rc; 1130 struct sock_extended_err *serr; 1131 struct cmsghdr *cm; 1132 uint32_t idx; 1133 struct spdk_sock_request *req, *treq; 1134 bool found; 1135 1136 assert(sock->zcopy == true); 1137 if (spdk_unlikely(status) < 0) { 1138 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 1139 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 1140 status); 1141 } else { 1142 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1143 } 1144 return 0; 1145 } 1146 1147 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 1148 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1149 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 1150 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1151 return 0; 1152 } 1153 1154 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1155 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1156 SPDK_WARNLOG("Unexpected extended error origin\n"); 1157 return 0; 1158 } 1159 1160 /* Most of the time, the pending_reqs array is in the exact 1161 * order we need such that all of the requests to complete are 1162 * in order, in the front. It is guaranteed that all requests 1163 * belonging to the same sendmsg call are sequential, so once 1164 * we encounter one match we can stop looping as soon as a 1165 * non-match is found. 1166 */ 1167 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 1168 found = false; 1169 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 1170 if (!req->internal.is_zcopy) { 1171 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1172 rc = spdk_sock_request_put(_sock, req, 0); 1173 if (rc < 0) { 1174 return rc; 1175 } 1176 } else if (req->internal.offset == idx) { 1177 found = true; 1178 rc = spdk_sock_request_put(_sock, req, 0); 1179 if (rc < 0) { 1180 return rc; 1181 } 1182 } else if (found) { 1183 break; 1184 } 1185 } 1186 } 1187 1188 return 0; 1189 } 1190 1191 static void 1192 _sock_prep_errqueue(struct spdk_sock *_sock) 1193 { 1194 struct spdk_uring_sock *sock = __uring_sock(_sock); 1195 struct spdk_uring_task *task = &sock->errqueue_task; 1196 struct io_uring_sqe *sqe; 1197 1198 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1199 return; 1200 } 1201 1202 if (sock->pending_group_remove) { 1203 return; 1204 } 1205 1206 assert(sock->group != NULL); 1207 sock->group->io_queued++; 1208 1209 sqe = io_uring_get_sqe(&sock->group->uring); 1210 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1211 io_uring_sqe_set_data(sqe, task); 1212 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1213 } 1214 1215 #endif 1216 1217 static void 1218 _sock_flush(struct spdk_sock *_sock) 1219 { 1220 struct spdk_uring_sock *sock = __uring_sock(_sock); 1221 struct spdk_uring_task *task = &sock->write_task; 1222 uint32_t iovcnt; 1223 struct io_uring_sqe *sqe; 1224 int flags; 1225 1226 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1227 return; 1228 } 1229 1230 #ifdef SPDK_ZEROCOPY 1231 if (sock->zcopy) { 1232 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1233 } else 1234 #endif 1235 { 1236 flags = MSG_DONTWAIT; 1237 } 1238 1239 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1240 if (!iovcnt) { 1241 return; 1242 } 1243 1244 task->iov_cnt = iovcnt; 1245 assert(sock->group != NULL); 1246 task->msg.msg_iov = task->iovs; 1247 task->msg.msg_iovlen = task->iov_cnt; 1248 #ifdef SPDK_ZEROCOPY 1249 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1250 #endif 1251 sock->group->io_queued++; 1252 1253 sqe = io_uring_get_sqe(&sock->group->uring); 1254 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1255 io_uring_sqe_set_data(sqe, task); 1256 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1257 } 1258 1259 static void 1260 _sock_prep_read(struct spdk_sock *_sock) 1261 { 1262 struct spdk_uring_sock *sock = __uring_sock(_sock); 1263 struct spdk_uring_task *task = &sock->read_task; 1264 struct io_uring_sqe *sqe; 1265 1266 /* Do not prepare read event */ 1267 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1268 return; 1269 } 1270 1271 if (sock->pending_group_remove) { 1272 return; 1273 } 1274 1275 assert(sock->group != NULL); 1276 sock->group->io_queued++; 1277 1278 sqe = io_uring_get_sqe(&sock->group->uring); 1279 io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0); 1280 sqe->buf_group = URING_BUF_GROUP_ID; 1281 sqe->flags |= IOSQE_BUFFER_SELECT; 1282 io_uring_sqe_set_data(sqe, task); 1283 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1284 } 1285 1286 static void 1287 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1288 { 1289 struct spdk_uring_sock *sock = __uring_sock(_sock); 1290 struct spdk_uring_task *task = &sock->cancel_task; 1291 struct io_uring_sqe *sqe; 1292 1293 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1294 return; 1295 } 1296 1297 assert(sock->group != NULL); 1298 sock->group->io_queued++; 1299 1300 sqe = io_uring_get_sqe(&sock->group->uring); 1301 io_uring_prep_cancel(sqe, user_data, 0); 1302 io_uring_sqe_set_data(sqe, task); 1303 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1304 } 1305 1306 static void 1307 uring_sock_fail(struct spdk_uring_sock *sock, int status) 1308 { 1309 struct spdk_uring_sock_group_impl *group = sock->group; 1310 int rc; 1311 1312 sock->connection_status = status; 1313 rc = spdk_sock_abort_requests(&sock->base); 1314 1315 /* The user needs to be notified that this socket is dead. */ 1316 if (rc == 0 && sock->base.cb_fn != NULL && 1317 sock->pending_recv == false) { 1318 sock->pending_recv = true; 1319 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1320 } 1321 } 1322 1323 static int 1324 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1325 struct spdk_sock **socks) 1326 { 1327 int i, count, ret; 1328 struct io_uring_cqe *cqe; 1329 struct spdk_uring_sock *sock, *tmp; 1330 struct spdk_uring_task *task; 1331 int status, bid, flags; 1332 bool is_zcopy; 1333 1334 for (i = 0; i < max; i++) { 1335 ret = io_uring_peek_cqe(&group->uring, &cqe); 1336 if (ret != 0) { 1337 break; 1338 } 1339 1340 if (cqe == NULL) { 1341 break; 1342 } 1343 1344 task = (struct spdk_uring_task *)cqe->user_data; 1345 assert(task != NULL); 1346 sock = task->sock; 1347 assert(sock != NULL); 1348 assert(sock->group != NULL); 1349 assert(sock->group == group); 1350 sock->group->io_inflight--; 1351 sock->group->io_avail++; 1352 status = cqe->res; 1353 flags = cqe->flags; 1354 io_uring_cqe_seen(&group->uring, cqe); 1355 1356 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1357 1358 switch (task->type) { 1359 case URING_TASK_READ: 1360 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1361 /* This likely shouldn't happen, but would indicate that the 1362 * kernel didn't have enough resources to queue a task internally. */ 1363 _sock_prep_read(&sock->base); 1364 } else if (status == -ECANCELED) { 1365 continue; 1366 } else if (status == -ENOBUFS) { 1367 /* There's data in the socket but the user hasn't provided any buffers. 1368 * We need to notify the user that the socket has data pending. */ 1369 if (sock->base.cb_fn != NULL && 1370 sock->pending_recv == false) { 1371 sock->pending_recv = true; 1372 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1373 } 1374 1375 _sock_prep_read(&sock->base); 1376 } else if (spdk_unlikely(status <= 0)) { 1377 uring_sock_fail(sock, status < 0 ? status : -ECONNRESET); 1378 } else { 1379 struct spdk_uring_buf_tracker *tracker; 1380 1381 assert((flags & IORING_CQE_F_BUFFER) != 0); 1382 1383 bid = flags >> IORING_CQE_BUFFER_SHIFT; 1384 tracker = &group->trackers[bid]; 1385 1386 assert(tracker->buf != NULL); 1387 assert(tracker->len != 0); 1388 1389 /* Append this data to the stream */ 1390 tracker->len = status; 1391 STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link); 1392 assert(group->buf_ring_count > 0); 1393 group->buf_ring_count--; 1394 1395 if (sock->base.cb_fn != NULL && 1396 sock->pending_recv == false) { 1397 sock->pending_recv = true; 1398 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1399 } 1400 1401 _sock_prep_read(&sock->base); 1402 } 1403 break; 1404 case URING_TASK_WRITE: 1405 if (status == -EAGAIN || status == -EWOULDBLOCK || 1406 (status == -ENOBUFS && sock->zcopy) || 1407 status == -ECANCELED) { 1408 continue; 1409 } else if (spdk_unlikely(status) < 0) { 1410 uring_sock_fail(sock, status); 1411 } else { 1412 task->last_req = NULL; 1413 task->iov_cnt = 0; 1414 is_zcopy = task->is_zcopy; 1415 task->is_zcopy = false; 1416 sock_complete_write_reqs(&sock->base, status, is_zcopy); 1417 } 1418 1419 break; 1420 #ifdef SPDK_ZEROCOPY 1421 case URING_TASK_ERRQUEUE: 1422 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1423 _sock_prep_errqueue(&sock->base); 1424 } else if (status == -ECANCELED) { 1425 continue; 1426 } else if (spdk_unlikely(status < 0)) { 1427 uring_sock_fail(sock, status); 1428 } else { 1429 _sock_check_zcopy(&sock->base, status); 1430 _sock_prep_errqueue(&sock->base); 1431 } 1432 break; 1433 #endif 1434 case URING_TASK_CANCEL: 1435 /* Do nothing */ 1436 break; 1437 default: 1438 SPDK_UNREACHABLE(); 1439 } 1440 } 1441 1442 if (!socks) { 1443 return 0; 1444 } 1445 count = 0; 1446 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1447 if (count == max_read_events) { 1448 break; 1449 } 1450 1451 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1452 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1453 assert(sock->pending_recv == true); 1454 sock->pending_recv = false; 1455 TAILQ_REMOVE(&group->pending_recv, sock, link); 1456 continue; 1457 } 1458 1459 socks[count++] = &sock->base; 1460 } 1461 1462 1463 /* Cycle the pending_recv list so that each time we poll things aren't 1464 * in the same order. Say we have 6 sockets in the list, named as follows: 1465 * A B C D E F 1466 * And all 6 sockets had the poll events, but max_events is only 3. That means 1467 * psock currently points at D. We want to rearrange the list to the following: 1468 * D E F A B C 1469 * 1470 * The variables below are named according to this example to make it easier to 1471 * follow the swaps. 1472 */ 1473 if (sock != NULL) { 1474 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1475 1476 /* Capture pointers to the elements we need */ 1477 ud = sock; 1478 1479 ua = TAILQ_FIRST(&group->pending_recv); 1480 if (ua == ud) { 1481 goto end; 1482 } 1483 1484 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1485 if (uf == ud) { 1486 TAILQ_REMOVE(&group->pending_recv, ud, link); 1487 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1488 goto end; 1489 } 1490 1491 uc = TAILQ_PREV(ud, pending_recv_list, link); 1492 assert(uc != NULL); 1493 1494 /* Break the link between C and D */ 1495 uc->link.tqe_next = NULL; 1496 1497 /* Connect F to A */ 1498 uf->link.tqe_next = ua; 1499 ua->link.tqe_prev = &uf->link.tqe_next; 1500 1501 /* Fix up the list first/last pointers */ 1502 group->pending_recv.tqh_first = ud; 1503 group->pending_recv.tqh_last = &uc->link.tqe_next; 1504 1505 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1506 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1507 } 1508 1509 end: 1510 return count; 1511 } 1512 1513 static int uring_sock_flush(struct spdk_sock *_sock); 1514 1515 static void 1516 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1517 { 1518 struct spdk_uring_sock *sock = __uring_sock(_sock); 1519 int rc; 1520 1521 if (spdk_unlikely(sock->connection_status)) { 1522 req->cb_fn(req->cb_arg, sock->connection_status); 1523 return; 1524 } 1525 1526 spdk_sock_request_queue(_sock, req); 1527 1528 if (!sock->group) { 1529 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1530 rc = uring_sock_flush(_sock); 1531 if (rc < 0 && errno != EAGAIN) { 1532 spdk_sock_abort_requests(_sock); 1533 } 1534 } 1535 } 1536 } 1537 1538 static int 1539 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1540 { 1541 struct spdk_uring_sock *sock = __uring_sock(_sock); 1542 int val; 1543 int rc; 1544 1545 assert(sock != NULL); 1546 1547 val = nbytes; 1548 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1549 if (rc != 0) { 1550 return -1; 1551 } 1552 return 0; 1553 } 1554 1555 static bool 1556 uring_sock_is_ipv6(struct spdk_sock *_sock) 1557 { 1558 struct spdk_uring_sock *sock = __uring_sock(_sock); 1559 struct sockaddr_storage sa; 1560 socklen_t salen; 1561 int rc; 1562 1563 assert(sock != NULL); 1564 1565 memset(&sa, 0, sizeof sa); 1566 salen = sizeof sa; 1567 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1568 if (rc != 0) { 1569 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1570 return false; 1571 } 1572 1573 return (sa.ss_family == AF_INET6); 1574 } 1575 1576 static bool 1577 uring_sock_is_ipv4(struct spdk_sock *_sock) 1578 { 1579 struct spdk_uring_sock *sock = __uring_sock(_sock); 1580 struct sockaddr_storage sa; 1581 socklen_t salen; 1582 int rc; 1583 1584 assert(sock != NULL); 1585 1586 memset(&sa, 0, sizeof sa); 1587 salen = sizeof sa; 1588 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1589 if (rc != 0) { 1590 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1591 return false; 1592 } 1593 1594 return (sa.ss_family == AF_INET); 1595 } 1596 1597 static bool 1598 uring_sock_is_connected(struct spdk_sock *_sock) 1599 { 1600 struct spdk_uring_sock *sock = __uring_sock(_sock); 1601 uint8_t byte; 1602 int rc; 1603 1604 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1605 if (rc == 0) { 1606 return false; 1607 } 1608 1609 if (rc < 0) { 1610 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1611 return true; 1612 } 1613 1614 return false; 1615 } 1616 1617 return true; 1618 } 1619 1620 static struct spdk_sock_group_impl * 1621 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1622 { 1623 struct spdk_uring_sock *sock = __uring_sock(_sock); 1624 struct spdk_sock_group_impl *group; 1625 1626 if (sock->placement_id != -1) { 1627 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1628 return group; 1629 } 1630 1631 return NULL; 1632 } 1633 1634 static int 1635 uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl) 1636 { 1637 if (group_impl->buf_ring) { 1638 io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID); 1639 free(group_impl->buf_ring); 1640 } 1641 1642 free(group_impl->trackers); 1643 1644 return 0; 1645 } 1646 1647 static int 1648 uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl) 1649 { 1650 struct io_uring_buf_reg buf_reg = {}; 1651 struct io_uring_buf_ring *buf_ring; 1652 int i, rc; 1653 1654 rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf)); 1655 if (rc != 0) { 1656 /* posix_memalign returns positive errno values */ 1657 return -rc; 1658 } 1659 1660 buf_reg.ring_addr = (unsigned long long)buf_ring; 1661 buf_reg.ring_entries = URING_BUF_POOL_SIZE; 1662 buf_reg.bgid = URING_BUF_GROUP_ID; 1663 1664 rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0); 1665 if (rc != 0) { 1666 free(buf_ring); 1667 return rc; 1668 } 1669 1670 group_impl->buf_ring = buf_ring; 1671 io_uring_buf_ring_init(group_impl->buf_ring); 1672 group_impl->buf_ring_count = 0; 1673 1674 group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker)); 1675 if (group_impl->trackers == NULL) { 1676 uring_sock_group_impl_buf_pool_free(group_impl); 1677 return -ENOMEM; 1678 } 1679 1680 STAILQ_INIT(&group_impl->free_trackers); 1681 1682 for (i = 0; i < URING_BUF_POOL_SIZE; i++) { 1683 struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i]; 1684 1685 tracker->buf = NULL; 1686 tracker->len = 0; 1687 tracker->ctx = NULL; 1688 tracker->id = i; 1689 1690 STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link); 1691 } 1692 1693 return 0; 1694 } 1695 1696 static struct spdk_sock_group_impl * 1697 uring_sock_group_impl_create(void) 1698 { 1699 struct spdk_uring_sock_group_impl *group_impl; 1700 1701 group_impl = calloc(1, sizeof(*group_impl)); 1702 if (group_impl == NULL) { 1703 SPDK_ERRLOG("group_impl allocation failed\n"); 1704 return NULL; 1705 } 1706 1707 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1708 1709 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1710 SPDK_ERRLOG("uring I/O context setup failure\n"); 1711 free(group_impl); 1712 return NULL; 1713 } 1714 1715 TAILQ_INIT(&group_impl->pending_recv); 1716 1717 if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) { 1718 SPDK_ERRLOG("Failed to create buffer ring. Your kernel is likely not new enough. " 1719 "Please switch to the POSIX sock implementation instead.\n"); 1720 io_uring_queue_exit(&group_impl->uring); 1721 free(group_impl); 1722 return NULL; 1723 } 1724 1725 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1726 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1727 } 1728 1729 return &group_impl->base; 1730 } 1731 1732 static int 1733 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1734 struct spdk_sock *_sock) 1735 { 1736 struct spdk_uring_sock *sock = __uring_sock(_sock); 1737 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1738 int rc; 1739 1740 sock->group = group; 1741 sock->write_task.sock = sock; 1742 sock->write_task.type = URING_TASK_WRITE; 1743 1744 sock->read_task.sock = sock; 1745 sock->read_task.type = URING_TASK_READ; 1746 1747 sock->errqueue_task.sock = sock; 1748 sock->errqueue_task.type = URING_TASK_ERRQUEUE; 1749 sock->errqueue_task.msg.msg_control = sock->buf; 1750 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1751 1752 sock->cancel_task.sock = sock; 1753 sock->cancel_task.type = URING_TASK_CANCEL; 1754 1755 /* switched from another polling group due to scheduling */ 1756 if (spdk_unlikely(sock->recv_pipe != NULL && 1757 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1758 assert(sock->pending_recv == false); 1759 sock->pending_recv = true; 1760 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1761 } 1762 1763 if (sock->placement_id != -1) { 1764 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1765 if (rc != 0) { 1766 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1767 /* Do not treat this as an error. The system will continue running. */ 1768 } 1769 } 1770 1771 /* We get an async read going immediately */ 1772 _sock_prep_read(&sock->base); 1773 #ifdef SPDK_ZEROCOPY 1774 if (sock->zcopy) { 1775 _sock_prep_errqueue(_sock); 1776 } 1777 #endif 1778 1779 return 0; 1780 } 1781 1782 static void 1783 uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group) 1784 { 1785 struct spdk_uring_buf_tracker *tracker; 1786 int count, mask; 1787 1788 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 1789 /* If recv_pipe is enabled, we do not post buffers. */ 1790 return; 1791 } 1792 1793 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1794 tracker = STAILQ_FIRST(&group->free_trackers); 1795 count = 0; 1796 mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE); 1797 while (tracker != NULL) { 1798 tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx); 1799 if (tracker->buflen == 0) { 1800 break; 1801 } 1802 1803 assert(tracker->buf != NULL); 1804 STAILQ_REMOVE_HEAD(&group->free_trackers, link); 1805 assert(STAILQ_FIRST(&group->free_trackers) != tracker); 1806 1807 io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count); 1808 count++; 1809 tracker = STAILQ_FIRST(&group->free_trackers); 1810 } 1811 1812 if (count > 0) { 1813 group->buf_ring_count += count; 1814 io_uring_buf_ring_advance(group->buf_ring, count); 1815 } 1816 } 1817 1818 static int 1819 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1820 struct spdk_sock **socks) 1821 { 1822 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1823 int count, ret; 1824 int to_complete, to_submit; 1825 struct spdk_sock *_sock, *tmp; 1826 struct spdk_uring_sock *sock; 1827 1828 if (spdk_likely(socks)) { 1829 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1830 sock = __uring_sock(_sock); 1831 if (spdk_unlikely(sock->connection_status)) { 1832 continue; 1833 } 1834 _sock_flush(_sock); 1835 } 1836 } 1837 1838 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1839 uring_sock_group_populate_buf_ring(group); 1840 1841 to_submit = group->io_queued; 1842 1843 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1844 if (to_submit > 0) { 1845 /* If there are I/O to submit, use io_uring_submit here. 1846 * It will automatically call io_uring_enter appropriately. */ 1847 ret = io_uring_submit(&group->uring); 1848 if (ret < 0) { 1849 return 1; 1850 } 1851 group->io_queued = 0; 1852 group->io_inflight += to_submit; 1853 group->io_avail -= to_submit; 1854 } 1855 1856 count = 0; 1857 to_complete = group->io_inflight; 1858 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1859 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1860 } 1861 1862 return count; 1863 } 1864 1865 static int 1866 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1867 struct spdk_sock *_sock) 1868 { 1869 struct spdk_uring_sock *sock = __uring_sock(_sock); 1870 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1871 1872 sock->pending_group_remove = true; 1873 1874 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1875 _sock_prep_cancel_task(_sock, &sock->write_task); 1876 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1877 * currently can use a while loop here. */ 1878 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1879 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1880 uring_sock_group_impl_poll(_group, 32, NULL); 1881 } 1882 } 1883 1884 if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1885 _sock_prep_cancel_task(_sock, &sock->read_task); 1886 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1887 * currently can use a while loop here. */ 1888 while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1889 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1890 uring_sock_group_impl_poll(_group, 32, NULL); 1891 } 1892 } 1893 1894 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1895 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1896 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1897 * currently can use a while loop here. */ 1898 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1899 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1900 uring_sock_group_impl_poll(_group, 32, NULL); 1901 } 1902 } 1903 1904 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1905 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1906 assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1907 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1908 1909 if (sock->pending_recv) { 1910 TAILQ_REMOVE(&group->pending_recv, sock, link); 1911 sock->pending_recv = false; 1912 } 1913 assert(sock->pending_recv == false); 1914 1915 /* We have no way to handle this case. We could let the user read this 1916 * buffer, but the buffer came from a group and we have lost the association 1917 * to that so we couldn't release it. */ 1918 assert(STAILQ_EMPTY(&sock->recv_stream)); 1919 1920 if (sock->placement_id != -1) { 1921 spdk_sock_map_release(&g_map, sock->placement_id); 1922 } 1923 1924 sock->pending_group_remove = false; 1925 sock->group = NULL; 1926 return 0; 1927 } 1928 1929 static int 1930 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1931 { 1932 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1933 1934 /* try to reap all the active I/O */ 1935 while (group->io_inflight) { 1936 uring_sock_group_impl_poll(_group, 32, NULL); 1937 } 1938 assert(group->io_inflight == 0); 1939 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1940 1941 uring_sock_group_impl_buf_pool_free(group); 1942 1943 io_uring_queue_exit(&group->uring); 1944 1945 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1946 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1947 } 1948 1949 free(group); 1950 return 0; 1951 } 1952 1953 static int 1954 uring_sock_flush(struct spdk_sock *_sock) 1955 { 1956 struct spdk_uring_sock *sock = __uring_sock(_sock); 1957 struct msghdr msg = {}; 1958 struct iovec iovs[IOV_BATCH_SIZE]; 1959 int iovcnt; 1960 ssize_t rc; 1961 int flags = sock->zcopy_send_flags; 1962 int retval; 1963 bool is_zcopy = false; 1964 struct spdk_uring_task *task = &sock->errqueue_task; 1965 1966 /* Can't flush from within a callback or we end up with recursive calls */ 1967 if (_sock->cb_cnt > 0) { 1968 errno = EAGAIN; 1969 return -1; 1970 } 1971 1972 /* Can't flush while a write is already outstanding */ 1973 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1974 errno = EAGAIN; 1975 return -1; 1976 } 1977 1978 /* Gather an iov */ 1979 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1980 if (iovcnt == 0) { 1981 /* Nothing to send */ 1982 return 0; 1983 } 1984 1985 /* Perform the vectored write */ 1986 msg.msg_iov = iovs; 1987 msg.msg_iovlen = iovcnt; 1988 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 1989 if (rc <= 0) { 1990 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) { 1991 errno = EAGAIN; 1992 } 1993 return -1; 1994 } 1995 1996 #ifdef SPDK_ZEROCOPY 1997 is_zcopy = flags & MSG_ZEROCOPY; 1998 #endif 1999 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 2000 if (retval < 0) { 2001 /* if the socket is closed, return to avoid heap-use-after-free error */ 2002 errno = ENOTCONN; 2003 return -1; 2004 } 2005 2006 #ifdef SPDK_ZEROCOPY 2007 /* At least do once to check zero copy case */ 2008 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 2009 retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE); 2010 if (retval < 0) { 2011 if (errno == EWOULDBLOCK || errno == EAGAIN) { 2012 return rc; 2013 } 2014 } 2015 _sock_check_zcopy(_sock, retval);; 2016 } 2017 #endif 2018 2019 return rc; 2020 } 2021 2022 static int 2023 uring_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events, 2024 spdk_interrupt_fn fn, void *arg, const char *name) 2025 { 2026 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 2027 2028 return -ENOTSUP; 2029 } 2030 2031 static void 2032 uring_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group) 2033 { 2034 } 2035 2036 static struct spdk_net_impl g_uring_net_impl = { 2037 .name = "uring", 2038 .getaddr = uring_sock_getaddr, 2039 .connect = uring_sock_connect, 2040 .listen = uring_sock_listen, 2041 .accept = uring_sock_accept, 2042 .close = uring_sock_close, 2043 .recv = uring_sock_recv, 2044 .readv = uring_sock_readv, 2045 .writev = uring_sock_writev, 2046 .recv_next = uring_sock_recv_next, 2047 .writev_async = uring_sock_writev_async, 2048 .flush = uring_sock_flush, 2049 .set_recvlowat = uring_sock_set_recvlowat, 2050 .set_recvbuf = uring_sock_set_recvbuf, 2051 .set_sendbuf = uring_sock_set_sendbuf, 2052 .is_ipv6 = uring_sock_is_ipv6, 2053 .is_ipv4 = uring_sock_is_ipv4, 2054 .is_connected = uring_sock_is_connected, 2055 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 2056 .group_impl_create = uring_sock_group_impl_create, 2057 .group_impl_add_sock = uring_sock_group_impl_add_sock, 2058 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 2059 .group_impl_poll = uring_sock_group_impl_poll, 2060 .group_impl_register_interrupt = uring_sock_group_impl_register_interrupt, 2061 .group_impl_unregister_interrupt = uring_sock_group_impl_unregister_interrupt, 2062 .group_impl_close = uring_sock_group_impl_close, 2063 .get_opts = uring_sock_impl_get_opts, 2064 .set_opts = uring_sock_impl_set_opts, 2065 }; 2066 2067 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl); 2068