1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 21 #include "spdk_internal/sock.h" 22 #include "spdk_internal/assert.h" 23 #include "../sock_kernel.h" 24 25 #define MAX_TMPBUF 1024 26 #define PORTNUMLEN 32 27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 29 30 enum uring_task_type { 31 URING_TASK_READ = 0, 32 URING_TASK_ERRQUEUE, 33 URING_TASK_WRITE, 34 URING_TASK_CANCEL, 35 }; 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 /* We don't know how big the buffers that the user posts will be, but this 42 * is the maximum we'll ever allow it to receive in a single command. 43 * If the user buffers are smaller, it will just receive less. */ 44 #define URING_MAX_RECV_SIZE (128 * 1024) 45 46 /* We don't know how many buffers the user will post, but this is the 47 * maximum number we'll take from the pool to post per group. */ 48 #define URING_BUF_POOL_SIZE 128 49 50 /* We use 1 just so it's not zero and we can validate it's right. */ 51 #define URING_BUF_GROUP_ID 1 52 53 enum spdk_uring_sock_task_status { 54 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 55 SPDK_URING_SOCK_TASK_IN_PROCESS, 56 }; 57 58 struct spdk_uring_task { 59 enum spdk_uring_sock_task_status status; 60 enum uring_task_type type; 61 struct spdk_uring_sock *sock; 62 struct msghdr msg; 63 struct iovec iovs[IOV_BATCH_SIZE]; 64 int iov_cnt; 65 struct spdk_sock_request *last_req; 66 bool is_zcopy; 67 STAILQ_ENTRY(spdk_uring_task) link; 68 }; 69 70 struct spdk_uring_sock { 71 struct spdk_sock base; 72 int fd; 73 uint32_t sendmsg_idx; 74 struct spdk_uring_sock_group_impl *group; 75 STAILQ_HEAD(, spdk_uring_buf_tracker) recv_stream; 76 size_t recv_offset; 77 struct spdk_uring_task write_task; 78 struct spdk_uring_task errqueue_task; 79 struct spdk_uring_task read_task; 80 struct spdk_uring_task cancel_task; 81 struct spdk_pipe *recv_pipe; 82 void *recv_buf; 83 int recv_buf_sz; 84 bool zcopy; 85 bool pending_recv; 86 bool pending_group_remove; 87 int zcopy_send_flags; 88 int connection_status; 89 int placement_id; 90 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 91 TAILQ_ENTRY(spdk_uring_sock) link; 92 }; 93 94 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 95 96 struct spdk_uring_buf_tracker { 97 void *buf; 98 size_t buflen; 99 size_t len; 100 void *ctx; 101 int id; 102 STAILQ_ENTRY(spdk_uring_buf_tracker) link; 103 }; 104 105 struct spdk_uring_sock_group_impl { 106 struct spdk_sock_group_impl base; 107 struct io_uring uring; 108 uint32_t io_inflight; 109 uint32_t io_queued; 110 uint32_t io_avail; 111 struct pending_recv_list pending_recv; 112 113 struct io_uring_buf_ring *buf_ring; 114 uint32_t buf_ring_count; 115 struct spdk_uring_buf_tracker *trackers; 116 STAILQ_HEAD(, spdk_uring_buf_tracker) free_trackers; 117 }; 118 119 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 120 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 121 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 122 .enable_recv_pipe = true, 123 .enable_quickack = false, 124 .enable_placement_id = PLACEMENT_NONE, 125 .enable_zerocopy_send_server = false, 126 .enable_zerocopy_send_client = false, 127 .zerocopy_threshold = 0, 128 .tls_version = 0, 129 .enable_ktls = false, 130 .psk_key = NULL, 131 .psk_identity = NULL 132 }; 133 134 static struct spdk_sock_map g_map = { 135 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 136 .mtx = PTHREAD_MUTEX_INITIALIZER 137 }; 138 139 __attribute((destructor)) static void 140 uring_sock_map_cleanup(void) 141 { 142 spdk_sock_map_cleanup(&g_map); 143 } 144 145 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 146 147 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 148 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 149 150 static void 151 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 152 size_t len) 153 { 154 #define FIELD_OK(field) \ 155 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 156 157 #define SET_FIELD(field) \ 158 if (FIELD_OK(field)) { \ 159 dest->field = src->field; \ 160 } 161 162 SET_FIELD(recv_buf_size); 163 SET_FIELD(send_buf_size); 164 SET_FIELD(enable_recv_pipe); 165 SET_FIELD(enable_quickack); 166 SET_FIELD(enable_placement_id); 167 SET_FIELD(enable_zerocopy_send_server); 168 SET_FIELD(enable_zerocopy_send_client); 169 SET_FIELD(zerocopy_threshold); 170 SET_FIELD(tls_version); 171 SET_FIELD(enable_ktls); 172 SET_FIELD(psk_key); 173 SET_FIELD(psk_identity); 174 175 #undef SET_FIELD 176 #undef FIELD_OK 177 } 178 179 static int 180 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 181 { 182 if (!opts || !len) { 183 errno = EINVAL; 184 return -1; 185 } 186 187 assert(sizeof(*opts) >= *len); 188 memset(opts, 0, *len); 189 190 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 191 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 192 193 return 0; 194 } 195 196 static int 197 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 198 { 199 if (!opts) { 200 errno = EINVAL; 201 return -1; 202 } 203 204 assert(sizeof(*opts) >= len); 205 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 206 207 return 0; 208 } 209 210 static void 211 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 212 { 213 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 214 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 215 216 if (opts->impl_opts != NULL) { 217 assert(sizeof(*dest) >= opts->impl_opts_size); 218 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 219 } 220 } 221 222 static int 223 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 224 char *caddr, int clen, uint16_t *cport) 225 { 226 struct spdk_uring_sock *sock = __uring_sock(_sock); 227 struct sockaddr_storage sa; 228 socklen_t salen; 229 int rc; 230 231 assert(sock != NULL); 232 233 memset(&sa, 0, sizeof sa); 234 salen = sizeof sa; 235 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 236 if (rc != 0) { 237 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 238 return -1; 239 } 240 241 switch (sa.ss_family) { 242 case AF_UNIX: 243 /* Acceptable connection types that don't have IPs */ 244 return 0; 245 case AF_INET: 246 case AF_INET6: 247 /* Code below will get IP addresses */ 248 break; 249 default: 250 /* Unsupported socket family */ 251 return -1; 252 } 253 254 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 255 if (rc != 0) { 256 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 257 return -1; 258 } 259 260 if (sport) { 261 if (sa.ss_family == AF_INET) { 262 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 263 } else if (sa.ss_family == AF_INET6) { 264 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 265 } 266 } 267 268 memset(&sa, 0, sizeof sa); 269 salen = sizeof sa; 270 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 271 if (rc != 0) { 272 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 273 return -1; 274 } 275 276 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 277 if (rc != 0) { 278 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 279 return -1; 280 } 281 282 if (cport) { 283 if (sa.ss_family == AF_INET) { 284 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 285 } else if (sa.ss_family == AF_INET6) { 286 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 287 } 288 } 289 290 return 0; 291 } 292 293 enum uring_sock_create_type { 294 SPDK_SOCK_CREATE_LISTEN, 295 SPDK_SOCK_CREATE_CONNECT, 296 }; 297 298 static int 299 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 300 { 301 uint8_t *new_buf; 302 struct spdk_pipe *new_pipe; 303 struct iovec siov[2]; 304 struct iovec diov[2]; 305 int sbytes; 306 ssize_t bytes; 307 int rc; 308 309 if (sock->recv_buf_sz == sz) { 310 return 0; 311 } 312 313 /* If the new size is 0, just free the pipe */ 314 if (sz == 0) { 315 spdk_pipe_destroy(sock->recv_pipe); 316 free(sock->recv_buf); 317 sock->recv_pipe = NULL; 318 sock->recv_buf = NULL; 319 return 0; 320 } else if (sz < MIN_SOCK_PIPE_SIZE) { 321 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 322 return -1; 323 } 324 325 /* Round up to next 64 byte multiple */ 326 rc = posix_memalign((void **)&new_buf, 64, sz); 327 if (rc != 0) { 328 SPDK_ERRLOG("socket recv buf allocation failed\n"); 329 return -ENOMEM; 330 } 331 memset(new_buf, 0, sz); 332 333 new_pipe = spdk_pipe_create(new_buf, sz); 334 if (new_pipe == NULL) { 335 SPDK_ERRLOG("socket pipe allocation failed\n"); 336 free(new_buf); 337 return -ENOMEM; 338 } 339 340 if (sock->recv_pipe != NULL) { 341 /* Pull all of the data out of the old pipe */ 342 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 343 if (sbytes > sz) { 344 /* Too much data to fit into the new pipe size */ 345 spdk_pipe_destroy(new_pipe); 346 free(new_buf); 347 return -EINVAL; 348 } 349 350 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 351 assert(sbytes == sz); 352 353 bytes = spdk_iovcpy(siov, 2, diov, 2); 354 spdk_pipe_writer_advance(new_pipe, bytes); 355 356 spdk_pipe_destroy(sock->recv_pipe); 357 free(sock->recv_buf); 358 } 359 360 sock->recv_buf_sz = sz; 361 sock->recv_buf = new_buf; 362 sock->recv_pipe = new_pipe; 363 364 return 0; 365 } 366 367 static int 368 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 369 { 370 struct spdk_uring_sock *sock = __uring_sock(_sock); 371 int min_size; 372 int rc; 373 374 assert(sock != NULL); 375 376 if (_sock->impl_opts.enable_recv_pipe) { 377 rc = uring_sock_alloc_pipe(sock, sz); 378 if (rc) { 379 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 380 return rc; 381 } 382 } 383 384 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 385 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 386 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 387 388 if (sz < min_size) { 389 sz = min_size; 390 } 391 392 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 393 if (rc < 0) { 394 return rc; 395 } 396 397 _sock->impl_opts.recv_buf_size = sz; 398 399 return 0; 400 } 401 402 static int 403 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 404 { 405 struct spdk_uring_sock *sock = __uring_sock(_sock); 406 int min_size; 407 int rc; 408 409 assert(sock != NULL); 410 411 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 412 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 413 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 414 415 if (sz < min_size) { 416 sz = min_size; 417 } 418 419 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 420 if (rc < 0) { 421 return rc; 422 } 423 424 _sock->impl_opts.send_buf_size = sz; 425 426 return 0; 427 } 428 429 static struct spdk_uring_sock * 430 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 431 { 432 struct spdk_uring_sock *sock; 433 #if defined(__linux__) 434 int flag; 435 int rc; 436 #endif 437 438 sock = calloc(1, sizeof(*sock)); 439 if (sock == NULL) { 440 SPDK_ERRLOG("sock allocation failed\n"); 441 return NULL; 442 } 443 444 sock->fd = fd; 445 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 446 447 STAILQ_INIT(&sock->recv_stream); 448 449 #if defined(__linux__) 450 flag = 1; 451 452 if (sock->base.impl_opts.enable_quickack) { 453 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 454 if (rc != 0) { 455 SPDK_ERRLOG("quickack was failed to set\n"); 456 } 457 } 458 459 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 460 &sock->placement_id); 461 #ifdef SPDK_ZEROCOPY 462 /* Try to turn on zero copy sends */ 463 flag = 1; 464 465 if (enable_zero_copy) { 466 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 467 if (rc == 0) { 468 sock->zcopy = true; 469 sock->zcopy_send_flags = MSG_ZEROCOPY; 470 } 471 } 472 #endif 473 #endif 474 475 return sock; 476 } 477 478 static struct spdk_sock * 479 uring_sock_create(const char *ip, int port, 480 enum uring_sock_create_type type, 481 struct spdk_sock_opts *opts) 482 { 483 struct spdk_uring_sock *sock; 484 struct spdk_sock_impl_opts impl_opts; 485 char buf[MAX_TMPBUF]; 486 char portnum[PORTNUMLEN]; 487 char *p; 488 struct addrinfo hints, *res, *res0; 489 int fd, flag; 490 int val = 1; 491 int rc; 492 bool enable_zcopy_impl_opts = false; 493 bool enable_zcopy_user_opts = true; 494 495 assert(opts != NULL); 496 uring_opts_get_impl_opts(opts, &impl_opts); 497 498 if (ip == NULL) { 499 return NULL; 500 } 501 if (ip[0] == '[') { 502 snprintf(buf, sizeof(buf), "%s", ip + 1); 503 p = strchr(buf, ']'); 504 if (p != NULL) { 505 *p = '\0'; 506 } 507 ip = (const char *) &buf[0]; 508 } 509 510 snprintf(portnum, sizeof portnum, "%d", port); 511 memset(&hints, 0, sizeof hints); 512 hints.ai_family = PF_UNSPEC; 513 hints.ai_socktype = SOCK_STREAM; 514 hints.ai_flags = AI_NUMERICSERV; 515 hints.ai_flags |= AI_PASSIVE; 516 hints.ai_flags |= AI_NUMERICHOST; 517 rc = getaddrinfo(ip, portnum, &hints, &res0); 518 if (rc != 0) { 519 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 520 return NULL; 521 } 522 523 /* try listen */ 524 fd = -1; 525 for (res = res0; res != NULL; res = res->ai_next) { 526 retry: 527 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 528 if (fd < 0) { 529 /* error */ 530 continue; 531 } 532 533 val = impl_opts.recv_buf_size; 534 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 535 if (rc) { 536 /* Not fatal */ 537 } 538 539 val = impl_opts.send_buf_size; 540 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 541 if (rc) { 542 /* Not fatal */ 543 } 544 545 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 546 if (rc != 0) { 547 close(fd); 548 fd = -1; 549 /* error */ 550 continue; 551 } 552 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 553 if (rc != 0) { 554 close(fd); 555 fd = -1; 556 /* error */ 557 continue; 558 } 559 560 if (opts->ack_timeout) { 561 #if defined(__linux__) 562 val = opts->ack_timeout; 563 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 564 if (rc != 0) { 565 close(fd); 566 fd = -1; 567 /* error */ 568 continue; 569 } 570 #else 571 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 572 #endif 573 } 574 575 576 577 #if defined(SO_PRIORITY) 578 if (opts != NULL && opts->priority) { 579 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 580 if (rc != 0) { 581 close(fd); 582 fd = -1; 583 /* error */ 584 continue; 585 } 586 } 587 #endif 588 if (res->ai_family == AF_INET6) { 589 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 590 if (rc != 0) { 591 close(fd); 592 fd = -1; 593 /* error */ 594 continue; 595 } 596 } 597 598 if (type == SPDK_SOCK_CREATE_LISTEN) { 599 rc = bind(fd, res->ai_addr, res->ai_addrlen); 600 if (rc != 0) { 601 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 602 switch (errno) { 603 case EINTR: 604 /* interrupted? */ 605 close(fd); 606 goto retry; 607 case EADDRNOTAVAIL: 608 SPDK_ERRLOG("IP address %s not available. " 609 "Verify IP address in config file " 610 "and make sure setup script is " 611 "run before starting spdk app.\n", ip); 612 /* FALLTHROUGH */ 613 default: 614 /* try next family */ 615 close(fd); 616 fd = -1; 617 continue; 618 } 619 } 620 /* bind OK */ 621 rc = listen(fd, 512); 622 if (rc != 0) { 623 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 624 close(fd); 625 fd = -1; 626 break; 627 } 628 629 flag = fcntl(fd, F_GETFL); 630 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 631 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 632 close(fd); 633 fd = -1; 634 break; 635 } 636 637 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 638 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 639 rc = connect(fd, res->ai_addr, res->ai_addrlen); 640 if (rc != 0) { 641 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 642 /* try next family */ 643 close(fd); 644 fd = -1; 645 continue; 646 } 647 648 flag = fcntl(fd, F_GETFL); 649 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 650 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 651 close(fd); 652 fd = -1; 653 break; 654 } 655 656 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 657 } 658 break; 659 } 660 freeaddrinfo(res0); 661 662 if (fd < 0) { 663 return NULL; 664 } 665 666 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd); 667 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 668 if (sock == NULL) { 669 SPDK_ERRLOG("sock allocation failed\n"); 670 close(fd); 671 return NULL; 672 } 673 674 return &sock->base; 675 } 676 677 static struct spdk_sock * 678 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 679 { 680 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 681 } 682 683 static struct spdk_sock * 684 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 685 { 686 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 687 } 688 689 static struct spdk_sock * 690 uring_sock_accept(struct spdk_sock *_sock) 691 { 692 struct spdk_uring_sock *sock = __uring_sock(_sock); 693 struct sockaddr_storage sa; 694 socklen_t salen; 695 int rc, fd; 696 struct spdk_uring_sock *new_sock; 697 int flag; 698 699 memset(&sa, 0, sizeof(sa)); 700 salen = sizeof(sa); 701 702 assert(sock != NULL); 703 704 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 705 706 if (rc == -1) { 707 return NULL; 708 } 709 710 fd = rc; 711 712 flag = fcntl(fd, F_GETFL); 713 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 714 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 715 close(fd); 716 return NULL; 717 } 718 719 #if defined(SO_PRIORITY) 720 /* The priority is not inherited, so call this function again */ 721 if (sock->base.opts.priority) { 722 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 723 if (rc != 0) { 724 close(fd); 725 return NULL; 726 } 727 } 728 #endif 729 730 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 731 if (new_sock == NULL) { 732 close(fd); 733 return NULL; 734 } 735 736 return &new_sock->base; 737 } 738 739 static int 740 uring_sock_close(struct spdk_sock *_sock) 741 { 742 struct spdk_uring_sock *sock = __uring_sock(_sock); 743 744 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 745 assert(sock->group == NULL); 746 747 /* If the socket fails to close, the best choice is to 748 * leak the fd but continue to free the rest of the sock 749 * memory. */ 750 close(sock->fd); 751 752 spdk_pipe_destroy(sock->recv_pipe); 753 free(sock->recv_buf); 754 free(sock); 755 756 return 0; 757 } 758 759 static ssize_t 760 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 761 { 762 struct iovec siov[2]; 763 int sbytes; 764 ssize_t bytes; 765 struct spdk_uring_sock_group_impl *group; 766 767 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 768 if (sbytes < 0) { 769 errno = EINVAL; 770 return -1; 771 } else if (sbytes == 0) { 772 errno = EAGAIN; 773 return -1; 774 } 775 776 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 777 778 if (bytes == 0) { 779 /* The only way this happens is if diov is 0 length */ 780 errno = EINVAL; 781 return -1; 782 } 783 784 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 785 786 /* If we drained the pipe, take it off the level-triggered list */ 787 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 788 group = __uring_group_impl(sock->base.group_impl); 789 TAILQ_REMOVE(&group->pending_recv, sock, link); 790 sock->pending_recv = false; 791 } 792 793 return bytes; 794 } 795 796 static inline ssize_t 797 sock_readv(int fd, struct iovec *iov, int iovcnt) 798 { 799 struct msghdr msg = { 800 .msg_iov = iov, 801 .msg_iovlen = iovcnt, 802 }; 803 804 return recvmsg(fd, &msg, MSG_DONTWAIT); 805 } 806 807 static inline ssize_t 808 uring_sock_read(struct spdk_uring_sock *sock) 809 { 810 struct iovec iov[2]; 811 int bytes; 812 struct spdk_uring_sock_group_impl *group; 813 814 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 815 816 if (bytes > 0) { 817 bytes = sock_readv(sock->fd, iov, 2); 818 if (bytes > 0) { 819 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 820 if (sock->base.group_impl && !sock->pending_recv) { 821 group = __uring_group_impl(sock->base.group_impl); 822 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 823 sock->pending_recv = true; 824 } 825 } 826 } 827 828 return bytes; 829 } 830 831 static int 832 uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx) 833 { 834 struct spdk_uring_sock *sock = __uring_sock(_sock); 835 struct spdk_uring_sock_group_impl *group; 836 struct spdk_uring_buf_tracker *tr; 837 838 if (sock->connection_status < 0) { 839 errno = -sock->connection_status; 840 return -1; 841 } 842 843 if (sock->recv_pipe != NULL) { 844 errno = ENOTSUP; 845 return -1; 846 } 847 848 group = __uring_group_impl(_sock->group_impl); 849 850 tr = STAILQ_FIRST(&sock->recv_stream); 851 if (tr == NULL) { 852 if (sock->group->buf_ring_count > 0) { 853 /* There are buffers posted, but data hasn't arrived. */ 854 errno = EAGAIN; 855 } else { 856 /* There are no buffers posted, so this won't ever 857 * make forward progress. */ 858 errno = ENOBUFS; 859 } 860 return -1; 861 } 862 assert(sock->pending_recv == true); 863 assert(tr->buf != NULL); 864 865 *_buf = tr->buf + sock->recv_offset; 866 *ctx = tr->ctx; 867 868 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 869 STAILQ_INSERT_HEAD(&group->free_trackers, tr, link); 870 871 if (STAILQ_EMPTY(&sock->recv_stream)) { 872 sock->pending_recv = false; 873 TAILQ_REMOVE(&group->pending_recv, sock, link); 874 } 875 876 return tr->len - sock->recv_offset; 877 } 878 879 static ssize_t 880 uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt) 881 { 882 struct spdk_uring_sock *sock = __uring_sock(_sock); 883 struct spdk_uring_buf_tracker *tr; 884 struct iovec iov; 885 ssize_t total, len; 886 int i; 887 888 if (sock->connection_status < 0) { 889 errno = -sock->connection_status; 890 return -1; 891 } 892 893 if (_sock->group_impl == NULL) { 894 /* If not in a group just read from the socket the regular way. */ 895 return sock_readv(sock->fd, iovs, iovcnt); 896 } 897 898 if (STAILQ_EMPTY(&sock->recv_stream)) { 899 if (sock->group->buf_ring_count == 0) { 900 /* If the user hasn't posted any buffers, read from the socket 901 * directly. */ 902 903 if (sock->pending_recv) { 904 sock->pending_recv = false; 905 TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link); 906 } 907 908 return sock_readv(sock->fd, iovs, iovcnt); 909 } 910 911 errno = EAGAIN; 912 return -1; 913 } 914 915 total = 0; 916 for (i = 0; i < iovcnt; i++) { 917 /* Copy to stack so we can change it */ 918 iov = iovs[i]; 919 920 tr = STAILQ_FIRST(&sock->recv_stream); 921 while (tr != NULL) { 922 len = spdk_min(iov.iov_len, tr->len - sock->recv_offset); 923 memcpy(iov.iov_base, tr->buf + sock->recv_offset, len); 924 925 total += len; 926 sock->recv_offset += len; 927 iov.iov_base += len; 928 iov.iov_len -= len; 929 930 if (sock->recv_offset == tr->len) { 931 sock->recv_offset = 0; 932 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 933 STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link); 934 spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx); 935 tr = STAILQ_FIRST(&sock->recv_stream); 936 } 937 938 if (iov.iov_len == 0) { 939 break; 940 } 941 } 942 } 943 944 if (STAILQ_EMPTY(&sock->recv_stream)) { 945 struct spdk_uring_sock_group_impl *group; 946 947 group = __uring_group_impl(_sock->group_impl); 948 sock->pending_recv = false; 949 TAILQ_REMOVE(&group->pending_recv, sock, link); 950 } 951 952 assert(total > 0); 953 return total; 954 } 955 956 static ssize_t 957 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 958 { 959 struct spdk_uring_sock *sock = __uring_sock(_sock); 960 int rc, i; 961 size_t len; 962 963 if (sock->connection_status < 0) { 964 errno = -sock->connection_status; 965 return -1; 966 } 967 968 if (sock->recv_pipe == NULL) { 969 return uring_sock_readv_no_pipe(_sock, iov, iovcnt); 970 } 971 972 len = 0; 973 for (i = 0; i < iovcnt; i++) { 974 len += iov[i].iov_len; 975 } 976 977 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 978 /* If the user is receiving a sufficiently large amount of data, 979 * receive directly to their buffers. */ 980 if (len >= MIN_SOCK_PIPE_SIZE) { 981 return sock_readv(sock->fd, iov, iovcnt); 982 } 983 984 /* Otherwise, do a big read into our pipe */ 985 rc = uring_sock_read(sock); 986 if (rc <= 0) { 987 return rc; 988 } 989 } 990 991 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 992 } 993 994 static ssize_t 995 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 996 { 997 struct iovec iov[1]; 998 999 iov[0].iov_base = buf; 1000 iov[0].iov_len = len; 1001 1002 return uring_sock_readv(sock, iov, 1); 1003 } 1004 1005 static ssize_t 1006 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1007 { 1008 struct spdk_uring_sock *sock = __uring_sock(_sock); 1009 struct msghdr msg = { 1010 .msg_iov = iov, 1011 .msg_iovlen = iovcnt, 1012 }; 1013 1014 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1015 errno = EAGAIN; 1016 return -1; 1017 } 1018 1019 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 1020 } 1021 1022 static ssize_t 1023 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 1024 { 1025 unsigned int offset; 1026 size_t len; 1027 int i; 1028 1029 offset = req->internal.offset; 1030 for (i = 0; i < req->iovcnt; i++) { 1031 /* Advance by the offset first */ 1032 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1033 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1034 continue; 1035 } 1036 1037 /* Calculate the remaining length of this element */ 1038 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1039 1040 if (len > (size_t)rc) { 1041 req->internal.offset += rc; 1042 return -1; 1043 } 1044 1045 offset = 0; 1046 req->internal.offset += len; 1047 rc -= len; 1048 } 1049 1050 return rc; 1051 } 1052 1053 static int 1054 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 1055 { 1056 struct spdk_uring_sock *sock = __uring_sock(_sock); 1057 struct spdk_sock_request *req; 1058 int retval; 1059 1060 if (is_zcopy) { 1061 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1062 * req->internal.offset, so sendmsg_idx should not be zero */ 1063 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 1064 sock->sendmsg_idx = 1; 1065 } else { 1066 sock->sendmsg_idx++; 1067 } 1068 } 1069 1070 /* Consume the requests that were actually written */ 1071 req = TAILQ_FIRST(&_sock->queued_reqs); 1072 while (req) { 1073 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1074 req->internal.is_zcopy = is_zcopy; 1075 1076 rc = sock_request_advance_offset(req, rc); 1077 if (rc < 0) { 1078 /* This element was partially sent. */ 1079 return 0; 1080 } 1081 1082 /* Handled a full request. */ 1083 spdk_sock_request_pend(_sock, req); 1084 1085 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 1086 retval = spdk_sock_request_put(_sock, req, 0); 1087 if (retval) { 1088 return retval; 1089 } 1090 } else { 1091 /* Re-use the offset field to hold the sendmsg call index. The 1092 * index is 0 based, so subtract one here because we've already 1093 * incremented above. */ 1094 req->internal.offset = sock->sendmsg_idx - 1; 1095 } 1096 1097 if (rc == 0) { 1098 break; 1099 } 1100 1101 req = TAILQ_FIRST(&_sock->queued_reqs); 1102 } 1103 1104 return 0; 1105 } 1106 1107 #ifdef SPDK_ZEROCOPY 1108 static int 1109 _sock_check_zcopy(struct spdk_sock *_sock, int status) 1110 { 1111 struct spdk_uring_sock *sock = __uring_sock(_sock); 1112 ssize_t rc; 1113 struct sock_extended_err *serr; 1114 struct cmsghdr *cm; 1115 uint32_t idx; 1116 struct spdk_sock_request *req, *treq; 1117 bool found; 1118 1119 assert(sock->zcopy == true); 1120 if (spdk_unlikely(status) < 0) { 1121 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 1122 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 1123 status); 1124 } else { 1125 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1126 } 1127 return 0; 1128 } 1129 1130 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 1131 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1132 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 1133 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1134 return 0; 1135 } 1136 1137 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1138 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1139 SPDK_WARNLOG("Unexpected extended error origin\n"); 1140 return 0; 1141 } 1142 1143 /* Most of the time, the pending_reqs array is in the exact 1144 * order we need such that all of the requests to complete are 1145 * in order, in the front. It is guaranteed that all requests 1146 * belonging to the same sendmsg call are sequential, so once 1147 * we encounter one match we can stop looping as soon as a 1148 * non-match is found. 1149 */ 1150 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 1151 found = false; 1152 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 1153 if (!req->internal.is_zcopy) { 1154 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1155 rc = spdk_sock_request_put(_sock, req, 0); 1156 if (rc < 0) { 1157 return rc; 1158 } 1159 } else if (req->internal.offset == idx) { 1160 found = true; 1161 rc = spdk_sock_request_put(_sock, req, 0); 1162 if (rc < 0) { 1163 return rc; 1164 } 1165 } else if (found) { 1166 break; 1167 } 1168 } 1169 } 1170 1171 return 0; 1172 } 1173 1174 static void 1175 _sock_prep_errqueue(struct spdk_sock *_sock) 1176 { 1177 struct spdk_uring_sock *sock = __uring_sock(_sock); 1178 struct spdk_uring_task *task = &sock->errqueue_task; 1179 struct io_uring_sqe *sqe; 1180 1181 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1182 return; 1183 } 1184 1185 if (sock->pending_group_remove) { 1186 return; 1187 } 1188 1189 assert(sock->group != NULL); 1190 sock->group->io_queued++; 1191 1192 sqe = io_uring_get_sqe(&sock->group->uring); 1193 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1194 io_uring_sqe_set_data(sqe, task); 1195 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1196 } 1197 1198 #endif 1199 1200 static void 1201 _sock_flush(struct spdk_sock *_sock) 1202 { 1203 struct spdk_uring_sock *sock = __uring_sock(_sock); 1204 struct spdk_uring_task *task = &sock->write_task; 1205 uint32_t iovcnt; 1206 struct io_uring_sqe *sqe; 1207 int flags; 1208 1209 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1210 return; 1211 } 1212 1213 #ifdef SPDK_ZEROCOPY 1214 if (sock->zcopy) { 1215 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1216 } else 1217 #endif 1218 { 1219 flags = MSG_DONTWAIT; 1220 } 1221 1222 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1223 if (!iovcnt) { 1224 return; 1225 } 1226 1227 task->iov_cnt = iovcnt; 1228 assert(sock->group != NULL); 1229 task->msg.msg_iov = task->iovs; 1230 task->msg.msg_iovlen = task->iov_cnt; 1231 #ifdef SPDK_ZEROCOPY 1232 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1233 #endif 1234 sock->group->io_queued++; 1235 1236 sqe = io_uring_get_sqe(&sock->group->uring); 1237 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1238 io_uring_sqe_set_data(sqe, task); 1239 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1240 } 1241 1242 static void 1243 _sock_prep_read(struct spdk_sock *_sock) 1244 { 1245 struct spdk_uring_sock *sock = __uring_sock(_sock); 1246 struct spdk_uring_task *task = &sock->read_task; 1247 struct io_uring_sqe *sqe; 1248 1249 /* Do not prepare read event */ 1250 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1251 return; 1252 } 1253 1254 if (sock->pending_group_remove) { 1255 return; 1256 } 1257 1258 assert(sock->group != NULL); 1259 sock->group->io_queued++; 1260 1261 sqe = io_uring_get_sqe(&sock->group->uring); 1262 io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0); 1263 sqe->buf_group = URING_BUF_GROUP_ID; 1264 sqe->flags |= IOSQE_BUFFER_SELECT; 1265 io_uring_sqe_set_data(sqe, task); 1266 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1267 } 1268 1269 static void 1270 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1271 { 1272 struct spdk_uring_sock *sock = __uring_sock(_sock); 1273 struct spdk_uring_task *task = &sock->cancel_task; 1274 struct io_uring_sqe *sqe; 1275 1276 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1277 return; 1278 } 1279 1280 assert(sock->group != NULL); 1281 sock->group->io_queued++; 1282 1283 sqe = io_uring_get_sqe(&sock->group->uring); 1284 io_uring_prep_cancel(sqe, user_data, 0); 1285 io_uring_sqe_set_data(sqe, task); 1286 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1287 } 1288 1289 static void 1290 uring_sock_fail(struct spdk_uring_sock *sock, int status) 1291 { 1292 struct spdk_uring_sock_group_impl *group = sock->group; 1293 int rc; 1294 1295 sock->connection_status = status; 1296 rc = spdk_sock_abort_requests(&sock->base); 1297 1298 /* The user needs to be notified that this socket is dead. */ 1299 if (rc == 0 && sock->base.cb_fn != NULL && 1300 sock->pending_recv == false) { 1301 sock->pending_recv = true; 1302 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1303 } 1304 } 1305 1306 static int 1307 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1308 struct spdk_sock **socks) 1309 { 1310 int i, count, ret; 1311 struct io_uring_cqe *cqe; 1312 struct spdk_uring_sock *sock, *tmp; 1313 struct spdk_uring_task *task; 1314 int status, bid, flags; 1315 bool is_zcopy; 1316 1317 for (i = 0; i < max; i++) { 1318 ret = io_uring_peek_cqe(&group->uring, &cqe); 1319 if (ret != 0) { 1320 break; 1321 } 1322 1323 if (cqe == NULL) { 1324 break; 1325 } 1326 1327 task = (struct spdk_uring_task *)cqe->user_data; 1328 assert(task != NULL); 1329 sock = task->sock; 1330 assert(sock != NULL); 1331 assert(sock->group != NULL); 1332 assert(sock->group == group); 1333 sock->group->io_inflight--; 1334 sock->group->io_avail++; 1335 status = cqe->res; 1336 flags = cqe->flags; 1337 io_uring_cqe_seen(&group->uring, cqe); 1338 1339 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1340 1341 switch (task->type) { 1342 case URING_TASK_READ: 1343 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1344 /* This likely shouldn't happen, but would indicate that the 1345 * kernel didn't have enough resources to queue a task internally. */ 1346 _sock_prep_read(&sock->base); 1347 } else if (status == -ECANCELED) { 1348 continue; 1349 } else if (status == -ENOBUFS) { 1350 /* There's data in the socket but the user hasn't provided any buffers. 1351 * We need to notify the user that the socket has data pending. */ 1352 if (sock->base.cb_fn != NULL && 1353 sock->pending_recv == false) { 1354 sock->pending_recv = true; 1355 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1356 } 1357 1358 _sock_prep_read(&sock->base); 1359 } else if (spdk_unlikely(status <= 0)) { 1360 uring_sock_fail(sock, status < 0 ? status : -ECONNRESET); 1361 } else { 1362 struct spdk_uring_buf_tracker *tracker; 1363 1364 assert((flags & IORING_CQE_F_BUFFER) != 0); 1365 1366 bid = flags >> IORING_CQE_BUFFER_SHIFT; 1367 tracker = &group->trackers[bid]; 1368 1369 assert(tracker->buf != NULL); 1370 assert(tracker->len != 0); 1371 1372 /* Append this data to the stream */ 1373 tracker->len = status; 1374 STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link); 1375 assert(group->buf_ring_count > 0); 1376 group->buf_ring_count--; 1377 1378 if (sock->base.cb_fn != NULL && 1379 sock->pending_recv == false) { 1380 sock->pending_recv = true; 1381 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1382 } 1383 1384 _sock_prep_read(&sock->base); 1385 } 1386 break; 1387 case URING_TASK_WRITE: 1388 if (status == -EAGAIN || status == -EWOULDBLOCK || 1389 (status == -ENOBUFS && sock->zcopy) || 1390 status == -ECANCELED) { 1391 continue; 1392 } else if (spdk_unlikely(status) < 0) { 1393 uring_sock_fail(sock, status); 1394 } else { 1395 task->last_req = NULL; 1396 task->iov_cnt = 0; 1397 is_zcopy = task->is_zcopy; 1398 task->is_zcopy = false; 1399 sock_complete_write_reqs(&sock->base, status, is_zcopy); 1400 } 1401 1402 break; 1403 #ifdef SPDK_ZEROCOPY 1404 case URING_TASK_ERRQUEUE: 1405 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1406 _sock_prep_errqueue(&sock->base); 1407 } else if (status == -ECANCELED) { 1408 continue; 1409 } else if (spdk_unlikely(status < 0)) { 1410 uring_sock_fail(sock, status); 1411 } else { 1412 _sock_check_zcopy(&sock->base, status); 1413 _sock_prep_errqueue(&sock->base); 1414 } 1415 break; 1416 #endif 1417 case URING_TASK_CANCEL: 1418 /* Do nothing */ 1419 break; 1420 default: 1421 SPDK_UNREACHABLE(); 1422 } 1423 } 1424 1425 if (!socks) { 1426 return 0; 1427 } 1428 count = 0; 1429 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1430 if (count == max_read_events) { 1431 break; 1432 } 1433 1434 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1435 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1436 assert(sock->pending_recv == true); 1437 sock->pending_recv = false; 1438 TAILQ_REMOVE(&group->pending_recv, sock, link); 1439 continue; 1440 } 1441 1442 socks[count++] = &sock->base; 1443 } 1444 1445 1446 /* Cycle the pending_recv list so that each time we poll things aren't 1447 * in the same order. Say we have 6 sockets in the list, named as follows: 1448 * A B C D E F 1449 * And all 6 sockets had the poll events, but max_events is only 3. That means 1450 * psock currently points at D. We want to rearrange the list to the following: 1451 * D E F A B C 1452 * 1453 * The variables below are named according to this example to make it easier to 1454 * follow the swaps. 1455 */ 1456 if (sock != NULL) { 1457 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1458 1459 /* Capture pointers to the elements we need */ 1460 ud = sock; 1461 1462 ua = TAILQ_FIRST(&group->pending_recv); 1463 if (ua == ud) { 1464 goto end; 1465 } 1466 1467 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1468 if (uf == ud) { 1469 TAILQ_REMOVE(&group->pending_recv, ud, link); 1470 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1471 goto end; 1472 } 1473 1474 uc = TAILQ_PREV(ud, pending_recv_list, link); 1475 assert(uc != NULL); 1476 1477 /* Break the link between C and D */ 1478 uc->link.tqe_next = NULL; 1479 1480 /* Connect F to A */ 1481 uf->link.tqe_next = ua; 1482 ua->link.tqe_prev = &uf->link.tqe_next; 1483 1484 /* Fix up the list first/last pointers */ 1485 group->pending_recv.tqh_first = ud; 1486 group->pending_recv.tqh_last = &uc->link.tqe_next; 1487 1488 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1489 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1490 } 1491 1492 end: 1493 return count; 1494 } 1495 1496 static int uring_sock_flush(struct spdk_sock *_sock); 1497 1498 static void 1499 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1500 { 1501 struct spdk_uring_sock *sock = __uring_sock(_sock); 1502 int rc; 1503 1504 if (spdk_unlikely(sock->connection_status)) { 1505 req->cb_fn(req->cb_arg, sock->connection_status); 1506 return; 1507 } 1508 1509 spdk_sock_request_queue(_sock, req); 1510 1511 if (!sock->group) { 1512 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1513 rc = uring_sock_flush(_sock); 1514 if (rc < 0 && errno != EAGAIN) { 1515 spdk_sock_abort_requests(_sock); 1516 } 1517 } 1518 } 1519 } 1520 1521 static int 1522 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1523 { 1524 struct spdk_uring_sock *sock = __uring_sock(_sock); 1525 int val; 1526 int rc; 1527 1528 assert(sock != NULL); 1529 1530 val = nbytes; 1531 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1532 if (rc != 0) { 1533 return -1; 1534 } 1535 return 0; 1536 } 1537 1538 static bool 1539 uring_sock_is_ipv6(struct spdk_sock *_sock) 1540 { 1541 struct spdk_uring_sock *sock = __uring_sock(_sock); 1542 struct sockaddr_storage sa; 1543 socklen_t salen; 1544 int rc; 1545 1546 assert(sock != NULL); 1547 1548 memset(&sa, 0, sizeof sa); 1549 salen = sizeof sa; 1550 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1551 if (rc != 0) { 1552 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1553 return false; 1554 } 1555 1556 return (sa.ss_family == AF_INET6); 1557 } 1558 1559 static bool 1560 uring_sock_is_ipv4(struct spdk_sock *_sock) 1561 { 1562 struct spdk_uring_sock *sock = __uring_sock(_sock); 1563 struct sockaddr_storage sa; 1564 socklen_t salen; 1565 int rc; 1566 1567 assert(sock != NULL); 1568 1569 memset(&sa, 0, sizeof sa); 1570 salen = sizeof sa; 1571 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1572 if (rc != 0) { 1573 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1574 return false; 1575 } 1576 1577 return (sa.ss_family == AF_INET); 1578 } 1579 1580 static bool 1581 uring_sock_is_connected(struct spdk_sock *_sock) 1582 { 1583 struct spdk_uring_sock *sock = __uring_sock(_sock); 1584 uint8_t byte; 1585 int rc; 1586 1587 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1588 if (rc == 0) { 1589 return false; 1590 } 1591 1592 if (rc < 0) { 1593 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1594 return true; 1595 } 1596 1597 return false; 1598 } 1599 1600 return true; 1601 } 1602 1603 static struct spdk_sock_group_impl * 1604 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1605 { 1606 struct spdk_uring_sock *sock = __uring_sock(_sock); 1607 struct spdk_sock_group_impl *group; 1608 1609 if (sock->placement_id != -1) { 1610 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1611 return group; 1612 } 1613 1614 return NULL; 1615 } 1616 1617 static int 1618 uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl) 1619 { 1620 if (group_impl->buf_ring) { 1621 io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID); 1622 free(group_impl->buf_ring); 1623 } 1624 1625 free(group_impl->trackers); 1626 1627 return 0; 1628 } 1629 1630 static int 1631 uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl) 1632 { 1633 struct io_uring_buf_reg buf_reg = {}; 1634 struct io_uring_buf_ring *buf_ring; 1635 int i, rc; 1636 1637 rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf)); 1638 if (rc != 0) { 1639 /* posix_memalign returns positive errno values */ 1640 return -rc; 1641 } 1642 1643 buf_reg.ring_addr = (unsigned long long)buf_ring; 1644 buf_reg.ring_entries = URING_BUF_POOL_SIZE; 1645 buf_reg.bgid = URING_BUF_GROUP_ID; 1646 1647 rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0); 1648 if (rc != 0) { 1649 free(buf_ring); 1650 return rc; 1651 } 1652 1653 group_impl->buf_ring = buf_ring; 1654 io_uring_buf_ring_init(group_impl->buf_ring); 1655 group_impl->buf_ring_count = 0; 1656 1657 group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker)); 1658 if (group_impl->trackers == NULL) { 1659 uring_sock_group_impl_buf_pool_free(group_impl); 1660 return -ENOMEM; 1661 } 1662 1663 STAILQ_INIT(&group_impl->free_trackers); 1664 1665 for (i = 0; i < URING_BUF_POOL_SIZE; i++) { 1666 struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i]; 1667 1668 tracker->buf = NULL; 1669 tracker->len = 0; 1670 tracker->ctx = NULL; 1671 tracker->id = i; 1672 1673 STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link); 1674 } 1675 1676 return 0; 1677 } 1678 1679 static struct spdk_sock_group_impl * 1680 uring_sock_group_impl_create(void) 1681 { 1682 struct spdk_uring_sock_group_impl *group_impl; 1683 1684 group_impl = calloc(1, sizeof(*group_impl)); 1685 if (group_impl == NULL) { 1686 SPDK_ERRLOG("group_impl allocation failed\n"); 1687 return NULL; 1688 } 1689 1690 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1691 1692 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1693 SPDK_ERRLOG("uring I/O context setup failure\n"); 1694 free(group_impl); 1695 return NULL; 1696 } 1697 1698 TAILQ_INIT(&group_impl->pending_recv); 1699 1700 if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) { 1701 SPDK_ERRLOG("Failed to create buffer ring. Your kernel is likely not new enough. " 1702 "Please switch to the POSIX sock implementation instead.\n"); 1703 io_uring_queue_exit(&group_impl->uring); 1704 free(group_impl); 1705 return NULL; 1706 } 1707 1708 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1709 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1710 } 1711 1712 return &group_impl->base; 1713 } 1714 1715 static int 1716 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1717 struct spdk_sock *_sock) 1718 { 1719 struct spdk_uring_sock *sock = __uring_sock(_sock); 1720 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1721 int rc; 1722 1723 sock->group = group; 1724 sock->write_task.sock = sock; 1725 sock->write_task.type = URING_TASK_WRITE; 1726 1727 sock->read_task.sock = sock; 1728 sock->read_task.type = URING_TASK_READ; 1729 1730 sock->errqueue_task.sock = sock; 1731 sock->errqueue_task.type = URING_TASK_ERRQUEUE; 1732 sock->errqueue_task.msg.msg_control = sock->buf; 1733 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1734 1735 sock->cancel_task.sock = sock; 1736 sock->cancel_task.type = URING_TASK_CANCEL; 1737 1738 /* switched from another polling group due to scheduling */ 1739 if (spdk_unlikely(sock->recv_pipe != NULL && 1740 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1741 assert(sock->pending_recv == false); 1742 sock->pending_recv = true; 1743 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1744 } 1745 1746 if (sock->placement_id != -1) { 1747 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1748 if (rc != 0) { 1749 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1750 /* Do not treat this as an error. The system will continue running. */ 1751 } 1752 } 1753 1754 /* We get an async read going immediately */ 1755 _sock_prep_read(&sock->base); 1756 #ifdef SPDK_ZEROCOPY 1757 if (sock->zcopy) { 1758 _sock_prep_errqueue(_sock); 1759 } 1760 #endif 1761 1762 return 0; 1763 } 1764 1765 static void 1766 uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group) 1767 { 1768 struct spdk_uring_buf_tracker *tracker; 1769 int count, mask; 1770 1771 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 1772 /* If recv_pipe is enabled, we do not post buffers. */ 1773 return; 1774 } 1775 1776 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1777 tracker = STAILQ_FIRST(&group->free_trackers); 1778 count = 0; 1779 mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE); 1780 while (tracker != NULL) { 1781 tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx); 1782 if (tracker->buflen == 0) { 1783 break; 1784 } 1785 1786 assert(tracker->buf != NULL); 1787 STAILQ_REMOVE_HEAD(&group->free_trackers, link); 1788 assert(STAILQ_FIRST(&group->free_trackers) != tracker); 1789 1790 io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count); 1791 count++; 1792 tracker = STAILQ_FIRST(&group->free_trackers); 1793 } 1794 1795 if (count > 0) { 1796 group->buf_ring_count += count; 1797 io_uring_buf_ring_advance(group->buf_ring, count); 1798 } 1799 } 1800 1801 static int 1802 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1803 struct spdk_sock **socks) 1804 { 1805 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1806 int count, ret; 1807 int to_complete, to_submit; 1808 struct spdk_sock *_sock, *tmp; 1809 struct spdk_uring_sock *sock; 1810 1811 if (spdk_likely(socks)) { 1812 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1813 sock = __uring_sock(_sock); 1814 if (spdk_unlikely(sock->connection_status)) { 1815 continue; 1816 } 1817 _sock_flush(_sock); 1818 } 1819 } 1820 1821 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1822 uring_sock_group_populate_buf_ring(group); 1823 1824 to_submit = group->io_queued; 1825 1826 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1827 if (to_submit > 0) { 1828 /* If there are I/O to submit, use io_uring_submit here. 1829 * It will automatically call io_uring_enter appropriately. */ 1830 ret = io_uring_submit(&group->uring); 1831 if (ret < 0) { 1832 return 1; 1833 } 1834 group->io_queued = 0; 1835 group->io_inflight += to_submit; 1836 group->io_avail -= to_submit; 1837 } 1838 1839 count = 0; 1840 to_complete = group->io_inflight; 1841 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1842 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1843 } 1844 1845 return count; 1846 } 1847 1848 static int 1849 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1850 struct spdk_sock *_sock) 1851 { 1852 struct spdk_uring_sock *sock = __uring_sock(_sock); 1853 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1854 1855 sock->pending_group_remove = true; 1856 1857 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1858 _sock_prep_cancel_task(_sock, &sock->write_task); 1859 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1860 * currently can use a while loop here. */ 1861 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1862 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1863 uring_sock_group_impl_poll(_group, 32, NULL); 1864 } 1865 } 1866 1867 if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1868 _sock_prep_cancel_task(_sock, &sock->read_task); 1869 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1870 * currently can use a while loop here. */ 1871 while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1872 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1873 uring_sock_group_impl_poll(_group, 32, NULL); 1874 } 1875 } 1876 1877 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1878 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1879 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1880 * currently can use a while loop here. */ 1881 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1882 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1883 uring_sock_group_impl_poll(_group, 32, NULL); 1884 } 1885 } 1886 1887 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1888 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1889 assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1890 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1891 1892 if (sock->pending_recv) { 1893 TAILQ_REMOVE(&group->pending_recv, sock, link); 1894 sock->pending_recv = false; 1895 } 1896 assert(sock->pending_recv == false); 1897 1898 /* We have no way to handle this case. We could let the user read this 1899 * buffer, but the buffer came from a group and we have lost the association 1900 * to that so we couldn't release it. */ 1901 assert(STAILQ_EMPTY(&sock->recv_stream)); 1902 1903 if (sock->placement_id != -1) { 1904 spdk_sock_map_release(&g_map, sock->placement_id); 1905 } 1906 1907 sock->pending_group_remove = false; 1908 sock->group = NULL; 1909 return 0; 1910 } 1911 1912 static int 1913 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1914 { 1915 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1916 1917 /* try to reap all the active I/O */ 1918 while (group->io_inflight) { 1919 uring_sock_group_impl_poll(_group, 32, NULL); 1920 } 1921 assert(group->io_inflight == 0); 1922 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1923 1924 uring_sock_group_impl_buf_pool_free(group); 1925 1926 io_uring_queue_exit(&group->uring); 1927 1928 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1929 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1930 } 1931 1932 free(group); 1933 return 0; 1934 } 1935 1936 static int 1937 uring_sock_flush(struct spdk_sock *_sock) 1938 { 1939 struct spdk_uring_sock *sock = __uring_sock(_sock); 1940 struct msghdr msg = {}; 1941 struct iovec iovs[IOV_BATCH_SIZE]; 1942 int iovcnt; 1943 ssize_t rc; 1944 int flags = sock->zcopy_send_flags; 1945 int retval; 1946 bool is_zcopy = false; 1947 struct spdk_uring_task *task = &sock->errqueue_task; 1948 1949 /* Can't flush from within a callback or we end up with recursive calls */ 1950 if (_sock->cb_cnt > 0) { 1951 errno = EAGAIN; 1952 return -1; 1953 } 1954 1955 /* Can't flush while a write is already outstanding */ 1956 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1957 errno = EAGAIN; 1958 return -1; 1959 } 1960 1961 /* Gather an iov */ 1962 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1963 if (iovcnt == 0) { 1964 /* Nothing to send */ 1965 return 0; 1966 } 1967 1968 /* Perform the vectored write */ 1969 msg.msg_iov = iovs; 1970 msg.msg_iovlen = iovcnt; 1971 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 1972 if (rc <= 0) { 1973 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) { 1974 errno = EAGAIN; 1975 } 1976 return -1; 1977 } 1978 1979 #ifdef SPDK_ZEROCOPY 1980 is_zcopy = flags & MSG_ZEROCOPY; 1981 #endif 1982 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 1983 if (retval < 0) { 1984 /* if the socket is closed, return to avoid heap-use-after-free error */ 1985 errno = ENOTCONN; 1986 return -1; 1987 } 1988 1989 #ifdef SPDK_ZEROCOPY 1990 /* At least do once to check zero copy case */ 1991 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 1992 retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE); 1993 if (retval < 0) { 1994 if (errno == EWOULDBLOCK || errno == EAGAIN) { 1995 return rc; 1996 } 1997 } 1998 _sock_check_zcopy(_sock, retval);; 1999 } 2000 #endif 2001 2002 return rc; 2003 } 2004 2005 static struct spdk_net_impl g_uring_net_impl = { 2006 .name = "uring", 2007 .getaddr = uring_sock_getaddr, 2008 .connect = uring_sock_connect, 2009 .listen = uring_sock_listen, 2010 .accept = uring_sock_accept, 2011 .close = uring_sock_close, 2012 .recv = uring_sock_recv, 2013 .readv = uring_sock_readv, 2014 .writev = uring_sock_writev, 2015 .recv_next = uring_sock_recv_next, 2016 .writev_async = uring_sock_writev_async, 2017 .flush = uring_sock_flush, 2018 .set_recvlowat = uring_sock_set_recvlowat, 2019 .set_recvbuf = uring_sock_set_recvbuf, 2020 .set_sendbuf = uring_sock_set_sendbuf, 2021 .is_ipv6 = uring_sock_is_ipv6, 2022 .is_ipv4 = uring_sock_is_ipv4, 2023 .is_connected = uring_sock_is_connected, 2024 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 2025 .group_impl_create = uring_sock_group_impl_create, 2026 .group_impl_add_sock = uring_sock_group_impl_add_sock, 2027 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 2028 .group_impl_poll = uring_sock_group_impl_poll, 2029 .group_impl_close = uring_sock_group_impl_close, 2030 .get_opts = uring_sock_impl_get_opts, 2031 .set_opts = uring_sock_impl_set_opts, 2032 }; 2033 2034 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 2); 2035