1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. All rights reserved. 3 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #if defined(__FreeBSD__) 10 #include <sys/event.h> 11 #define SPDK_KEVENT 12 #else 13 #include <sys/epoll.h> 14 #define SPDK_EPOLL 15 #endif 16 17 #if defined(__linux__) 18 #include <linux/errqueue.h> 19 #endif 20 21 #include "spdk/env.h" 22 #include "spdk/log.h" 23 #include "spdk/pipe.h" 24 #include "spdk/sock.h" 25 #include "spdk/util.h" 26 #include "spdk/string.h" 27 #include "spdk_internal/sock.h" 28 #include "../sock_kernel.h" 29 30 #include "openssl/crypto.h" 31 #include "openssl/err.h" 32 #include "openssl/ssl.h" 33 34 #define MAX_TMPBUF 1024 35 #define PORTNUMLEN 32 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 struct spdk_posix_sock { 42 struct spdk_sock base; 43 int fd; 44 45 uint32_t sendmsg_idx; 46 47 struct spdk_pipe *recv_pipe; 48 void *recv_buf; 49 int recv_buf_sz; 50 bool pipe_has_data; 51 bool socket_has_data; 52 bool zcopy; 53 54 int placement_id; 55 56 SSL_CTX *ctx; 57 SSL *ssl; 58 59 TAILQ_ENTRY(spdk_posix_sock) link; 60 }; 61 62 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 63 64 struct spdk_posix_sock_group_impl { 65 struct spdk_sock_group_impl base; 66 int fd; 67 struct spdk_has_data_list socks_with_data; 68 int placement_id; 69 }; 70 71 static struct spdk_sock_impl_opts g_posix_impl_opts = { 72 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 73 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 74 .enable_recv_pipe = true, 75 .enable_quickack = false, 76 .enable_placement_id = PLACEMENT_NONE, 77 .enable_zerocopy_send_server = true, 78 .enable_zerocopy_send_client = false, 79 .zerocopy_threshold = 0, 80 .tls_version = 0, 81 .enable_ktls = false, 82 .psk_key = NULL, 83 .psk_key_size = 0, 84 .psk_identity = NULL, 85 .get_key = NULL, 86 .get_key_ctx = NULL, 87 .tls_cipher_suites = NULL 88 }; 89 90 static struct spdk_sock_impl_opts g_ssl_impl_opts = { 91 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 92 .send_buf_size = MIN_SO_SNDBUF_SIZE, 93 .enable_recv_pipe = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = true, 97 .enable_zerocopy_send_client = false, 98 .zerocopy_threshold = 0, 99 .tls_version = 0, 100 .enable_ktls = false, 101 .psk_key = NULL, 102 .psk_identity = NULL 103 }; 104 105 static struct spdk_sock_map g_map = { 106 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 107 .mtx = PTHREAD_MUTEX_INITIALIZER 108 }; 109 110 __attribute((destructor)) static void 111 posix_sock_map_cleanup(void) 112 { 113 spdk_sock_map_cleanup(&g_map); 114 } 115 116 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 117 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 118 119 static void 120 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 121 size_t len) 122 { 123 #define FIELD_OK(field) \ 124 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 125 126 #define SET_FIELD(field) \ 127 if (FIELD_OK(field)) { \ 128 dest->field = src->field; \ 129 } 130 131 SET_FIELD(recv_buf_size); 132 SET_FIELD(send_buf_size); 133 SET_FIELD(enable_recv_pipe); 134 SET_FIELD(enable_zerocopy_send); 135 SET_FIELD(enable_quickack); 136 SET_FIELD(enable_placement_id); 137 SET_FIELD(enable_zerocopy_send_server); 138 SET_FIELD(enable_zerocopy_send_client); 139 SET_FIELD(zerocopy_threshold); 140 SET_FIELD(tls_version); 141 SET_FIELD(enable_ktls); 142 SET_FIELD(psk_key); 143 SET_FIELD(psk_key_size); 144 SET_FIELD(psk_identity); 145 SET_FIELD(get_key); 146 SET_FIELD(get_key_ctx); 147 SET_FIELD(tls_cipher_suites); 148 149 #undef SET_FIELD 150 #undef FIELD_OK 151 } 152 153 static int 154 _sock_impl_get_opts(struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 155 size_t *len) 156 { 157 if (!opts || !len) { 158 errno = EINVAL; 159 return -1; 160 } 161 162 assert(sizeof(*opts) >= *len); 163 memset(opts, 0, *len); 164 165 posix_sock_copy_impl_opts(opts, impl_opts, *len); 166 *len = spdk_min(*len, sizeof(*impl_opts)); 167 168 return 0; 169 } 170 171 static int 172 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 173 { 174 return _sock_impl_get_opts(opts, &g_posix_impl_opts, len); 175 } 176 177 static int 178 ssl_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 179 { 180 return _sock_impl_get_opts(opts, &g_ssl_impl_opts, len); 181 } 182 183 static int 184 _sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 185 size_t len) 186 { 187 if (!opts) { 188 errno = EINVAL; 189 return -1; 190 } 191 192 assert(sizeof(*opts) >= len); 193 posix_sock_copy_impl_opts(impl_opts, opts, len); 194 195 return 0; 196 } 197 198 static int 199 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 200 { 201 return _sock_impl_set_opts(opts, &g_posix_impl_opts, len); 202 } 203 204 static int 205 ssl_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 206 { 207 return _sock_impl_set_opts(opts, &g_ssl_impl_opts, len); 208 } 209 210 static void 211 _opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest, 212 const struct spdk_sock_impl_opts *default_impl) 213 { 214 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 215 memcpy(dest, default_impl, sizeof(*dest)); 216 217 if (opts->impl_opts != NULL) { 218 assert(sizeof(*dest) >= opts->impl_opts_size); 219 posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 220 } 221 } 222 223 static int 224 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 225 char *caddr, int clen, uint16_t *cport) 226 { 227 struct spdk_posix_sock *sock = __posix_sock(_sock); 228 struct sockaddr_storage sa; 229 socklen_t salen; 230 int rc; 231 232 assert(sock != NULL); 233 234 memset(&sa, 0, sizeof sa); 235 salen = sizeof sa; 236 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 237 if (rc != 0) { 238 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 239 return -1; 240 } 241 242 switch (sa.ss_family) { 243 case AF_UNIX: 244 /* Acceptable connection types that don't have IPs */ 245 return 0; 246 case AF_INET: 247 case AF_INET6: 248 /* Code below will get IP addresses */ 249 break; 250 default: 251 /* Unsupported socket family */ 252 return -1; 253 } 254 255 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 256 if (rc != 0) { 257 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 258 return -1; 259 } 260 261 if (sport) { 262 if (sa.ss_family == AF_INET) { 263 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 264 } else if (sa.ss_family == AF_INET6) { 265 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 266 } 267 } 268 269 memset(&sa, 0, sizeof sa); 270 salen = sizeof sa; 271 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 272 if (rc != 0) { 273 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 274 return -1; 275 } 276 277 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 278 if (rc != 0) { 279 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 280 return -1; 281 } 282 283 if (cport) { 284 if (sa.ss_family == AF_INET) { 285 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 286 } else if (sa.ss_family == AF_INET6) { 287 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 288 } 289 } 290 291 return 0; 292 } 293 294 enum posix_sock_create_type { 295 SPDK_SOCK_CREATE_LISTEN, 296 SPDK_SOCK_CREATE_CONNECT, 297 }; 298 299 static int 300 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 301 { 302 uint8_t *new_buf; 303 struct spdk_pipe *new_pipe; 304 struct iovec siov[2]; 305 struct iovec diov[2]; 306 int sbytes; 307 ssize_t bytes; 308 int rc; 309 310 if (sock->recv_buf_sz == sz) { 311 return 0; 312 } 313 314 /* If the new size is 0, just free the pipe */ 315 if (sz == 0) { 316 spdk_pipe_destroy(sock->recv_pipe); 317 free(sock->recv_buf); 318 sock->recv_pipe = NULL; 319 sock->recv_buf = NULL; 320 return 0; 321 } else if (sz < MIN_SOCK_PIPE_SIZE) { 322 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 323 return -1; 324 } 325 326 /* Round up to next 64 byte multiple */ 327 rc = posix_memalign((void **)&new_buf, 64, sz); 328 if (rc != 0) { 329 SPDK_ERRLOG("socket recv buf allocation failed\n"); 330 return -ENOMEM; 331 } 332 memset(new_buf, 0, sz); 333 334 new_pipe = spdk_pipe_create(new_buf, sz); 335 if (new_pipe == NULL) { 336 SPDK_ERRLOG("socket pipe allocation failed\n"); 337 free(new_buf); 338 return -ENOMEM; 339 } 340 341 if (sock->recv_pipe != NULL) { 342 /* Pull all of the data out of the old pipe */ 343 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 344 if (sbytes > sz) { 345 /* Too much data to fit into the new pipe size */ 346 spdk_pipe_destroy(new_pipe); 347 free(new_buf); 348 return -EINVAL; 349 } 350 351 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 352 assert(sbytes == sz); 353 354 bytes = spdk_iovcpy(siov, 2, diov, 2); 355 spdk_pipe_writer_advance(new_pipe, bytes); 356 357 spdk_pipe_destroy(sock->recv_pipe); 358 free(sock->recv_buf); 359 } 360 361 sock->recv_buf_sz = sz; 362 sock->recv_buf = new_buf; 363 sock->recv_pipe = new_pipe; 364 365 return 0; 366 } 367 368 static int 369 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 370 { 371 struct spdk_posix_sock *sock = __posix_sock(_sock); 372 int min_size; 373 int rc; 374 375 assert(sock != NULL); 376 377 if (_sock->impl_opts.enable_recv_pipe) { 378 rc = posix_sock_alloc_pipe(sock, sz); 379 if (rc) { 380 return rc; 381 } 382 } 383 384 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 385 * _sock->impl_opts.recv_buf_size. */ 386 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, _sock->impl_opts.recv_buf_size); 387 388 if (sz < min_size) { 389 sz = min_size; 390 } 391 392 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 393 if (rc < 0) { 394 return rc; 395 } 396 397 _sock->impl_opts.recv_buf_size = sz; 398 399 return 0; 400 } 401 402 static int 403 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 404 { 405 struct spdk_posix_sock *sock = __posix_sock(_sock); 406 int min_size; 407 int rc; 408 409 assert(sock != NULL); 410 411 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 412 * _sock->impl_opts.send_buf_size. */ 413 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, _sock->impl_opts.send_buf_size); 414 415 if (sz < min_size) { 416 sz = min_size; 417 } 418 419 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 420 if (rc < 0) { 421 return rc; 422 } 423 424 _sock->impl_opts.send_buf_size = sz; 425 426 return 0; 427 } 428 429 static void 430 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 431 { 432 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 433 int flag; 434 int rc; 435 #endif 436 437 #if defined(SPDK_ZEROCOPY) 438 flag = 1; 439 440 if (enable_zero_copy) { 441 /* Try to turn on zero copy sends */ 442 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 443 if (rc == 0) { 444 sock->zcopy = true; 445 } 446 } 447 #endif 448 449 #if defined(__linux__) 450 flag = 1; 451 452 if (sock->base.impl_opts.enable_quickack) { 453 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 454 if (rc != 0) { 455 SPDK_ERRLOG("quickack was failed to set\n"); 456 } 457 } 458 459 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 460 &sock->placement_id); 461 462 if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) { 463 /* Save placement_id */ 464 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 465 } 466 #endif 467 } 468 469 static struct spdk_posix_sock * 470 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 471 { 472 struct spdk_posix_sock *sock; 473 474 sock = calloc(1, sizeof(*sock)); 475 if (sock == NULL) { 476 SPDK_ERRLOG("sock allocation failed\n"); 477 return NULL; 478 } 479 480 sock->fd = fd; 481 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 482 posix_sock_init(sock, enable_zero_copy); 483 484 return sock; 485 } 486 487 static int 488 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts, 489 struct spdk_sock_impl_opts *impl_opts) 490 { 491 int fd; 492 int val = 1; 493 int rc, sz; 494 #if defined(__linux__) 495 int to; 496 #endif 497 498 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 499 if (fd < 0) { 500 /* error */ 501 return -1; 502 } 503 504 sz = impl_opts->recv_buf_size; 505 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 506 if (rc) { 507 /* Not fatal */ 508 } 509 510 sz = impl_opts->send_buf_size; 511 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 512 if (rc) { 513 /* Not fatal */ 514 } 515 516 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 517 if (rc != 0) { 518 close(fd); 519 /* error */ 520 return -1; 521 } 522 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 523 if (rc != 0) { 524 close(fd); 525 /* error */ 526 return -1; 527 } 528 529 #if defined(SO_PRIORITY) 530 if (opts->priority) { 531 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 532 if (rc != 0) { 533 close(fd); 534 /* error */ 535 return -1; 536 } 537 } 538 #endif 539 540 if (res->ai_family == AF_INET6) { 541 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 542 if (rc != 0) { 543 close(fd); 544 /* error */ 545 return -1; 546 } 547 } 548 549 if (opts->ack_timeout) { 550 #if defined(__linux__) 551 to = opts->ack_timeout; 552 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to)); 553 if (rc != 0) { 554 close(fd); 555 /* error */ 556 return -1; 557 } 558 #else 559 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 560 #endif 561 } 562 563 return fd; 564 } 565 566 static int 567 posix_sock_psk_find_session_server_cb(SSL *ssl, const unsigned char *identity, 568 size_t identity_len, SSL_SESSION **sess) 569 { 570 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 571 uint8_t key[SSL_MAX_MASTER_KEY_LENGTH] = {}; 572 int keylen; 573 int rc, i; 574 STACK_OF(SSL_CIPHER) *ciphers; 575 const SSL_CIPHER *cipher; 576 const char *cipher_name; 577 const char *user_cipher = NULL; 578 bool found = false; 579 580 if (impl_opts->get_key) { 581 rc = impl_opts->get_key(key, sizeof(key), &user_cipher, identity, impl_opts->get_key_ctx); 582 if (rc < 0) { 583 SPDK_ERRLOG("Unable to find PSK for identity: %s\n", identity); 584 return 0; 585 } 586 keylen = rc; 587 } else { 588 if (impl_opts->psk_key == NULL) { 589 SPDK_ERRLOG("PSK is not set\n"); 590 return 0; 591 } 592 593 SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(impl_opts->psk_identity)); 594 if (strcmp(impl_opts->psk_identity, identity) != 0) { 595 SPDK_ERRLOG("Unknown Client's PSK ID\n"); 596 return 0; 597 } 598 keylen = impl_opts->psk_key_size; 599 600 memcpy(key, impl_opts->psk_key, keylen); 601 user_cipher = impl_opts->tls_cipher_suites; 602 } 603 604 if (user_cipher == NULL) { 605 SPDK_ERRLOG("Cipher suite not set\n"); 606 return 0; 607 } 608 609 *sess = SSL_SESSION_new(); 610 if (*sess == NULL) { 611 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 612 return 0; 613 } 614 615 ciphers = SSL_get_ciphers(ssl); 616 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 617 cipher = sk_SSL_CIPHER_value(ciphers, i); 618 cipher_name = SSL_CIPHER_get_name(cipher); 619 620 if (strcmp(user_cipher, cipher_name) == 0) { 621 rc = SSL_SESSION_set_cipher(*sess, cipher); 622 if (rc != 1) { 623 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 624 goto err; 625 } 626 found = true; 627 break; 628 } 629 } 630 if (found == false) { 631 SPDK_ERRLOG("No suitable cipher found\n"); 632 goto err; 633 } 634 635 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 636 637 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 638 if (rc != 1) { 639 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 640 goto err; 641 } 642 643 rc = SSL_SESSION_set1_master_key(*sess, key, keylen); 644 if (rc != 1) { 645 SPDK_ERRLOG("Unable to set PSK for session\n"); 646 goto err; 647 } 648 649 return 1; 650 651 err: 652 SSL_SESSION_free(*sess); 653 *sess = NULL; 654 return 0; 655 } 656 657 static int 658 posix_sock_psk_use_session_client_cb(SSL *ssl, const EVP_MD *md, const unsigned char **identity, 659 size_t *identity_len, SSL_SESSION **sess) 660 { 661 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 662 int rc, i; 663 STACK_OF(SSL_CIPHER) *ciphers; 664 const SSL_CIPHER *cipher; 665 const char *cipher_name; 666 long keylen; 667 bool found = false; 668 669 if (impl_opts->psk_key == NULL) { 670 SPDK_ERRLOG("PSK is not set\n"); 671 return 0; 672 } 673 if (impl_opts->psk_key_size > SSL_MAX_MASTER_KEY_LENGTH) { 674 SPDK_ERRLOG("PSK too long\n"); 675 return 0; 676 } 677 keylen = impl_opts->psk_key_size; 678 679 if (impl_opts->tls_cipher_suites == NULL) { 680 SPDK_ERRLOG("Cipher suite not set\n"); 681 return 0; 682 } 683 *sess = SSL_SESSION_new(); 684 if (*sess == NULL) { 685 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 686 return 0; 687 } 688 689 ciphers = SSL_get_ciphers(ssl); 690 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 691 cipher = sk_SSL_CIPHER_value(ciphers, i); 692 cipher_name = SSL_CIPHER_get_name(cipher); 693 694 if (strcmp(impl_opts->tls_cipher_suites, cipher_name) == 0) { 695 rc = SSL_SESSION_set_cipher(*sess, cipher); 696 if (rc != 1) { 697 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 698 goto err; 699 } 700 found = true; 701 break; 702 } 703 } 704 if (found == false) { 705 SPDK_ERRLOG("No suitable cipher found\n"); 706 goto err; 707 } 708 709 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 710 711 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 712 if (rc != 1) { 713 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 714 goto err; 715 } 716 717 rc = SSL_SESSION_set1_master_key(*sess, impl_opts->psk_key, keylen); 718 if (rc != 1) { 719 SPDK_ERRLOG("Unable to set PSK for session\n"); 720 goto err; 721 } 722 723 *identity_len = strlen(impl_opts->psk_identity); 724 *identity = impl_opts->psk_identity; 725 726 return 1; 727 728 err: 729 SSL_SESSION_free(*sess); 730 *sess = NULL; 731 return 0; 732 } 733 734 static SSL_CTX * 735 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts, 736 struct spdk_sock_impl_opts *impl_opts) 737 { 738 SSL_CTX *ctx; 739 int tls_version = 0; 740 bool ktls_enabled = false; 741 #ifdef SSL_OP_ENABLE_KTLS 742 long options; 743 #endif 744 745 SSL_library_init(); 746 OpenSSL_add_all_algorithms(); 747 SSL_load_error_strings(); 748 /* Produce a SSL CTX in SSL V2 and V3 standards compliant way */ 749 ctx = SSL_CTX_new(method); 750 if (!ctx) { 751 SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 752 return NULL; 753 } 754 SPDK_DEBUGLOG(sock_posix, "SSL context created\n"); 755 756 switch (impl_opts->tls_version) { 757 case 0: 758 /* auto-negotioation */ 759 break; 760 case SPDK_TLS_VERSION_1_3: 761 tls_version = TLS1_3_VERSION; 762 break; 763 default: 764 SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version); 765 goto err; 766 } 767 768 if (tls_version) { 769 SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version, 770 tls_version); 771 if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) { 772 SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 773 goto err; 774 } 775 if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) { 776 SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 777 goto err; 778 } 779 } 780 if (impl_opts->enable_ktls) { 781 SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n"); 782 #ifdef SSL_OP_ENABLE_KTLS 783 options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS); 784 ktls_enabled = options & SSL_OP_ENABLE_KTLS; 785 #else 786 ktls_enabled = false; 787 #endif 788 if (!ktls_enabled) { 789 SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n"); 790 goto err; 791 } 792 } 793 794 /* SSL_CTX_set_ciphersuites() return 1 if the requested 795 * cipher suite list was configured, and 0 otherwise. */ 796 if (impl_opts->tls_cipher_suites != NULL && 797 SSL_CTX_set_ciphersuites(ctx, impl_opts->tls_cipher_suites) != 1) { 798 SPDK_ERRLOG("Unable to set TLS cipher suites for SSL'\n"); 799 goto err; 800 } 801 802 return ctx; 803 804 err: 805 SSL_CTX_free(ctx); 806 return NULL; 807 } 808 809 static SSL * 810 ssl_sock_connect_loop(SSL_CTX *ctx, int fd, struct spdk_sock_impl_opts *impl_opts) 811 { 812 int rc; 813 SSL *ssl; 814 int ssl_get_error; 815 816 ssl = SSL_new(ctx); 817 if (!ssl) { 818 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 819 return NULL; 820 } 821 SSL_set_fd(ssl, fd); 822 SSL_set_app_data(ssl, impl_opts); 823 SSL_set_psk_use_session_callback(ssl, posix_sock_psk_use_session_client_cb); 824 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 825 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 826 while ((rc = SSL_connect(ssl)) != 1) { 827 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 828 ssl_get_error = SSL_get_error(ssl, rc); 829 SPDK_DEBUGLOG(sock_posix, "SSL_connect failed %d = SSL_connect(%p), %d = SSL_get_error(%p, %d)\n", 830 rc, ssl, ssl_get_error, ssl, rc); 831 switch (ssl_get_error) { 832 case SSL_ERROR_WANT_READ: 833 case SSL_ERROR_WANT_WRITE: 834 continue; 835 default: 836 break; 837 } 838 SPDK_ERRLOG("SSL_connect() failed, errno = %d\n", errno); 839 SSL_free(ssl); 840 return NULL; 841 } 842 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 843 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 844 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 845 return ssl; 846 } 847 848 static SSL * 849 ssl_sock_accept_loop(SSL_CTX *ctx, int fd, struct spdk_sock_impl_opts *impl_opts) 850 { 851 int rc; 852 SSL *ssl; 853 int ssl_get_error; 854 855 ssl = SSL_new(ctx); 856 if (!ssl) { 857 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 858 return NULL; 859 } 860 SSL_set_fd(ssl, fd); 861 SSL_set_app_data(ssl, impl_opts); 862 SSL_set_psk_find_session_callback(ssl, posix_sock_psk_find_session_server_cb); 863 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 864 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 865 while ((rc = SSL_accept(ssl)) != 1) { 866 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 867 ssl_get_error = SSL_get_error(ssl, rc); 868 SPDK_DEBUGLOG(sock_posix, "SSL_accept failed %d = SSL_accept(%p), %d = SSL_get_error(%p, %d)\n", rc, 869 ssl, ssl_get_error, ssl, rc); 870 switch (ssl_get_error) { 871 case SSL_ERROR_WANT_READ: 872 case SSL_ERROR_WANT_WRITE: 873 continue; 874 default: 875 break; 876 } 877 SPDK_ERRLOG("SSL_accept() failed, errno = %d\n", errno); 878 SSL_free(ssl); 879 return NULL; 880 } 881 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 882 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 883 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 884 return ssl; 885 } 886 887 static ssize_t 888 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt) 889 { 890 int i, rc = 0; 891 ssize_t total = 0; 892 893 for (i = 0; i < iovcnt; i++) { 894 rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len); 895 896 if (rc > 0) { 897 total += rc; 898 } 899 if (rc != (int)iov[i].iov_len) { 900 break; 901 } 902 } 903 if (total > 0) { 904 errno = 0; 905 return total; 906 } 907 switch (SSL_get_error(ssl, rc)) { 908 case SSL_ERROR_ZERO_RETURN: 909 errno = ENOTCONN; 910 return 0; 911 case SSL_ERROR_WANT_READ: 912 case SSL_ERROR_WANT_WRITE: 913 case SSL_ERROR_WANT_CONNECT: 914 case SSL_ERROR_WANT_ACCEPT: 915 case SSL_ERROR_WANT_X509_LOOKUP: 916 case SSL_ERROR_WANT_ASYNC: 917 case SSL_ERROR_WANT_ASYNC_JOB: 918 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 919 errno = EAGAIN; 920 return -1; 921 case SSL_ERROR_SYSCALL: 922 case SSL_ERROR_SSL: 923 errno = ENOTCONN; 924 return -1; 925 default: 926 errno = ENOTCONN; 927 return -1; 928 } 929 } 930 931 static ssize_t 932 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt) 933 { 934 int i, rc = 0; 935 ssize_t total = 0; 936 937 for (i = 0; i < iovcnt; i++) { 938 rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len); 939 940 if (rc > 0) { 941 total += rc; 942 } 943 if (rc != (int)iov[i].iov_len) { 944 break; 945 } 946 } 947 if (total > 0) { 948 errno = 0; 949 return total; 950 } 951 switch (SSL_get_error(ssl, rc)) { 952 case SSL_ERROR_ZERO_RETURN: 953 errno = ENOTCONN; 954 return 0; 955 case SSL_ERROR_WANT_READ: 956 case SSL_ERROR_WANT_WRITE: 957 case SSL_ERROR_WANT_CONNECT: 958 case SSL_ERROR_WANT_ACCEPT: 959 case SSL_ERROR_WANT_X509_LOOKUP: 960 case SSL_ERROR_WANT_ASYNC: 961 case SSL_ERROR_WANT_ASYNC_JOB: 962 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 963 errno = EAGAIN; 964 return -1; 965 case SSL_ERROR_SYSCALL: 966 case SSL_ERROR_SSL: 967 errno = ENOTCONN; 968 return -1; 969 default: 970 errno = ENOTCONN; 971 return -1; 972 } 973 } 974 975 static struct spdk_sock * 976 posix_sock_create(const char *ip, int port, 977 enum posix_sock_create_type type, 978 struct spdk_sock_opts *opts, 979 bool enable_ssl) 980 { 981 struct spdk_posix_sock *sock; 982 struct spdk_sock_impl_opts impl_opts; 983 char buf[MAX_TMPBUF]; 984 char portnum[PORTNUMLEN]; 985 char *p; 986 struct addrinfo hints, *res, *res0; 987 int fd, flag; 988 int rc; 989 bool enable_zcopy_user_opts = true; 990 bool enable_zcopy_impl_opts = true; 991 SSL_CTX *ctx = 0; 992 SSL *ssl = 0; 993 994 assert(opts != NULL); 995 if (enable_ssl) { 996 _opts_get_impl_opts(opts, &impl_opts, &g_ssl_impl_opts); 997 } else { 998 _opts_get_impl_opts(opts, &impl_opts, &g_posix_impl_opts); 999 } 1000 1001 if (ip == NULL) { 1002 return NULL; 1003 } 1004 if (ip[0] == '[') { 1005 snprintf(buf, sizeof(buf), "%s", ip + 1); 1006 p = strchr(buf, ']'); 1007 if (p != NULL) { 1008 *p = '\0'; 1009 } 1010 ip = (const char *) &buf[0]; 1011 } 1012 1013 snprintf(portnum, sizeof portnum, "%d", port); 1014 memset(&hints, 0, sizeof hints); 1015 hints.ai_family = PF_UNSPEC; 1016 hints.ai_socktype = SOCK_STREAM; 1017 hints.ai_flags = AI_NUMERICSERV; 1018 hints.ai_flags |= AI_PASSIVE; 1019 hints.ai_flags |= AI_NUMERICHOST; 1020 rc = getaddrinfo(ip, portnum, &hints, &res0); 1021 if (rc != 0) { 1022 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 1023 return NULL; 1024 } 1025 1026 /* try listen */ 1027 fd = -1; 1028 for (res = res0; res != NULL; res = res->ai_next) { 1029 retry: 1030 fd = posix_fd_create(res, opts, &impl_opts); 1031 if (fd < 0) { 1032 continue; 1033 } 1034 if (type == SPDK_SOCK_CREATE_LISTEN) { 1035 rc = bind(fd, res->ai_addr, res->ai_addrlen); 1036 if (rc != 0) { 1037 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 1038 switch (errno) { 1039 case EINTR: 1040 /* interrupted? */ 1041 close(fd); 1042 goto retry; 1043 case EADDRNOTAVAIL: 1044 SPDK_ERRLOG("IP address %s not available. " 1045 "Verify IP address in config file " 1046 "and make sure setup script is " 1047 "run before starting spdk app.\n", ip); 1048 /* FALLTHROUGH */ 1049 default: 1050 /* try next family */ 1051 close(fd); 1052 fd = -1; 1053 continue; 1054 } 1055 } 1056 /* bind OK */ 1057 rc = listen(fd, 512); 1058 if (rc != 0) { 1059 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 1060 close(fd); 1061 fd = -1; 1062 break; 1063 } 1064 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 1065 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 1066 rc = connect(fd, res->ai_addr, res->ai_addrlen); 1067 if (rc != 0) { 1068 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 1069 /* try next family */ 1070 close(fd); 1071 fd = -1; 1072 continue; 1073 } 1074 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 1075 if (enable_ssl) { 1076 ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts); 1077 if (!ctx) { 1078 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1079 close(fd); 1080 fd = -1; 1081 break; 1082 } 1083 ssl = ssl_sock_connect_loop(ctx, fd, &impl_opts); 1084 if (!ssl) { 1085 SPDK_ERRLOG("ssl_sock_connect_loop() failed, errno = %d\n", errno); 1086 close(fd); 1087 fd = -1; 1088 SSL_CTX_free(ctx); 1089 break; 1090 } 1091 } 1092 } 1093 1094 flag = fcntl(fd, F_GETFL); 1095 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 1096 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1097 SSL_free(ssl); 1098 SSL_CTX_free(ctx); 1099 close(fd); 1100 fd = -1; 1101 break; 1102 } 1103 break; 1104 } 1105 freeaddrinfo(res0); 1106 1107 if (fd < 0) { 1108 return NULL; 1109 } 1110 1111 /* Only enable zero copy for non-loopback and non-ssl sockets. */ 1112 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd) && !enable_ssl; 1113 1114 sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 1115 if (sock == NULL) { 1116 SPDK_ERRLOG("sock allocation failed\n"); 1117 SSL_free(ssl); 1118 SSL_CTX_free(ctx); 1119 close(fd); 1120 return NULL; 1121 } 1122 1123 if (ctx) { 1124 sock->ctx = ctx; 1125 } 1126 1127 if (ssl) { 1128 sock->ssl = ssl; 1129 } 1130 1131 return &sock->base; 1132 } 1133 1134 static struct spdk_sock * 1135 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 1136 { 1137 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false); 1138 } 1139 1140 static struct spdk_sock * 1141 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 1142 { 1143 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false); 1144 } 1145 1146 static struct spdk_sock * 1147 _posix_sock_accept(struct spdk_sock *_sock, bool enable_ssl) 1148 { 1149 struct spdk_posix_sock *sock = __posix_sock(_sock); 1150 struct sockaddr_storage sa; 1151 socklen_t salen; 1152 int rc, fd; 1153 struct spdk_posix_sock *new_sock; 1154 int flag; 1155 SSL_CTX *ctx = 0; 1156 SSL *ssl = 0; 1157 1158 memset(&sa, 0, sizeof(sa)); 1159 salen = sizeof(sa); 1160 1161 assert(sock != NULL); 1162 1163 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 1164 1165 if (rc == -1) { 1166 return NULL; 1167 } 1168 1169 fd = rc; 1170 1171 flag = fcntl(fd, F_GETFL); 1172 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 1173 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1174 close(fd); 1175 return NULL; 1176 } 1177 1178 #if defined(SO_PRIORITY) 1179 /* The priority is not inherited, so call this function again */ 1180 if (sock->base.opts.priority) { 1181 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 1182 if (rc != 0) { 1183 close(fd); 1184 return NULL; 1185 } 1186 } 1187 #endif 1188 1189 /* Establish SSL connection */ 1190 if (enable_ssl) { 1191 ctx = posix_sock_create_ssl_context(TLS_server_method(), &sock->base.opts, &sock->base.impl_opts); 1192 if (!ctx) { 1193 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1194 close(fd); 1195 return NULL; 1196 } 1197 ssl = ssl_sock_accept_loop(ctx, fd, &sock->base.impl_opts); 1198 if (!ssl) { 1199 SPDK_ERRLOG("ssl_sock_accept_loop() failed, errno = %d\n", errno); 1200 close(fd); 1201 SSL_CTX_free(ctx); 1202 return NULL; 1203 } 1204 } 1205 1206 /* Inherit the zero copy feature from the listen socket */ 1207 new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 1208 if (new_sock == NULL) { 1209 close(fd); 1210 SSL_free(ssl); 1211 SSL_CTX_free(ctx); 1212 return NULL; 1213 } 1214 1215 if (ctx) { 1216 new_sock->ctx = ctx; 1217 } 1218 1219 if (ssl) { 1220 new_sock->ssl = ssl; 1221 } 1222 1223 return &new_sock->base; 1224 } 1225 1226 static struct spdk_sock * 1227 posix_sock_accept(struct spdk_sock *_sock) 1228 { 1229 return _posix_sock_accept(_sock, false); 1230 } 1231 1232 static int 1233 posix_sock_close(struct spdk_sock *_sock) 1234 { 1235 struct spdk_posix_sock *sock = __posix_sock(_sock); 1236 1237 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 1238 1239 if (sock->ssl != NULL) { 1240 SSL_shutdown(sock->ssl); 1241 } 1242 1243 /* If the socket fails to close, the best choice is to 1244 * leak the fd but continue to free the rest of the sock 1245 * memory. */ 1246 close(sock->fd); 1247 1248 SSL_free(sock->ssl); 1249 SSL_CTX_free(sock->ctx); 1250 1251 spdk_pipe_destroy(sock->recv_pipe); 1252 free(sock->recv_buf); 1253 free(sock); 1254 1255 return 0; 1256 } 1257 1258 #ifdef SPDK_ZEROCOPY 1259 static int 1260 _sock_check_zcopy(struct spdk_sock *sock) 1261 { 1262 struct spdk_posix_sock *psock = __posix_sock(sock); 1263 struct msghdr msgh = {}; 1264 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 1265 ssize_t rc; 1266 struct sock_extended_err *serr; 1267 struct cmsghdr *cm; 1268 uint32_t idx; 1269 struct spdk_sock_request *req, *treq; 1270 bool found; 1271 1272 msgh.msg_control = buf; 1273 msgh.msg_controllen = sizeof(buf); 1274 1275 while (true) { 1276 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 1277 1278 if (rc < 0) { 1279 if (errno == EWOULDBLOCK || errno == EAGAIN) { 1280 return 0; 1281 } 1282 1283 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 1284 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 1285 } else { 1286 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1287 } 1288 return 0; 1289 } 1290 1291 cm = CMSG_FIRSTHDR(&msgh); 1292 if (!(cm && 1293 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1294 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 1295 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1296 return 0; 1297 } 1298 1299 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1300 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1301 SPDK_WARNLOG("Unexpected extended error origin\n"); 1302 return 0; 1303 } 1304 1305 /* Most of the time, the pending_reqs array is in the exact 1306 * order we need such that all of the requests to complete are 1307 * in order, in the front. It is guaranteed that all requests 1308 * belonging to the same sendmsg call are sequential, so once 1309 * we encounter one match we can stop looping as soon as a 1310 * non-match is found. 1311 */ 1312 idx = serr->ee_info; 1313 while (true) { 1314 found = false; 1315 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 1316 if (!req->internal.is_zcopy) { 1317 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1318 rc = spdk_sock_request_put(sock, req, 0); 1319 if (rc < 0) { 1320 return rc; 1321 } 1322 } else if (req->internal.offset == idx) { 1323 found = true; 1324 rc = spdk_sock_request_put(sock, req, 0); 1325 if (rc < 0) { 1326 return rc; 1327 } 1328 } else if (found) { 1329 break; 1330 } 1331 } 1332 1333 if (idx == serr->ee_data) { 1334 break; 1335 } 1336 1337 if (idx == UINT32_MAX) { 1338 idx = 0; 1339 } else { 1340 idx++; 1341 } 1342 } 1343 } 1344 1345 return 0; 1346 } 1347 #endif 1348 1349 static int 1350 _sock_flush(struct spdk_sock *sock) 1351 { 1352 struct spdk_posix_sock *psock = __posix_sock(sock); 1353 struct msghdr msg = {}; 1354 int flags; 1355 struct iovec iovs[IOV_BATCH_SIZE]; 1356 int iovcnt; 1357 int retval; 1358 struct spdk_sock_request *req; 1359 int i; 1360 ssize_t rc, sent; 1361 unsigned int offset; 1362 size_t len; 1363 bool is_zcopy = false; 1364 1365 /* Can't flush from within a callback or we end up with recursive calls */ 1366 if (sock->cb_cnt > 0) { 1367 errno = EAGAIN; 1368 return -1; 1369 } 1370 1371 #ifdef SPDK_ZEROCOPY 1372 if (psock->zcopy) { 1373 flags = MSG_ZEROCOPY | MSG_NOSIGNAL; 1374 } else 1375 #endif 1376 { 1377 flags = MSG_NOSIGNAL; 1378 } 1379 1380 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); 1381 if (iovcnt == 0) { 1382 return 0; 1383 } 1384 1385 #ifdef SPDK_ZEROCOPY 1386 is_zcopy = flags & MSG_ZEROCOPY; 1387 #endif 1388 1389 /* Perform the vectored write */ 1390 msg.msg_iov = iovs; 1391 msg.msg_iovlen = iovcnt; 1392 1393 if (psock->ssl) { 1394 rc = SSL_writev(psock->ssl, iovs, iovcnt); 1395 } else { 1396 rc = sendmsg(psock->fd, &msg, flags); 1397 } 1398 if (rc <= 0) { 1399 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 1400 errno = EAGAIN; 1401 } 1402 return -1; 1403 } 1404 1405 sent = rc; 1406 1407 if (is_zcopy) { 1408 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1409 * req->internal.offset, so sendmsg_idx should not be zero */ 1410 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 1411 psock->sendmsg_idx = 1; 1412 } else { 1413 psock->sendmsg_idx++; 1414 } 1415 } 1416 1417 /* Consume the requests that were actually written */ 1418 req = TAILQ_FIRST(&sock->queued_reqs); 1419 while (req) { 1420 offset = req->internal.offset; 1421 1422 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1423 req->internal.is_zcopy = is_zcopy; 1424 1425 for (i = 0; i < req->iovcnt; i++) { 1426 /* Advance by the offset first */ 1427 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1428 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1429 continue; 1430 } 1431 1432 /* Calculate the remaining length of this element */ 1433 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1434 1435 if (len > (size_t)rc) { 1436 /* This element was partially sent. */ 1437 req->internal.offset += rc; 1438 return sent; 1439 } 1440 1441 offset = 0; 1442 req->internal.offset += len; 1443 rc -= len; 1444 } 1445 1446 /* Handled a full request. */ 1447 spdk_sock_request_pend(sock, req); 1448 1449 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { 1450 /* The sendmsg syscall above isn't currently asynchronous, 1451 * so it's already done. */ 1452 retval = spdk_sock_request_put(sock, req, 0); 1453 if (retval) { 1454 break; 1455 } 1456 } else { 1457 /* Re-use the offset field to hold the sendmsg call index. The 1458 * index is 0 based, so subtract one here because we've already 1459 * incremented above. */ 1460 req->internal.offset = psock->sendmsg_idx - 1; 1461 } 1462 1463 if (rc == 0) { 1464 break; 1465 } 1466 1467 req = TAILQ_FIRST(&sock->queued_reqs); 1468 } 1469 1470 return sent; 1471 } 1472 1473 static int 1474 posix_sock_flush(struct spdk_sock *sock) 1475 { 1476 #ifdef SPDK_ZEROCOPY 1477 struct spdk_posix_sock *psock = __posix_sock(sock); 1478 1479 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 1480 _sock_check_zcopy(sock); 1481 } 1482 #endif 1483 1484 return _sock_flush(sock); 1485 } 1486 1487 static ssize_t 1488 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 1489 { 1490 struct iovec siov[2]; 1491 int sbytes; 1492 ssize_t bytes; 1493 struct spdk_posix_sock_group_impl *group; 1494 1495 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 1496 if (sbytes < 0) { 1497 errno = EINVAL; 1498 return -1; 1499 } else if (sbytes == 0) { 1500 errno = EAGAIN; 1501 return -1; 1502 } 1503 1504 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 1505 1506 if (bytes == 0) { 1507 /* The only way this happens is if diov is 0 length */ 1508 errno = EINVAL; 1509 return -1; 1510 } 1511 1512 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 1513 1514 /* If we drained the pipe, mark it appropriately */ 1515 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1516 assert(sock->pipe_has_data == true); 1517 1518 group = __posix_group_impl(sock->base.group_impl); 1519 if (group && !sock->socket_has_data) { 1520 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1521 } 1522 1523 sock->pipe_has_data = false; 1524 } 1525 1526 return bytes; 1527 } 1528 1529 static inline ssize_t 1530 posix_sock_read(struct spdk_posix_sock *sock) 1531 { 1532 struct iovec iov[2]; 1533 int bytes_avail, bytes_recvd; 1534 struct spdk_posix_sock_group_impl *group; 1535 1536 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 1537 1538 if (bytes_avail <= 0) { 1539 return bytes_avail; 1540 } 1541 1542 if (sock->ssl) { 1543 bytes_recvd = SSL_readv(sock->ssl, iov, 2); 1544 } else { 1545 bytes_recvd = readv(sock->fd, iov, 2); 1546 } 1547 1548 assert(sock->pipe_has_data == false); 1549 1550 if (bytes_recvd <= 0) { 1551 /* Errors count as draining the socket data */ 1552 if (sock->base.group_impl && sock->socket_has_data) { 1553 group = __posix_group_impl(sock->base.group_impl); 1554 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1555 } 1556 1557 sock->socket_has_data = false; 1558 1559 return bytes_recvd; 1560 } 1561 1562 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 1563 1564 #if DEBUG 1565 if (sock->base.group_impl) { 1566 assert(sock->socket_has_data == true); 1567 } 1568 #endif 1569 1570 sock->pipe_has_data = true; 1571 if (bytes_recvd < bytes_avail) { 1572 /* We drained the kernel socket entirely. */ 1573 sock->socket_has_data = false; 1574 } 1575 1576 return bytes_recvd; 1577 } 1578 1579 static ssize_t 1580 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1581 { 1582 struct spdk_posix_sock *sock = __posix_sock(_sock); 1583 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1584 int rc, i; 1585 size_t len; 1586 1587 if (sock->recv_pipe == NULL) { 1588 assert(sock->pipe_has_data == false); 1589 if (group && sock->socket_has_data) { 1590 sock->socket_has_data = false; 1591 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1592 } 1593 if (sock->ssl) { 1594 return SSL_readv(sock->ssl, iov, iovcnt); 1595 } else { 1596 return readv(sock->fd, iov, iovcnt); 1597 } 1598 } 1599 1600 /* If the socket is not in a group, we must assume it always has 1601 * data waiting for us because it is not epolled */ 1602 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 1603 /* If the user is receiving a sufficiently large amount of data, 1604 * receive directly to their buffers. */ 1605 len = 0; 1606 for (i = 0; i < iovcnt; i++) { 1607 len += iov[i].iov_len; 1608 } 1609 1610 if (len >= MIN_SOCK_PIPE_SIZE) { 1611 /* TODO: Should this detect if kernel socket is drained? */ 1612 if (sock->ssl) { 1613 return SSL_readv(sock->ssl, iov, iovcnt); 1614 } else { 1615 return readv(sock->fd, iov, iovcnt); 1616 } 1617 } 1618 1619 /* Otherwise, do a big read into our pipe */ 1620 rc = posix_sock_read(sock); 1621 if (rc <= 0) { 1622 return rc; 1623 } 1624 } 1625 1626 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 1627 } 1628 1629 static ssize_t 1630 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1631 { 1632 struct iovec iov[1]; 1633 1634 iov[0].iov_base = buf; 1635 iov[0].iov_len = len; 1636 1637 return posix_sock_readv(sock, iov, 1); 1638 } 1639 1640 static ssize_t 1641 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1642 { 1643 struct spdk_posix_sock *sock = __posix_sock(_sock); 1644 int rc; 1645 1646 /* In order to process a writev, we need to flush any asynchronous writes 1647 * first. */ 1648 rc = _sock_flush(_sock); 1649 if (rc < 0) { 1650 return rc; 1651 } 1652 1653 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1654 /* We weren't able to flush all requests */ 1655 errno = EAGAIN; 1656 return -1; 1657 } 1658 1659 if (sock->ssl) { 1660 return SSL_writev(sock->ssl, iov, iovcnt); 1661 } else { 1662 return writev(sock->fd, iov, iovcnt); 1663 } 1664 } 1665 1666 static int 1667 posix_sock_recv_next(struct spdk_sock *_sock, void **buf, void **ctx) 1668 { 1669 struct spdk_posix_sock *sock = __posix_sock(_sock); 1670 struct iovec iov; 1671 ssize_t rc; 1672 1673 if (sock->recv_pipe != NULL) { 1674 errno = ENOTSUP; 1675 return -1; 1676 } 1677 1678 iov.iov_len = spdk_sock_group_get_buf(_sock->group_impl->group, &iov.iov_base, ctx); 1679 if (iov.iov_len == 0) { 1680 errno = ENOBUFS; 1681 return -1; 1682 } 1683 1684 rc = posix_sock_readv(_sock, &iov, 1); 1685 if (rc <= 0) { 1686 spdk_sock_group_provide_buf(_sock->group_impl->group, iov.iov_base, iov.iov_len, *ctx); 1687 return rc; 1688 } 1689 1690 *buf = iov.iov_base; 1691 1692 return rc; 1693 } 1694 1695 static void 1696 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1697 { 1698 int rc; 1699 1700 spdk_sock_request_queue(sock, req); 1701 1702 /* If there are a sufficient number queued, just flush them out immediately. */ 1703 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1704 rc = _sock_flush(sock); 1705 if (rc < 0 && errno != EAGAIN) { 1706 spdk_sock_abort_requests(sock); 1707 } 1708 } 1709 } 1710 1711 static int 1712 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1713 { 1714 struct spdk_posix_sock *sock = __posix_sock(_sock); 1715 int val; 1716 int rc; 1717 1718 assert(sock != NULL); 1719 1720 val = nbytes; 1721 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1722 if (rc != 0) { 1723 return -1; 1724 } 1725 return 0; 1726 } 1727 1728 static bool 1729 posix_sock_is_ipv6(struct spdk_sock *_sock) 1730 { 1731 struct spdk_posix_sock *sock = __posix_sock(_sock); 1732 struct sockaddr_storage sa; 1733 socklen_t salen; 1734 int rc; 1735 1736 assert(sock != NULL); 1737 1738 memset(&sa, 0, sizeof sa); 1739 salen = sizeof sa; 1740 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1741 if (rc != 0) { 1742 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1743 return false; 1744 } 1745 1746 return (sa.ss_family == AF_INET6); 1747 } 1748 1749 static bool 1750 posix_sock_is_ipv4(struct spdk_sock *_sock) 1751 { 1752 struct spdk_posix_sock *sock = __posix_sock(_sock); 1753 struct sockaddr_storage sa; 1754 socklen_t salen; 1755 int rc; 1756 1757 assert(sock != NULL); 1758 1759 memset(&sa, 0, sizeof sa); 1760 salen = sizeof sa; 1761 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1762 if (rc != 0) { 1763 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1764 return false; 1765 } 1766 1767 return (sa.ss_family == AF_INET); 1768 } 1769 1770 static bool 1771 posix_sock_is_connected(struct spdk_sock *_sock) 1772 { 1773 struct spdk_posix_sock *sock = __posix_sock(_sock); 1774 uint8_t byte; 1775 int rc; 1776 1777 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1778 if (rc == 0) { 1779 return false; 1780 } 1781 1782 if (rc < 0) { 1783 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1784 return true; 1785 } 1786 1787 return false; 1788 } 1789 1790 return true; 1791 } 1792 1793 static struct spdk_sock_group_impl * 1794 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1795 { 1796 struct spdk_posix_sock *sock = __posix_sock(_sock); 1797 struct spdk_sock_group_impl *group_impl; 1798 1799 if (sock->placement_id != -1) { 1800 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint); 1801 return group_impl; 1802 } 1803 1804 return NULL; 1805 } 1806 1807 static struct spdk_sock_group_impl * 1808 _sock_group_impl_create(uint32_t enable_placement_id) 1809 { 1810 struct spdk_posix_sock_group_impl *group_impl; 1811 int fd; 1812 1813 #if defined(SPDK_EPOLL) 1814 fd = epoll_create1(0); 1815 #elif defined(SPDK_KEVENT) 1816 fd = kqueue(); 1817 #endif 1818 if (fd == -1) { 1819 return NULL; 1820 } 1821 1822 group_impl = calloc(1, sizeof(*group_impl)); 1823 if (group_impl == NULL) { 1824 SPDK_ERRLOG("group_impl allocation failed\n"); 1825 close(fd); 1826 return NULL; 1827 } 1828 1829 group_impl->fd = fd; 1830 TAILQ_INIT(&group_impl->socks_with_data); 1831 group_impl->placement_id = -1; 1832 1833 if (enable_placement_id == PLACEMENT_CPU) { 1834 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1835 group_impl->placement_id = spdk_env_get_current_core(); 1836 } 1837 1838 return &group_impl->base; 1839 } 1840 1841 static struct spdk_sock_group_impl * 1842 posix_sock_group_impl_create(void) 1843 { 1844 return _sock_group_impl_create(g_posix_impl_opts.enable_placement_id); 1845 } 1846 1847 static struct spdk_sock_group_impl * 1848 ssl_sock_group_impl_create(void) 1849 { 1850 return _sock_group_impl_create(g_ssl_impl_opts.enable_placement_id); 1851 } 1852 1853 static void 1854 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1855 int placement_id) 1856 { 1857 #if defined(SO_MARK) 1858 int rc; 1859 1860 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1861 &placement_id, sizeof(placement_id)); 1862 if (rc != 0) { 1863 /* Not fatal */ 1864 SPDK_ERRLOG("Error setting SO_MARK\n"); 1865 return; 1866 } 1867 1868 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1869 if (rc != 0) { 1870 /* Not fatal */ 1871 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1872 return; 1873 } 1874 1875 sock->placement_id = placement_id; 1876 #endif 1877 } 1878 1879 static void 1880 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1881 { 1882 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1883 1884 if (group->placement_id == -1) { 1885 group->placement_id = spdk_sock_map_find_free(&g_map); 1886 1887 /* If a free placement id is found, update existing sockets in this group */ 1888 if (group->placement_id != -1) { 1889 struct spdk_sock *sock, *tmp; 1890 1891 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1892 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1893 } 1894 } 1895 } 1896 1897 if (group->placement_id != -1) { 1898 /* 1899 * group placement id is already determined for this poll group. 1900 * Mark socket with group's placement id. 1901 */ 1902 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1903 } 1904 } 1905 1906 static int 1907 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1908 { 1909 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1910 struct spdk_posix_sock *sock = __posix_sock(_sock); 1911 int rc; 1912 1913 #if defined(SPDK_EPOLL) 1914 struct epoll_event event; 1915 1916 memset(&event, 0, sizeof(event)); 1917 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1918 event.events = EPOLLIN | EPOLLERR; 1919 event.data.ptr = sock; 1920 1921 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1922 #elif defined(SPDK_KEVENT) 1923 struct kevent event; 1924 struct timespec ts = {0}; 1925 1926 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1927 1928 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1929 #endif 1930 1931 if (rc != 0) { 1932 return rc; 1933 } 1934 1935 /* switched from another polling group due to scheduling */ 1936 if (spdk_unlikely(sock->recv_pipe != NULL && 1937 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1938 sock->pipe_has_data = true; 1939 sock->socket_has_data = false; 1940 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1941 } 1942 1943 if (_sock->impl_opts.enable_placement_id == PLACEMENT_MARK) { 1944 posix_sock_update_mark(_group, _sock); 1945 } else if (sock->placement_id != -1) { 1946 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1947 if (rc != 0) { 1948 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1949 /* Do not treat this as an error. The system will continue running. */ 1950 } 1951 } 1952 1953 return rc; 1954 } 1955 1956 static int 1957 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1958 { 1959 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1960 struct spdk_posix_sock *sock = __posix_sock(_sock); 1961 int rc; 1962 1963 if (sock->pipe_has_data || sock->socket_has_data) { 1964 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1965 sock->pipe_has_data = false; 1966 sock->socket_has_data = false; 1967 } 1968 1969 if (sock->placement_id != -1) { 1970 spdk_sock_map_release(&g_map, sock->placement_id); 1971 } 1972 1973 #if defined(SPDK_EPOLL) 1974 struct epoll_event event; 1975 1976 /* Event parameter is ignored but some old kernel version still require it. */ 1977 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1978 #elif defined(SPDK_KEVENT) 1979 struct kevent event; 1980 struct timespec ts = {0}; 1981 1982 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1983 1984 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1985 if (rc == 0 && event.flags & EV_ERROR) { 1986 rc = -1; 1987 errno = event.data; 1988 } 1989 #endif 1990 1991 spdk_sock_abort_requests(_sock); 1992 1993 return rc; 1994 } 1995 1996 static int 1997 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1998 struct spdk_sock **socks) 1999 { 2000 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2001 struct spdk_sock *sock, *tmp; 2002 int num_events, i, rc; 2003 struct spdk_posix_sock *psock, *ptmp; 2004 #if defined(SPDK_EPOLL) 2005 struct epoll_event events[MAX_EVENTS_PER_POLL]; 2006 #elif defined(SPDK_KEVENT) 2007 struct kevent events[MAX_EVENTS_PER_POLL]; 2008 struct timespec ts = {0}; 2009 #endif 2010 2011 #ifdef SPDK_ZEROCOPY 2012 /* When all of the following conditions are met 2013 * - non-blocking socket 2014 * - zero copy is enabled 2015 * - interrupts suppressed (i.e. busy polling) 2016 * - the NIC tx queue is full at the time sendmsg() is called 2017 * - epoll_wait determines there is an EPOLLIN event for the socket 2018 * then we can get into a situation where data we've sent is queued 2019 * up in the kernel network stack, but interrupts have been suppressed 2020 * because other traffic is flowing so the kernel misses the signal 2021 * to flush the software tx queue. If there wasn't incoming data 2022 * pending on the socket, then epoll_wait would have been sufficient 2023 * to kick off the send operation, but since there is a pending event 2024 * epoll_wait does not trigger the necessary operation. 2025 * 2026 * We deal with this by checking for all of the above conditions and 2027 * additionally looking for EPOLLIN events that were not consumed from 2028 * the last poll loop. We take this to mean that the upper layer is 2029 * unable to consume them because it is blocked waiting for resources 2030 * to free up, and those resources are most likely freed in response 2031 * to a pending asynchronous write completing. 2032 * 2033 * Additionally, sockets that have the same placement_id actually share 2034 * an underlying hardware queue. That means polling one of them is 2035 * equivalent to polling all of them. As a quick mechanism to avoid 2036 * making extra poll() calls, stash the last placement_id during the loop 2037 * and only poll if it's not the same. The overwhelmingly common case 2038 * is that all sockets in this list have the same placement_id because 2039 * SPDK is intentionally grouping sockets by that value, so even 2040 * though this won't stop all extra calls to poll(), it's very fast 2041 * and will catch all of them in practice. 2042 */ 2043 int last_placement_id = -1; 2044 2045 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 2046 if (psock->zcopy && psock->placement_id >= 0 && 2047 psock->placement_id != last_placement_id) { 2048 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 2049 2050 poll(&pfd, 1, 0); 2051 last_placement_id = psock->placement_id; 2052 } 2053 } 2054 #endif 2055 2056 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 2057 * a completion callback could remove the sock from the 2058 * group. */ 2059 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 2060 rc = _sock_flush(sock); 2061 if (rc < 0 && errno != EAGAIN) { 2062 spdk_sock_abort_requests(sock); 2063 } 2064 } 2065 2066 assert(max_events > 0); 2067 2068 #if defined(SPDK_EPOLL) 2069 num_events = epoll_wait(group->fd, events, max_events, 0); 2070 #elif defined(SPDK_KEVENT) 2071 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 2072 #endif 2073 2074 if (num_events == -1) { 2075 return -1; 2076 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 2077 sock = TAILQ_FIRST(&_group->socks); 2078 psock = __posix_sock(sock); 2079 /* poll() is called here to busy poll the queue associated with 2080 * first socket in list and potentially reap incoming data. 2081 */ 2082 if (sock->opts.priority) { 2083 struct pollfd pfd = {0, 0, 0}; 2084 2085 pfd.fd = psock->fd; 2086 pfd.events = POLLIN | POLLERR; 2087 poll(&pfd, 1, 0); 2088 } 2089 } 2090 2091 for (i = 0; i < num_events; i++) { 2092 #if defined(SPDK_EPOLL) 2093 sock = events[i].data.ptr; 2094 psock = __posix_sock(sock); 2095 2096 #ifdef SPDK_ZEROCOPY 2097 if (events[i].events & EPOLLERR) { 2098 rc = _sock_check_zcopy(sock); 2099 /* If the socket was closed or removed from 2100 * the group in response to a send ack, don't 2101 * add it to the array here. */ 2102 if (rc || sock->cb_fn == NULL) { 2103 continue; 2104 } 2105 } 2106 #endif 2107 if ((events[i].events & EPOLLIN) == 0) { 2108 continue; 2109 } 2110 2111 #elif defined(SPDK_KEVENT) 2112 sock = events[i].udata; 2113 psock = __posix_sock(sock); 2114 #endif 2115 2116 /* If the socket is not already in the list, add it now */ 2117 if (!psock->socket_has_data && !psock->pipe_has_data) { 2118 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 2119 } 2120 psock->socket_has_data = true; 2121 } 2122 2123 num_events = 0; 2124 2125 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 2126 if (num_events == max_events) { 2127 break; 2128 } 2129 2130 /* If the socket's cb_fn is NULL, just remove it from the 2131 * list and do not add it to socks array */ 2132 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 2133 psock->socket_has_data = false; 2134 psock->pipe_has_data = false; 2135 TAILQ_REMOVE(&group->socks_with_data, psock, link); 2136 continue; 2137 } 2138 2139 socks[num_events++] = &psock->base; 2140 } 2141 2142 /* Cycle the has_data list so that each time we poll things aren't 2143 * in the same order. Say we have 6 sockets in the list, named as follows: 2144 * A B C D E F 2145 * And all 6 sockets had epoll events, but max_events is only 3. That means 2146 * psock currently points at D. We want to rearrange the list to the following: 2147 * D E F A B C 2148 * 2149 * The variables below are named according to this example to make it easier to 2150 * follow the swaps. 2151 */ 2152 if (psock != NULL) { 2153 struct spdk_posix_sock *pa, *pc, *pd, *pf; 2154 2155 /* Capture pointers to the elements we need */ 2156 pd = psock; 2157 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 2158 pa = TAILQ_FIRST(&group->socks_with_data); 2159 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 2160 2161 /* Break the link between C and D */ 2162 pc->link.tqe_next = NULL; 2163 2164 /* Connect F to A */ 2165 pf->link.tqe_next = pa; 2166 pa->link.tqe_prev = &pf->link.tqe_next; 2167 2168 /* Fix up the list first/last pointers */ 2169 group->socks_with_data.tqh_first = pd; 2170 group->socks_with_data.tqh_last = &pc->link.tqe_next; 2171 2172 /* D is in front of the list, make tqe prev pointer point to the head of list */ 2173 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 2174 } 2175 2176 return num_events; 2177 } 2178 2179 static int 2180 _sock_group_impl_close(struct spdk_sock_group_impl *_group, uint32_t enable_placement_id) 2181 { 2182 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2183 int rc; 2184 2185 if (enable_placement_id == PLACEMENT_CPU) { 2186 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 2187 } 2188 2189 rc = close(group->fd); 2190 free(group); 2191 return rc; 2192 } 2193 2194 static int 2195 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2196 { 2197 return _sock_group_impl_close(_group, g_posix_impl_opts.enable_placement_id); 2198 } 2199 2200 static int 2201 ssl_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2202 { 2203 return _sock_group_impl_close(_group, g_ssl_impl_opts.enable_placement_id); 2204 } 2205 2206 static struct spdk_net_impl g_posix_net_impl = { 2207 .name = "posix", 2208 .getaddr = posix_sock_getaddr, 2209 .connect = posix_sock_connect, 2210 .listen = posix_sock_listen, 2211 .accept = posix_sock_accept, 2212 .close = posix_sock_close, 2213 .recv = posix_sock_recv, 2214 .readv = posix_sock_readv, 2215 .writev = posix_sock_writev, 2216 .recv_next = posix_sock_recv_next, 2217 .writev_async = posix_sock_writev_async, 2218 .flush = posix_sock_flush, 2219 .set_recvlowat = posix_sock_set_recvlowat, 2220 .set_recvbuf = posix_sock_set_recvbuf, 2221 .set_sendbuf = posix_sock_set_sendbuf, 2222 .is_ipv6 = posix_sock_is_ipv6, 2223 .is_ipv4 = posix_sock_is_ipv4, 2224 .is_connected = posix_sock_is_connected, 2225 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2226 .group_impl_create = posix_sock_group_impl_create, 2227 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2228 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2229 .group_impl_poll = posix_sock_group_impl_poll, 2230 .group_impl_close = posix_sock_group_impl_close, 2231 .get_opts = posix_sock_impl_get_opts, 2232 .set_opts = posix_sock_impl_set_opts, 2233 }; 2234 2235 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY + 1); 2236 2237 static struct spdk_sock * 2238 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 2239 { 2240 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true); 2241 } 2242 2243 static struct spdk_sock * 2244 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 2245 { 2246 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true); 2247 } 2248 2249 static struct spdk_sock * 2250 ssl_sock_accept(struct spdk_sock *_sock) 2251 { 2252 return _posix_sock_accept(_sock, true); 2253 } 2254 2255 static struct spdk_net_impl g_ssl_net_impl = { 2256 .name = "ssl", 2257 .getaddr = posix_sock_getaddr, 2258 .connect = ssl_sock_connect, 2259 .listen = ssl_sock_listen, 2260 .accept = ssl_sock_accept, 2261 .close = posix_sock_close, 2262 .recv = posix_sock_recv, 2263 .readv = posix_sock_readv, 2264 .writev = posix_sock_writev, 2265 .recv_next = posix_sock_recv_next, 2266 .writev_async = posix_sock_writev_async, 2267 .flush = posix_sock_flush, 2268 .set_recvlowat = posix_sock_set_recvlowat, 2269 .set_recvbuf = posix_sock_set_recvbuf, 2270 .set_sendbuf = posix_sock_set_sendbuf, 2271 .is_ipv6 = posix_sock_is_ipv6, 2272 .is_ipv4 = posix_sock_is_ipv4, 2273 .is_connected = posix_sock_is_connected, 2274 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2275 .group_impl_create = ssl_sock_group_impl_create, 2276 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2277 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2278 .group_impl_poll = posix_sock_group_impl_poll, 2279 .group_impl_close = ssl_sock_group_impl_close, 2280 .get_opts = ssl_sock_impl_get_opts, 2281 .set_opts = ssl_sock_impl_set_opts, 2282 }; 2283 2284 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl, DEFAULT_SOCK_PRIORITY); 2285 SPDK_LOG_REGISTER_COMPONENT(sock_posix) 2286