1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <sys/epoll.h> 11 #include <liburing.h> 12 13 #include "spdk/barrier.h" 14 #include "spdk/env.h" 15 #include "spdk/log.h" 16 #include "spdk/pipe.h" 17 #include "spdk/sock.h" 18 #include "spdk/string.h" 19 #include "spdk/util.h" 20 #include "spdk/net.h" 21 #include "spdk/file.h" 22 23 #include "spdk_internal/sock.h" 24 #include "spdk_internal/assert.h" 25 #include "spdk/net.h" 26 27 #define MAX_TMPBUF 1024 28 #define PORTNUMLEN 32 29 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 30 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 31 32 enum uring_task_type { 33 URING_TASK_READ = 0, 34 URING_TASK_ERRQUEUE, 35 URING_TASK_WRITE, 36 URING_TASK_CANCEL, 37 }; 38 39 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 40 #define SPDK_ZEROCOPY 41 #endif 42 43 /* We don't know how big the buffers that the user posts will be, but this 44 * is the maximum we'll ever allow it to receive in a single command. 45 * If the user buffers are smaller, it will just receive less. */ 46 #define URING_MAX_RECV_SIZE (128 * 1024) 47 48 /* We don't know how many buffers the user will post, but this is the 49 * maximum number we'll take from the pool to post per group. */ 50 #define URING_BUF_POOL_SIZE 128 51 52 /* We use 1 just so it's not zero and we can validate it's right. */ 53 #define URING_BUF_GROUP_ID 1 54 55 enum spdk_uring_sock_task_status { 56 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 57 SPDK_URING_SOCK_TASK_IN_PROCESS, 58 }; 59 60 struct spdk_uring_task { 61 enum spdk_uring_sock_task_status status; 62 enum uring_task_type type; 63 struct spdk_uring_sock *sock; 64 struct msghdr msg; 65 struct iovec iovs[IOV_BATCH_SIZE]; 66 int iov_cnt; 67 struct spdk_sock_request *last_req; 68 bool is_zcopy; 69 STAILQ_ENTRY(spdk_uring_task) link; 70 }; 71 72 struct spdk_uring_sock { 73 struct spdk_sock base; 74 int fd; 75 uint32_t sendmsg_idx; 76 struct spdk_uring_sock_group_impl *group; 77 STAILQ_HEAD(, spdk_uring_buf_tracker) recv_stream; 78 size_t recv_offset; 79 struct spdk_uring_task write_task; 80 struct spdk_uring_task errqueue_task; 81 struct spdk_uring_task read_task; 82 struct spdk_uring_task cancel_task; 83 struct spdk_pipe *recv_pipe; 84 void *recv_buf; 85 int recv_buf_sz; 86 bool zcopy; 87 bool pending_recv; 88 bool pending_group_remove; 89 int zcopy_send_flags; 90 int connection_status; 91 int placement_id; 92 uint8_t reserved[4]; 93 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 94 TAILQ_ENTRY(spdk_uring_sock) link; 95 char interface_name[IFNAMSIZ]; 96 }; 97 /* 'struct cmsghdr' is mapped to the buffer 'buf', and while first element 98 * of this control message header has a size of 8 bytes, 'buf' 99 * must be 8-byte aligned. 100 */ 101 SPDK_STATIC_ASSERT(offsetof(struct spdk_uring_sock, buf) % 8 == 0, 102 "Incorrect alignment: `buf` must be aligned to 8 bytes"); 103 104 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 105 106 struct spdk_uring_buf_tracker { 107 void *buf; 108 size_t buflen; 109 size_t len; 110 void *ctx; 111 int id; 112 STAILQ_ENTRY(spdk_uring_buf_tracker) link; 113 }; 114 115 struct spdk_uring_sock_group_impl { 116 struct spdk_sock_group_impl base; 117 struct io_uring uring; 118 uint32_t io_inflight; 119 uint32_t io_queued; 120 uint32_t io_avail; 121 struct pending_recv_list pending_recv; 122 123 struct io_uring_buf_ring *buf_ring; 124 uint32_t buf_ring_count; 125 struct spdk_uring_buf_tracker *trackers; 126 STAILQ_HEAD(, spdk_uring_buf_tracker) free_trackers; 127 }; 128 129 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 130 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 131 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 132 .enable_recv_pipe = true, 133 .enable_quickack = false, 134 .enable_placement_id = PLACEMENT_NONE, 135 .enable_zerocopy_send_server = false, 136 .enable_zerocopy_send_client = false, 137 .zerocopy_threshold = 0, 138 .tls_version = 0, 139 .enable_ktls = false, 140 .psk_key = NULL, 141 .psk_identity = NULL 142 }; 143 144 static struct spdk_sock_map g_map = { 145 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 146 .mtx = PTHREAD_MUTEX_INITIALIZER 147 }; 148 149 __attribute((destructor)) static void 150 uring_sock_map_cleanup(void) 151 { 152 spdk_sock_map_cleanup(&g_map); 153 } 154 155 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 156 157 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 158 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 159 160 static void 161 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 162 size_t len) 163 { 164 #define FIELD_OK(field) \ 165 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 166 167 #define SET_FIELD(field) \ 168 if (FIELD_OK(field)) { \ 169 dest->field = src->field; \ 170 } 171 172 SET_FIELD(recv_buf_size); 173 SET_FIELD(send_buf_size); 174 SET_FIELD(enable_recv_pipe); 175 SET_FIELD(enable_quickack); 176 SET_FIELD(enable_placement_id); 177 SET_FIELD(enable_zerocopy_send_server); 178 SET_FIELD(enable_zerocopy_send_client); 179 SET_FIELD(zerocopy_threshold); 180 SET_FIELD(tls_version); 181 SET_FIELD(enable_ktls); 182 SET_FIELD(psk_key); 183 SET_FIELD(psk_identity); 184 185 #undef SET_FIELD 186 #undef FIELD_OK 187 } 188 189 static int 190 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 191 { 192 if (!opts || !len) { 193 errno = EINVAL; 194 return -1; 195 } 196 197 assert(sizeof(*opts) >= *len); 198 memset(opts, 0, *len); 199 200 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 201 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 202 203 return 0; 204 } 205 206 static int 207 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 208 { 209 if (!opts) { 210 errno = EINVAL; 211 return -1; 212 } 213 214 assert(sizeof(*opts) >= len); 215 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 216 217 return 0; 218 } 219 220 static void 221 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 222 { 223 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 224 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 225 226 if (opts->impl_opts != NULL) { 227 assert(sizeof(*dest) >= opts->impl_opts_size); 228 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 229 } 230 } 231 232 static int 233 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 234 char *caddr, int clen, uint16_t *cport) 235 { 236 struct spdk_uring_sock *sock = __uring_sock(_sock); 237 238 assert(sock != NULL); 239 return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport); 240 } 241 242 static const char * 243 uring_sock_get_interface_name(struct spdk_sock *_sock) 244 { 245 struct spdk_uring_sock *sock = __uring_sock(_sock); 246 char saddr[64]; 247 int rc; 248 249 rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL); 250 if (rc != 0) { 251 return NULL; 252 } 253 254 rc = spdk_net_get_interface_name(saddr, sock->interface_name, 255 sizeof(sock->interface_name)); 256 if (rc != 0) { 257 return NULL; 258 } 259 260 return sock->interface_name; 261 } 262 263 static int32_t 264 uring_sock_get_numa_id(struct spdk_sock *sock) 265 { 266 const char *interface_name; 267 uint32_t numa_id; 268 int rc; 269 270 interface_name = uring_sock_get_interface_name(sock); 271 if (interface_name == NULL) { 272 return SPDK_ENV_NUMA_ID_ANY; 273 } 274 275 rc = spdk_read_sysfs_attribute_uint32(&numa_id, 276 "/sys/class/net/%s/device/numa_node", interface_name); 277 if (rc == 0 && numa_id <= INT32_MAX) { 278 return (int32_t)numa_id; 279 } else { 280 return SPDK_ENV_NUMA_ID_ANY; 281 } 282 } 283 284 enum uring_sock_create_type { 285 SPDK_SOCK_CREATE_LISTEN, 286 SPDK_SOCK_CREATE_CONNECT, 287 }; 288 289 static int 290 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 291 { 292 uint8_t *new_buf; 293 struct spdk_pipe *new_pipe; 294 struct iovec siov[2]; 295 struct iovec diov[2]; 296 int sbytes; 297 ssize_t bytes; 298 int rc; 299 300 if (sock->recv_buf_sz == sz) { 301 return 0; 302 } 303 304 /* If the new size is 0, just free the pipe */ 305 if (sz == 0) { 306 spdk_pipe_destroy(sock->recv_pipe); 307 free(sock->recv_buf); 308 sock->recv_pipe = NULL; 309 sock->recv_buf = NULL; 310 return 0; 311 } else if (sz < MIN_SOCK_PIPE_SIZE) { 312 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 313 return -1; 314 } 315 316 /* Round up to next 64 byte multiple */ 317 rc = posix_memalign((void **)&new_buf, 64, sz); 318 if (rc != 0) { 319 SPDK_ERRLOG("socket recv buf allocation failed\n"); 320 return -ENOMEM; 321 } 322 memset(new_buf, 0, sz); 323 324 new_pipe = spdk_pipe_create(new_buf, sz); 325 if (new_pipe == NULL) { 326 SPDK_ERRLOG("socket pipe allocation failed\n"); 327 free(new_buf); 328 return -ENOMEM; 329 } 330 331 if (sock->recv_pipe != NULL) { 332 /* Pull all of the data out of the old pipe */ 333 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 334 if (sbytes > sz) { 335 /* Too much data to fit into the new pipe size */ 336 spdk_pipe_destroy(new_pipe); 337 free(new_buf); 338 return -EINVAL; 339 } 340 341 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 342 assert(sbytes == sz); 343 344 bytes = spdk_iovcpy(siov, 2, diov, 2); 345 spdk_pipe_writer_advance(new_pipe, bytes); 346 347 spdk_pipe_destroy(sock->recv_pipe); 348 free(sock->recv_buf); 349 } 350 351 sock->recv_buf_sz = sz; 352 sock->recv_buf = new_buf; 353 sock->recv_pipe = new_pipe; 354 355 return 0; 356 } 357 358 static int 359 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 360 { 361 struct spdk_uring_sock *sock = __uring_sock(_sock); 362 int min_size; 363 int rc; 364 365 assert(sock != NULL); 366 367 if (_sock->impl_opts.enable_recv_pipe) { 368 rc = uring_sock_alloc_pipe(sock, sz); 369 if (rc) { 370 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 371 return rc; 372 } 373 } 374 375 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 376 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 377 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 378 379 if (sz < min_size) { 380 sz = min_size; 381 } 382 383 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 384 if (rc < 0) { 385 return rc; 386 } 387 388 _sock->impl_opts.recv_buf_size = sz; 389 390 return 0; 391 } 392 393 static int 394 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 395 { 396 struct spdk_uring_sock *sock = __uring_sock(_sock); 397 int min_size; 398 int rc; 399 400 assert(sock != NULL); 401 402 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 403 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 404 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 405 406 if (sz < min_size) { 407 sz = min_size; 408 } 409 410 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 411 if (rc < 0) { 412 return rc; 413 } 414 415 _sock->impl_opts.send_buf_size = sz; 416 417 return 0; 418 } 419 420 static struct spdk_uring_sock * 421 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 422 { 423 struct spdk_uring_sock *sock; 424 #if defined(__linux__) 425 int flag; 426 int rc; 427 #endif 428 429 sock = calloc(1, sizeof(*sock)); 430 if (sock == NULL) { 431 SPDK_ERRLOG("sock allocation failed\n"); 432 return NULL; 433 } 434 435 sock->fd = fd; 436 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 437 438 STAILQ_INIT(&sock->recv_stream); 439 440 #if defined(__linux__) 441 flag = 1; 442 443 if (sock->base.impl_opts.enable_quickack) { 444 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 445 if (rc != 0) { 446 SPDK_ERRLOG("quickack was failed to set\n"); 447 } 448 } 449 450 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 451 &sock->placement_id); 452 #ifdef SPDK_ZEROCOPY 453 /* Try to turn on zero copy sends */ 454 flag = 1; 455 456 if (enable_zero_copy) { 457 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 458 if (rc == 0) { 459 sock->zcopy = true; 460 sock->zcopy_send_flags = MSG_ZEROCOPY; 461 } 462 } 463 #endif 464 #endif 465 466 return sock; 467 } 468 469 static struct spdk_sock * 470 uring_sock_create(const char *ip, int port, 471 enum uring_sock_create_type type, 472 struct spdk_sock_opts *opts) 473 { 474 struct spdk_uring_sock *sock; 475 struct spdk_sock_impl_opts impl_opts; 476 char buf[MAX_TMPBUF]; 477 char portnum[PORTNUMLEN]; 478 char *p; 479 const char *src_addr; 480 uint16_t src_port; 481 struct addrinfo hints, *res, *res0, *src_ai; 482 int fd, flag; 483 int val = 1; 484 int rc; 485 bool enable_zcopy_impl_opts = false; 486 bool enable_zcopy_user_opts = true; 487 488 assert(opts != NULL); 489 uring_opts_get_impl_opts(opts, &impl_opts); 490 491 if (ip == NULL) { 492 return NULL; 493 } 494 if (ip[0] == '[') { 495 snprintf(buf, sizeof(buf), "%s", ip + 1); 496 p = strchr(buf, ']'); 497 if (p != NULL) { 498 *p = '\0'; 499 } 500 ip = (const char *) &buf[0]; 501 } 502 503 snprintf(portnum, sizeof portnum, "%d", port); 504 memset(&hints, 0, sizeof hints); 505 hints.ai_family = PF_UNSPEC; 506 hints.ai_socktype = SOCK_STREAM; 507 hints.ai_flags = AI_NUMERICSERV; 508 hints.ai_flags |= AI_PASSIVE; 509 hints.ai_flags |= AI_NUMERICHOST; 510 rc = getaddrinfo(ip, portnum, &hints, &res0); 511 if (rc != 0) { 512 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 513 return NULL; 514 } 515 516 /* try listen */ 517 fd = -1; 518 for (res = res0; res != NULL; res = res->ai_next) { 519 retry: 520 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 521 if (fd < 0) { 522 /* error */ 523 continue; 524 } 525 526 val = impl_opts.recv_buf_size; 527 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 528 if (rc) { 529 /* Not fatal */ 530 } 531 532 val = impl_opts.send_buf_size; 533 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 534 if (rc) { 535 /* Not fatal */ 536 } 537 538 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 539 if (rc != 0) { 540 close(fd); 541 fd = -1; 542 /* error */ 543 continue; 544 } 545 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 546 if (rc != 0) { 547 close(fd); 548 fd = -1; 549 /* error */ 550 continue; 551 } 552 553 if (opts->ack_timeout) { 554 #if defined(__linux__) 555 val = opts->ack_timeout; 556 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 557 if (rc != 0) { 558 close(fd); 559 fd = -1; 560 /* error */ 561 continue; 562 } 563 #else 564 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 565 #endif 566 } 567 568 569 570 #if defined(SO_PRIORITY) 571 if (opts != NULL && opts->priority) { 572 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 573 if (rc != 0) { 574 close(fd); 575 fd = -1; 576 /* error */ 577 continue; 578 } 579 } 580 #endif 581 if (res->ai_family == AF_INET6) { 582 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 583 if (rc != 0) { 584 close(fd); 585 fd = -1; 586 /* error */ 587 continue; 588 } 589 } 590 591 if (type == SPDK_SOCK_CREATE_LISTEN) { 592 rc = bind(fd, res->ai_addr, res->ai_addrlen); 593 if (rc != 0) { 594 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 595 switch (errno) { 596 case EINTR: 597 /* interrupted? */ 598 close(fd); 599 goto retry; 600 case EADDRNOTAVAIL: 601 SPDK_ERRLOG("IP address %s not available. " 602 "Verify IP address in config file " 603 "and make sure setup script is " 604 "run before starting spdk app.\n", ip); 605 /* FALLTHROUGH */ 606 default: 607 /* try next family */ 608 close(fd); 609 fd = -1; 610 continue; 611 } 612 } 613 /* bind OK */ 614 rc = listen(fd, 512); 615 if (rc != 0) { 616 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 617 close(fd); 618 fd = -1; 619 break; 620 } 621 622 flag = fcntl(fd, F_GETFL); 623 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 624 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 625 close(fd); 626 fd = -1; 627 break; 628 } 629 630 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 631 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 632 src_addr = SPDK_GET_FIELD(opts, src_addr, NULL, opts->opts_size); 633 src_port = SPDK_GET_FIELD(opts, src_port, 0, opts->opts_size); 634 if (src_addr != NULL || src_port != 0) { 635 snprintf(portnum, sizeof(portnum), "%"PRIu16, src_port); 636 memset(&hints, 0, sizeof hints); 637 hints.ai_family = AF_UNSPEC; 638 hints.ai_socktype = SOCK_STREAM; 639 hints.ai_flags = AI_NUMERICSERV | AI_NUMERICHOST | AI_PASSIVE; 640 rc = getaddrinfo(src_addr, src_port > 0 ? portnum : NULL, 641 &hints, &src_ai); 642 if (rc != 0 || src_ai == NULL) { 643 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", 644 rc != 0 ? gai_strerror(rc) : "", rc); 645 close(fd); 646 fd = -1; 647 break; 648 } 649 rc = bind(fd, src_ai->ai_addr, src_ai->ai_addrlen); 650 if (rc != 0) { 651 SPDK_ERRLOG("bind() failed errno %d (%s:%s)\n", errno, 652 src_addr ? src_addr : "", portnum); 653 close(fd); 654 fd = -1; 655 freeaddrinfo(src_ai); 656 src_ai = NULL; 657 break; 658 } 659 freeaddrinfo(src_ai); 660 src_ai = NULL; 661 } 662 663 rc = connect(fd, res->ai_addr, res->ai_addrlen); 664 if (rc != 0) { 665 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 666 /* try next family */ 667 close(fd); 668 fd = -1; 669 continue; 670 } 671 672 flag = fcntl(fd, F_GETFL); 673 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 674 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 675 close(fd); 676 fd = -1; 677 break; 678 } 679 680 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 681 } 682 break; 683 } 684 freeaddrinfo(res0); 685 686 if (fd < 0) { 687 return NULL; 688 } 689 690 enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd); 691 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 692 if (sock == NULL) { 693 SPDK_ERRLOG("sock allocation failed\n"); 694 close(fd); 695 return NULL; 696 } 697 698 return &sock->base; 699 } 700 701 static struct spdk_sock * 702 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 703 { 704 if (spdk_interrupt_mode_is_enabled()) { 705 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 706 return NULL; 707 } 708 709 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 710 } 711 712 static struct spdk_sock * 713 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 714 { 715 if (spdk_interrupt_mode_is_enabled()) { 716 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 717 return NULL; 718 } 719 720 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 721 } 722 723 static struct spdk_sock * 724 uring_sock_accept(struct spdk_sock *_sock) 725 { 726 struct spdk_uring_sock *sock = __uring_sock(_sock); 727 struct sockaddr_storage sa; 728 socklen_t salen; 729 int rc, fd; 730 struct spdk_uring_sock *new_sock; 731 int flag; 732 733 memset(&sa, 0, sizeof(sa)); 734 salen = sizeof(sa); 735 736 assert(sock != NULL); 737 738 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 739 740 if (rc == -1) { 741 return NULL; 742 } 743 744 fd = rc; 745 746 flag = fcntl(fd, F_GETFL); 747 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 748 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 749 close(fd); 750 return NULL; 751 } 752 753 #if defined(SO_PRIORITY) 754 /* The priority is not inherited, so call this function again */ 755 if (sock->base.opts.priority) { 756 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 757 if (rc != 0) { 758 close(fd); 759 return NULL; 760 } 761 } 762 #endif 763 764 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 765 if (new_sock == NULL) { 766 close(fd); 767 return NULL; 768 } 769 770 return &new_sock->base; 771 } 772 773 static int 774 uring_sock_close(struct spdk_sock *_sock) 775 { 776 struct spdk_uring_sock *sock = __uring_sock(_sock); 777 778 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 779 assert(sock->group == NULL); 780 781 /* If the socket fails to close, the best choice is to 782 * leak the fd but continue to free the rest of the sock 783 * memory. */ 784 close(sock->fd); 785 786 spdk_pipe_destroy(sock->recv_pipe); 787 free(sock->recv_buf); 788 free(sock); 789 790 return 0; 791 } 792 793 static ssize_t 794 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 795 { 796 struct iovec siov[2]; 797 int sbytes; 798 ssize_t bytes; 799 struct spdk_uring_sock_group_impl *group; 800 801 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 802 if (sbytes < 0) { 803 errno = EINVAL; 804 return -1; 805 } else if (sbytes == 0) { 806 errno = EAGAIN; 807 return -1; 808 } 809 810 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 811 812 if (bytes == 0) { 813 /* The only way this happens is if diov is 0 length */ 814 errno = EINVAL; 815 return -1; 816 } 817 818 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 819 820 /* If we drained the pipe, take it off the level-triggered list */ 821 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 822 group = __uring_group_impl(sock->base.group_impl); 823 TAILQ_REMOVE(&group->pending_recv, sock, link); 824 sock->pending_recv = false; 825 } 826 827 return bytes; 828 } 829 830 static inline ssize_t 831 sock_readv(int fd, struct iovec *iov, int iovcnt) 832 { 833 struct msghdr msg = { 834 .msg_iov = iov, 835 .msg_iovlen = iovcnt, 836 }; 837 838 return recvmsg(fd, &msg, MSG_DONTWAIT); 839 } 840 841 static inline ssize_t 842 uring_sock_read(struct spdk_uring_sock *sock) 843 { 844 struct iovec iov[2]; 845 int bytes; 846 struct spdk_uring_sock_group_impl *group; 847 848 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 849 850 if (bytes > 0) { 851 bytes = sock_readv(sock->fd, iov, 2); 852 if (bytes > 0) { 853 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 854 if (sock->base.group_impl && !sock->pending_recv) { 855 group = __uring_group_impl(sock->base.group_impl); 856 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 857 sock->pending_recv = true; 858 } 859 } 860 } 861 862 return bytes; 863 } 864 865 static int 866 uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx) 867 { 868 struct spdk_uring_sock *sock = __uring_sock(_sock); 869 struct spdk_uring_sock_group_impl *group; 870 struct spdk_uring_buf_tracker *tr; 871 872 if (sock->connection_status < 0) { 873 errno = -sock->connection_status; 874 return -1; 875 } 876 877 if (sock->recv_pipe != NULL) { 878 errno = ENOTSUP; 879 return -1; 880 } 881 882 group = __uring_group_impl(_sock->group_impl); 883 884 tr = STAILQ_FIRST(&sock->recv_stream); 885 if (tr == NULL) { 886 if (sock->group->buf_ring_count > 0) { 887 /* There are buffers posted, but data hasn't arrived. */ 888 errno = EAGAIN; 889 } else { 890 /* There are no buffers posted, so this won't ever 891 * make forward progress. */ 892 errno = ENOBUFS; 893 } 894 return -1; 895 } 896 assert(sock->pending_recv == true); 897 assert(tr->buf != NULL); 898 899 *_buf = tr->buf + sock->recv_offset; 900 *ctx = tr->ctx; 901 902 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 903 STAILQ_INSERT_HEAD(&group->free_trackers, tr, link); 904 905 if (STAILQ_EMPTY(&sock->recv_stream)) { 906 sock->pending_recv = false; 907 TAILQ_REMOVE(&group->pending_recv, sock, link); 908 } 909 910 return tr->len - sock->recv_offset; 911 } 912 913 static ssize_t 914 uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt) 915 { 916 struct spdk_uring_sock *sock = __uring_sock(_sock); 917 struct spdk_uring_buf_tracker *tr; 918 struct iovec iov; 919 ssize_t total, len; 920 int i; 921 922 if (sock->connection_status < 0) { 923 errno = -sock->connection_status; 924 return -1; 925 } 926 927 if (_sock->group_impl == NULL) { 928 /* If not in a group just read from the socket the regular way. */ 929 return sock_readv(sock->fd, iovs, iovcnt); 930 } 931 932 if (STAILQ_EMPTY(&sock->recv_stream)) { 933 if (sock->group->buf_ring_count == 0) { 934 /* If the user hasn't posted any buffers, read from the socket 935 * directly. */ 936 937 if (sock->pending_recv) { 938 sock->pending_recv = false; 939 TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link); 940 } 941 942 return sock_readv(sock->fd, iovs, iovcnt); 943 } 944 945 errno = EAGAIN; 946 return -1; 947 } 948 949 total = 0; 950 for (i = 0; i < iovcnt; i++) { 951 /* Copy to stack so we can change it */ 952 iov = iovs[i]; 953 954 tr = STAILQ_FIRST(&sock->recv_stream); 955 while (tr != NULL) { 956 len = spdk_min(iov.iov_len, tr->len - sock->recv_offset); 957 memcpy(iov.iov_base, tr->buf + sock->recv_offset, len); 958 959 total += len; 960 sock->recv_offset += len; 961 iov.iov_base += len; 962 iov.iov_len -= len; 963 964 if (sock->recv_offset == tr->len) { 965 sock->recv_offset = 0; 966 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 967 STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link); 968 spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx); 969 tr = STAILQ_FIRST(&sock->recv_stream); 970 } 971 972 if (iov.iov_len == 0) { 973 break; 974 } 975 } 976 } 977 978 if (STAILQ_EMPTY(&sock->recv_stream)) { 979 struct spdk_uring_sock_group_impl *group; 980 981 group = __uring_group_impl(_sock->group_impl); 982 sock->pending_recv = false; 983 TAILQ_REMOVE(&group->pending_recv, sock, link); 984 } 985 986 assert(total > 0); 987 return total; 988 } 989 990 static ssize_t 991 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 992 { 993 struct spdk_uring_sock *sock = __uring_sock(_sock); 994 int rc, i; 995 size_t len; 996 997 if (sock->connection_status < 0) { 998 errno = -sock->connection_status; 999 return -1; 1000 } 1001 1002 if (sock->recv_pipe == NULL) { 1003 return uring_sock_readv_no_pipe(_sock, iov, iovcnt); 1004 } 1005 1006 len = 0; 1007 for (i = 0; i < iovcnt; i++) { 1008 len += iov[i].iov_len; 1009 } 1010 1011 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1012 /* If the user is receiving a sufficiently large amount of data, 1013 * receive directly to their buffers. */ 1014 if (len >= MIN_SOCK_PIPE_SIZE) { 1015 return sock_readv(sock->fd, iov, iovcnt); 1016 } 1017 1018 /* Otherwise, do a big read into our pipe */ 1019 rc = uring_sock_read(sock); 1020 if (rc <= 0) { 1021 return rc; 1022 } 1023 } 1024 1025 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 1026 } 1027 1028 static ssize_t 1029 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1030 { 1031 struct iovec iov[1]; 1032 1033 iov[0].iov_base = buf; 1034 iov[0].iov_len = len; 1035 1036 return uring_sock_readv(sock, iov, 1); 1037 } 1038 1039 static ssize_t 1040 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1041 { 1042 struct spdk_uring_sock *sock = __uring_sock(_sock); 1043 struct msghdr msg = { 1044 .msg_iov = iov, 1045 .msg_iovlen = iovcnt, 1046 }; 1047 1048 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1049 errno = EAGAIN; 1050 return -1; 1051 } 1052 1053 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 1054 } 1055 1056 static ssize_t 1057 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 1058 { 1059 unsigned int offset; 1060 size_t len; 1061 int i; 1062 1063 offset = req->internal.offset; 1064 for (i = 0; i < req->iovcnt; i++) { 1065 /* Advance by the offset first */ 1066 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1067 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1068 continue; 1069 } 1070 1071 /* Calculate the remaining length of this element */ 1072 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1073 1074 if (len > (size_t)rc) { 1075 req->internal.offset += rc; 1076 return -1; 1077 } 1078 1079 offset = 0; 1080 req->internal.offset += len; 1081 rc -= len; 1082 } 1083 1084 return rc; 1085 } 1086 1087 static int 1088 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 1089 { 1090 struct spdk_uring_sock *sock = __uring_sock(_sock); 1091 struct spdk_sock_request *req; 1092 int retval; 1093 1094 if (is_zcopy) { 1095 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1096 * req->internal.offset, so sendmsg_idx should not be zero */ 1097 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 1098 sock->sendmsg_idx = 1; 1099 } else { 1100 sock->sendmsg_idx++; 1101 } 1102 } 1103 1104 /* Consume the requests that were actually written */ 1105 req = TAILQ_FIRST(&_sock->queued_reqs); 1106 while (req) { 1107 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1108 req->internal.is_zcopy = is_zcopy; 1109 1110 rc = sock_request_advance_offset(req, rc); 1111 if (rc < 0) { 1112 /* This element was partially sent. */ 1113 return 0; 1114 } 1115 1116 /* Handled a full request. */ 1117 spdk_sock_request_pend(_sock, req); 1118 1119 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 1120 retval = spdk_sock_request_put(_sock, req, 0); 1121 if (retval) { 1122 return retval; 1123 } 1124 } else { 1125 /* Re-use the offset field to hold the sendmsg call index. The 1126 * index is 0 based, so subtract one here because we've already 1127 * incremented above. */ 1128 req->internal.offset = sock->sendmsg_idx - 1; 1129 } 1130 1131 if (rc == 0) { 1132 break; 1133 } 1134 1135 req = TAILQ_FIRST(&_sock->queued_reqs); 1136 } 1137 1138 return 0; 1139 } 1140 1141 #ifdef SPDK_ZEROCOPY 1142 static int 1143 _sock_check_zcopy(struct spdk_sock *_sock, int status) 1144 { 1145 struct spdk_uring_sock *sock = __uring_sock(_sock); 1146 ssize_t rc; 1147 struct sock_extended_err *serr; 1148 struct cmsghdr *cm; 1149 uint32_t idx; 1150 struct spdk_sock_request *req, *treq; 1151 bool found; 1152 1153 assert(sock->zcopy == true); 1154 if (spdk_unlikely(status) < 0) { 1155 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 1156 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 1157 status); 1158 } else { 1159 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1160 } 1161 return 0; 1162 } 1163 1164 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 1165 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1166 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 1167 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1168 return 0; 1169 } 1170 1171 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1172 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1173 SPDK_WARNLOG("Unexpected extended error origin\n"); 1174 return 0; 1175 } 1176 1177 /* Most of the time, the pending_reqs array is in the exact 1178 * order we need such that all of the requests to complete are 1179 * in order, in the front. It is guaranteed that all requests 1180 * belonging to the same sendmsg call are sequential, so once 1181 * we encounter one match we can stop looping as soon as a 1182 * non-match is found. 1183 */ 1184 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 1185 found = false; 1186 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 1187 if (!req->internal.is_zcopy) { 1188 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1189 rc = spdk_sock_request_put(_sock, req, 0); 1190 if (rc < 0) { 1191 return rc; 1192 } 1193 } else if (req->internal.offset == idx) { 1194 found = true; 1195 rc = spdk_sock_request_put(_sock, req, 0); 1196 if (rc < 0) { 1197 return rc; 1198 } 1199 } else if (found) { 1200 break; 1201 } 1202 } 1203 } 1204 1205 return 0; 1206 } 1207 1208 static void 1209 _sock_prep_errqueue(struct spdk_sock *_sock) 1210 { 1211 struct spdk_uring_sock *sock = __uring_sock(_sock); 1212 struct spdk_uring_task *task = &sock->errqueue_task; 1213 struct io_uring_sqe *sqe; 1214 1215 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1216 return; 1217 } 1218 1219 if (sock->pending_group_remove) { 1220 return; 1221 } 1222 1223 assert(sock->group != NULL); 1224 sock->group->io_queued++; 1225 1226 sqe = io_uring_get_sqe(&sock->group->uring); 1227 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1228 io_uring_sqe_set_data(sqe, task); 1229 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1230 } 1231 1232 #endif 1233 1234 static void 1235 _sock_flush(struct spdk_sock *_sock) 1236 { 1237 struct spdk_uring_sock *sock = __uring_sock(_sock); 1238 struct spdk_uring_task *task = &sock->write_task; 1239 uint32_t iovcnt; 1240 struct io_uring_sqe *sqe; 1241 int flags; 1242 1243 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1244 return; 1245 } 1246 1247 #ifdef SPDK_ZEROCOPY 1248 if (sock->zcopy) { 1249 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1250 } else 1251 #endif 1252 { 1253 flags = MSG_DONTWAIT; 1254 } 1255 1256 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1257 if (!iovcnt) { 1258 return; 1259 } 1260 1261 task->iov_cnt = iovcnt; 1262 assert(sock->group != NULL); 1263 task->msg.msg_iov = task->iovs; 1264 task->msg.msg_iovlen = task->iov_cnt; 1265 #ifdef SPDK_ZEROCOPY 1266 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1267 #endif 1268 sock->group->io_queued++; 1269 1270 sqe = io_uring_get_sqe(&sock->group->uring); 1271 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1272 io_uring_sqe_set_data(sqe, task); 1273 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1274 } 1275 1276 static void 1277 _sock_prep_read(struct spdk_sock *_sock) 1278 { 1279 struct spdk_uring_sock *sock = __uring_sock(_sock); 1280 struct spdk_uring_task *task = &sock->read_task; 1281 struct io_uring_sqe *sqe; 1282 1283 /* Do not prepare read event */ 1284 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1285 return; 1286 } 1287 1288 if (sock->pending_group_remove) { 1289 return; 1290 } 1291 1292 assert(sock->group != NULL); 1293 sock->group->io_queued++; 1294 1295 sqe = io_uring_get_sqe(&sock->group->uring); 1296 io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0); 1297 sqe->buf_group = URING_BUF_GROUP_ID; 1298 sqe->flags |= IOSQE_BUFFER_SELECT; 1299 io_uring_sqe_set_data(sqe, task); 1300 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1301 } 1302 1303 static void 1304 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1305 { 1306 struct spdk_uring_sock *sock = __uring_sock(_sock); 1307 struct spdk_uring_task *task = &sock->cancel_task; 1308 struct io_uring_sqe *sqe; 1309 1310 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1311 return; 1312 } 1313 1314 assert(sock->group != NULL); 1315 sock->group->io_queued++; 1316 1317 sqe = io_uring_get_sqe(&sock->group->uring); 1318 io_uring_prep_cancel(sqe, user_data, 0); 1319 io_uring_sqe_set_data(sqe, task); 1320 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1321 } 1322 1323 static void 1324 uring_sock_fail(struct spdk_uring_sock *sock, int status) 1325 { 1326 struct spdk_uring_sock_group_impl *group = sock->group; 1327 int rc; 1328 1329 sock->connection_status = status; 1330 rc = spdk_sock_abort_requests(&sock->base); 1331 1332 /* The user needs to be notified that this socket is dead. */ 1333 if (rc == 0 && sock->base.cb_fn != NULL && 1334 sock->pending_recv == false) { 1335 sock->pending_recv = true; 1336 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1337 } 1338 } 1339 1340 static int 1341 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1342 struct spdk_sock **socks) 1343 { 1344 int i, count, ret; 1345 struct io_uring_cqe *cqe; 1346 struct spdk_uring_sock *sock, *tmp; 1347 struct spdk_uring_task *task; 1348 int status, bid, flags; 1349 bool is_zcopy; 1350 1351 for (i = 0; i < max; i++) { 1352 ret = io_uring_peek_cqe(&group->uring, &cqe); 1353 if (ret != 0) { 1354 break; 1355 } 1356 1357 if (cqe == NULL) { 1358 break; 1359 } 1360 1361 task = (struct spdk_uring_task *)cqe->user_data; 1362 assert(task != NULL); 1363 sock = task->sock; 1364 assert(sock != NULL); 1365 assert(sock->group != NULL); 1366 assert(sock->group == group); 1367 sock->group->io_inflight--; 1368 sock->group->io_avail++; 1369 status = cqe->res; 1370 flags = cqe->flags; 1371 io_uring_cqe_seen(&group->uring, cqe); 1372 1373 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1374 1375 switch (task->type) { 1376 case URING_TASK_READ: 1377 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1378 /* This likely shouldn't happen, but would indicate that the 1379 * kernel didn't have enough resources to queue a task internally. */ 1380 _sock_prep_read(&sock->base); 1381 } else if (status == -ECANCELED) { 1382 continue; 1383 } else if (status == -ENOBUFS) { 1384 /* There's data in the socket but the user hasn't provided any buffers. 1385 * We need to notify the user that the socket has data pending. */ 1386 if (sock->base.cb_fn != NULL && 1387 sock->pending_recv == false) { 1388 sock->pending_recv = true; 1389 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1390 } 1391 1392 _sock_prep_read(&sock->base); 1393 } else if (spdk_unlikely(status <= 0)) { 1394 uring_sock_fail(sock, status < 0 ? status : -ECONNRESET); 1395 } else { 1396 struct spdk_uring_buf_tracker *tracker; 1397 1398 assert((flags & IORING_CQE_F_BUFFER) != 0); 1399 1400 bid = flags >> IORING_CQE_BUFFER_SHIFT; 1401 tracker = &group->trackers[bid]; 1402 1403 assert(tracker->buf != NULL); 1404 assert(tracker->len != 0); 1405 1406 /* Append this data to the stream */ 1407 tracker->len = status; 1408 STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link); 1409 assert(group->buf_ring_count > 0); 1410 group->buf_ring_count--; 1411 1412 if (sock->base.cb_fn != NULL && 1413 sock->pending_recv == false) { 1414 sock->pending_recv = true; 1415 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1416 } 1417 1418 _sock_prep_read(&sock->base); 1419 } 1420 break; 1421 case URING_TASK_WRITE: 1422 if (status == -EAGAIN || status == -EWOULDBLOCK || 1423 (status == -ENOBUFS && sock->zcopy) || 1424 status == -ECANCELED) { 1425 continue; 1426 } else if (spdk_unlikely(status) < 0) { 1427 uring_sock_fail(sock, status); 1428 } else { 1429 task->last_req = NULL; 1430 task->iov_cnt = 0; 1431 is_zcopy = task->is_zcopy; 1432 task->is_zcopy = false; 1433 sock_complete_write_reqs(&sock->base, status, is_zcopy); 1434 } 1435 1436 break; 1437 #ifdef SPDK_ZEROCOPY 1438 case URING_TASK_ERRQUEUE: 1439 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1440 _sock_prep_errqueue(&sock->base); 1441 } else if (status == -ECANCELED) { 1442 continue; 1443 } else if (spdk_unlikely(status < 0)) { 1444 uring_sock_fail(sock, status); 1445 } else { 1446 _sock_check_zcopy(&sock->base, status); 1447 _sock_prep_errqueue(&sock->base); 1448 } 1449 break; 1450 #endif 1451 case URING_TASK_CANCEL: 1452 /* Do nothing */ 1453 break; 1454 default: 1455 SPDK_UNREACHABLE(); 1456 } 1457 } 1458 1459 if (!socks) { 1460 return 0; 1461 } 1462 count = 0; 1463 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1464 if (count == max_read_events) { 1465 break; 1466 } 1467 1468 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1469 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1470 assert(sock->pending_recv == true); 1471 sock->pending_recv = false; 1472 TAILQ_REMOVE(&group->pending_recv, sock, link); 1473 continue; 1474 } 1475 1476 socks[count++] = &sock->base; 1477 } 1478 1479 1480 /* Cycle the pending_recv list so that each time we poll things aren't 1481 * in the same order. Say we have 6 sockets in the list, named as follows: 1482 * A B C D E F 1483 * And all 6 sockets had the poll events, but max_events is only 3. That means 1484 * psock currently points at D. We want to rearrange the list to the following: 1485 * D E F A B C 1486 * 1487 * The variables below are named according to this example to make it easier to 1488 * follow the swaps. 1489 */ 1490 if (sock != NULL) { 1491 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1492 1493 /* Capture pointers to the elements we need */ 1494 ud = sock; 1495 1496 ua = TAILQ_FIRST(&group->pending_recv); 1497 if (ua == ud) { 1498 goto end; 1499 } 1500 1501 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1502 if (uf == ud) { 1503 TAILQ_REMOVE(&group->pending_recv, ud, link); 1504 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1505 goto end; 1506 } 1507 1508 uc = TAILQ_PREV(ud, pending_recv_list, link); 1509 assert(uc != NULL); 1510 1511 /* Break the link between C and D */ 1512 uc->link.tqe_next = NULL; 1513 1514 /* Connect F to A */ 1515 uf->link.tqe_next = ua; 1516 ua->link.tqe_prev = &uf->link.tqe_next; 1517 1518 /* Fix up the list first/last pointers */ 1519 group->pending_recv.tqh_first = ud; 1520 group->pending_recv.tqh_last = &uc->link.tqe_next; 1521 1522 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1523 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1524 } 1525 1526 end: 1527 return count; 1528 } 1529 1530 static int uring_sock_flush(struct spdk_sock *_sock); 1531 1532 static void 1533 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1534 { 1535 struct spdk_uring_sock *sock = __uring_sock(_sock); 1536 int rc; 1537 1538 if (spdk_unlikely(sock->connection_status)) { 1539 req->cb_fn(req->cb_arg, sock->connection_status); 1540 return; 1541 } 1542 1543 spdk_sock_request_queue(_sock, req); 1544 1545 if (!sock->group) { 1546 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1547 rc = uring_sock_flush(_sock); 1548 if (rc < 0 && errno != EAGAIN) { 1549 spdk_sock_abort_requests(_sock); 1550 } 1551 } 1552 } 1553 } 1554 1555 static int 1556 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1557 { 1558 struct spdk_uring_sock *sock = __uring_sock(_sock); 1559 int val; 1560 int rc; 1561 1562 assert(sock != NULL); 1563 1564 val = nbytes; 1565 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1566 if (rc != 0) { 1567 return -1; 1568 } 1569 return 0; 1570 } 1571 1572 static bool 1573 uring_sock_is_ipv6(struct spdk_sock *_sock) 1574 { 1575 struct spdk_uring_sock *sock = __uring_sock(_sock); 1576 struct sockaddr_storage sa; 1577 socklen_t salen; 1578 int rc; 1579 1580 assert(sock != NULL); 1581 1582 memset(&sa, 0, sizeof sa); 1583 salen = sizeof sa; 1584 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1585 if (rc != 0) { 1586 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1587 return false; 1588 } 1589 1590 return (sa.ss_family == AF_INET6); 1591 } 1592 1593 static bool 1594 uring_sock_is_ipv4(struct spdk_sock *_sock) 1595 { 1596 struct spdk_uring_sock *sock = __uring_sock(_sock); 1597 struct sockaddr_storage sa; 1598 socklen_t salen; 1599 int rc; 1600 1601 assert(sock != NULL); 1602 1603 memset(&sa, 0, sizeof sa); 1604 salen = sizeof sa; 1605 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1606 if (rc != 0) { 1607 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1608 return false; 1609 } 1610 1611 return (sa.ss_family == AF_INET); 1612 } 1613 1614 static bool 1615 uring_sock_is_connected(struct spdk_sock *_sock) 1616 { 1617 struct spdk_uring_sock *sock = __uring_sock(_sock); 1618 uint8_t byte; 1619 int rc; 1620 1621 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1622 if (rc == 0) { 1623 return false; 1624 } 1625 1626 if (rc < 0) { 1627 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1628 return true; 1629 } 1630 1631 return false; 1632 } 1633 1634 return true; 1635 } 1636 1637 static struct spdk_sock_group_impl * 1638 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1639 { 1640 struct spdk_uring_sock *sock = __uring_sock(_sock); 1641 struct spdk_sock_group_impl *group; 1642 1643 if (sock->placement_id != -1) { 1644 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1645 return group; 1646 } 1647 1648 return NULL; 1649 } 1650 1651 static int 1652 uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl) 1653 { 1654 if (group_impl->buf_ring) { 1655 io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID); 1656 free(group_impl->buf_ring); 1657 } 1658 1659 free(group_impl->trackers); 1660 1661 return 0; 1662 } 1663 1664 static int 1665 uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl) 1666 { 1667 struct io_uring_buf_reg buf_reg = {}; 1668 struct io_uring_buf_ring *buf_ring; 1669 int i, rc; 1670 1671 rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf)); 1672 if (rc != 0) { 1673 /* posix_memalign returns positive errno values */ 1674 return -rc; 1675 } 1676 1677 buf_reg.ring_addr = (unsigned long long)buf_ring; 1678 buf_reg.ring_entries = URING_BUF_POOL_SIZE; 1679 buf_reg.bgid = URING_BUF_GROUP_ID; 1680 1681 rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0); 1682 if (rc != 0) { 1683 free(buf_ring); 1684 return rc; 1685 } 1686 1687 group_impl->buf_ring = buf_ring; 1688 io_uring_buf_ring_init(group_impl->buf_ring); 1689 group_impl->buf_ring_count = 0; 1690 1691 group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker)); 1692 if (group_impl->trackers == NULL) { 1693 uring_sock_group_impl_buf_pool_free(group_impl); 1694 return -ENOMEM; 1695 } 1696 1697 STAILQ_INIT(&group_impl->free_trackers); 1698 1699 for (i = 0; i < URING_BUF_POOL_SIZE; i++) { 1700 struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i]; 1701 1702 tracker->buf = NULL; 1703 tracker->len = 0; 1704 tracker->ctx = NULL; 1705 tracker->id = i; 1706 1707 STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link); 1708 } 1709 1710 return 0; 1711 } 1712 1713 static struct spdk_sock_group_impl * 1714 uring_sock_group_impl_create(void) 1715 { 1716 struct spdk_uring_sock_group_impl *group_impl; 1717 1718 group_impl = calloc(1, sizeof(*group_impl)); 1719 if (group_impl == NULL) { 1720 SPDK_ERRLOG("group_impl allocation failed\n"); 1721 return NULL; 1722 } 1723 1724 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1725 1726 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1727 SPDK_ERRLOG("uring I/O context setup failure\n"); 1728 free(group_impl); 1729 return NULL; 1730 } 1731 1732 TAILQ_INIT(&group_impl->pending_recv); 1733 1734 if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) { 1735 SPDK_ERRLOG("Failed to create buffer ring." 1736 "uring sock implementation is likely not supported on this kernel.\n"); 1737 io_uring_queue_exit(&group_impl->uring); 1738 free(group_impl); 1739 return NULL; 1740 } 1741 1742 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1743 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1744 } 1745 1746 return &group_impl->base; 1747 } 1748 1749 static int 1750 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1751 struct spdk_sock *_sock) 1752 { 1753 struct spdk_uring_sock *sock = __uring_sock(_sock); 1754 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1755 int rc; 1756 1757 sock->group = group; 1758 sock->write_task.sock = sock; 1759 sock->write_task.type = URING_TASK_WRITE; 1760 1761 sock->read_task.sock = sock; 1762 sock->read_task.type = URING_TASK_READ; 1763 1764 sock->errqueue_task.sock = sock; 1765 sock->errqueue_task.type = URING_TASK_ERRQUEUE; 1766 sock->errqueue_task.msg.msg_control = sock->buf; 1767 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1768 1769 sock->cancel_task.sock = sock; 1770 sock->cancel_task.type = URING_TASK_CANCEL; 1771 1772 /* switched from another polling group due to scheduling */ 1773 if (spdk_unlikely(sock->recv_pipe != NULL && 1774 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1775 assert(sock->pending_recv == false); 1776 sock->pending_recv = true; 1777 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1778 } 1779 1780 if (sock->placement_id != -1) { 1781 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1782 if (rc != 0) { 1783 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1784 /* Do not treat this as an error. The system will continue running. */ 1785 } 1786 } 1787 1788 /* We get an async read going immediately */ 1789 _sock_prep_read(&sock->base); 1790 #ifdef SPDK_ZEROCOPY 1791 if (sock->zcopy) { 1792 _sock_prep_errqueue(_sock); 1793 } 1794 #endif 1795 1796 return 0; 1797 } 1798 1799 static void 1800 uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group) 1801 { 1802 struct spdk_uring_buf_tracker *tracker; 1803 int count, mask; 1804 1805 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 1806 /* If recv_pipe is enabled, we do not post buffers. */ 1807 return; 1808 } 1809 1810 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1811 tracker = STAILQ_FIRST(&group->free_trackers); 1812 count = 0; 1813 mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE); 1814 while (tracker != NULL) { 1815 tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx); 1816 if (tracker->buflen == 0) { 1817 break; 1818 } 1819 1820 assert(tracker->buf != NULL); 1821 STAILQ_REMOVE_HEAD(&group->free_trackers, link); 1822 assert(STAILQ_FIRST(&group->free_trackers) != tracker); 1823 1824 io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count); 1825 count++; 1826 tracker = STAILQ_FIRST(&group->free_trackers); 1827 } 1828 1829 if (count > 0) { 1830 group->buf_ring_count += count; 1831 io_uring_buf_ring_advance(group->buf_ring, count); 1832 } 1833 } 1834 1835 static int 1836 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1837 struct spdk_sock **socks) 1838 { 1839 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1840 int count, ret; 1841 int to_complete, to_submit; 1842 struct spdk_sock *_sock, *tmp; 1843 struct spdk_uring_sock *sock; 1844 1845 if (spdk_likely(socks)) { 1846 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1847 sock = __uring_sock(_sock); 1848 if (spdk_unlikely(sock->connection_status)) { 1849 continue; 1850 } 1851 _sock_flush(_sock); 1852 } 1853 } 1854 1855 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1856 uring_sock_group_populate_buf_ring(group); 1857 1858 to_submit = group->io_queued; 1859 1860 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1861 if (to_submit > 0) { 1862 /* If there are I/O to submit, use io_uring_submit here. 1863 * It will automatically call io_uring_enter appropriately. */ 1864 ret = io_uring_submit(&group->uring); 1865 if (ret < 0) { 1866 return 1; 1867 } 1868 group->io_queued = 0; 1869 group->io_inflight += to_submit; 1870 group->io_avail -= to_submit; 1871 } 1872 1873 count = 0; 1874 to_complete = group->io_inflight; 1875 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1876 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1877 } 1878 1879 return count; 1880 } 1881 1882 static int 1883 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1884 struct spdk_sock *_sock) 1885 { 1886 struct spdk_uring_sock *sock = __uring_sock(_sock); 1887 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1888 1889 sock->pending_group_remove = true; 1890 1891 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1892 _sock_prep_cancel_task(_sock, &sock->write_task); 1893 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1894 * currently can use a while loop here. */ 1895 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1896 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1897 uring_sock_group_impl_poll(_group, 32, NULL); 1898 } 1899 } 1900 1901 if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1902 _sock_prep_cancel_task(_sock, &sock->read_task); 1903 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1904 * currently can use a while loop here. */ 1905 while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1906 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1907 uring_sock_group_impl_poll(_group, 32, NULL); 1908 } 1909 } 1910 1911 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1912 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1913 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1914 * currently can use a while loop here. */ 1915 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1916 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1917 uring_sock_group_impl_poll(_group, 32, NULL); 1918 } 1919 } 1920 1921 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1922 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1923 assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1924 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1925 1926 if (sock->pending_recv) { 1927 TAILQ_REMOVE(&group->pending_recv, sock, link); 1928 sock->pending_recv = false; 1929 } 1930 assert(sock->pending_recv == false); 1931 1932 /* We have no way to handle this case. We could let the user read this 1933 * buffer, but the buffer came from a group and we have lost the association 1934 * to that so we couldn't release it. */ 1935 assert(STAILQ_EMPTY(&sock->recv_stream)); 1936 1937 if (sock->placement_id != -1) { 1938 spdk_sock_map_release(&g_map, sock->placement_id); 1939 } 1940 1941 sock->pending_group_remove = false; 1942 sock->group = NULL; 1943 return 0; 1944 } 1945 1946 static int 1947 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1948 { 1949 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1950 1951 /* try to reap all the active I/O */ 1952 while (group->io_inflight) { 1953 uring_sock_group_impl_poll(_group, 32, NULL); 1954 } 1955 assert(group->io_inflight == 0); 1956 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1957 1958 uring_sock_group_impl_buf_pool_free(group); 1959 1960 io_uring_queue_exit(&group->uring); 1961 1962 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1963 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1964 } 1965 1966 free(group); 1967 return 0; 1968 } 1969 1970 static int 1971 uring_sock_flush(struct spdk_sock *_sock) 1972 { 1973 struct spdk_uring_sock *sock = __uring_sock(_sock); 1974 struct msghdr msg = {}; 1975 struct iovec iovs[IOV_BATCH_SIZE]; 1976 int iovcnt; 1977 ssize_t rc; 1978 int flags = sock->zcopy_send_flags; 1979 int retval; 1980 bool is_zcopy = false; 1981 struct spdk_uring_task *task = &sock->errqueue_task; 1982 1983 /* Can't flush from within a callback or we end up with recursive calls */ 1984 if (_sock->cb_cnt > 0) { 1985 errno = EAGAIN; 1986 return -1; 1987 } 1988 1989 /* Can't flush while a write is already outstanding */ 1990 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1991 errno = EAGAIN; 1992 return -1; 1993 } 1994 1995 /* Gather an iov */ 1996 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1997 if (iovcnt == 0) { 1998 /* Nothing to send */ 1999 return 0; 2000 } 2001 2002 /* Perform the vectored write */ 2003 msg.msg_iov = iovs; 2004 msg.msg_iovlen = iovcnt; 2005 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 2006 if (rc <= 0) { 2007 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) { 2008 errno = EAGAIN; 2009 } 2010 return -1; 2011 } 2012 2013 #ifdef SPDK_ZEROCOPY 2014 is_zcopy = flags & MSG_ZEROCOPY; 2015 #endif 2016 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 2017 if (retval < 0) { 2018 /* if the socket is closed, return to avoid heap-use-after-free error */ 2019 errno = ENOTCONN; 2020 return -1; 2021 } 2022 2023 #ifdef SPDK_ZEROCOPY 2024 /* At least do once to check zero copy case */ 2025 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 2026 retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE); 2027 if (retval < 0) { 2028 if (errno == EWOULDBLOCK || errno == EAGAIN) { 2029 return rc; 2030 } 2031 } 2032 _sock_check_zcopy(_sock, retval);; 2033 } 2034 #endif 2035 2036 return rc; 2037 } 2038 2039 static int 2040 uring_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events, 2041 spdk_interrupt_fn fn, void *arg, const char *name) 2042 { 2043 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 2044 2045 return -ENOTSUP; 2046 } 2047 2048 static void 2049 uring_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group) 2050 { 2051 } 2052 2053 static struct spdk_net_impl g_uring_net_impl = { 2054 .name = "uring", 2055 .getaddr = uring_sock_getaddr, 2056 .get_interface_name = uring_sock_get_interface_name, 2057 .get_numa_id = uring_sock_get_numa_id, 2058 .connect = uring_sock_connect, 2059 .listen = uring_sock_listen, 2060 .accept = uring_sock_accept, 2061 .close = uring_sock_close, 2062 .recv = uring_sock_recv, 2063 .readv = uring_sock_readv, 2064 .writev = uring_sock_writev, 2065 .recv_next = uring_sock_recv_next, 2066 .writev_async = uring_sock_writev_async, 2067 .flush = uring_sock_flush, 2068 .set_recvlowat = uring_sock_set_recvlowat, 2069 .set_recvbuf = uring_sock_set_recvbuf, 2070 .set_sendbuf = uring_sock_set_sendbuf, 2071 .is_ipv6 = uring_sock_is_ipv6, 2072 .is_ipv4 = uring_sock_is_ipv4, 2073 .is_connected = uring_sock_is_connected, 2074 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 2075 .group_impl_create = uring_sock_group_impl_create, 2076 .group_impl_add_sock = uring_sock_group_impl_add_sock, 2077 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 2078 .group_impl_poll = uring_sock_group_impl_poll, 2079 .group_impl_register_interrupt = uring_sock_group_impl_register_interrupt, 2080 .group_impl_unregister_interrupt = uring_sock_group_impl_unregister_interrupt, 2081 .group_impl_close = uring_sock_group_impl_close, 2082 .get_opts = uring_sock_impl_get_opts, 2083 .set_opts = uring_sock_impl_set_opts, 2084 }; 2085 2086 __attribute__((constructor)) static void 2087 net_impl_register_uring(void) 2088 { 2089 struct spdk_sock_group_impl *impl; 2090 2091 /* Check if we can create a uring sock group before we register 2092 * it as a valid impl. */ 2093 impl = uring_sock_group_impl_create(); 2094 if (impl) { 2095 uring_sock_group_impl_close(impl); 2096 spdk_net_impl_register(&g_uring_net_impl); 2097 } 2098 } 2099