1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. All rights reserved. 3 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #if defined(__FreeBSD__) 10 #include <sys/event.h> 11 #define SPDK_KEVENT 12 #else 13 #include <sys/epoll.h> 14 #define SPDK_EPOLL 15 #endif 16 17 #if defined(__linux__) 18 #include <linux/errqueue.h> 19 #endif 20 21 #include "spdk/env.h" 22 #include "spdk/log.h" 23 #include "spdk/pipe.h" 24 #include "spdk/sock.h" 25 #include "spdk/util.h" 26 #include "spdk/string.h" 27 #include "spdk_internal/sock.h" 28 #include "../sock_kernel.h" 29 30 #include "openssl/crypto.h" 31 #include "openssl/err.h" 32 #include "openssl/ssl.h" 33 34 #define MAX_TMPBUF 1024 35 #define PORTNUMLEN 32 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 struct spdk_posix_sock { 42 struct spdk_sock base; 43 int fd; 44 45 uint32_t sendmsg_idx; 46 47 struct spdk_pipe *recv_pipe; 48 void *recv_buf; 49 int recv_buf_sz; 50 bool pipe_has_data; 51 bool socket_has_data; 52 bool zcopy; 53 54 int placement_id; 55 56 SSL_CTX *ctx; 57 SSL *ssl; 58 59 TAILQ_ENTRY(spdk_posix_sock) link; 60 }; 61 62 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 63 64 struct spdk_posix_sock_group_impl { 65 struct spdk_sock_group_impl base; 66 int fd; 67 struct spdk_has_data_list socks_with_data; 68 int placement_id; 69 }; 70 71 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 72 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 73 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 74 .enable_recv_pipe = true, 75 .enable_quickack = false, 76 .enable_placement_id = PLACEMENT_NONE, 77 .enable_zerocopy_send_server = true, 78 .enable_zerocopy_send_client = false, 79 .zerocopy_threshold = 0, 80 .tls_version = 0, 81 .enable_ktls = false, 82 .psk_key = NULL, 83 .psk_key_size = 0, 84 .psk_identity = NULL, 85 .get_key = NULL, 86 .get_key_ctx = NULL, 87 .tls_cipher_suites = NULL 88 }; 89 90 static struct spdk_sock_map g_map = { 91 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 92 .mtx = PTHREAD_MUTEX_INITIALIZER 93 }; 94 95 __attribute((destructor)) static void 96 posix_sock_map_cleanup(void) 97 { 98 spdk_sock_map_cleanup(&g_map); 99 } 100 101 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 102 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 103 104 static void 105 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 106 size_t len) 107 { 108 #define FIELD_OK(field) \ 109 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 110 111 #define SET_FIELD(field) \ 112 if (FIELD_OK(field)) { \ 113 dest->field = src->field; \ 114 } 115 116 SET_FIELD(recv_buf_size); 117 SET_FIELD(send_buf_size); 118 SET_FIELD(enable_recv_pipe); 119 SET_FIELD(enable_zerocopy_send); 120 SET_FIELD(enable_quickack); 121 SET_FIELD(enable_placement_id); 122 SET_FIELD(enable_zerocopy_send_server); 123 SET_FIELD(enable_zerocopy_send_client); 124 SET_FIELD(zerocopy_threshold); 125 SET_FIELD(tls_version); 126 SET_FIELD(enable_ktls); 127 SET_FIELD(psk_key); 128 SET_FIELD(psk_key_size); 129 SET_FIELD(psk_identity); 130 SET_FIELD(get_key); 131 SET_FIELD(get_key_ctx); 132 SET_FIELD(tls_cipher_suites); 133 134 #undef SET_FIELD 135 #undef FIELD_OK 136 } 137 138 static int 139 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 140 { 141 if (!opts || !len) { 142 errno = EINVAL; 143 return -1; 144 } 145 146 assert(sizeof(*opts) >= *len); 147 memset(opts, 0, *len); 148 149 posix_sock_copy_impl_opts(opts, &g_spdk_posix_sock_impl_opts, *len); 150 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 151 152 return 0; 153 } 154 155 static int 156 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 157 { 158 if (!opts) { 159 errno = EINVAL; 160 return -1; 161 } 162 163 assert(sizeof(*opts) >= len); 164 posix_sock_copy_impl_opts(&g_spdk_posix_sock_impl_opts, opts, len); 165 166 return 0; 167 } 168 169 static void 170 posix_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest) 171 { 172 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 173 memcpy(dest, &g_spdk_posix_sock_impl_opts, sizeof(*dest)); 174 175 if (opts->impl_opts != NULL) { 176 assert(sizeof(*dest) >= opts->impl_opts_size); 177 posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 178 } 179 } 180 181 static int 182 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 183 char *caddr, int clen, uint16_t *cport) 184 { 185 struct spdk_posix_sock *sock = __posix_sock(_sock); 186 struct sockaddr_storage sa; 187 socklen_t salen; 188 int rc; 189 190 assert(sock != NULL); 191 192 memset(&sa, 0, sizeof sa); 193 salen = sizeof sa; 194 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 195 if (rc != 0) { 196 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 197 return -1; 198 } 199 200 switch (sa.ss_family) { 201 case AF_UNIX: 202 /* Acceptable connection types that don't have IPs */ 203 return 0; 204 case AF_INET: 205 case AF_INET6: 206 /* Code below will get IP addresses */ 207 break; 208 default: 209 /* Unsupported socket family */ 210 return -1; 211 } 212 213 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 214 if (rc != 0) { 215 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 216 return -1; 217 } 218 219 if (sport) { 220 if (sa.ss_family == AF_INET) { 221 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 222 } else if (sa.ss_family == AF_INET6) { 223 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 224 } 225 } 226 227 memset(&sa, 0, sizeof sa); 228 salen = sizeof sa; 229 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 230 if (rc != 0) { 231 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 232 return -1; 233 } 234 235 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 236 if (rc != 0) { 237 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 238 return -1; 239 } 240 241 if (cport) { 242 if (sa.ss_family == AF_INET) { 243 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 244 } else if (sa.ss_family == AF_INET6) { 245 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 246 } 247 } 248 249 return 0; 250 } 251 252 enum posix_sock_create_type { 253 SPDK_SOCK_CREATE_LISTEN, 254 SPDK_SOCK_CREATE_CONNECT, 255 }; 256 257 static int 258 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 259 { 260 uint8_t *new_buf; 261 struct spdk_pipe *new_pipe; 262 struct iovec siov[2]; 263 struct iovec diov[2]; 264 int sbytes; 265 ssize_t bytes; 266 int rc; 267 268 if (sock->recv_buf_sz == sz) { 269 return 0; 270 } 271 272 /* If the new size is 0, just free the pipe */ 273 if (sz == 0) { 274 spdk_pipe_destroy(sock->recv_pipe); 275 free(sock->recv_buf); 276 sock->recv_pipe = NULL; 277 sock->recv_buf = NULL; 278 return 0; 279 } else if (sz < MIN_SOCK_PIPE_SIZE) { 280 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 281 return -1; 282 } 283 284 /* Round up to next 64 byte multiple */ 285 rc = posix_memalign((void **)&new_buf, 64, sz); 286 if (rc != 0) { 287 SPDK_ERRLOG("socket recv buf allocation failed\n"); 288 return -ENOMEM; 289 } 290 memset(new_buf, 0, sz); 291 292 new_pipe = spdk_pipe_create(new_buf, sz); 293 if (new_pipe == NULL) { 294 SPDK_ERRLOG("socket pipe allocation failed\n"); 295 free(new_buf); 296 return -ENOMEM; 297 } 298 299 if (sock->recv_pipe != NULL) { 300 /* Pull all of the data out of the old pipe */ 301 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 302 if (sbytes > sz) { 303 /* Too much data to fit into the new pipe size */ 304 spdk_pipe_destroy(new_pipe); 305 free(new_buf); 306 return -EINVAL; 307 } 308 309 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 310 assert(sbytes == sz); 311 312 bytes = spdk_iovcpy(siov, 2, diov, 2); 313 spdk_pipe_writer_advance(new_pipe, bytes); 314 315 spdk_pipe_destroy(sock->recv_pipe); 316 free(sock->recv_buf); 317 } 318 319 sock->recv_buf_sz = sz; 320 sock->recv_buf = new_buf; 321 sock->recv_pipe = new_pipe; 322 323 return 0; 324 } 325 326 static int 327 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 328 { 329 struct spdk_posix_sock *sock = __posix_sock(_sock); 330 int min_size; 331 int rc; 332 333 assert(sock != NULL); 334 335 if (_sock->impl_opts.enable_recv_pipe) { 336 rc = posix_sock_alloc_pipe(sock, sz); 337 if (rc) { 338 return rc; 339 } 340 } 341 342 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 343 * g_spdk_posix_sock_impl_opts.recv_buf_size. */ 344 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_posix_sock_impl_opts.recv_buf_size); 345 346 if (sz < min_size) { 347 sz = min_size; 348 } 349 350 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 351 if (rc < 0) { 352 return rc; 353 } 354 355 _sock->impl_opts.recv_buf_size = sz; 356 357 return 0; 358 } 359 360 static int 361 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 362 { 363 struct spdk_posix_sock *sock = __posix_sock(_sock); 364 int min_size; 365 int rc; 366 367 assert(sock != NULL); 368 369 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 370 * g_spdk_posix_sock_impl_opts.send_buf_size. */ 371 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_posix_sock_impl_opts.send_buf_size); 372 373 if (sz < min_size) { 374 sz = min_size; 375 } 376 377 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 378 if (rc < 0) { 379 return rc; 380 } 381 382 _sock->impl_opts.send_buf_size = sz; 383 384 return 0; 385 } 386 387 static void 388 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 389 { 390 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 391 int flag; 392 int rc; 393 #endif 394 395 #if defined(SPDK_ZEROCOPY) 396 flag = 1; 397 398 if (enable_zero_copy) { 399 /* Try to turn on zero copy sends */ 400 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 401 if (rc == 0) { 402 sock->zcopy = true; 403 } 404 } 405 #endif 406 407 #if defined(__linux__) 408 flag = 1; 409 410 if (sock->base.impl_opts.enable_quickack) { 411 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 412 if (rc != 0) { 413 SPDK_ERRLOG("quickack was failed to set\n"); 414 } 415 } 416 417 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 418 &sock->placement_id); 419 420 if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) { 421 /* Save placement_id */ 422 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 423 } 424 #endif 425 } 426 427 static struct spdk_posix_sock * 428 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 429 { 430 struct spdk_posix_sock *sock; 431 432 sock = calloc(1, sizeof(*sock)); 433 if (sock == NULL) { 434 SPDK_ERRLOG("sock allocation failed\n"); 435 return NULL; 436 } 437 438 sock->fd = fd; 439 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 440 posix_sock_init(sock, enable_zero_copy); 441 442 return sock; 443 } 444 445 static int 446 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts, 447 struct spdk_sock_impl_opts *impl_opts) 448 { 449 int fd; 450 int val = 1; 451 int rc, sz; 452 #if defined(__linux__) 453 int to; 454 #endif 455 456 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 457 if (fd < 0) { 458 /* error */ 459 return -1; 460 } 461 462 sz = impl_opts->recv_buf_size; 463 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 464 if (rc) { 465 /* Not fatal */ 466 } 467 468 sz = impl_opts->send_buf_size; 469 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 470 if (rc) { 471 /* Not fatal */ 472 } 473 474 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 475 if (rc != 0) { 476 close(fd); 477 /* error */ 478 return -1; 479 } 480 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 481 if (rc != 0) { 482 close(fd); 483 /* error */ 484 return -1; 485 } 486 487 #if defined(SO_PRIORITY) 488 if (opts->priority) { 489 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 490 if (rc != 0) { 491 close(fd); 492 /* error */ 493 return -1; 494 } 495 } 496 #endif 497 498 if (res->ai_family == AF_INET6) { 499 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 500 if (rc != 0) { 501 close(fd); 502 /* error */ 503 return -1; 504 } 505 } 506 507 if (opts->ack_timeout) { 508 #if defined(__linux__) 509 to = opts->ack_timeout; 510 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to)); 511 if (rc != 0) { 512 close(fd); 513 /* error */ 514 return -1; 515 } 516 #else 517 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 518 #endif 519 } 520 521 return fd; 522 } 523 524 static int 525 posix_sock_psk_find_session_server_cb(SSL *ssl, const unsigned char *identity, 526 size_t identity_len, SSL_SESSION **sess) 527 { 528 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 529 uint8_t key[SSL_MAX_MASTER_KEY_LENGTH] = {}; 530 int keylen; 531 int rc, i; 532 STACK_OF(SSL_CIPHER) *ciphers; 533 const SSL_CIPHER *cipher; 534 const char *cipher_name; 535 const char *user_cipher = NULL; 536 bool found = false; 537 538 if (impl_opts->get_key) { 539 rc = impl_opts->get_key(key, sizeof(key), &user_cipher, identity, impl_opts->get_key_ctx); 540 if (rc < 0) { 541 SPDK_ERRLOG("Unable to find PSK for identity: %s\n", identity); 542 return 0; 543 } 544 keylen = rc; 545 } else { 546 if (impl_opts->psk_key == NULL) { 547 SPDK_ERRLOG("PSK is not set\n"); 548 return 0; 549 } 550 551 SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(impl_opts->psk_identity)); 552 if (strcmp(impl_opts->psk_identity, identity) != 0) { 553 SPDK_ERRLOG("Unknown Client's PSK ID\n"); 554 return 0; 555 } 556 keylen = impl_opts->psk_key_size; 557 558 memcpy(key, impl_opts->psk_key, keylen); 559 user_cipher = impl_opts->tls_cipher_suites; 560 } 561 562 if (user_cipher == NULL) { 563 SPDK_ERRLOG("Cipher suite not set\n"); 564 return 0; 565 } 566 567 *sess = SSL_SESSION_new(); 568 if (*sess == NULL) { 569 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 570 return 0; 571 } 572 573 ciphers = SSL_get_ciphers(ssl); 574 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 575 cipher = sk_SSL_CIPHER_value(ciphers, i); 576 cipher_name = SSL_CIPHER_get_name(cipher); 577 578 if (strcmp(user_cipher, cipher_name) == 0) { 579 rc = SSL_SESSION_set_cipher(*sess, cipher); 580 if (rc != 1) { 581 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 582 goto err; 583 } 584 found = true; 585 break; 586 } 587 } 588 if (found == false) { 589 SPDK_ERRLOG("No suitable cipher found\n"); 590 goto err; 591 } 592 593 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 594 595 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 596 if (rc != 1) { 597 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 598 goto err; 599 } 600 601 rc = SSL_SESSION_set1_master_key(*sess, key, keylen); 602 if (rc != 1) { 603 SPDK_ERRLOG("Unable to set PSK for session\n"); 604 goto err; 605 } 606 607 return 1; 608 609 err: 610 SSL_SESSION_free(*sess); 611 *sess = NULL; 612 return 0; 613 } 614 615 static int 616 posix_sock_psk_use_session_client_cb(SSL *ssl, const EVP_MD *md, const unsigned char **identity, 617 size_t *identity_len, SSL_SESSION **sess) 618 { 619 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 620 int rc, i; 621 STACK_OF(SSL_CIPHER) *ciphers; 622 const SSL_CIPHER *cipher; 623 const char *cipher_name; 624 long keylen; 625 bool found = false; 626 627 if (impl_opts->psk_key == NULL) { 628 SPDK_ERRLOG("PSK is not set\n"); 629 return 0; 630 } 631 if (impl_opts->psk_key_size > SSL_MAX_MASTER_KEY_LENGTH) { 632 SPDK_ERRLOG("PSK too long\n"); 633 return 0; 634 } 635 keylen = impl_opts->psk_key_size; 636 637 if (impl_opts->tls_cipher_suites == NULL) { 638 SPDK_ERRLOG("Cipher suite not set\n"); 639 return 0; 640 } 641 *sess = SSL_SESSION_new(); 642 if (*sess == NULL) { 643 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 644 return 0; 645 } 646 647 ciphers = SSL_get_ciphers(ssl); 648 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 649 cipher = sk_SSL_CIPHER_value(ciphers, i); 650 cipher_name = SSL_CIPHER_get_name(cipher); 651 652 if (strcmp(impl_opts->tls_cipher_suites, cipher_name) == 0) { 653 rc = SSL_SESSION_set_cipher(*sess, cipher); 654 if (rc != 1) { 655 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 656 goto err; 657 } 658 found = true; 659 break; 660 } 661 } 662 if (found == false) { 663 SPDK_ERRLOG("No suitable cipher found\n"); 664 goto err; 665 } 666 667 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 668 669 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 670 if (rc != 1) { 671 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 672 goto err; 673 } 674 675 rc = SSL_SESSION_set1_master_key(*sess, impl_opts->psk_key, keylen); 676 if (rc != 1) { 677 SPDK_ERRLOG("Unable to set PSK for session\n"); 678 goto err; 679 } 680 681 *identity_len = strlen(impl_opts->psk_identity); 682 *identity = impl_opts->psk_identity; 683 684 return 1; 685 686 err: 687 SSL_SESSION_free(*sess); 688 *sess = NULL; 689 return 0; 690 } 691 692 static SSL_CTX * 693 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts, 694 struct spdk_sock_impl_opts *impl_opts) 695 { 696 SSL_CTX *ctx; 697 int tls_version = 0; 698 bool ktls_enabled = false; 699 #ifdef SSL_OP_ENABLE_KTLS 700 long options; 701 #endif 702 703 SSL_library_init(); 704 OpenSSL_add_all_algorithms(); 705 SSL_load_error_strings(); 706 /* Produce a SSL CTX in SSL V2 and V3 standards compliant way */ 707 ctx = SSL_CTX_new(method); 708 if (!ctx) { 709 SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 710 return NULL; 711 } 712 SPDK_DEBUGLOG(sock_posix, "SSL context created\n"); 713 714 switch (impl_opts->tls_version) { 715 case 0: 716 /* auto-negotioation */ 717 break; 718 case SPDK_TLS_VERSION_1_3: 719 tls_version = TLS1_3_VERSION; 720 break; 721 default: 722 SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version); 723 goto err; 724 } 725 726 if (tls_version) { 727 SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version, 728 tls_version); 729 if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) { 730 SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 731 goto err; 732 } 733 if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) { 734 SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 735 goto err; 736 } 737 } 738 if (impl_opts->enable_ktls) { 739 SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n"); 740 #ifdef SSL_OP_ENABLE_KTLS 741 options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS); 742 ktls_enabled = options & SSL_OP_ENABLE_KTLS; 743 #else 744 ktls_enabled = false; 745 #endif 746 if (!ktls_enabled) { 747 SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n"); 748 goto err; 749 } 750 } 751 752 /* SSL_CTX_set_ciphersuites() return 1 if the requested 753 * cipher suite list was configured, and 0 otherwise. */ 754 if (impl_opts->tls_cipher_suites != NULL && 755 SSL_CTX_set_ciphersuites(ctx, impl_opts->tls_cipher_suites) != 1) { 756 SPDK_ERRLOG("Unable to set TLS cipher suites for SSL'\n"); 757 goto err; 758 } 759 760 return ctx; 761 762 err: 763 SSL_CTX_free(ctx); 764 return NULL; 765 } 766 767 static SSL * 768 ssl_sock_connect_loop(SSL_CTX *ctx, int fd, struct spdk_sock_impl_opts *impl_opts) 769 { 770 int rc; 771 SSL *ssl; 772 int ssl_get_error; 773 774 ssl = SSL_new(ctx); 775 if (!ssl) { 776 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 777 return NULL; 778 } 779 SSL_set_fd(ssl, fd); 780 SSL_set_app_data(ssl, impl_opts); 781 SSL_set_psk_use_session_callback(ssl, posix_sock_psk_use_session_client_cb); 782 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 783 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 784 while ((rc = SSL_connect(ssl)) != 1) { 785 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 786 ssl_get_error = SSL_get_error(ssl, rc); 787 SPDK_DEBUGLOG(sock_posix, "SSL_connect failed %d = SSL_connect(%p), %d = SSL_get_error(%p, %d)\n", 788 rc, ssl, ssl_get_error, ssl, rc); 789 switch (ssl_get_error) { 790 case SSL_ERROR_WANT_READ: 791 case SSL_ERROR_WANT_WRITE: 792 continue; 793 default: 794 break; 795 } 796 SPDK_ERRLOG("SSL_connect() failed, errno = %d\n", errno); 797 SSL_free(ssl); 798 return NULL; 799 } 800 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 801 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 802 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 803 return ssl; 804 } 805 806 static SSL * 807 ssl_sock_accept_loop(SSL_CTX *ctx, int fd, struct spdk_sock_impl_opts *impl_opts) 808 { 809 int rc; 810 SSL *ssl; 811 int ssl_get_error; 812 813 ssl = SSL_new(ctx); 814 if (!ssl) { 815 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 816 return NULL; 817 } 818 SSL_set_fd(ssl, fd); 819 SSL_set_app_data(ssl, impl_opts); 820 SSL_set_psk_find_session_callback(ssl, posix_sock_psk_find_session_server_cb); 821 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 822 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 823 while ((rc = SSL_accept(ssl)) != 1) { 824 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 825 ssl_get_error = SSL_get_error(ssl, rc); 826 SPDK_DEBUGLOG(sock_posix, "SSL_accept failed %d = SSL_accept(%p), %d = SSL_get_error(%p, %d)\n", rc, 827 ssl, ssl_get_error, ssl, rc); 828 switch (ssl_get_error) { 829 case SSL_ERROR_WANT_READ: 830 case SSL_ERROR_WANT_WRITE: 831 continue; 832 default: 833 break; 834 } 835 SPDK_ERRLOG("SSL_accept() failed, errno = %d\n", errno); 836 SSL_free(ssl); 837 return NULL; 838 } 839 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 840 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 841 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 842 return ssl; 843 } 844 845 static ssize_t 846 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt) 847 { 848 int i, rc = 0; 849 ssize_t total = 0; 850 851 for (i = 0; i < iovcnt; i++) { 852 rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len); 853 854 if (rc > 0) { 855 total += rc; 856 } 857 if (rc != (int)iov[i].iov_len) { 858 break; 859 } 860 } 861 if (total > 0) { 862 errno = 0; 863 return total; 864 } 865 switch (SSL_get_error(ssl, rc)) { 866 case SSL_ERROR_ZERO_RETURN: 867 errno = ENOTCONN; 868 return 0; 869 case SSL_ERROR_WANT_READ: 870 case SSL_ERROR_WANT_WRITE: 871 case SSL_ERROR_WANT_CONNECT: 872 case SSL_ERROR_WANT_ACCEPT: 873 case SSL_ERROR_WANT_X509_LOOKUP: 874 case SSL_ERROR_WANT_ASYNC: 875 case SSL_ERROR_WANT_ASYNC_JOB: 876 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 877 errno = EAGAIN; 878 return -1; 879 case SSL_ERROR_SYSCALL: 880 case SSL_ERROR_SSL: 881 errno = ENOTCONN; 882 return -1; 883 default: 884 errno = ENOTCONN; 885 return -1; 886 } 887 } 888 889 static ssize_t 890 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt) 891 { 892 int i, rc = 0; 893 ssize_t total = 0; 894 895 for (i = 0; i < iovcnt; i++) { 896 rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len); 897 898 if (rc > 0) { 899 total += rc; 900 } 901 if (rc != (int)iov[i].iov_len) { 902 break; 903 } 904 } 905 if (total > 0) { 906 errno = 0; 907 return total; 908 } 909 switch (SSL_get_error(ssl, rc)) { 910 case SSL_ERROR_ZERO_RETURN: 911 errno = ENOTCONN; 912 return 0; 913 case SSL_ERROR_WANT_READ: 914 case SSL_ERROR_WANT_WRITE: 915 case SSL_ERROR_WANT_CONNECT: 916 case SSL_ERROR_WANT_ACCEPT: 917 case SSL_ERROR_WANT_X509_LOOKUP: 918 case SSL_ERROR_WANT_ASYNC: 919 case SSL_ERROR_WANT_ASYNC_JOB: 920 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 921 errno = EAGAIN; 922 return -1; 923 case SSL_ERROR_SYSCALL: 924 case SSL_ERROR_SSL: 925 errno = ENOTCONN; 926 return -1; 927 default: 928 errno = ENOTCONN; 929 return -1; 930 } 931 } 932 933 static struct spdk_sock * 934 posix_sock_create(const char *ip, int port, 935 enum posix_sock_create_type type, 936 struct spdk_sock_opts *opts, 937 bool enable_ssl) 938 { 939 struct spdk_posix_sock *sock; 940 struct spdk_sock_impl_opts impl_opts; 941 char buf[MAX_TMPBUF]; 942 char portnum[PORTNUMLEN]; 943 char *p; 944 struct addrinfo hints, *res, *res0; 945 int fd, flag; 946 int rc; 947 bool enable_zcopy_user_opts = true; 948 bool enable_zcopy_impl_opts = true; 949 SSL_CTX *ctx = 0; 950 SSL *ssl = 0; 951 952 assert(opts != NULL); 953 posix_opts_get_impl_opts(opts, &impl_opts); 954 955 if (ip == NULL) { 956 return NULL; 957 } 958 if (ip[0] == '[') { 959 snprintf(buf, sizeof(buf), "%s", ip + 1); 960 p = strchr(buf, ']'); 961 if (p != NULL) { 962 *p = '\0'; 963 } 964 ip = (const char *) &buf[0]; 965 } 966 967 snprintf(portnum, sizeof portnum, "%d", port); 968 memset(&hints, 0, sizeof hints); 969 hints.ai_family = PF_UNSPEC; 970 hints.ai_socktype = SOCK_STREAM; 971 hints.ai_flags = AI_NUMERICSERV; 972 hints.ai_flags |= AI_PASSIVE; 973 hints.ai_flags |= AI_NUMERICHOST; 974 rc = getaddrinfo(ip, portnum, &hints, &res0); 975 if (rc != 0) { 976 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 977 return NULL; 978 } 979 980 /* try listen */ 981 fd = -1; 982 for (res = res0; res != NULL; res = res->ai_next) { 983 retry: 984 fd = posix_fd_create(res, opts, &impl_opts); 985 if (fd < 0) { 986 continue; 987 } 988 if (type == SPDK_SOCK_CREATE_LISTEN) { 989 rc = bind(fd, res->ai_addr, res->ai_addrlen); 990 if (rc != 0) { 991 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 992 switch (errno) { 993 case EINTR: 994 /* interrupted? */ 995 close(fd); 996 goto retry; 997 case EADDRNOTAVAIL: 998 SPDK_ERRLOG("IP address %s not available. " 999 "Verify IP address in config file " 1000 "and make sure setup script is " 1001 "run before starting spdk app.\n", ip); 1002 /* FALLTHROUGH */ 1003 default: 1004 /* try next family */ 1005 close(fd); 1006 fd = -1; 1007 continue; 1008 } 1009 } 1010 /* bind OK */ 1011 rc = listen(fd, 512); 1012 if (rc != 0) { 1013 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 1014 close(fd); 1015 fd = -1; 1016 break; 1017 } 1018 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 1019 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 1020 rc = connect(fd, res->ai_addr, res->ai_addrlen); 1021 if (rc != 0) { 1022 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 1023 /* try next family */ 1024 close(fd); 1025 fd = -1; 1026 continue; 1027 } 1028 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 1029 if (enable_ssl) { 1030 ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts); 1031 if (!ctx) { 1032 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1033 close(fd); 1034 fd = -1; 1035 break; 1036 } 1037 ssl = ssl_sock_connect_loop(ctx, fd, &impl_opts); 1038 if (!ssl) { 1039 SPDK_ERRLOG("ssl_sock_connect_loop() failed, errno = %d\n", errno); 1040 close(fd); 1041 fd = -1; 1042 SSL_CTX_free(ctx); 1043 break; 1044 } 1045 } 1046 } 1047 1048 flag = fcntl(fd, F_GETFL); 1049 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 1050 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1051 SSL_free(ssl); 1052 SSL_CTX_free(ctx); 1053 close(fd); 1054 fd = -1; 1055 break; 1056 } 1057 break; 1058 } 1059 freeaddrinfo(res0); 1060 1061 if (fd < 0) { 1062 return NULL; 1063 } 1064 1065 /* Only enable zero copy for non-loopback and non-ssl sockets. */ 1066 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd) && !enable_ssl; 1067 1068 sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 1069 if (sock == NULL) { 1070 SPDK_ERRLOG("sock allocation failed\n"); 1071 SSL_free(ssl); 1072 SSL_CTX_free(ctx); 1073 close(fd); 1074 return NULL; 1075 } 1076 1077 if (ctx) { 1078 sock->ctx = ctx; 1079 } 1080 1081 if (ssl) { 1082 sock->ssl = ssl; 1083 } 1084 1085 return &sock->base; 1086 } 1087 1088 static struct spdk_sock * 1089 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 1090 { 1091 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false); 1092 } 1093 1094 static struct spdk_sock * 1095 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 1096 { 1097 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false); 1098 } 1099 1100 static struct spdk_sock * 1101 _posix_sock_accept(struct spdk_sock *_sock, bool enable_ssl) 1102 { 1103 struct spdk_posix_sock *sock = __posix_sock(_sock); 1104 struct sockaddr_storage sa; 1105 socklen_t salen; 1106 int rc, fd; 1107 struct spdk_posix_sock *new_sock; 1108 int flag; 1109 SSL_CTX *ctx = 0; 1110 SSL *ssl = 0; 1111 1112 memset(&sa, 0, sizeof(sa)); 1113 salen = sizeof(sa); 1114 1115 assert(sock != NULL); 1116 1117 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 1118 1119 if (rc == -1) { 1120 return NULL; 1121 } 1122 1123 fd = rc; 1124 1125 flag = fcntl(fd, F_GETFL); 1126 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 1127 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1128 close(fd); 1129 return NULL; 1130 } 1131 1132 #if defined(SO_PRIORITY) 1133 /* The priority is not inherited, so call this function again */ 1134 if (sock->base.opts.priority) { 1135 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 1136 if (rc != 0) { 1137 close(fd); 1138 return NULL; 1139 } 1140 } 1141 #endif 1142 1143 /* Establish SSL connection */ 1144 if (enable_ssl) { 1145 ctx = posix_sock_create_ssl_context(TLS_server_method(), &sock->base.opts, &sock->base.impl_opts); 1146 if (!ctx) { 1147 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1148 close(fd); 1149 return NULL; 1150 } 1151 ssl = ssl_sock_accept_loop(ctx, fd, &sock->base.impl_opts); 1152 if (!ssl) { 1153 SPDK_ERRLOG("ssl_sock_accept_loop() failed, errno = %d\n", errno); 1154 close(fd); 1155 SSL_CTX_free(ctx); 1156 return NULL; 1157 } 1158 } 1159 1160 /* Inherit the zero copy feature from the listen socket */ 1161 new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 1162 if (new_sock == NULL) { 1163 close(fd); 1164 SSL_free(ssl); 1165 SSL_CTX_free(ctx); 1166 return NULL; 1167 } 1168 1169 if (ctx) { 1170 new_sock->ctx = ctx; 1171 } 1172 1173 if (ssl) { 1174 new_sock->ssl = ssl; 1175 } 1176 1177 return &new_sock->base; 1178 } 1179 1180 static struct spdk_sock * 1181 posix_sock_accept(struct spdk_sock *_sock) 1182 { 1183 return _posix_sock_accept(_sock, false); 1184 } 1185 1186 static int 1187 posix_sock_close(struct spdk_sock *_sock) 1188 { 1189 struct spdk_posix_sock *sock = __posix_sock(_sock); 1190 1191 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 1192 1193 if (sock->ssl != NULL) { 1194 SSL_shutdown(sock->ssl); 1195 } 1196 1197 /* If the socket fails to close, the best choice is to 1198 * leak the fd but continue to free the rest of the sock 1199 * memory. */ 1200 close(sock->fd); 1201 1202 SSL_free(sock->ssl); 1203 SSL_CTX_free(sock->ctx); 1204 1205 spdk_pipe_destroy(sock->recv_pipe); 1206 free(sock->recv_buf); 1207 free(sock); 1208 1209 return 0; 1210 } 1211 1212 #ifdef SPDK_ZEROCOPY 1213 static int 1214 _sock_check_zcopy(struct spdk_sock *sock) 1215 { 1216 struct spdk_posix_sock *psock = __posix_sock(sock); 1217 struct msghdr msgh = {}; 1218 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 1219 ssize_t rc; 1220 struct sock_extended_err *serr; 1221 struct cmsghdr *cm; 1222 uint32_t idx; 1223 struct spdk_sock_request *req, *treq; 1224 bool found; 1225 1226 msgh.msg_control = buf; 1227 msgh.msg_controllen = sizeof(buf); 1228 1229 while (true) { 1230 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 1231 1232 if (rc < 0) { 1233 if (errno == EWOULDBLOCK || errno == EAGAIN) { 1234 return 0; 1235 } 1236 1237 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 1238 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 1239 } else { 1240 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1241 } 1242 return 0; 1243 } 1244 1245 cm = CMSG_FIRSTHDR(&msgh); 1246 if (!(cm && 1247 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1248 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 1249 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1250 return 0; 1251 } 1252 1253 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1254 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1255 SPDK_WARNLOG("Unexpected extended error origin\n"); 1256 return 0; 1257 } 1258 1259 /* Most of the time, the pending_reqs array is in the exact 1260 * order we need such that all of the requests to complete are 1261 * in order, in the front. It is guaranteed that all requests 1262 * belonging to the same sendmsg call are sequential, so once 1263 * we encounter one match we can stop looping as soon as a 1264 * non-match is found. 1265 */ 1266 idx = serr->ee_info; 1267 while (true) { 1268 found = false; 1269 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 1270 if (!req->internal.is_zcopy) { 1271 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1272 rc = spdk_sock_request_put(sock, req, 0); 1273 if (rc < 0) { 1274 return rc; 1275 } 1276 } else if (req->internal.offset == idx) { 1277 found = true; 1278 rc = spdk_sock_request_put(sock, req, 0); 1279 if (rc < 0) { 1280 return rc; 1281 } 1282 } else if (found) { 1283 break; 1284 } 1285 } 1286 1287 if (idx == serr->ee_data) { 1288 break; 1289 } 1290 1291 if (idx == UINT32_MAX) { 1292 idx = 0; 1293 } else { 1294 idx++; 1295 } 1296 } 1297 } 1298 1299 return 0; 1300 } 1301 #endif 1302 1303 static int 1304 _sock_flush(struct spdk_sock *sock) 1305 { 1306 struct spdk_posix_sock *psock = __posix_sock(sock); 1307 struct msghdr msg = {}; 1308 int flags; 1309 struct iovec iovs[IOV_BATCH_SIZE]; 1310 int iovcnt; 1311 int retval; 1312 struct spdk_sock_request *req; 1313 int i; 1314 ssize_t rc, sent; 1315 unsigned int offset; 1316 size_t len; 1317 bool is_zcopy = false; 1318 1319 /* Can't flush from within a callback or we end up with recursive calls */ 1320 if (sock->cb_cnt > 0) { 1321 errno = EAGAIN; 1322 return -1; 1323 } 1324 1325 #ifdef SPDK_ZEROCOPY 1326 if (psock->zcopy) { 1327 flags = MSG_ZEROCOPY | MSG_NOSIGNAL; 1328 } else 1329 #endif 1330 { 1331 flags = MSG_NOSIGNAL; 1332 } 1333 1334 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); 1335 if (iovcnt == 0) { 1336 return 0; 1337 } 1338 1339 #ifdef SPDK_ZEROCOPY 1340 is_zcopy = flags & MSG_ZEROCOPY; 1341 #endif 1342 1343 /* Perform the vectored write */ 1344 msg.msg_iov = iovs; 1345 msg.msg_iovlen = iovcnt; 1346 1347 if (psock->ssl) { 1348 rc = SSL_writev(psock->ssl, iovs, iovcnt); 1349 } else { 1350 rc = sendmsg(psock->fd, &msg, flags); 1351 } 1352 if (rc <= 0) { 1353 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 1354 errno = EAGAIN; 1355 } 1356 return -1; 1357 } 1358 1359 sent = rc; 1360 1361 if (is_zcopy) { 1362 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1363 * req->internal.offset, so sendmsg_idx should not be zero */ 1364 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 1365 psock->sendmsg_idx = 1; 1366 } else { 1367 psock->sendmsg_idx++; 1368 } 1369 } 1370 1371 /* Consume the requests that were actually written */ 1372 req = TAILQ_FIRST(&sock->queued_reqs); 1373 while (req) { 1374 offset = req->internal.offset; 1375 1376 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1377 req->internal.is_zcopy = is_zcopy; 1378 1379 for (i = 0; i < req->iovcnt; i++) { 1380 /* Advance by the offset first */ 1381 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1382 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1383 continue; 1384 } 1385 1386 /* Calculate the remaining length of this element */ 1387 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1388 1389 if (len > (size_t)rc) { 1390 /* This element was partially sent. */ 1391 req->internal.offset += rc; 1392 return sent; 1393 } 1394 1395 offset = 0; 1396 req->internal.offset += len; 1397 rc -= len; 1398 } 1399 1400 /* Handled a full request. */ 1401 spdk_sock_request_pend(sock, req); 1402 1403 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { 1404 /* The sendmsg syscall above isn't currently asynchronous, 1405 * so it's already done. */ 1406 retval = spdk_sock_request_put(sock, req, 0); 1407 if (retval) { 1408 break; 1409 } 1410 } else { 1411 /* Re-use the offset field to hold the sendmsg call index. The 1412 * index is 0 based, so subtract one here because we've already 1413 * incremented above. */ 1414 req->internal.offset = psock->sendmsg_idx - 1; 1415 } 1416 1417 if (rc == 0) { 1418 break; 1419 } 1420 1421 req = TAILQ_FIRST(&sock->queued_reqs); 1422 } 1423 1424 return sent; 1425 } 1426 1427 static int 1428 posix_sock_flush(struct spdk_sock *sock) 1429 { 1430 #ifdef SPDK_ZEROCOPY 1431 struct spdk_posix_sock *psock = __posix_sock(sock); 1432 1433 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 1434 _sock_check_zcopy(sock); 1435 } 1436 #endif 1437 1438 return _sock_flush(sock); 1439 } 1440 1441 static ssize_t 1442 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 1443 { 1444 struct iovec siov[2]; 1445 int sbytes; 1446 ssize_t bytes; 1447 struct spdk_posix_sock_group_impl *group; 1448 1449 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 1450 if (sbytes < 0) { 1451 errno = EINVAL; 1452 return -1; 1453 } else if (sbytes == 0) { 1454 errno = EAGAIN; 1455 return -1; 1456 } 1457 1458 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 1459 1460 if (bytes == 0) { 1461 /* The only way this happens is if diov is 0 length */ 1462 errno = EINVAL; 1463 return -1; 1464 } 1465 1466 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 1467 1468 /* If we drained the pipe, mark it appropriately */ 1469 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1470 assert(sock->pipe_has_data == true); 1471 1472 group = __posix_group_impl(sock->base.group_impl); 1473 if (group && !sock->socket_has_data) { 1474 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1475 } 1476 1477 sock->pipe_has_data = false; 1478 } 1479 1480 return bytes; 1481 } 1482 1483 static inline ssize_t 1484 posix_sock_read(struct spdk_posix_sock *sock) 1485 { 1486 struct iovec iov[2]; 1487 int bytes_avail, bytes_recvd; 1488 struct spdk_posix_sock_group_impl *group; 1489 1490 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 1491 1492 if (bytes_avail <= 0) { 1493 return bytes_avail; 1494 } 1495 1496 if (sock->ssl) { 1497 bytes_recvd = SSL_readv(sock->ssl, iov, 2); 1498 } else { 1499 bytes_recvd = readv(sock->fd, iov, 2); 1500 } 1501 1502 assert(sock->pipe_has_data == false); 1503 1504 if (bytes_recvd <= 0) { 1505 /* Errors count as draining the socket data */ 1506 if (sock->base.group_impl && sock->socket_has_data) { 1507 group = __posix_group_impl(sock->base.group_impl); 1508 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1509 } 1510 1511 sock->socket_has_data = false; 1512 1513 return bytes_recvd; 1514 } 1515 1516 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 1517 1518 #if DEBUG 1519 if (sock->base.group_impl) { 1520 assert(sock->socket_has_data == true); 1521 } 1522 #endif 1523 1524 sock->pipe_has_data = true; 1525 if (bytes_recvd < bytes_avail) { 1526 /* We drained the kernel socket entirely. */ 1527 sock->socket_has_data = false; 1528 } 1529 1530 return bytes_recvd; 1531 } 1532 1533 static ssize_t 1534 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1535 { 1536 struct spdk_posix_sock *sock = __posix_sock(_sock); 1537 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1538 int rc, i; 1539 size_t len; 1540 1541 if (sock->recv_pipe == NULL) { 1542 assert(sock->pipe_has_data == false); 1543 if (group && sock->socket_has_data) { 1544 sock->socket_has_data = false; 1545 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1546 } 1547 if (sock->ssl) { 1548 return SSL_readv(sock->ssl, iov, iovcnt); 1549 } else { 1550 return readv(sock->fd, iov, iovcnt); 1551 } 1552 } 1553 1554 /* If the socket is not in a group, we must assume it always has 1555 * data waiting for us because it is not epolled */ 1556 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 1557 /* If the user is receiving a sufficiently large amount of data, 1558 * receive directly to their buffers. */ 1559 len = 0; 1560 for (i = 0; i < iovcnt; i++) { 1561 len += iov[i].iov_len; 1562 } 1563 1564 if (len >= MIN_SOCK_PIPE_SIZE) { 1565 /* TODO: Should this detect if kernel socket is drained? */ 1566 if (sock->ssl) { 1567 return SSL_readv(sock->ssl, iov, iovcnt); 1568 } else { 1569 return readv(sock->fd, iov, iovcnt); 1570 } 1571 } 1572 1573 /* Otherwise, do a big read into our pipe */ 1574 rc = posix_sock_read(sock); 1575 if (rc <= 0) { 1576 return rc; 1577 } 1578 } 1579 1580 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 1581 } 1582 1583 static ssize_t 1584 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1585 { 1586 struct iovec iov[1]; 1587 1588 iov[0].iov_base = buf; 1589 iov[0].iov_len = len; 1590 1591 return posix_sock_readv(sock, iov, 1); 1592 } 1593 1594 static ssize_t 1595 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1596 { 1597 struct spdk_posix_sock *sock = __posix_sock(_sock); 1598 int rc; 1599 1600 /* In order to process a writev, we need to flush any asynchronous writes 1601 * first. */ 1602 rc = _sock_flush(_sock); 1603 if (rc < 0) { 1604 return rc; 1605 } 1606 1607 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1608 /* We weren't able to flush all requests */ 1609 errno = EAGAIN; 1610 return -1; 1611 } 1612 1613 if (sock->ssl) { 1614 return SSL_writev(sock->ssl, iov, iovcnt); 1615 } else { 1616 return writev(sock->fd, iov, iovcnt); 1617 } 1618 } 1619 1620 static int 1621 posix_sock_recv_next(struct spdk_sock *_sock, void **buf, void **ctx) 1622 { 1623 struct spdk_posix_sock *sock = __posix_sock(_sock); 1624 struct iovec iov; 1625 ssize_t rc; 1626 1627 if (sock->recv_pipe != NULL) { 1628 errno = ENOTSUP; 1629 return -1; 1630 } 1631 1632 iov.iov_len = spdk_sock_group_get_buf(_sock->group_impl->group, &iov.iov_base, ctx); 1633 if (iov.iov_len == 0) { 1634 errno = ENOBUFS; 1635 return -1; 1636 } 1637 1638 rc = posix_sock_readv(_sock, &iov, 1); 1639 if (rc <= 0) { 1640 spdk_sock_group_provide_buf(_sock->group_impl->group, iov.iov_base, iov.iov_len, *ctx); 1641 return rc; 1642 } 1643 1644 *buf = iov.iov_base; 1645 1646 return rc; 1647 } 1648 1649 static void 1650 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1651 { 1652 int rc; 1653 1654 spdk_sock_request_queue(sock, req); 1655 1656 /* If there are a sufficient number queued, just flush them out immediately. */ 1657 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1658 rc = _sock_flush(sock); 1659 if (rc < 0 && errno != EAGAIN) { 1660 spdk_sock_abort_requests(sock); 1661 } 1662 } 1663 } 1664 1665 static int 1666 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1667 { 1668 struct spdk_posix_sock *sock = __posix_sock(_sock); 1669 int val; 1670 int rc; 1671 1672 assert(sock != NULL); 1673 1674 val = nbytes; 1675 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1676 if (rc != 0) { 1677 return -1; 1678 } 1679 return 0; 1680 } 1681 1682 static bool 1683 posix_sock_is_ipv6(struct spdk_sock *_sock) 1684 { 1685 struct spdk_posix_sock *sock = __posix_sock(_sock); 1686 struct sockaddr_storage sa; 1687 socklen_t salen; 1688 int rc; 1689 1690 assert(sock != NULL); 1691 1692 memset(&sa, 0, sizeof sa); 1693 salen = sizeof sa; 1694 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1695 if (rc != 0) { 1696 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1697 return false; 1698 } 1699 1700 return (sa.ss_family == AF_INET6); 1701 } 1702 1703 static bool 1704 posix_sock_is_ipv4(struct spdk_sock *_sock) 1705 { 1706 struct spdk_posix_sock *sock = __posix_sock(_sock); 1707 struct sockaddr_storage sa; 1708 socklen_t salen; 1709 int rc; 1710 1711 assert(sock != NULL); 1712 1713 memset(&sa, 0, sizeof sa); 1714 salen = sizeof sa; 1715 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1716 if (rc != 0) { 1717 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1718 return false; 1719 } 1720 1721 return (sa.ss_family == AF_INET); 1722 } 1723 1724 static bool 1725 posix_sock_is_connected(struct spdk_sock *_sock) 1726 { 1727 struct spdk_posix_sock *sock = __posix_sock(_sock); 1728 uint8_t byte; 1729 int rc; 1730 1731 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1732 if (rc == 0) { 1733 return false; 1734 } 1735 1736 if (rc < 0) { 1737 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1738 return true; 1739 } 1740 1741 return false; 1742 } 1743 1744 return true; 1745 } 1746 1747 static struct spdk_sock_group_impl * 1748 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1749 { 1750 struct spdk_posix_sock *sock = __posix_sock(_sock); 1751 struct spdk_sock_group_impl *group_impl; 1752 1753 if (sock->placement_id != -1) { 1754 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint); 1755 return group_impl; 1756 } 1757 1758 return NULL; 1759 } 1760 1761 static struct spdk_sock_group_impl * 1762 posix_sock_group_impl_create(void) 1763 { 1764 struct spdk_posix_sock_group_impl *group_impl; 1765 int fd; 1766 1767 #if defined(SPDK_EPOLL) 1768 fd = epoll_create1(0); 1769 #elif defined(SPDK_KEVENT) 1770 fd = kqueue(); 1771 #endif 1772 if (fd == -1) { 1773 return NULL; 1774 } 1775 1776 group_impl = calloc(1, sizeof(*group_impl)); 1777 if (group_impl == NULL) { 1778 SPDK_ERRLOG("group_impl allocation failed\n"); 1779 close(fd); 1780 return NULL; 1781 } 1782 1783 group_impl->fd = fd; 1784 TAILQ_INIT(&group_impl->socks_with_data); 1785 group_impl->placement_id = -1; 1786 1787 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1788 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1789 group_impl->placement_id = spdk_env_get_current_core(); 1790 } 1791 1792 return &group_impl->base; 1793 } 1794 1795 static void 1796 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1797 int placement_id) 1798 { 1799 #if defined(SO_MARK) 1800 int rc; 1801 1802 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1803 &placement_id, sizeof(placement_id)); 1804 if (rc != 0) { 1805 /* Not fatal */ 1806 SPDK_ERRLOG("Error setting SO_MARK\n"); 1807 return; 1808 } 1809 1810 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1811 if (rc != 0) { 1812 /* Not fatal */ 1813 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1814 return; 1815 } 1816 1817 sock->placement_id = placement_id; 1818 #endif 1819 } 1820 1821 static void 1822 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1823 { 1824 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1825 1826 if (group->placement_id == -1) { 1827 group->placement_id = spdk_sock_map_find_free(&g_map); 1828 1829 /* If a free placement id is found, update existing sockets in this group */ 1830 if (group->placement_id != -1) { 1831 struct spdk_sock *sock, *tmp; 1832 1833 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1834 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1835 } 1836 } 1837 } 1838 1839 if (group->placement_id != -1) { 1840 /* 1841 * group placement id is already determined for this poll group. 1842 * Mark socket with group's placement id. 1843 */ 1844 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1845 } 1846 } 1847 1848 static int 1849 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1850 { 1851 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1852 struct spdk_posix_sock *sock = __posix_sock(_sock); 1853 int rc; 1854 1855 #if defined(SPDK_EPOLL) 1856 struct epoll_event event; 1857 1858 memset(&event, 0, sizeof(event)); 1859 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1860 event.events = EPOLLIN | EPOLLERR; 1861 event.data.ptr = sock; 1862 1863 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1864 #elif defined(SPDK_KEVENT) 1865 struct kevent event; 1866 struct timespec ts = {0}; 1867 1868 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1869 1870 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1871 #endif 1872 1873 if (rc != 0) { 1874 return rc; 1875 } 1876 1877 /* switched from another polling group due to scheduling */ 1878 if (spdk_unlikely(sock->recv_pipe != NULL && 1879 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1880 sock->pipe_has_data = true; 1881 sock->socket_has_data = false; 1882 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1883 } 1884 1885 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 1886 posix_sock_update_mark(_group, _sock); 1887 } else if (sock->placement_id != -1) { 1888 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1889 if (rc != 0) { 1890 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1891 /* Do not treat this as an error. The system will continue running. */ 1892 } 1893 } 1894 1895 return rc; 1896 } 1897 1898 static int 1899 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1900 { 1901 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1902 struct spdk_posix_sock *sock = __posix_sock(_sock); 1903 int rc; 1904 1905 if (sock->pipe_has_data || sock->socket_has_data) { 1906 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1907 sock->pipe_has_data = false; 1908 sock->socket_has_data = false; 1909 } 1910 1911 if (sock->placement_id != -1) { 1912 spdk_sock_map_release(&g_map, sock->placement_id); 1913 } 1914 1915 #if defined(SPDK_EPOLL) 1916 struct epoll_event event; 1917 1918 /* Event parameter is ignored but some old kernel version still require it. */ 1919 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1920 #elif defined(SPDK_KEVENT) 1921 struct kevent event; 1922 struct timespec ts = {0}; 1923 1924 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1925 1926 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1927 if (rc == 0 && event.flags & EV_ERROR) { 1928 rc = -1; 1929 errno = event.data; 1930 } 1931 #endif 1932 1933 spdk_sock_abort_requests(_sock); 1934 1935 return rc; 1936 } 1937 1938 static int 1939 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1940 struct spdk_sock **socks) 1941 { 1942 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1943 struct spdk_sock *sock, *tmp; 1944 int num_events, i, rc; 1945 struct spdk_posix_sock *psock, *ptmp; 1946 #if defined(SPDK_EPOLL) 1947 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1948 #elif defined(SPDK_KEVENT) 1949 struct kevent events[MAX_EVENTS_PER_POLL]; 1950 struct timespec ts = {0}; 1951 #endif 1952 1953 #ifdef SPDK_ZEROCOPY 1954 /* When all of the following conditions are met 1955 * - non-blocking socket 1956 * - zero copy is enabled 1957 * - interrupts suppressed (i.e. busy polling) 1958 * - the NIC tx queue is full at the time sendmsg() is called 1959 * - epoll_wait determines there is an EPOLLIN event for the socket 1960 * then we can get into a situation where data we've sent is queued 1961 * up in the kernel network stack, but interrupts have been suppressed 1962 * because other traffic is flowing so the kernel misses the signal 1963 * to flush the software tx queue. If there wasn't incoming data 1964 * pending on the socket, then epoll_wait would have been sufficient 1965 * to kick off the send operation, but since there is a pending event 1966 * epoll_wait does not trigger the necessary operation. 1967 * 1968 * We deal with this by checking for all of the above conditions and 1969 * additionally looking for EPOLLIN events that were not consumed from 1970 * the last poll loop. We take this to mean that the upper layer is 1971 * unable to consume them because it is blocked waiting for resources 1972 * to free up, and those resources are most likely freed in response 1973 * to a pending asynchronous write completing. 1974 * 1975 * Additionally, sockets that have the same placement_id actually share 1976 * an underlying hardware queue. That means polling one of them is 1977 * equivalent to polling all of them. As a quick mechanism to avoid 1978 * making extra poll() calls, stash the last placement_id during the loop 1979 * and only poll if it's not the same. The overwhelmingly common case 1980 * is that all sockets in this list have the same placement_id because 1981 * SPDK is intentionally grouping sockets by that value, so even 1982 * though this won't stop all extra calls to poll(), it's very fast 1983 * and will catch all of them in practice. 1984 */ 1985 int last_placement_id = -1; 1986 1987 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 1988 if (psock->zcopy && psock->placement_id >= 0 && 1989 psock->placement_id != last_placement_id) { 1990 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 1991 1992 poll(&pfd, 1, 0); 1993 last_placement_id = psock->placement_id; 1994 } 1995 } 1996 #endif 1997 1998 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1999 * a completion callback could remove the sock from the 2000 * group. */ 2001 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 2002 rc = _sock_flush(sock); 2003 if (rc < 0 && errno != EAGAIN) { 2004 spdk_sock_abort_requests(sock); 2005 } 2006 } 2007 2008 assert(max_events > 0); 2009 2010 #if defined(SPDK_EPOLL) 2011 num_events = epoll_wait(group->fd, events, max_events, 0); 2012 #elif defined(SPDK_KEVENT) 2013 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 2014 #endif 2015 2016 if (num_events == -1) { 2017 return -1; 2018 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 2019 sock = TAILQ_FIRST(&_group->socks); 2020 psock = __posix_sock(sock); 2021 /* poll() is called here to busy poll the queue associated with 2022 * first socket in list and potentially reap incoming data. 2023 */ 2024 if (sock->opts.priority) { 2025 struct pollfd pfd = {0, 0, 0}; 2026 2027 pfd.fd = psock->fd; 2028 pfd.events = POLLIN | POLLERR; 2029 poll(&pfd, 1, 0); 2030 } 2031 } 2032 2033 for (i = 0; i < num_events; i++) { 2034 #if defined(SPDK_EPOLL) 2035 sock = events[i].data.ptr; 2036 psock = __posix_sock(sock); 2037 2038 #ifdef SPDK_ZEROCOPY 2039 if (events[i].events & EPOLLERR) { 2040 rc = _sock_check_zcopy(sock); 2041 /* If the socket was closed or removed from 2042 * the group in response to a send ack, don't 2043 * add it to the array here. */ 2044 if (rc || sock->cb_fn == NULL) { 2045 continue; 2046 } 2047 } 2048 #endif 2049 if ((events[i].events & EPOLLIN) == 0) { 2050 continue; 2051 } 2052 2053 #elif defined(SPDK_KEVENT) 2054 sock = events[i].udata; 2055 psock = __posix_sock(sock); 2056 #endif 2057 2058 /* If the socket is not already in the list, add it now */ 2059 if (!psock->socket_has_data && !psock->pipe_has_data) { 2060 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 2061 } 2062 psock->socket_has_data = true; 2063 } 2064 2065 num_events = 0; 2066 2067 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 2068 if (num_events == max_events) { 2069 break; 2070 } 2071 2072 /* If the socket's cb_fn is NULL, just remove it from the 2073 * list and do not add it to socks array */ 2074 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 2075 psock->socket_has_data = false; 2076 psock->pipe_has_data = false; 2077 TAILQ_REMOVE(&group->socks_with_data, psock, link); 2078 continue; 2079 } 2080 2081 socks[num_events++] = &psock->base; 2082 } 2083 2084 /* Cycle the has_data list so that each time we poll things aren't 2085 * in the same order. Say we have 6 sockets in the list, named as follows: 2086 * A B C D E F 2087 * And all 6 sockets had epoll events, but max_events is only 3. That means 2088 * psock currently points at D. We want to rearrange the list to the following: 2089 * D E F A B C 2090 * 2091 * The variables below are named according to this example to make it easier to 2092 * follow the swaps. 2093 */ 2094 if (psock != NULL) { 2095 struct spdk_posix_sock *pa, *pc, *pd, *pf; 2096 2097 /* Capture pointers to the elements we need */ 2098 pd = psock; 2099 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 2100 pa = TAILQ_FIRST(&group->socks_with_data); 2101 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 2102 2103 /* Break the link between C and D */ 2104 pc->link.tqe_next = NULL; 2105 2106 /* Connect F to A */ 2107 pf->link.tqe_next = pa; 2108 pa->link.tqe_prev = &pf->link.tqe_next; 2109 2110 /* Fix up the list first/last pointers */ 2111 group->socks_with_data.tqh_first = pd; 2112 group->socks_with_data.tqh_last = &pc->link.tqe_next; 2113 2114 /* D is in front of the list, make tqe prev pointer point to the head of list */ 2115 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 2116 } 2117 2118 return num_events; 2119 } 2120 2121 static int 2122 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2123 { 2124 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2125 int rc; 2126 2127 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 2128 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 2129 } 2130 2131 rc = close(group->fd); 2132 free(group); 2133 return rc; 2134 } 2135 2136 static struct spdk_net_impl g_posix_net_impl = { 2137 .name = "posix", 2138 .getaddr = posix_sock_getaddr, 2139 .connect = posix_sock_connect, 2140 .listen = posix_sock_listen, 2141 .accept = posix_sock_accept, 2142 .close = posix_sock_close, 2143 .recv = posix_sock_recv, 2144 .readv = posix_sock_readv, 2145 .writev = posix_sock_writev, 2146 .recv_next = posix_sock_recv_next, 2147 .writev_async = posix_sock_writev_async, 2148 .flush = posix_sock_flush, 2149 .set_recvlowat = posix_sock_set_recvlowat, 2150 .set_recvbuf = posix_sock_set_recvbuf, 2151 .set_sendbuf = posix_sock_set_sendbuf, 2152 .is_ipv6 = posix_sock_is_ipv6, 2153 .is_ipv4 = posix_sock_is_ipv4, 2154 .is_connected = posix_sock_is_connected, 2155 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2156 .group_impl_create = posix_sock_group_impl_create, 2157 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2158 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2159 .group_impl_poll = posix_sock_group_impl_poll, 2160 .group_impl_close = posix_sock_group_impl_close, 2161 .get_opts = posix_sock_impl_get_opts, 2162 .set_opts = posix_sock_impl_set_opts, 2163 }; 2164 2165 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY + 1); 2166 2167 static struct spdk_sock * 2168 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 2169 { 2170 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true); 2171 } 2172 2173 static struct spdk_sock * 2174 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 2175 { 2176 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true); 2177 } 2178 2179 static struct spdk_sock * 2180 ssl_sock_accept(struct spdk_sock *_sock) 2181 { 2182 return _posix_sock_accept(_sock, true); 2183 } 2184 2185 static struct spdk_net_impl g_ssl_net_impl = { 2186 .name = "ssl", 2187 .getaddr = posix_sock_getaddr, 2188 .connect = ssl_sock_connect, 2189 .listen = ssl_sock_listen, 2190 .accept = ssl_sock_accept, 2191 .close = posix_sock_close, 2192 .recv = posix_sock_recv, 2193 .readv = posix_sock_readv, 2194 .writev = posix_sock_writev, 2195 .recv_next = posix_sock_recv_next, 2196 .writev_async = posix_sock_writev_async, 2197 .flush = posix_sock_flush, 2198 .set_recvlowat = posix_sock_set_recvlowat, 2199 .set_recvbuf = posix_sock_set_recvbuf, 2200 .set_sendbuf = posix_sock_set_sendbuf, 2201 .is_ipv6 = posix_sock_is_ipv6, 2202 .is_ipv4 = posix_sock_is_ipv4, 2203 .is_connected = posix_sock_is_connected, 2204 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2205 .group_impl_create = posix_sock_group_impl_create, 2206 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2207 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2208 .group_impl_poll = posix_sock_group_impl_poll, 2209 .group_impl_close = posix_sock_group_impl_close, 2210 .get_opts = posix_sock_impl_get_opts, 2211 .set_opts = posix_sock_impl_set_opts, 2212 }; 2213 2214 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl, DEFAULT_SOCK_PRIORITY); 2215 SPDK_LOG_REGISTER_COMPONENT(sock_posix) 2216