1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2019 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/config.h" 8 9 #include <linux/errqueue.h> 10 #include <liburing.h> 11 12 #include "spdk/barrier.h" 13 #include "spdk/env.h" 14 #include "spdk/log.h" 15 #include "spdk/pipe.h" 16 #include "spdk/sock.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/net.h" 20 #include "spdk/file.h" 21 22 #include "spdk_internal/sock.h" 23 #include "spdk_internal/assert.h" 24 #include "spdk/net.h" 25 26 #define MAX_TMPBUF 1024 27 #define PORTNUMLEN 32 28 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 29 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)) 30 31 enum uring_task_type { 32 URING_TASK_READ = 0, 33 URING_TASK_ERRQUEUE, 34 URING_TASK_WRITE, 35 URING_TASK_CANCEL, 36 }; 37 38 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 39 #define SPDK_ZEROCOPY 40 #endif 41 42 /* We don't know how big the buffers that the user posts will be, but this 43 * is the maximum we'll ever allow it to receive in a single command. 44 * If the user buffers are smaller, it will just receive less. */ 45 #define URING_MAX_RECV_SIZE (128 * 1024) 46 47 /* We don't know how many buffers the user will post, but this is the 48 * maximum number we'll take from the pool to post per group. */ 49 #define URING_BUF_POOL_SIZE 128 50 51 /* We use 1 just so it's not zero and we can validate it's right. */ 52 #define URING_BUF_GROUP_ID 1 53 54 enum spdk_uring_sock_task_status { 55 SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, 56 SPDK_URING_SOCK_TASK_IN_PROCESS, 57 }; 58 59 struct spdk_uring_task { 60 enum spdk_uring_sock_task_status status; 61 enum uring_task_type type; 62 struct spdk_uring_sock *sock; 63 struct msghdr msg; 64 struct iovec iovs[IOV_BATCH_SIZE]; 65 int iov_cnt; 66 struct spdk_sock_request *last_req; 67 bool is_zcopy; 68 STAILQ_ENTRY(spdk_uring_task) link; 69 }; 70 71 struct spdk_uring_sock { 72 struct spdk_sock base; 73 int fd; 74 uint32_t sendmsg_idx; 75 struct spdk_uring_sock_group_impl *group; 76 STAILQ_HEAD(, spdk_uring_buf_tracker) recv_stream; 77 size_t recv_offset; 78 struct spdk_uring_task write_task; 79 struct spdk_uring_task errqueue_task; 80 struct spdk_uring_task read_task; 81 struct spdk_uring_task cancel_task; 82 struct spdk_pipe *recv_pipe; 83 void *recv_buf; 84 int recv_buf_sz; 85 bool zcopy; 86 bool pending_recv; 87 bool pending_group_remove; 88 int zcopy_send_flags; 89 int connection_status; 90 int placement_id; 91 uint8_t reserved[4]; 92 uint8_t buf[SPDK_SOCK_CMG_INFO_SIZE]; 93 TAILQ_ENTRY(spdk_uring_sock) link; 94 char interface_name[IFNAMSIZ]; 95 }; 96 /* 'struct cmsghdr' is mapped to the buffer 'buf', and while first element 97 * of this control message header has a size of 8 bytes, 'buf' 98 * must be 8-byte aligned. 99 */ 100 SPDK_STATIC_ASSERT(offsetof(struct spdk_uring_sock, buf) % 8 == 0, 101 "Incorrect alignment: `buf` must be aligned to 8 bytes"); 102 103 TAILQ_HEAD(pending_recv_list, spdk_uring_sock); 104 105 struct spdk_uring_buf_tracker { 106 void *buf; 107 size_t buflen; 108 size_t len; 109 void *ctx; 110 int id; 111 STAILQ_ENTRY(spdk_uring_buf_tracker) link; 112 }; 113 114 struct spdk_uring_sock_group_impl { 115 struct spdk_sock_group_impl base; 116 struct io_uring uring; 117 uint32_t io_inflight; 118 uint32_t io_queued; 119 uint32_t io_avail; 120 struct pending_recv_list pending_recv; 121 122 struct io_uring_buf_ring *buf_ring; 123 uint32_t buf_ring_count; 124 struct spdk_uring_buf_tracker *trackers; 125 STAILQ_HEAD(, spdk_uring_buf_tracker) free_trackers; 126 }; 127 128 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = { 129 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 130 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 131 .enable_recv_pipe = true, 132 .enable_quickack = false, 133 .enable_placement_id = PLACEMENT_NONE, 134 .enable_zerocopy_send_server = false, 135 .enable_zerocopy_send_client = false, 136 .zerocopy_threshold = 0, 137 .tls_version = 0, 138 .enable_ktls = false, 139 .psk_key = NULL, 140 .psk_identity = NULL 141 }; 142 143 static struct spdk_sock_map g_map = { 144 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 145 .mtx = PTHREAD_MUTEX_INITIALIZER 146 }; 147 148 __attribute((destructor)) static void 149 uring_sock_map_cleanup(void) 150 { 151 spdk_sock_map_cleanup(&g_map); 152 } 153 154 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) 155 156 #define __uring_sock(sock) (struct spdk_uring_sock *)sock 157 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group 158 159 static void 160 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 161 size_t len) 162 { 163 #define FIELD_OK(field) \ 164 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 165 166 #define SET_FIELD(field) \ 167 if (FIELD_OK(field)) { \ 168 dest->field = src->field; \ 169 } 170 171 SET_FIELD(recv_buf_size); 172 SET_FIELD(send_buf_size); 173 SET_FIELD(enable_recv_pipe); 174 SET_FIELD(enable_quickack); 175 SET_FIELD(enable_placement_id); 176 SET_FIELD(enable_zerocopy_send_server); 177 SET_FIELD(enable_zerocopy_send_client); 178 SET_FIELD(zerocopy_threshold); 179 SET_FIELD(tls_version); 180 SET_FIELD(enable_ktls); 181 SET_FIELD(psk_key); 182 SET_FIELD(psk_identity); 183 184 #undef SET_FIELD 185 #undef FIELD_OK 186 } 187 188 static int 189 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 190 { 191 if (!opts || !len) { 192 errno = EINVAL; 193 return -1; 194 } 195 196 assert(sizeof(*opts) >= *len); 197 memset(opts, 0, *len); 198 199 uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len); 200 *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts)); 201 202 return 0; 203 } 204 205 static int 206 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 207 { 208 if (!opts) { 209 errno = EINVAL; 210 return -1; 211 } 212 213 assert(sizeof(*opts) >= len); 214 uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len); 215 216 return 0; 217 } 218 219 static void 220 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 221 { 222 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 223 memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest)); 224 225 if (opts->impl_opts != NULL) { 226 assert(sizeof(*dest) >= opts->impl_opts_size); 227 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 228 } 229 } 230 231 static int 232 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 233 char *caddr, int clen, uint16_t *cport) 234 { 235 struct spdk_uring_sock *sock = __uring_sock(_sock); 236 237 assert(sock != NULL); 238 return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport); 239 } 240 241 static const char * 242 uring_sock_get_interface_name(struct spdk_sock *_sock) 243 { 244 struct spdk_uring_sock *sock = __uring_sock(_sock); 245 char saddr[64]; 246 int rc; 247 248 rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL); 249 if (rc != 0) { 250 return NULL; 251 } 252 253 rc = spdk_net_get_interface_name(saddr, sock->interface_name, 254 sizeof(sock->interface_name)); 255 if (rc != 0) { 256 return NULL; 257 } 258 259 return sock->interface_name; 260 } 261 262 static int32_t 263 uring_sock_get_numa_id(struct spdk_sock *sock) 264 { 265 const char *interface_name; 266 uint32_t numa_id; 267 int rc; 268 269 interface_name = uring_sock_get_interface_name(sock); 270 if (interface_name == NULL) { 271 return SPDK_ENV_NUMA_ID_ANY; 272 } 273 274 rc = spdk_read_sysfs_attribute_uint32(&numa_id, 275 "/sys/class/net/%s/device/numa_node", interface_name); 276 if (rc == 0 && numa_id <= INT32_MAX) { 277 return (int32_t)numa_id; 278 } else { 279 return SPDK_ENV_NUMA_ID_ANY; 280 } 281 } 282 283 enum uring_sock_create_type { 284 SPDK_SOCK_CREATE_LISTEN, 285 SPDK_SOCK_CREATE_CONNECT, 286 }; 287 288 static int 289 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) 290 { 291 uint8_t *new_buf; 292 struct spdk_pipe *new_pipe; 293 struct iovec siov[2]; 294 struct iovec diov[2]; 295 int sbytes; 296 ssize_t bytes; 297 int rc; 298 299 if (sock->recv_buf_sz == sz) { 300 return 0; 301 } 302 303 /* If the new size is 0, just free the pipe */ 304 if (sz == 0) { 305 spdk_pipe_destroy(sock->recv_pipe); 306 free(sock->recv_buf); 307 sock->recv_pipe = NULL; 308 sock->recv_buf = NULL; 309 return 0; 310 } else if (sz < MIN_SOCK_PIPE_SIZE) { 311 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 312 return -1; 313 } 314 315 /* Round up to next 64 byte multiple */ 316 rc = posix_memalign((void **)&new_buf, 64, sz); 317 if (rc != 0) { 318 SPDK_ERRLOG("socket recv buf allocation failed\n"); 319 return -ENOMEM; 320 } 321 memset(new_buf, 0, sz); 322 323 new_pipe = spdk_pipe_create(new_buf, sz); 324 if (new_pipe == NULL) { 325 SPDK_ERRLOG("socket pipe allocation failed\n"); 326 free(new_buf); 327 return -ENOMEM; 328 } 329 330 if (sock->recv_pipe != NULL) { 331 /* Pull all of the data out of the old pipe */ 332 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 333 if (sbytes > sz) { 334 /* Too much data to fit into the new pipe size */ 335 spdk_pipe_destroy(new_pipe); 336 free(new_buf); 337 return -EINVAL; 338 } 339 340 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 341 assert(sbytes == sz); 342 343 bytes = spdk_iovcpy(siov, 2, diov, 2); 344 spdk_pipe_writer_advance(new_pipe, bytes); 345 346 spdk_pipe_destroy(sock->recv_pipe); 347 free(sock->recv_buf); 348 } 349 350 sock->recv_buf_sz = sz; 351 sock->recv_buf = new_buf; 352 sock->recv_pipe = new_pipe; 353 354 return 0; 355 } 356 357 static int 358 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 359 { 360 struct spdk_uring_sock *sock = __uring_sock(_sock); 361 int min_size; 362 int rc; 363 364 assert(sock != NULL); 365 366 if (_sock->impl_opts.enable_recv_pipe) { 367 rc = uring_sock_alloc_pipe(sock, sz); 368 if (rc) { 369 SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); 370 return rc; 371 } 372 } 373 374 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 375 * g_spdk_uring_sock_impl_opts.recv_buf_size. */ 376 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size); 377 378 if (sz < min_size) { 379 sz = min_size; 380 } 381 382 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 383 if (rc < 0) { 384 return rc; 385 } 386 387 _sock->impl_opts.recv_buf_size = sz; 388 389 return 0; 390 } 391 392 static int 393 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 394 { 395 struct spdk_uring_sock *sock = __uring_sock(_sock); 396 int min_size; 397 int rc; 398 399 assert(sock != NULL); 400 401 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 402 * g_spdk_uring_sock_impl_opts.seend_buf_size. */ 403 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size); 404 405 if (sz < min_size) { 406 sz = min_size; 407 } 408 409 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 410 if (rc < 0) { 411 return rc; 412 } 413 414 _sock->impl_opts.send_buf_size = sz; 415 416 return 0; 417 } 418 419 static struct spdk_uring_sock * 420 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 421 { 422 struct spdk_uring_sock *sock; 423 #if defined(__linux__) 424 int flag; 425 int rc; 426 #endif 427 428 sock = calloc(1, sizeof(*sock)); 429 if (sock == NULL) { 430 SPDK_ERRLOG("sock allocation failed\n"); 431 return NULL; 432 } 433 434 sock->fd = fd; 435 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 436 437 STAILQ_INIT(&sock->recv_stream); 438 439 #if defined(__linux__) 440 flag = 1; 441 442 if (sock->base.impl_opts.enable_quickack) { 443 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 444 if (rc != 0) { 445 SPDK_ERRLOG("quickack was failed to set\n"); 446 } 447 } 448 449 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 450 &sock->placement_id); 451 #ifdef SPDK_ZEROCOPY 452 /* Try to turn on zero copy sends */ 453 flag = 1; 454 455 if (enable_zero_copy) { 456 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 457 if (rc == 0) { 458 sock->zcopy = true; 459 sock->zcopy_send_flags = MSG_ZEROCOPY; 460 } 461 } 462 #endif 463 #endif 464 465 return sock; 466 } 467 468 static struct spdk_sock * 469 uring_sock_create(const char *ip, int port, 470 enum uring_sock_create_type type, 471 struct spdk_sock_opts *opts) 472 { 473 struct spdk_uring_sock *sock; 474 struct spdk_sock_impl_opts impl_opts; 475 char buf[MAX_TMPBUF]; 476 char portnum[PORTNUMLEN]; 477 char *p; 478 const char *src_addr; 479 uint16_t src_port; 480 struct addrinfo hints, *res, *res0, *src_ai; 481 int fd, flag; 482 int val = 1; 483 int rc; 484 bool enable_zcopy_impl_opts = false; 485 bool enable_zcopy_user_opts = true; 486 487 assert(opts != NULL); 488 uring_opts_get_impl_opts(opts, &impl_opts); 489 490 if (ip == NULL) { 491 return NULL; 492 } 493 if (ip[0] == '[') { 494 snprintf(buf, sizeof(buf), "%s", ip + 1); 495 p = strchr(buf, ']'); 496 if (p != NULL) { 497 *p = '\0'; 498 } 499 ip = (const char *) &buf[0]; 500 } 501 502 snprintf(portnum, sizeof portnum, "%d", port); 503 memset(&hints, 0, sizeof hints); 504 hints.ai_family = PF_UNSPEC; 505 hints.ai_socktype = SOCK_STREAM; 506 hints.ai_flags = AI_NUMERICSERV; 507 hints.ai_flags |= AI_PASSIVE; 508 hints.ai_flags |= AI_NUMERICHOST; 509 rc = getaddrinfo(ip, portnum, &hints, &res0); 510 if (rc != 0) { 511 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 512 return NULL; 513 } 514 515 /* try listen */ 516 fd = -1; 517 for (res = res0; res != NULL; res = res->ai_next) { 518 retry: 519 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 520 if (fd < 0) { 521 /* error */ 522 continue; 523 } 524 525 val = impl_opts.recv_buf_size; 526 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); 527 if (rc) { 528 /* Not fatal */ 529 } 530 531 val = impl_opts.send_buf_size; 532 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); 533 if (rc) { 534 /* Not fatal */ 535 } 536 537 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 538 if (rc != 0) { 539 close(fd); 540 fd = -1; 541 /* error */ 542 continue; 543 } 544 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 545 if (rc != 0) { 546 close(fd); 547 fd = -1; 548 /* error */ 549 continue; 550 } 551 552 if (opts->ack_timeout) { 553 #if defined(__linux__) 554 val = opts->ack_timeout; 555 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val); 556 if (rc != 0) { 557 close(fd); 558 fd = -1; 559 /* error */ 560 continue; 561 } 562 #else 563 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 564 #endif 565 } 566 567 568 569 #if defined(SO_PRIORITY) 570 if (opts != NULL && opts->priority) { 571 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 572 if (rc != 0) { 573 close(fd); 574 fd = -1; 575 /* error */ 576 continue; 577 } 578 } 579 #endif 580 if (res->ai_family == AF_INET6) { 581 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 582 if (rc != 0) { 583 close(fd); 584 fd = -1; 585 /* error */ 586 continue; 587 } 588 } 589 590 if (type == SPDK_SOCK_CREATE_LISTEN) { 591 rc = bind(fd, res->ai_addr, res->ai_addrlen); 592 if (rc != 0) { 593 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 594 switch (errno) { 595 case EINTR: 596 /* interrupted? */ 597 close(fd); 598 goto retry; 599 case EADDRNOTAVAIL: 600 SPDK_ERRLOG("IP address %s not available. " 601 "Verify IP address in config file " 602 "and make sure setup script is " 603 "run before starting spdk app.\n", ip); 604 /* FALLTHROUGH */ 605 default: 606 /* try next family */ 607 close(fd); 608 fd = -1; 609 continue; 610 } 611 } 612 /* bind OK */ 613 rc = listen(fd, 512); 614 if (rc != 0) { 615 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 616 close(fd); 617 fd = -1; 618 break; 619 } 620 621 flag = fcntl(fd, F_GETFL); 622 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 623 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 624 close(fd); 625 fd = -1; 626 break; 627 } 628 629 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 630 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 631 src_addr = SPDK_GET_FIELD(opts, src_addr, NULL, opts->opts_size); 632 src_port = SPDK_GET_FIELD(opts, src_port, 0, opts->opts_size); 633 if (src_addr != NULL || src_port != 0) { 634 snprintf(portnum, sizeof(portnum), "%"PRIu16, src_port); 635 memset(&hints, 0, sizeof hints); 636 hints.ai_family = AF_UNSPEC; 637 hints.ai_socktype = SOCK_STREAM; 638 hints.ai_flags = AI_NUMERICSERV | AI_NUMERICHOST | AI_PASSIVE; 639 rc = getaddrinfo(src_addr, src_port > 0 ? portnum : NULL, 640 &hints, &src_ai); 641 if (rc != 0 || src_ai == NULL) { 642 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", 643 rc != 0 ? gai_strerror(rc) : "", rc); 644 close(fd); 645 fd = -1; 646 break; 647 } 648 rc = bind(fd, src_ai->ai_addr, src_ai->ai_addrlen); 649 if (rc != 0) { 650 SPDK_ERRLOG("bind() failed errno %d (%s:%s)\n", errno, 651 src_addr ? src_addr : "", portnum); 652 close(fd); 653 fd = -1; 654 freeaddrinfo(src_ai); 655 src_ai = NULL; 656 break; 657 } 658 freeaddrinfo(src_ai); 659 src_ai = NULL; 660 } 661 662 rc = connect(fd, res->ai_addr, res->ai_addrlen); 663 if (rc != 0) { 664 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 665 /* try next family */ 666 close(fd); 667 fd = -1; 668 continue; 669 } 670 671 flag = fcntl(fd, F_GETFL); 672 if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) { 673 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 674 close(fd); 675 fd = -1; 676 break; 677 } 678 679 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 680 } 681 break; 682 } 683 freeaddrinfo(res0); 684 685 if (fd < 0) { 686 return NULL; 687 } 688 689 enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd); 690 sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 691 if (sock == NULL) { 692 SPDK_ERRLOG("sock allocation failed\n"); 693 close(fd); 694 return NULL; 695 } 696 697 return &sock->base; 698 } 699 700 static struct spdk_sock * 701 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 702 { 703 if (spdk_interrupt_mode_is_enabled()) { 704 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 705 return NULL; 706 } 707 708 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); 709 } 710 711 static struct spdk_sock * 712 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 713 { 714 if (spdk_interrupt_mode_is_enabled()) { 715 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 716 return NULL; 717 } 718 719 return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); 720 } 721 722 static struct spdk_sock * 723 uring_sock_accept(struct spdk_sock *_sock) 724 { 725 struct spdk_uring_sock *sock = __uring_sock(_sock); 726 struct sockaddr_storage sa; 727 socklen_t salen; 728 int rc, fd; 729 struct spdk_uring_sock *new_sock; 730 int flag; 731 732 memset(&sa, 0, sizeof(sa)); 733 salen = sizeof(sa); 734 735 assert(sock != NULL); 736 737 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 738 739 if (rc == -1) { 740 return NULL; 741 } 742 743 fd = rc; 744 745 flag = fcntl(fd, F_GETFL); 746 if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) { 747 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno); 748 close(fd); 749 return NULL; 750 } 751 752 #if defined(SO_PRIORITY) 753 /* The priority is not inherited, so call this function again */ 754 if (sock->base.opts.priority) { 755 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 756 if (rc != 0) { 757 close(fd); 758 return NULL; 759 } 760 } 761 #endif 762 763 new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 764 if (new_sock == NULL) { 765 close(fd); 766 return NULL; 767 } 768 769 return &new_sock->base; 770 } 771 772 static int 773 uring_sock_close(struct spdk_sock *_sock) 774 { 775 struct spdk_uring_sock *sock = __uring_sock(_sock); 776 777 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 778 assert(sock->group == NULL); 779 780 /* If the socket fails to close, the best choice is to 781 * leak the fd but continue to free the rest of the sock 782 * memory. */ 783 close(sock->fd); 784 785 spdk_pipe_destroy(sock->recv_pipe); 786 free(sock->recv_buf); 787 free(sock); 788 789 return 0; 790 } 791 792 static ssize_t 793 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) 794 { 795 struct iovec siov[2]; 796 int sbytes; 797 ssize_t bytes; 798 struct spdk_uring_sock_group_impl *group; 799 800 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 801 if (sbytes < 0) { 802 errno = EINVAL; 803 return -1; 804 } else if (sbytes == 0) { 805 errno = EAGAIN; 806 return -1; 807 } 808 809 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 810 811 if (bytes == 0) { 812 /* The only way this happens is if diov is 0 length */ 813 errno = EINVAL; 814 return -1; 815 } 816 817 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 818 819 /* If we drained the pipe, take it off the level-triggered list */ 820 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 821 group = __uring_group_impl(sock->base.group_impl); 822 TAILQ_REMOVE(&group->pending_recv, sock, link); 823 sock->pending_recv = false; 824 } 825 826 return bytes; 827 } 828 829 static inline ssize_t 830 sock_readv(int fd, struct iovec *iov, int iovcnt) 831 { 832 struct msghdr msg = { 833 .msg_iov = iov, 834 .msg_iovlen = iovcnt, 835 }; 836 837 return recvmsg(fd, &msg, MSG_DONTWAIT); 838 } 839 840 static inline ssize_t 841 uring_sock_read(struct spdk_uring_sock *sock) 842 { 843 struct iovec iov[2]; 844 int bytes; 845 struct spdk_uring_sock_group_impl *group; 846 847 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 848 849 if (bytes > 0) { 850 bytes = sock_readv(sock->fd, iov, 2); 851 if (bytes > 0) { 852 spdk_pipe_writer_advance(sock->recv_pipe, bytes); 853 if (sock->base.group_impl && !sock->pending_recv) { 854 group = __uring_group_impl(sock->base.group_impl); 855 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 856 sock->pending_recv = true; 857 } 858 } 859 } 860 861 return bytes; 862 } 863 864 static int 865 uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx) 866 { 867 struct spdk_uring_sock *sock = __uring_sock(_sock); 868 struct spdk_uring_sock_group_impl *group; 869 struct spdk_uring_buf_tracker *tr; 870 871 if (sock->connection_status < 0) { 872 errno = -sock->connection_status; 873 return -1; 874 } 875 876 if (sock->recv_pipe != NULL) { 877 errno = ENOTSUP; 878 return -1; 879 } 880 881 group = __uring_group_impl(_sock->group_impl); 882 883 tr = STAILQ_FIRST(&sock->recv_stream); 884 if (tr == NULL) { 885 if (sock->group->buf_ring_count > 0) { 886 /* There are buffers posted, but data hasn't arrived. */ 887 errno = EAGAIN; 888 } else { 889 /* There are no buffers posted, so this won't ever 890 * make forward progress. */ 891 errno = ENOBUFS; 892 } 893 return -1; 894 } 895 assert(sock->pending_recv == true); 896 assert(tr->buf != NULL); 897 898 *_buf = tr->buf + sock->recv_offset; 899 *ctx = tr->ctx; 900 901 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 902 STAILQ_INSERT_HEAD(&group->free_trackers, tr, link); 903 904 if (STAILQ_EMPTY(&sock->recv_stream)) { 905 sock->pending_recv = false; 906 TAILQ_REMOVE(&group->pending_recv, sock, link); 907 } 908 909 return tr->len - sock->recv_offset; 910 } 911 912 static ssize_t 913 uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt) 914 { 915 struct spdk_uring_sock *sock = __uring_sock(_sock); 916 struct spdk_uring_buf_tracker *tr; 917 struct iovec iov; 918 ssize_t total, len; 919 int i; 920 921 if (sock->connection_status < 0) { 922 errno = -sock->connection_status; 923 return -1; 924 } 925 926 if (_sock->group_impl == NULL) { 927 /* If not in a group just read from the socket the regular way. */ 928 return sock_readv(sock->fd, iovs, iovcnt); 929 } 930 931 if (STAILQ_EMPTY(&sock->recv_stream)) { 932 if (sock->group->buf_ring_count == 0) { 933 /* If the user hasn't posted any buffers, read from the socket 934 * directly. */ 935 936 if (sock->pending_recv) { 937 sock->pending_recv = false; 938 TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link); 939 } 940 941 return sock_readv(sock->fd, iovs, iovcnt); 942 } 943 944 errno = EAGAIN; 945 return -1; 946 } 947 948 total = 0; 949 for (i = 0; i < iovcnt; i++) { 950 /* Copy to stack so we can change it */ 951 iov = iovs[i]; 952 953 tr = STAILQ_FIRST(&sock->recv_stream); 954 while (tr != NULL) { 955 len = spdk_min(iov.iov_len, tr->len - sock->recv_offset); 956 memcpy(iov.iov_base, tr->buf + sock->recv_offset, len); 957 958 total += len; 959 sock->recv_offset += len; 960 iov.iov_base += len; 961 iov.iov_len -= len; 962 963 if (sock->recv_offset == tr->len) { 964 sock->recv_offset = 0; 965 STAILQ_REMOVE_HEAD(&sock->recv_stream, link); 966 STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link); 967 spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx); 968 tr = STAILQ_FIRST(&sock->recv_stream); 969 } 970 971 if (iov.iov_len == 0) { 972 break; 973 } 974 } 975 } 976 977 if (STAILQ_EMPTY(&sock->recv_stream)) { 978 struct spdk_uring_sock_group_impl *group; 979 980 group = __uring_group_impl(_sock->group_impl); 981 sock->pending_recv = false; 982 TAILQ_REMOVE(&group->pending_recv, sock, link); 983 } 984 985 assert(total > 0); 986 return total; 987 } 988 989 static ssize_t 990 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 991 { 992 struct spdk_uring_sock *sock = __uring_sock(_sock); 993 int rc, i; 994 size_t len; 995 996 if (sock->connection_status < 0) { 997 errno = -sock->connection_status; 998 return -1; 999 } 1000 1001 if (sock->recv_pipe == NULL) { 1002 return uring_sock_readv_no_pipe(_sock, iov, iovcnt); 1003 } 1004 1005 len = 0; 1006 for (i = 0; i < iovcnt; i++) { 1007 len += iov[i].iov_len; 1008 } 1009 1010 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1011 /* If the user is receiving a sufficiently large amount of data, 1012 * receive directly to their buffers. */ 1013 if (len >= MIN_SOCK_PIPE_SIZE) { 1014 return sock_readv(sock->fd, iov, iovcnt); 1015 } 1016 1017 /* Otherwise, do a big read into our pipe */ 1018 rc = uring_sock_read(sock); 1019 if (rc <= 0) { 1020 return rc; 1021 } 1022 } 1023 1024 return uring_sock_recv_from_pipe(sock, iov, iovcnt); 1025 } 1026 1027 static ssize_t 1028 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1029 { 1030 struct iovec iov[1]; 1031 1032 iov[0].iov_base = buf; 1033 iov[0].iov_len = len; 1034 1035 return uring_sock_readv(sock, iov, 1); 1036 } 1037 1038 static ssize_t 1039 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1040 { 1041 struct spdk_uring_sock *sock = __uring_sock(_sock); 1042 struct msghdr msg = { 1043 .msg_iov = iov, 1044 .msg_iovlen = iovcnt, 1045 }; 1046 1047 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1048 errno = EAGAIN; 1049 return -1; 1050 } 1051 1052 return sendmsg(sock->fd, &msg, MSG_DONTWAIT); 1053 } 1054 1055 static ssize_t 1056 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc) 1057 { 1058 unsigned int offset; 1059 size_t len; 1060 int i; 1061 1062 offset = req->internal.offset; 1063 for (i = 0; i < req->iovcnt; i++) { 1064 /* Advance by the offset first */ 1065 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1066 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1067 continue; 1068 } 1069 1070 /* Calculate the remaining length of this element */ 1071 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1072 1073 if (len > (size_t)rc) { 1074 req->internal.offset += rc; 1075 return -1; 1076 } 1077 1078 offset = 0; 1079 req->internal.offset += len; 1080 rc -= len; 1081 } 1082 1083 return rc; 1084 } 1085 1086 static int 1087 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy) 1088 { 1089 struct spdk_uring_sock *sock = __uring_sock(_sock); 1090 struct spdk_sock_request *req; 1091 int retval; 1092 1093 if (is_zcopy) { 1094 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1095 * req->internal.offset, so sendmsg_idx should not be zero */ 1096 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) { 1097 sock->sendmsg_idx = 1; 1098 } else { 1099 sock->sendmsg_idx++; 1100 } 1101 } 1102 1103 /* Consume the requests that were actually written */ 1104 req = TAILQ_FIRST(&_sock->queued_reqs); 1105 while (req) { 1106 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1107 req->internal.is_zcopy = is_zcopy; 1108 1109 rc = sock_request_advance_offset(req, rc); 1110 if (rc < 0) { 1111 /* This element was partially sent. */ 1112 return 0; 1113 } 1114 1115 /* Handled a full request. */ 1116 spdk_sock_request_pend(_sock, req); 1117 1118 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) { 1119 retval = spdk_sock_request_put(_sock, req, 0); 1120 if (retval) { 1121 return retval; 1122 } 1123 } else { 1124 /* Re-use the offset field to hold the sendmsg call index. The 1125 * index is 0 based, so subtract one here because we've already 1126 * incremented above. */ 1127 req->internal.offset = sock->sendmsg_idx - 1; 1128 } 1129 1130 if (rc == 0) { 1131 break; 1132 } 1133 1134 req = TAILQ_FIRST(&_sock->queued_reqs); 1135 } 1136 1137 return 0; 1138 } 1139 1140 #ifdef SPDK_ZEROCOPY 1141 static int 1142 _sock_check_zcopy(struct spdk_sock *_sock, int status) 1143 { 1144 struct spdk_uring_sock *sock = __uring_sock(_sock); 1145 ssize_t rc; 1146 struct sock_extended_err *serr; 1147 struct cmsghdr *cm; 1148 uint32_t idx; 1149 struct spdk_sock_request *req, *treq; 1150 bool found; 1151 1152 assert(sock->zcopy == true); 1153 if (spdk_unlikely(status) < 0) { 1154 if (!TAILQ_EMPTY(&_sock->pending_reqs)) { 1155 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n", 1156 status); 1157 } else { 1158 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1159 } 1160 return 0; 1161 } 1162 1163 cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg); 1164 if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1165 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) { 1166 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1167 return 0; 1168 } 1169 1170 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1171 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1172 SPDK_WARNLOG("Unexpected extended error origin\n"); 1173 return 0; 1174 } 1175 1176 /* Most of the time, the pending_reqs array is in the exact 1177 * order we need such that all of the requests to complete are 1178 * in order, in the front. It is guaranteed that all requests 1179 * belonging to the same sendmsg call are sequential, so once 1180 * we encounter one match we can stop looping as soon as a 1181 * non-match is found. 1182 */ 1183 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 1184 found = false; 1185 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) { 1186 if (!req->internal.is_zcopy) { 1187 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1188 rc = spdk_sock_request_put(_sock, req, 0); 1189 if (rc < 0) { 1190 return rc; 1191 } 1192 } else if (req->internal.offset == idx) { 1193 found = true; 1194 rc = spdk_sock_request_put(_sock, req, 0); 1195 if (rc < 0) { 1196 return rc; 1197 } 1198 } else if (found) { 1199 break; 1200 } 1201 } 1202 } 1203 1204 return 0; 1205 } 1206 1207 static void 1208 _sock_prep_errqueue(struct spdk_sock *_sock) 1209 { 1210 struct spdk_uring_sock *sock = __uring_sock(_sock); 1211 struct spdk_uring_task *task = &sock->errqueue_task; 1212 struct io_uring_sqe *sqe; 1213 1214 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1215 return; 1216 } 1217 1218 if (sock->pending_group_remove) { 1219 return; 1220 } 1221 1222 assert(sock->group != NULL); 1223 sock->group->io_queued++; 1224 1225 sqe = io_uring_get_sqe(&sock->group->uring); 1226 io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE); 1227 io_uring_sqe_set_data(sqe, task); 1228 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1229 } 1230 1231 #endif 1232 1233 static void 1234 _sock_flush(struct spdk_sock *_sock) 1235 { 1236 struct spdk_uring_sock *sock = __uring_sock(_sock); 1237 struct spdk_uring_task *task = &sock->write_task; 1238 uint32_t iovcnt; 1239 struct io_uring_sqe *sqe; 1240 int flags; 1241 1242 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1243 return; 1244 } 1245 1246 #ifdef SPDK_ZEROCOPY 1247 if (sock->zcopy) { 1248 flags = MSG_DONTWAIT | sock->zcopy_send_flags; 1249 } else 1250 #endif 1251 { 1252 flags = MSG_DONTWAIT; 1253 } 1254 1255 iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags); 1256 if (!iovcnt) { 1257 return; 1258 } 1259 1260 task->iov_cnt = iovcnt; 1261 assert(sock->group != NULL); 1262 task->msg.msg_iov = task->iovs; 1263 task->msg.msg_iovlen = task->iov_cnt; 1264 #ifdef SPDK_ZEROCOPY 1265 task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false; 1266 #endif 1267 sock->group->io_queued++; 1268 1269 sqe = io_uring_get_sqe(&sock->group->uring); 1270 io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags); 1271 io_uring_sqe_set_data(sqe, task); 1272 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1273 } 1274 1275 static void 1276 _sock_prep_read(struct spdk_sock *_sock) 1277 { 1278 struct spdk_uring_sock *sock = __uring_sock(_sock); 1279 struct spdk_uring_task *task = &sock->read_task; 1280 struct io_uring_sqe *sqe; 1281 1282 /* Do not prepare read event */ 1283 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1284 return; 1285 } 1286 1287 if (sock->pending_group_remove) { 1288 return; 1289 } 1290 1291 assert(sock->group != NULL); 1292 sock->group->io_queued++; 1293 1294 sqe = io_uring_get_sqe(&sock->group->uring); 1295 io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0); 1296 sqe->buf_group = URING_BUF_GROUP_ID; 1297 sqe->flags |= IOSQE_BUFFER_SELECT; 1298 io_uring_sqe_set_data(sqe, task); 1299 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1300 } 1301 1302 static void 1303 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) 1304 { 1305 struct spdk_uring_sock *sock = __uring_sock(_sock); 1306 struct spdk_uring_task *task = &sock->cancel_task; 1307 struct io_uring_sqe *sqe; 1308 1309 if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { 1310 return; 1311 } 1312 1313 assert(sock->group != NULL); 1314 sock->group->io_queued++; 1315 1316 sqe = io_uring_get_sqe(&sock->group->uring); 1317 io_uring_prep_cancel(sqe, user_data, 0); 1318 io_uring_sqe_set_data(sqe, task); 1319 task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; 1320 } 1321 1322 static void 1323 uring_sock_fail(struct spdk_uring_sock *sock, int status) 1324 { 1325 struct spdk_uring_sock_group_impl *group = sock->group; 1326 int rc; 1327 1328 sock->connection_status = status; 1329 rc = spdk_sock_abort_requests(&sock->base); 1330 1331 /* The user needs to be notified that this socket is dead. */ 1332 if (rc == 0 && sock->base.cb_fn != NULL && 1333 sock->pending_recv == false) { 1334 sock->pending_recv = true; 1335 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1336 } 1337 } 1338 1339 static int 1340 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, 1341 struct spdk_sock **socks) 1342 { 1343 int i, count, ret; 1344 struct io_uring_cqe *cqe; 1345 struct spdk_uring_sock *sock, *tmp; 1346 struct spdk_uring_task *task; 1347 int status, bid, flags; 1348 bool is_zcopy; 1349 1350 for (i = 0; i < max; i++) { 1351 ret = io_uring_peek_cqe(&group->uring, &cqe); 1352 if (ret != 0) { 1353 break; 1354 } 1355 1356 if (cqe == NULL) { 1357 break; 1358 } 1359 1360 task = (struct spdk_uring_task *)cqe->user_data; 1361 assert(task != NULL); 1362 sock = task->sock; 1363 assert(sock != NULL); 1364 assert(sock->group != NULL); 1365 assert(sock->group == group); 1366 sock->group->io_inflight--; 1367 sock->group->io_avail++; 1368 status = cqe->res; 1369 flags = cqe->flags; 1370 io_uring_cqe_seen(&group->uring, cqe); 1371 1372 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; 1373 1374 switch (task->type) { 1375 case URING_TASK_READ: 1376 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1377 /* This likely shouldn't happen, but would indicate that the 1378 * kernel didn't have enough resources to queue a task internally. */ 1379 _sock_prep_read(&sock->base); 1380 } else if (status == -ECANCELED) { 1381 continue; 1382 } else if (status == -ENOBUFS) { 1383 /* There's data in the socket but the user hasn't provided any buffers. 1384 * We need to notify the user that the socket has data pending. */ 1385 if (sock->base.cb_fn != NULL && 1386 sock->pending_recv == false) { 1387 sock->pending_recv = true; 1388 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1389 } 1390 1391 _sock_prep_read(&sock->base); 1392 } else if (spdk_unlikely(status <= 0)) { 1393 uring_sock_fail(sock, status < 0 ? status : -ECONNRESET); 1394 } else { 1395 struct spdk_uring_buf_tracker *tracker; 1396 1397 assert((flags & IORING_CQE_F_BUFFER) != 0); 1398 1399 bid = flags >> IORING_CQE_BUFFER_SHIFT; 1400 tracker = &group->trackers[bid]; 1401 1402 assert(tracker->buf != NULL); 1403 assert(tracker->len != 0); 1404 1405 /* Append this data to the stream */ 1406 tracker->len = status; 1407 STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link); 1408 assert(group->buf_ring_count > 0); 1409 group->buf_ring_count--; 1410 1411 if (sock->base.cb_fn != NULL && 1412 sock->pending_recv == false) { 1413 sock->pending_recv = true; 1414 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1415 } 1416 1417 _sock_prep_read(&sock->base); 1418 } 1419 break; 1420 case URING_TASK_WRITE: 1421 if (status == -EAGAIN || status == -EWOULDBLOCK || 1422 (status == -ENOBUFS && sock->zcopy) || 1423 status == -ECANCELED) { 1424 continue; 1425 } else if (spdk_unlikely(status) < 0) { 1426 uring_sock_fail(sock, status); 1427 } else { 1428 task->last_req = NULL; 1429 task->iov_cnt = 0; 1430 is_zcopy = task->is_zcopy; 1431 task->is_zcopy = false; 1432 sock_complete_write_reqs(&sock->base, status, is_zcopy); 1433 } 1434 1435 break; 1436 #ifdef SPDK_ZEROCOPY 1437 case URING_TASK_ERRQUEUE: 1438 if (status == -EAGAIN || status == -EWOULDBLOCK) { 1439 _sock_prep_errqueue(&sock->base); 1440 } else if (status == -ECANCELED) { 1441 continue; 1442 } else if (spdk_unlikely(status < 0)) { 1443 uring_sock_fail(sock, status); 1444 } else { 1445 _sock_check_zcopy(&sock->base, status); 1446 _sock_prep_errqueue(&sock->base); 1447 } 1448 break; 1449 #endif 1450 case URING_TASK_CANCEL: 1451 /* Do nothing */ 1452 break; 1453 default: 1454 SPDK_UNREACHABLE(); 1455 } 1456 } 1457 1458 if (!socks) { 1459 return 0; 1460 } 1461 count = 0; 1462 TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { 1463 if (count == max_read_events) { 1464 break; 1465 } 1466 1467 /* If the socket's cb_fn is NULL, do not add it to socks array */ 1468 if (spdk_unlikely(sock->base.cb_fn == NULL)) { 1469 assert(sock->pending_recv == true); 1470 sock->pending_recv = false; 1471 TAILQ_REMOVE(&group->pending_recv, sock, link); 1472 continue; 1473 } 1474 1475 socks[count++] = &sock->base; 1476 } 1477 1478 1479 /* Cycle the pending_recv list so that each time we poll things aren't 1480 * in the same order. Say we have 6 sockets in the list, named as follows: 1481 * A B C D E F 1482 * And all 6 sockets had the poll events, but max_events is only 3. That means 1483 * psock currently points at D. We want to rearrange the list to the following: 1484 * D E F A B C 1485 * 1486 * The variables below are named according to this example to make it easier to 1487 * follow the swaps. 1488 */ 1489 if (sock != NULL) { 1490 struct spdk_uring_sock *ua, *uc, *ud, *uf; 1491 1492 /* Capture pointers to the elements we need */ 1493 ud = sock; 1494 1495 ua = TAILQ_FIRST(&group->pending_recv); 1496 if (ua == ud) { 1497 goto end; 1498 } 1499 1500 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list); 1501 if (uf == ud) { 1502 TAILQ_REMOVE(&group->pending_recv, ud, link); 1503 TAILQ_INSERT_HEAD(&group->pending_recv, ud, link); 1504 goto end; 1505 } 1506 1507 uc = TAILQ_PREV(ud, pending_recv_list, link); 1508 assert(uc != NULL); 1509 1510 /* Break the link between C and D */ 1511 uc->link.tqe_next = NULL; 1512 1513 /* Connect F to A */ 1514 uf->link.tqe_next = ua; 1515 ua->link.tqe_prev = &uf->link.tqe_next; 1516 1517 /* Fix up the list first/last pointers */ 1518 group->pending_recv.tqh_first = ud; 1519 group->pending_recv.tqh_last = &uc->link.tqe_next; 1520 1521 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1522 ud->link.tqe_prev = &group->pending_recv.tqh_first; 1523 } 1524 1525 end: 1526 return count; 1527 } 1528 1529 static int uring_sock_flush(struct spdk_sock *_sock); 1530 1531 static void 1532 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) 1533 { 1534 struct spdk_uring_sock *sock = __uring_sock(_sock); 1535 int rc; 1536 1537 if (spdk_unlikely(sock->connection_status)) { 1538 req->cb_fn(req->cb_arg, sock->connection_status); 1539 return; 1540 } 1541 1542 spdk_sock_request_queue(_sock, req); 1543 1544 if (!sock->group) { 1545 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1546 rc = uring_sock_flush(_sock); 1547 if (rc < 0 && errno != EAGAIN) { 1548 spdk_sock_abort_requests(_sock); 1549 } 1550 } 1551 } 1552 } 1553 1554 static int 1555 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1556 { 1557 struct spdk_uring_sock *sock = __uring_sock(_sock); 1558 int val; 1559 int rc; 1560 1561 assert(sock != NULL); 1562 1563 val = nbytes; 1564 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1565 if (rc != 0) { 1566 return -1; 1567 } 1568 return 0; 1569 } 1570 1571 static bool 1572 uring_sock_is_ipv6(struct spdk_sock *_sock) 1573 { 1574 struct spdk_uring_sock *sock = __uring_sock(_sock); 1575 struct sockaddr_storage sa; 1576 socklen_t salen; 1577 int rc; 1578 1579 assert(sock != NULL); 1580 1581 memset(&sa, 0, sizeof sa); 1582 salen = sizeof sa; 1583 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1584 if (rc != 0) { 1585 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1586 return false; 1587 } 1588 1589 return (sa.ss_family == AF_INET6); 1590 } 1591 1592 static bool 1593 uring_sock_is_ipv4(struct spdk_sock *_sock) 1594 { 1595 struct spdk_uring_sock *sock = __uring_sock(_sock); 1596 struct sockaddr_storage sa; 1597 socklen_t salen; 1598 int rc; 1599 1600 assert(sock != NULL); 1601 1602 memset(&sa, 0, sizeof sa); 1603 salen = sizeof sa; 1604 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1605 if (rc != 0) { 1606 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1607 return false; 1608 } 1609 1610 return (sa.ss_family == AF_INET); 1611 } 1612 1613 static bool 1614 uring_sock_is_connected(struct spdk_sock *_sock) 1615 { 1616 struct spdk_uring_sock *sock = __uring_sock(_sock); 1617 uint8_t byte; 1618 int rc; 1619 1620 rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT); 1621 if (rc == 0) { 1622 return false; 1623 } 1624 1625 if (rc < 0) { 1626 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1627 return true; 1628 } 1629 1630 return false; 1631 } 1632 1633 return true; 1634 } 1635 1636 static struct spdk_sock_group_impl * 1637 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1638 { 1639 struct spdk_uring_sock *sock = __uring_sock(_sock); 1640 struct spdk_sock_group_impl *group; 1641 1642 if (sock->placement_id != -1) { 1643 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint); 1644 return group; 1645 } 1646 1647 return NULL; 1648 } 1649 1650 static int 1651 uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl) 1652 { 1653 if (group_impl->buf_ring) { 1654 io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID); 1655 free(group_impl->buf_ring); 1656 } 1657 1658 free(group_impl->trackers); 1659 1660 return 0; 1661 } 1662 1663 static int 1664 uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl) 1665 { 1666 struct io_uring_buf_reg buf_reg = {}; 1667 struct io_uring_buf_ring *buf_ring; 1668 int i, rc; 1669 1670 rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf)); 1671 if (rc != 0) { 1672 /* posix_memalign returns positive errno values */ 1673 return -rc; 1674 } 1675 1676 buf_reg.ring_addr = (unsigned long long)buf_ring; 1677 buf_reg.ring_entries = URING_BUF_POOL_SIZE; 1678 buf_reg.bgid = URING_BUF_GROUP_ID; 1679 1680 rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0); 1681 if (rc != 0) { 1682 free(buf_ring); 1683 return rc; 1684 } 1685 1686 group_impl->buf_ring = buf_ring; 1687 io_uring_buf_ring_init(group_impl->buf_ring); 1688 group_impl->buf_ring_count = 0; 1689 1690 group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker)); 1691 if (group_impl->trackers == NULL) { 1692 uring_sock_group_impl_buf_pool_free(group_impl); 1693 return -ENOMEM; 1694 } 1695 1696 STAILQ_INIT(&group_impl->free_trackers); 1697 1698 for (i = 0; i < URING_BUF_POOL_SIZE; i++) { 1699 struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i]; 1700 1701 tracker->buf = NULL; 1702 tracker->len = 0; 1703 tracker->ctx = NULL; 1704 tracker->id = i; 1705 1706 STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link); 1707 } 1708 1709 return 0; 1710 } 1711 1712 static struct spdk_sock_group_impl * 1713 uring_sock_group_impl_create(void) 1714 { 1715 struct spdk_uring_sock_group_impl *group_impl; 1716 1717 group_impl = calloc(1, sizeof(*group_impl)); 1718 if (group_impl == NULL) { 1719 SPDK_ERRLOG("group_impl allocation failed\n"); 1720 return NULL; 1721 } 1722 1723 group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; 1724 1725 if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { 1726 SPDK_ERRLOG("uring I/O context setup failure\n"); 1727 free(group_impl); 1728 return NULL; 1729 } 1730 1731 TAILQ_INIT(&group_impl->pending_recv); 1732 1733 if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) { 1734 SPDK_ERRLOG("Failed to create buffer ring." 1735 "uring sock implementation is likely not supported on this kernel.\n"); 1736 io_uring_queue_exit(&group_impl->uring); 1737 free(group_impl); 1738 return NULL; 1739 } 1740 1741 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1742 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1743 } 1744 1745 return &group_impl->base; 1746 } 1747 1748 static int 1749 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, 1750 struct spdk_sock *_sock) 1751 { 1752 struct spdk_uring_sock *sock = __uring_sock(_sock); 1753 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1754 int rc; 1755 1756 sock->group = group; 1757 sock->write_task.sock = sock; 1758 sock->write_task.type = URING_TASK_WRITE; 1759 1760 sock->read_task.sock = sock; 1761 sock->read_task.type = URING_TASK_READ; 1762 1763 sock->errqueue_task.sock = sock; 1764 sock->errqueue_task.type = URING_TASK_ERRQUEUE; 1765 sock->errqueue_task.msg.msg_control = sock->buf; 1766 sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf); 1767 1768 sock->cancel_task.sock = sock; 1769 sock->cancel_task.type = URING_TASK_CANCEL; 1770 1771 /* switched from another polling group due to scheduling */ 1772 if (spdk_unlikely(sock->recv_pipe != NULL && 1773 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1774 assert(sock->pending_recv == false); 1775 sock->pending_recv = true; 1776 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); 1777 } 1778 1779 if (sock->placement_id != -1) { 1780 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1781 if (rc != 0) { 1782 SPDK_ERRLOG("Failed to insert sock group into map: %d", rc); 1783 /* Do not treat this as an error. The system will continue running. */ 1784 } 1785 } 1786 1787 /* We get an async read going immediately */ 1788 _sock_prep_read(&sock->base); 1789 #ifdef SPDK_ZEROCOPY 1790 if (sock->zcopy) { 1791 _sock_prep_errqueue(_sock); 1792 } 1793 #endif 1794 1795 return 0; 1796 } 1797 1798 static void 1799 uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group) 1800 { 1801 struct spdk_uring_buf_tracker *tracker; 1802 int count, mask; 1803 1804 if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) { 1805 /* If recv_pipe is enabled, we do not post buffers. */ 1806 return; 1807 } 1808 1809 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1810 tracker = STAILQ_FIRST(&group->free_trackers); 1811 count = 0; 1812 mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE); 1813 while (tracker != NULL) { 1814 tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx); 1815 if (tracker->buflen == 0) { 1816 break; 1817 } 1818 1819 assert(tracker->buf != NULL); 1820 STAILQ_REMOVE_HEAD(&group->free_trackers, link); 1821 assert(STAILQ_FIRST(&group->free_trackers) != tracker); 1822 1823 io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count); 1824 count++; 1825 tracker = STAILQ_FIRST(&group->free_trackers); 1826 } 1827 1828 if (count > 0) { 1829 group->buf_ring_count += count; 1830 io_uring_buf_ring_advance(group->buf_ring, count); 1831 } 1832 } 1833 1834 static int 1835 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1836 struct spdk_sock **socks) 1837 { 1838 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1839 int count, ret; 1840 int to_complete, to_submit; 1841 struct spdk_sock *_sock, *tmp; 1842 struct spdk_uring_sock *sock; 1843 1844 if (spdk_likely(socks)) { 1845 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { 1846 sock = __uring_sock(_sock); 1847 if (spdk_unlikely(sock->connection_status)) { 1848 continue; 1849 } 1850 _sock_flush(_sock); 1851 } 1852 } 1853 1854 /* Try to re-populate the io_uring's buffer pool using user-provided buffers */ 1855 uring_sock_group_populate_buf_ring(group); 1856 1857 to_submit = group->io_queued; 1858 1859 /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ 1860 if (to_submit > 0) { 1861 /* If there are I/O to submit, use io_uring_submit here. 1862 * It will automatically call io_uring_enter appropriately. */ 1863 ret = io_uring_submit(&group->uring); 1864 if (ret < 0) { 1865 return 1; 1866 } 1867 group->io_queued = 0; 1868 group->io_inflight += to_submit; 1869 group->io_avail -= to_submit; 1870 } 1871 1872 count = 0; 1873 to_complete = group->io_inflight; 1874 if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) { 1875 count = sock_uring_group_reap(group, to_complete, max_events, socks); 1876 } 1877 1878 return count; 1879 } 1880 1881 static int 1882 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, 1883 struct spdk_sock *_sock) 1884 { 1885 struct spdk_uring_sock *sock = __uring_sock(_sock); 1886 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1887 1888 sock->pending_group_remove = true; 1889 1890 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1891 _sock_prep_cancel_task(_sock, &sock->write_task); 1892 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1893 * currently can use a while loop here. */ 1894 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1895 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1896 uring_sock_group_impl_poll(_group, 32, NULL); 1897 } 1898 } 1899 1900 if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1901 _sock_prep_cancel_task(_sock, &sock->read_task); 1902 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1903 * currently can use a while loop here. */ 1904 while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1905 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1906 uring_sock_group_impl_poll(_group, 32, NULL); 1907 } 1908 } 1909 1910 if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1911 _sock_prep_cancel_task(_sock, &sock->errqueue_task); 1912 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so 1913 * currently can use a while loop here. */ 1914 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || 1915 (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { 1916 uring_sock_group_impl_poll(_group, 32, NULL); 1917 } 1918 } 1919 1920 /* Make sure the cancelling the tasks above didn't cause sending new requests */ 1921 assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1922 assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1923 assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE); 1924 1925 if (sock->pending_recv) { 1926 TAILQ_REMOVE(&group->pending_recv, sock, link); 1927 sock->pending_recv = false; 1928 } 1929 assert(sock->pending_recv == false); 1930 1931 /* We have no way to handle this case. We could let the user read this 1932 * buffer, but the buffer came from a group and we have lost the association 1933 * to that so we couldn't release it. */ 1934 assert(STAILQ_EMPTY(&sock->recv_stream)); 1935 1936 if (sock->placement_id != -1) { 1937 spdk_sock_map_release(&g_map, sock->placement_id); 1938 } 1939 1940 sock->pending_group_remove = false; 1941 sock->group = NULL; 1942 return 0; 1943 } 1944 1945 static int 1946 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1947 { 1948 struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); 1949 1950 /* try to reap all the active I/O */ 1951 while (group->io_inflight) { 1952 uring_sock_group_impl_poll(_group, 32, NULL); 1953 } 1954 assert(group->io_inflight == 0); 1955 assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); 1956 1957 uring_sock_group_impl_buf_pool_free(group); 1958 1959 io_uring_queue_exit(&group->uring); 1960 1961 if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1962 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1963 } 1964 1965 free(group); 1966 return 0; 1967 } 1968 1969 static int 1970 uring_sock_flush(struct spdk_sock *_sock) 1971 { 1972 struct spdk_uring_sock *sock = __uring_sock(_sock); 1973 struct msghdr msg = {}; 1974 struct iovec iovs[IOV_BATCH_SIZE]; 1975 int iovcnt; 1976 ssize_t rc; 1977 int flags = sock->zcopy_send_flags; 1978 int retval; 1979 bool is_zcopy = false; 1980 struct spdk_uring_task *task = &sock->errqueue_task; 1981 1982 /* Can't flush from within a callback or we end up with recursive calls */ 1983 if (_sock->cb_cnt > 0) { 1984 errno = EAGAIN; 1985 return -1; 1986 } 1987 1988 /* Can't flush while a write is already outstanding */ 1989 if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { 1990 errno = EAGAIN; 1991 return -1; 1992 } 1993 1994 /* Gather an iov */ 1995 iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags); 1996 if (iovcnt == 0) { 1997 /* Nothing to send */ 1998 return 0; 1999 } 2000 2001 /* Perform the vectored write */ 2002 msg.msg_iov = iovs; 2003 msg.msg_iovlen = iovcnt; 2004 rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT); 2005 if (rc <= 0) { 2006 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) { 2007 errno = EAGAIN; 2008 } 2009 return -1; 2010 } 2011 2012 #ifdef SPDK_ZEROCOPY 2013 is_zcopy = flags & MSG_ZEROCOPY; 2014 #endif 2015 retval = sock_complete_write_reqs(_sock, rc, is_zcopy); 2016 if (retval < 0) { 2017 /* if the socket is closed, return to avoid heap-use-after-free error */ 2018 errno = ENOTCONN; 2019 return -1; 2020 } 2021 2022 #ifdef SPDK_ZEROCOPY 2023 /* At least do once to check zero copy case */ 2024 if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) { 2025 retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE); 2026 if (retval < 0) { 2027 if (errno == EWOULDBLOCK || errno == EAGAIN) { 2028 return rc; 2029 } 2030 } 2031 _sock_check_zcopy(_sock, retval);; 2032 } 2033 #endif 2034 2035 return rc; 2036 } 2037 2038 static int 2039 uring_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events, 2040 spdk_interrupt_fn fn, void *arg, const char *name) 2041 { 2042 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation."); 2043 2044 return -ENOTSUP; 2045 } 2046 2047 static void 2048 uring_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group) 2049 { 2050 } 2051 2052 static struct spdk_net_impl g_uring_net_impl = { 2053 .name = "uring", 2054 .getaddr = uring_sock_getaddr, 2055 .get_interface_name = uring_sock_get_interface_name, 2056 .get_numa_id = uring_sock_get_numa_id, 2057 .connect = uring_sock_connect, 2058 .listen = uring_sock_listen, 2059 .accept = uring_sock_accept, 2060 .close = uring_sock_close, 2061 .recv = uring_sock_recv, 2062 .readv = uring_sock_readv, 2063 .writev = uring_sock_writev, 2064 .recv_next = uring_sock_recv_next, 2065 .writev_async = uring_sock_writev_async, 2066 .flush = uring_sock_flush, 2067 .set_recvlowat = uring_sock_set_recvlowat, 2068 .set_recvbuf = uring_sock_set_recvbuf, 2069 .set_sendbuf = uring_sock_set_sendbuf, 2070 .is_ipv6 = uring_sock_is_ipv6, 2071 .is_ipv4 = uring_sock_is_ipv4, 2072 .is_connected = uring_sock_is_connected, 2073 .group_impl_get_optimal = uring_sock_group_impl_get_optimal, 2074 .group_impl_create = uring_sock_group_impl_create, 2075 .group_impl_add_sock = uring_sock_group_impl_add_sock, 2076 .group_impl_remove_sock = uring_sock_group_impl_remove_sock, 2077 .group_impl_poll = uring_sock_group_impl_poll, 2078 .group_impl_register_interrupt = uring_sock_group_impl_register_interrupt, 2079 .group_impl_unregister_interrupt = uring_sock_group_impl_unregister_interrupt, 2080 .group_impl_close = uring_sock_group_impl_close, 2081 .get_opts = uring_sock_impl_get_opts, 2082 .set_opts = uring_sock_impl_set_opts, 2083 }; 2084 2085 __attribute__((constructor)) static void 2086 net_impl_register_uring(void) 2087 { 2088 struct spdk_sock_group_impl *impl; 2089 2090 /* Check if we can create a uring sock group before we register 2091 * it as a valid impl. */ 2092 impl = uring_sock_group_impl_create(); 2093 if (impl) { 2094 uring_sock_group_impl_close(impl); 2095 spdk_net_impl_register(&g_uring_net_impl); 2096 } 2097 } 2098