1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. All rights reserved. 3 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #if defined(__FreeBSD__) 10 #include <sys/event.h> 11 #define SPDK_KEVENT 12 #else 13 #include <sys/epoll.h> 14 #define SPDK_EPOLL 15 #endif 16 17 #if defined(__linux__) 18 #include <linux/errqueue.h> 19 #endif 20 21 #include "spdk/env.h" 22 #include "spdk/log.h" 23 #include "spdk/pipe.h" 24 #include "spdk/sock.h" 25 #include "spdk/util.h" 26 #include "spdk/string.h" 27 #include "spdk_internal/sock.h" 28 #include "../sock_kernel.h" 29 30 #include "openssl/crypto.h" 31 #include "openssl/err.h" 32 #include "openssl/ssl.h" 33 34 #define MAX_TMPBUF 1024 35 #define PORTNUMLEN 32 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 struct spdk_posix_sock { 42 struct spdk_sock base; 43 int fd; 44 45 uint32_t sendmsg_idx; 46 47 struct spdk_pipe *recv_pipe; 48 int recv_buf_sz; 49 bool pipe_has_data; 50 bool socket_has_data; 51 bool zcopy; 52 53 int placement_id; 54 55 SSL_CTX *ctx; 56 SSL *ssl; 57 58 TAILQ_ENTRY(spdk_posix_sock) link; 59 }; 60 61 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 62 63 struct spdk_posix_sock_group_impl { 64 struct spdk_sock_group_impl base; 65 int fd; 66 struct spdk_has_data_list socks_with_data; 67 int placement_id; 68 struct spdk_pipe_group *pipe_group; 69 }; 70 71 static struct spdk_sock_impl_opts g_posix_impl_opts = { 72 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 73 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 74 .enable_recv_pipe = true, 75 .enable_quickack = false, 76 .enable_placement_id = PLACEMENT_NONE, 77 .enable_zerocopy_send_server = true, 78 .enable_zerocopy_send_client = false, 79 .zerocopy_threshold = 0, 80 .tls_version = 0, 81 .enable_ktls = false, 82 .psk_key = NULL, 83 .psk_key_size = 0, 84 .psk_identity = NULL, 85 .get_key = NULL, 86 .get_key_ctx = NULL, 87 .tls_cipher_suites = NULL 88 }; 89 90 static struct spdk_sock_impl_opts g_ssl_impl_opts = { 91 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 92 .send_buf_size = MIN_SO_SNDBUF_SIZE, 93 .enable_recv_pipe = true, 94 .enable_quickack = false, 95 .enable_placement_id = PLACEMENT_NONE, 96 .enable_zerocopy_send_server = true, 97 .enable_zerocopy_send_client = false, 98 .zerocopy_threshold = 0, 99 .tls_version = 0, 100 .enable_ktls = false, 101 .psk_key = NULL, 102 .psk_identity = NULL 103 }; 104 105 static struct spdk_sock_map g_map = { 106 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 107 .mtx = PTHREAD_MUTEX_INITIALIZER 108 }; 109 110 __attribute((destructor)) static void 111 posix_sock_map_cleanup(void) 112 { 113 spdk_sock_map_cleanup(&g_map); 114 } 115 116 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 117 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 118 119 static void 120 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 121 size_t len) 122 { 123 #define FIELD_OK(field) \ 124 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 125 126 #define SET_FIELD(field) \ 127 if (FIELD_OK(field)) { \ 128 dest->field = src->field; \ 129 } 130 131 SET_FIELD(recv_buf_size); 132 SET_FIELD(send_buf_size); 133 SET_FIELD(enable_recv_pipe); 134 SET_FIELD(enable_zerocopy_send); 135 SET_FIELD(enable_quickack); 136 SET_FIELD(enable_placement_id); 137 SET_FIELD(enable_zerocopy_send_server); 138 SET_FIELD(enable_zerocopy_send_client); 139 SET_FIELD(zerocopy_threshold); 140 SET_FIELD(tls_version); 141 SET_FIELD(enable_ktls); 142 SET_FIELD(psk_key); 143 SET_FIELD(psk_key_size); 144 SET_FIELD(psk_identity); 145 SET_FIELD(get_key); 146 SET_FIELD(get_key_ctx); 147 SET_FIELD(tls_cipher_suites); 148 149 #undef SET_FIELD 150 #undef FIELD_OK 151 } 152 153 static int 154 _sock_impl_get_opts(struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 155 size_t *len) 156 { 157 if (!opts || !len) { 158 errno = EINVAL; 159 return -1; 160 } 161 162 assert(sizeof(*opts) >= *len); 163 memset(opts, 0, *len); 164 165 posix_sock_copy_impl_opts(opts, impl_opts, *len); 166 *len = spdk_min(*len, sizeof(*impl_opts)); 167 168 return 0; 169 } 170 171 static int 172 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 173 { 174 return _sock_impl_get_opts(opts, &g_posix_impl_opts, len); 175 } 176 177 static int 178 ssl_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 179 { 180 return _sock_impl_get_opts(opts, &g_ssl_impl_opts, len); 181 } 182 183 static int 184 _sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 185 size_t len) 186 { 187 if (!opts) { 188 errno = EINVAL; 189 return -1; 190 } 191 192 assert(sizeof(*opts) >= len); 193 posix_sock_copy_impl_opts(impl_opts, opts, len); 194 195 return 0; 196 } 197 198 static int 199 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 200 { 201 return _sock_impl_set_opts(opts, &g_posix_impl_opts, len); 202 } 203 204 static int 205 ssl_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 206 { 207 return _sock_impl_set_opts(opts, &g_ssl_impl_opts, len); 208 } 209 210 static void 211 _opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest, 212 const struct spdk_sock_impl_opts *default_impl) 213 { 214 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 215 memcpy(dest, default_impl, sizeof(*dest)); 216 217 if (opts->impl_opts != NULL) { 218 assert(sizeof(*dest) >= opts->impl_opts_size); 219 posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 220 } 221 } 222 223 static int 224 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 225 char *caddr, int clen, uint16_t *cport) 226 { 227 struct spdk_posix_sock *sock = __posix_sock(_sock); 228 struct sockaddr_storage sa; 229 socklen_t salen; 230 int rc; 231 232 assert(sock != NULL); 233 234 memset(&sa, 0, sizeof sa); 235 salen = sizeof sa; 236 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 237 if (rc != 0) { 238 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 239 return -1; 240 } 241 242 switch (sa.ss_family) { 243 case AF_UNIX: 244 /* Acceptable connection types that don't have IPs */ 245 return 0; 246 case AF_INET: 247 case AF_INET6: 248 /* Code below will get IP addresses */ 249 break; 250 default: 251 /* Unsupported socket family */ 252 return -1; 253 } 254 255 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 256 if (rc != 0) { 257 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 258 return -1; 259 } 260 261 if (sport) { 262 if (sa.ss_family == AF_INET) { 263 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 264 } else if (sa.ss_family == AF_INET6) { 265 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 266 } 267 } 268 269 memset(&sa, 0, sizeof sa); 270 salen = sizeof sa; 271 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 272 if (rc != 0) { 273 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 274 return -1; 275 } 276 277 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 278 if (rc != 0) { 279 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 280 return -1; 281 } 282 283 if (cport) { 284 if (sa.ss_family == AF_INET) { 285 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 286 } else if (sa.ss_family == AF_INET6) { 287 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 288 } 289 } 290 291 return 0; 292 } 293 294 enum posix_sock_create_type { 295 SPDK_SOCK_CREATE_LISTEN, 296 SPDK_SOCK_CREATE_CONNECT, 297 }; 298 299 static int 300 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 301 { 302 uint8_t *new_buf, *old_buf; 303 struct spdk_pipe *new_pipe; 304 struct iovec siov[2]; 305 struct iovec diov[2]; 306 int sbytes; 307 ssize_t bytes; 308 int rc; 309 310 if (sock->recv_buf_sz == sz) { 311 return 0; 312 } 313 314 /* If the new size is 0, just free the pipe */ 315 if (sz == 0) { 316 old_buf = spdk_pipe_destroy(sock->recv_pipe); 317 free(old_buf); 318 sock->recv_pipe = NULL; 319 return 0; 320 } else if (sz < MIN_SOCK_PIPE_SIZE) { 321 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 322 return -1; 323 } 324 325 /* Round up to next 64 byte multiple */ 326 rc = posix_memalign((void **)&new_buf, 64, sz); 327 if (rc != 0) { 328 SPDK_ERRLOG("socket recv buf allocation failed\n"); 329 return -ENOMEM; 330 } 331 memset(new_buf, 0, sz); 332 333 new_pipe = spdk_pipe_create(new_buf, sz); 334 if (new_pipe == NULL) { 335 SPDK_ERRLOG("socket pipe allocation failed\n"); 336 free(new_buf); 337 return -ENOMEM; 338 } 339 340 if (sock->recv_pipe != NULL) { 341 /* Pull all of the data out of the old pipe */ 342 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 343 if (sbytes > sz) { 344 /* Too much data to fit into the new pipe size */ 345 old_buf = spdk_pipe_destroy(new_pipe); 346 free(old_buf); 347 return -EINVAL; 348 } 349 350 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 351 assert(sbytes == sz); 352 353 bytes = spdk_iovcpy(siov, 2, diov, 2); 354 spdk_pipe_writer_advance(new_pipe, bytes); 355 356 old_buf = spdk_pipe_destroy(sock->recv_pipe); 357 free(old_buf); 358 } 359 360 sock->recv_buf_sz = sz; 361 sock->recv_pipe = new_pipe; 362 363 if (sock->base.group_impl) { 364 struct spdk_posix_sock_group_impl *group; 365 366 group = __posix_group_impl(sock->base.group_impl); 367 spdk_pipe_group_add(group->pipe_group, sock->recv_pipe); 368 } 369 370 return 0; 371 } 372 373 static int 374 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 375 { 376 struct spdk_posix_sock *sock = __posix_sock(_sock); 377 int min_size; 378 int rc; 379 380 assert(sock != NULL); 381 382 if (_sock->impl_opts.enable_recv_pipe) { 383 rc = posix_sock_alloc_pipe(sock, sz); 384 if (rc) { 385 return rc; 386 } 387 } 388 389 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 390 * _sock->impl_opts.recv_buf_size. */ 391 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, _sock->impl_opts.recv_buf_size); 392 393 if (sz < min_size) { 394 sz = min_size; 395 } 396 397 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 398 if (rc < 0) { 399 return rc; 400 } 401 402 _sock->impl_opts.recv_buf_size = sz; 403 404 return 0; 405 } 406 407 static int 408 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 409 { 410 struct spdk_posix_sock *sock = __posix_sock(_sock); 411 int min_size; 412 int rc; 413 414 assert(sock != NULL); 415 416 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 417 * _sock->impl_opts.send_buf_size. */ 418 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, _sock->impl_opts.send_buf_size); 419 420 if (sz < min_size) { 421 sz = min_size; 422 } 423 424 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 425 if (rc < 0) { 426 return rc; 427 } 428 429 _sock->impl_opts.send_buf_size = sz; 430 431 return 0; 432 } 433 434 static void 435 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 436 { 437 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 438 int flag; 439 int rc; 440 #endif 441 442 #if defined(SPDK_ZEROCOPY) 443 flag = 1; 444 445 if (enable_zero_copy) { 446 /* Try to turn on zero copy sends */ 447 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 448 if (rc == 0) { 449 sock->zcopy = true; 450 } 451 } 452 #endif 453 454 #if defined(__linux__) 455 flag = 1; 456 457 if (sock->base.impl_opts.enable_quickack) { 458 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 459 if (rc != 0) { 460 SPDK_ERRLOG("quickack was failed to set\n"); 461 } 462 } 463 464 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 465 &sock->placement_id); 466 467 if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) { 468 /* Save placement_id */ 469 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 470 } 471 #endif 472 } 473 474 static struct spdk_posix_sock * 475 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 476 { 477 struct spdk_posix_sock *sock; 478 479 sock = calloc(1, sizeof(*sock)); 480 if (sock == NULL) { 481 SPDK_ERRLOG("sock allocation failed\n"); 482 return NULL; 483 } 484 485 sock->fd = fd; 486 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 487 posix_sock_init(sock, enable_zero_copy); 488 489 return sock; 490 } 491 492 static int 493 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts, 494 struct spdk_sock_impl_opts *impl_opts) 495 { 496 int fd; 497 int val = 1; 498 int rc, sz; 499 #if defined(__linux__) 500 int to; 501 #endif 502 503 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 504 if (fd < 0) { 505 /* error */ 506 return -1; 507 } 508 509 sz = impl_opts->recv_buf_size; 510 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 511 if (rc) { 512 /* Not fatal */ 513 } 514 515 sz = impl_opts->send_buf_size; 516 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 517 if (rc) { 518 /* Not fatal */ 519 } 520 521 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 522 if (rc != 0) { 523 close(fd); 524 /* error */ 525 return -1; 526 } 527 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 528 if (rc != 0) { 529 close(fd); 530 /* error */ 531 return -1; 532 } 533 534 #if defined(SO_PRIORITY) 535 if (opts->priority) { 536 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 537 if (rc != 0) { 538 close(fd); 539 /* error */ 540 return -1; 541 } 542 } 543 #endif 544 545 if (res->ai_family == AF_INET6) { 546 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 547 if (rc != 0) { 548 close(fd); 549 /* error */ 550 return -1; 551 } 552 } 553 554 if (opts->ack_timeout) { 555 #if defined(__linux__) 556 to = opts->ack_timeout; 557 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to)); 558 if (rc != 0) { 559 close(fd); 560 /* error */ 561 return -1; 562 } 563 #else 564 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 565 #endif 566 } 567 568 return fd; 569 } 570 571 static int 572 posix_sock_psk_find_session_server_cb(SSL *ssl, const unsigned char *identity, 573 size_t identity_len, SSL_SESSION **sess) 574 { 575 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 576 uint8_t key[SSL_MAX_MASTER_KEY_LENGTH] = {}; 577 int keylen; 578 int rc, i; 579 STACK_OF(SSL_CIPHER) *ciphers; 580 const SSL_CIPHER *cipher; 581 const char *cipher_name; 582 const char *user_cipher = NULL; 583 bool found = false; 584 585 if (impl_opts->get_key) { 586 rc = impl_opts->get_key(key, sizeof(key), &user_cipher, identity, impl_opts->get_key_ctx); 587 if (rc < 0) { 588 SPDK_ERRLOG("Unable to find PSK for identity: %s\n", identity); 589 return 0; 590 } 591 keylen = rc; 592 } else { 593 if (impl_opts->psk_key == NULL) { 594 SPDK_ERRLOG("PSK is not set\n"); 595 return 0; 596 } 597 598 SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(impl_opts->psk_identity)); 599 if (strcmp(impl_opts->psk_identity, identity) != 0) { 600 SPDK_ERRLOG("Unknown Client's PSK ID\n"); 601 return 0; 602 } 603 keylen = impl_opts->psk_key_size; 604 605 memcpy(key, impl_opts->psk_key, keylen); 606 user_cipher = impl_opts->tls_cipher_suites; 607 } 608 609 if (user_cipher == NULL) { 610 SPDK_ERRLOG("Cipher suite not set\n"); 611 return 0; 612 } 613 614 *sess = SSL_SESSION_new(); 615 if (*sess == NULL) { 616 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 617 return 0; 618 } 619 620 ciphers = SSL_get_ciphers(ssl); 621 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 622 cipher = sk_SSL_CIPHER_value(ciphers, i); 623 cipher_name = SSL_CIPHER_get_name(cipher); 624 625 if (strcmp(user_cipher, cipher_name) == 0) { 626 rc = SSL_SESSION_set_cipher(*sess, cipher); 627 if (rc != 1) { 628 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 629 goto err; 630 } 631 found = true; 632 break; 633 } 634 } 635 if (found == false) { 636 SPDK_ERRLOG("No suitable cipher found\n"); 637 goto err; 638 } 639 640 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 641 642 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 643 if (rc != 1) { 644 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 645 goto err; 646 } 647 648 rc = SSL_SESSION_set1_master_key(*sess, key, keylen); 649 if (rc != 1) { 650 SPDK_ERRLOG("Unable to set PSK for session\n"); 651 goto err; 652 } 653 654 return 1; 655 656 err: 657 SSL_SESSION_free(*sess); 658 *sess = NULL; 659 return 0; 660 } 661 662 static int 663 posix_sock_psk_use_session_client_cb(SSL *ssl, const EVP_MD *md, const unsigned char **identity, 664 size_t *identity_len, SSL_SESSION **sess) 665 { 666 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 667 int rc, i; 668 STACK_OF(SSL_CIPHER) *ciphers; 669 const SSL_CIPHER *cipher; 670 const char *cipher_name; 671 long keylen; 672 bool found = false; 673 674 if (impl_opts->psk_key == NULL) { 675 SPDK_ERRLOG("PSK is not set\n"); 676 return 0; 677 } 678 if (impl_opts->psk_key_size > SSL_MAX_MASTER_KEY_LENGTH) { 679 SPDK_ERRLOG("PSK too long\n"); 680 return 0; 681 } 682 keylen = impl_opts->psk_key_size; 683 684 if (impl_opts->tls_cipher_suites == NULL) { 685 SPDK_ERRLOG("Cipher suite not set\n"); 686 return 0; 687 } 688 *sess = SSL_SESSION_new(); 689 if (*sess == NULL) { 690 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 691 return 0; 692 } 693 694 ciphers = SSL_get_ciphers(ssl); 695 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 696 cipher = sk_SSL_CIPHER_value(ciphers, i); 697 cipher_name = SSL_CIPHER_get_name(cipher); 698 699 if (strcmp(impl_opts->tls_cipher_suites, cipher_name) == 0) { 700 rc = SSL_SESSION_set_cipher(*sess, cipher); 701 if (rc != 1) { 702 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 703 goto err; 704 } 705 found = true; 706 break; 707 } 708 } 709 if (found == false) { 710 SPDK_ERRLOG("No suitable cipher found\n"); 711 goto err; 712 } 713 714 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 715 716 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 717 if (rc != 1) { 718 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 719 goto err; 720 } 721 722 rc = SSL_SESSION_set1_master_key(*sess, impl_opts->psk_key, keylen); 723 if (rc != 1) { 724 SPDK_ERRLOG("Unable to set PSK for session\n"); 725 goto err; 726 } 727 728 *identity_len = strlen(impl_opts->psk_identity); 729 *identity = impl_opts->psk_identity; 730 731 return 1; 732 733 err: 734 SSL_SESSION_free(*sess); 735 *sess = NULL; 736 return 0; 737 } 738 739 static SSL_CTX * 740 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts, 741 struct spdk_sock_impl_opts *impl_opts) 742 { 743 SSL_CTX *ctx; 744 int tls_version = 0; 745 bool ktls_enabled = false; 746 #ifdef SSL_OP_ENABLE_KTLS 747 long options; 748 #endif 749 750 SSL_library_init(); 751 OpenSSL_add_all_algorithms(); 752 SSL_load_error_strings(); 753 /* Produce a SSL CTX in SSL V2 and V3 standards compliant way */ 754 ctx = SSL_CTX_new(method); 755 if (!ctx) { 756 SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 757 return NULL; 758 } 759 SPDK_DEBUGLOG(sock_posix, "SSL context created\n"); 760 761 switch (impl_opts->tls_version) { 762 case 0: 763 /* auto-negotioation */ 764 break; 765 case SPDK_TLS_VERSION_1_3: 766 tls_version = TLS1_3_VERSION; 767 break; 768 default: 769 SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version); 770 goto err; 771 } 772 773 if (tls_version) { 774 SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version, 775 tls_version); 776 if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) { 777 SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 778 goto err; 779 } 780 if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) { 781 SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 782 goto err; 783 } 784 } 785 if (impl_opts->enable_ktls) { 786 SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n"); 787 #ifdef SSL_OP_ENABLE_KTLS 788 options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS); 789 ktls_enabled = options & SSL_OP_ENABLE_KTLS; 790 #else 791 ktls_enabled = false; 792 #endif 793 if (!ktls_enabled) { 794 SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n"); 795 goto err; 796 } 797 } 798 799 /* SSL_CTX_set_ciphersuites() return 1 if the requested 800 * cipher suite list was configured, and 0 otherwise. */ 801 if (impl_opts->tls_cipher_suites != NULL && 802 SSL_CTX_set_ciphersuites(ctx, impl_opts->tls_cipher_suites) != 1) { 803 SPDK_ERRLOG("Unable to set TLS cipher suites for SSL'\n"); 804 goto err; 805 } 806 807 return ctx; 808 809 err: 810 SSL_CTX_free(ctx); 811 return NULL; 812 } 813 814 static SSL * 815 ssl_sock_setup_connect(SSL_CTX *ctx, int fd) 816 { 817 SSL *ssl; 818 819 ssl = SSL_new(ctx); 820 if (!ssl) { 821 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 822 return NULL; 823 } 824 SSL_set_fd(ssl, fd); 825 SSL_set_connect_state(ssl); 826 SSL_set_psk_use_session_callback(ssl, posix_sock_psk_use_session_client_cb); 827 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 828 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 829 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 830 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 831 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 832 return ssl; 833 } 834 835 static SSL * 836 ssl_sock_setup_accept(SSL_CTX *ctx, int fd) 837 { 838 SSL *ssl; 839 840 ssl = SSL_new(ctx); 841 if (!ssl) { 842 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 843 return NULL; 844 } 845 SSL_set_fd(ssl, fd); 846 SSL_set_accept_state(ssl); 847 SSL_set_psk_find_session_callback(ssl, posix_sock_psk_find_session_server_cb); 848 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 849 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 850 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 851 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 852 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 853 return ssl; 854 } 855 856 static ssize_t 857 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt) 858 { 859 int i, rc = 0; 860 ssize_t total = 0; 861 862 for (i = 0; i < iovcnt; i++) { 863 rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len); 864 865 if (rc > 0) { 866 total += rc; 867 } 868 if (rc != (int)iov[i].iov_len) { 869 break; 870 } 871 } 872 if (total > 0) { 873 errno = 0; 874 return total; 875 } 876 switch (SSL_get_error(ssl, rc)) { 877 case SSL_ERROR_ZERO_RETURN: 878 errno = ENOTCONN; 879 return 0; 880 case SSL_ERROR_WANT_READ: 881 case SSL_ERROR_WANT_WRITE: 882 case SSL_ERROR_WANT_CONNECT: 883 case SSL_ERROR_WANT_ACCEPT: 884 case SSL_ERROR_WANT_X509_LOOKUP: 885 case SSL_ERROR_WANT_ASYNC: 886 case SSL_ERROR_WANT_ASYNC_JOB: 887 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 888 errno = EAGAIN; 889 return -1; 890 case SSL_ERROR_SYSCALL: 891 case SSL_ERROR_SSL: 892 errno = ENOTCONN; 893 return -1; 894 default: 895 errno = ENOTCONN; 896 return -1; 897 } 898 } 899 900 static ssize_t 901 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt) 902 { 903 int i, rc = 0; 904 ssize_t total = 0; 905 906 for (i = 0; i < iovcnt; i++) { 907 rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len); 908 909 if (rc > 0) { 910 total += rc; 911 } 912 if (rc != (int)iov[i].iov_len) { 913 break; 914 } 915 } 916 if (total > 0) { 917 errno = 0; 918 return total; 919 } 920 switch (SSL_get_error(ssl, rc)) { 921 case SSL_ERROR_ZERO_RETURN: 922 errno = ENOTCONN; 923 return 0; 924 case SSL_ERROR_WANT_READ: 925 case SSL_ERROR_WANT_WRITE: 926 case SSL_ERROR_WANT_CONNECT: 927 case SSL_ERROR_WANT_ACCEPT: 928 case SSL_ERROR_WANT_X509_LOOKUP: 929 case SSL_ERROR_WANT_ASYNC: 930 case SSL_ERROR_WANT_ASYNC_JOB: 931 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 932 errno = EAGAIN; 933 return -1; 934 case SSL_ERROR_SYSCALL: 935 case SSL_ERROR_SSL: 936 errno = ENOTCONN; 937 return -1; 938 default: 939 errno = ENOTCONN; 940 return -1; 941 } 942 } 943 944 static struct spdk_sock * 945 posix_sock_create(const char *ip, int port, 946 enum posix_sock_create_type type, 947 struct spdk_sock_opts *opts, 948 bool enable_ssl) 949 { 950 struct spdk_posix_sock *sock; 951 struct spdk_sock_impl_opts impl_opts; 952 char buf[MAX_TMPBUF]; 953 char portnum[PORTNUMLEN]; 954 char *p; 955 struct addrinfo hints, *res, *res0; 956 int fd, flag; 957 int rc; 958 bool enable_zcopy_user_opts = true; 959 bool enable_zcopy_impl_opts = true; 960 SSL_CTX *ctx = 0; 961 SSL *ssl = 0; 962 963 assert(opts != NULL); 964 if (enable_ssl) { 965 _opts_get_impl_opts(opts, &impl_opts, &g_ssl_impl_opts); 966 } else { 967 _opts_get_impl_opts(opts, &impl_opts, &g_posix_impl_opts); 968 } 969 970 if (ip == NULL) { 971 return NULL; 972 } 973 if (ip[0] == '[') { 974 snprintf(buf, sizeof(buf), "%s", ip + 1); 975 p = strchr(buf, ']'); 976 if (p != NULL) { 977 *p = '\0'; 978 } 979 ip = (const char *) &buf[0]; 980 } 981 982 snprintf(portnum, sizeof portnum, "%d", port); 983 memset(&hints, 0, sizeof hints); 984 hints.ai_family = PF_UNSPEC; 985 hints.ai_socktype = SOCK_STREAM; 986 hints.ai_flags = AI_NUMERICSERV; 987 hints.ai_flags |= AI_PASSIVE; 988 hints.ai_flags |= AI_NUMERICHOST; 989 rc = getaddrinfo(ip, portnum, &hints, &res0); 990 if (rc != 0) { 991 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 992 return NULL; 993 } 994 995 /* try listen */ 996 fd = -1; 997 for (res = res0; res != NULL; res = res->ai_next) { 998 retry: 999 fd = posix_fd_create(res, opts, &impl_opts); 1000 if (fd < 0) { 1001 continue; 1002 } 1003 if (type == SPDK_SOCK_CREATE_LISTEN) { 1004 rc = bind(fd, res->ai_addr, res->ai_addrlen); 1005 if (rc != 0) { 1006 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 1007 switch (errno) { 1008 case EINTR: 1009 /* interrupted? */ 1010 close(fd); 1011 goto retry; 1012 case EADDRNOTAVAIL: 1013 SPDK_ERRLOG("IP address %s not available. " 1014 "Verify IP address in config file " 1015 "and make sure setup script is " 1016 "run before starting spdk app.\n", ip); 1017 /* FALLTHROUGH */ 1018 default: 1019 /* try next family */ 1020 close(fd); 1021 fd = -1; 1022 continue; 1023 } 1024 } 1025 /* bind OK */ 1026 rc = listen(fd, 512); 1027 if (rc != 0) { 1028 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 1029 close(fd); 1030 fd = -1; 1031 break; 1032 } 1033 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 1034 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 1035 rc = connect(fd, res->ai_addr, res->ai_addrlen); 1036 if (rc != 0) { 1037 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 1038 /* try next family */ 1039 close(fd); 1040 fd = -1; 1041 continue; 1042 } 1043 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 1044 if (enable_ssl) { 1045 ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts); 1046 if (!ctx) { 1047 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1048 close(fd); 1049 fd = -1; 1050 break; 1051 } 1052 ssl = ssl_sock_setup_connect(ctx, fd); 1053 if (!ssl) { 1054 SPDK_ERRLOG("ssl_sock_setup_connect() failed, errno = %d\n", errno); 1055 close(fd); 1056 fd = -1; 1057 SSL_CTX_free(ctx); 1058 break; 1059 } 1060 } 1061 } 1062 1063 flag = fcntl(fd, F_GETFL); 1064 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 1065 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1066 SSL_free(ssl); 1067 SSL_CTX_free(ctx); 1068 close(fd); 1069 fd = -1; 1070 break; 1071 } 1072 break; 1073 } 1074 freeaddrinfo(res0); 1075 1076 if (fd < 0) { 1077 return NULL; 1078 } 1079 1080 /* Only enable zero copy for non-loopback and non-ssl sockets. */ 1081 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd) && !enable_ssl; 1082 1083 sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 1084 if (sock == NULL) { 1085 SPDK_ERRLOG("sock allocation failed\n"); 1086 SSL_free(ssl); 1087 SSL_CTX_free(ctx); 1088 close(fd); 1089 return NULL; 1090 } 1091 1092 if (ctx) { 1093 sock->ctx = ctx; 1094 } 1095 1096 if (ssl) { 1097 sock->ssl = ssl; 1098 SSL_set_app_data(ssl, &sock->base.impl_opts); 1099 } 1100 1101 return &sock->base; 1102 } 1103 1104 static struct spdk_sock * 1105 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 1106 { 1107 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false); 1108 } 1109 1110 static struct spdk_sock * 1111 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 1112 { 1113 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false); 1114 } 1115 1116 static struct spdk_sock * 1117 _posix_sock_accept(struct spdk_sock *_sock, bool enable_ssl) 1118 { 1119 struct spdk_posix_sock *sock = __posix_sock(_sock); 1120 struct sockaddr_storage sa; 1121 socklen_t salen; 1122 int rc, fd; 1123 struct spdk_posix_sock *new_sock; 1124 int flag; 1125 SSL_CTX *ctx = 0; 1126 SSL *ssl = 0; 1127 1128 memset(&sa, 0, sizeof(sa)); 1129 salen = sizeof(sa); 1130 1131 assert(sock != NULL); 1132 1133 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 1134 1135 if (rc == -1) { 1136 return NULL; 1137 } 1138 1139 fd = rc; 1140 1141 flag = fcntl(fd, F_GETFL); 1142 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 1143 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1144 close(fd); 1145 return NULL; 1146 } 1147 1148 #if defined(SO_PRIORITY) 1149 /* The priority is not inherited, so call this function again */ 1150 if (sock->base.opts.priority) { 1151 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 1152 if (rc != 0) { 1153 close(fd); 1154 return NULL; 1155 } 1156 } 1157 #endif 1158 1159 /* Establish SSL connection */ 1160 if (enable_ssl) { 1161 ctx = posix_sock_create_ssl_context(TLS_server_method(), &sock->base.opts, &sock->base.impl_opts); 1162 if (!ctx) { 1163 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1164 close(fd); 1165 return NULL; 1166 } 1167 ssl = ssl_sock_setup_accept(ctx, fd); 1168 if (!ssl) { 1169 SPDK_ERRLOG("ssl_sock_setup_accept() failed, errno = %d\n", errno); 1170 close(fd); 1171 SSL_CTX_free(ctx); 1172 return NULL; 1173 } 1174 } 1175 1176 /* Inherit the zero copy feature from the listen socket */ 1177 new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 1178 if (new_sock == NULL) { 1179 close(fd); 1180 SSL_free(ssl); 1181 SSL_CTX_free(ctx); 1182 return NULL; 1183 } 1184 1185 if (ctx) { 1186 new_sock->ctx = ctx; 1187 } 1188 1189 if (ssl) { 1190 new_sock->ssl = ssl; 1191 SSL_set_app_data(ssl, &new_sock->base.impl_opts); 1192 } 1193 1194 return &new_sock->base; 1195 } 1196 1197 static struct spdk_sock * 1198 posix_sock_accept(struct spdk_sock *_sock) 1199 { 1200 return _posix_sock_accept(_sock, false); 1201 } 1202 1203 static int 1204 posix_sock_close(struct spdk_sock *_sock) 1205 { 1206 struct spdk_posix_sock *sock = __posix_sock(_sock); 1207 void *pipe_buf; 1208 1209 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 1210 1211 if (sock->ssl != NULL) { 1212 SSL_shutdown(sock->ssl); 1213 } 1214 1215 /* If the socket fails to close, the best choice is to 1216 * leak the fd but continue to free the rest of the sock 1217 * memory. */ 1218 close(sock->fd); 1219 1220 SSL_free(sock->ssl); 1221 SSL_CTX_free(sock->ctx); 1222 1223 pipe_buf = spdk_pipe_destroy(sock->recv_pipe); 1224 free(pipe_buf); 1225 free(sock); 1226 1227 return 0; 1228 } 1229 1230 #ifdef SPDK_ZEROCOPY 1231 static int 1232 _sock_check_zcopy(struct spdk_sock *sock) 1233 { 1234 struct spdk_posix_sock *psock = __posix_sock(sock); 1235 struct msghdr msgh = {}; 1236 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 1237 ssize_t rc; 1238 struct sock_extended_err *serr; 1239 struct cmsghdr *cm; 1240 uint32_t idx; 1241 struct spdk_sock_request *req, *treq; 1242 bool found; 1243 1244 msgh.msg_control = buf; 1245 msgh.msg_controllen = sizeof(buf); 1246 1247 while (true) { 1248 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 1249 1250 if (rc < 0) { 1251 if (errno == EWOULDBLOCK || errno == EAGAIN) { 1252 return 0; 1253 } 1254 1255 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 1256 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 1257 } else { 1258 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1259 } 1260 return 0; 1261 } 1262 1263 cm = CMSG_FIRSTHDR(&msgh); 1264 if (!(cm && 1265 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1266 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 1267 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1268 return 0; 1269 } 1270 1271 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1272 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1273 SPDK_WARNLOG("Unexpected extended error origin\n"); 1274 return 0; 1275 } 1276 1277 /* Most of the time, the pending_reqs array is in the exact 1278 * order we need such that all of the requests to complete are 1279 * in order, in the front. It is guaranteed that all requests 1280 * belonging to the same sendmsg call are sequential, so once 1281 * we encounter one match we can stop looping as soon as a 1282 * non-match is found. 1283 */ 1284 idx = serr->ee_info; 1285 while (true) { 1286 found = false; 1287 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 1288 if (!req->internal.is_zcopy) { 1289 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1290 rc = spdk_sock_request_put(sock, req, 0); 1291 if (rc < 0) { 1292 return rc; 1293 } 1294 } else if (req->internal.offset == idx) { 1295 found = true; 1296 rc = spdk_sock_request_put(sock, req, 0); 1297 if (rc < 0) { 1298 return rc; 1299 } 1300 } else if (found) { 1301 break; 1302 } 1303 } 1304 1305 if (idx == serr->ee_data) { 1306 break; 1307 } 1308 1309 if (idx == UINT32_MAX) { 1310 idx = 0; 1311 } else { 1312 idx++; 1313 } 1314 } 1315 } 1316 1317 return 0; 1318 } 1319 #endif 1320 1321 static int 1322 _sock_flush(struct spdk_sock *sock) 1323 { 1324 struct spdk_posix_sock *psock = __posix_sock(sock); 1325 struct msghdr msg = {}; 1326 int flags; 1327 struct iovec iovs[IOV_BATCH_SIZE]; 1328 int iovcnt; 1329 int retval; 1330 struct spdk_sock_request *req; 1331 int i; 1332 ssize_t rc, sent; 1333 unsigned int offset; 1334 size_t len; 1335 bool is_zcopy = false; 1336 1337 /* Can't flush from within a callback or we end up with recursive calls */ 1338 if (sock->cb_cnt > 0) { 1339 errno = EAGAIN; 1340 return -1; 1341 } 1342 1343 #ifdef SPDK_ZEROCOPY 1344 if (psock->zcopy) { 1345 flags = MSG_ZEROCOPY | MSG_NOSIGNAL; 1346 } else 1347 #endif 1348 { 1349 flags = MSG_NOSIGNAL; 1350 } 1351 1352 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); 1353 if (iovcnt == 0) { 1354 return 0; 1355 } 1356 1357 #ifdef SPDK_ZEROCOPY 1358 is_zcopy = flags & MSG_ZEROCOPY; 1359 #endif 1360 1361 /* Perform the vectored write */ 1362 msg.msg_iov = iovs; 1363 msg.msg_iovlen = iovcnt; 1364 1365 if (psock->ssl) { 1366 rc = SSL_writev(psock->ssl, iovs, iovcnt); 1367 } else { 1368 rc = sendmsg(psock->fd, &msg, flags); 1369 } 1370 if (rc <= 0) { 1371 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 1372 errno = EAGAIN; 1373 } 1374 return -1; 1375 } 1376 1377 sent = rc; 1378 1379 if (is_zcopy) { 1380 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1381 * req->internal.offset, so sendmsg_idx should not be zero */ 1382 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 1383 psock->sendmsg_idx = 1; 1384 } else { 1385 psock->sendmsg_idx++; 1386 } 1387 } 1388 1389 /* Consume the requests that were actually written */ 1390 req = TAILQ_FIRST(&sock->queued_reqs); 1391 while (req) { 1392 offset = req->internal.offset; 1393 1394 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1395 req->internal.is_zcopy = is_zcopy; 1396 1397 for (i = 0; i < req->iovcnt; i++) { 1398 /* Advance by the offset first */ 1399 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1400 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1401 continue; 1402 } 1403 1404 /* Calculate the remaining length of this element */ 1405 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1406 1407 if (len > (size_t)rc) { 1408 /* This element was partially sent. */ 1409 req->internal.offset += rc; 1410 return sent; 1411 } 1412 1413 offset = 0; 1414 req->internal.offset += len; 1415 rc -= len; 1416 } 1417 1418 /* Handled a full request. */ 1419 spdk_sock_request_pend(sock, req); 1420 1421 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { 1422 /* The sendmsg syscall above isn't currently asynchronous, 1423 * so it's already done. */ 1424 retval = spdk_sock_request_put(sock, req, 0); 1425 if (retval) { 1426 break; 1427 } 1428 } else { 1429 /* Re-use the offset field to hold the sendmsg call index. The 1430 * index is 0 based, so subtract one here because we've already 1431 * incremented above. */ 1432 req->internal.offset = psock->sendmsg_idx - 1; 1433 } 1434 1435 if (rc == 0) { 1436 break; 1437 } 1438 1439 req = TAILQ_FIRST(&sock->queued_reqs); 1440 } 1441 1442 return sent; 1443 } 1444 1445 static int 1446 posix_sock_flush(struct spdk_sock *sock) 1447 { 1448 #ifdef SPDK_ZEROCOPY 1449 struct spdk_posix_sock *psock = __posix_sock(sock); 1450 1451 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 1452 _sock_check_zcopy(sock); 1453 } 1454 #endif 1455 1456 return _sock_flush(sock); 1457 } 1458 1459 static ssize_t 1460 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 1461 { 1462 struct iovec siov[2]; 1463 int sbytes; 1464 ssize_t bytes; 1465 struct spdk_posix_sock_group_impl *group; 1466 1467 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 1468 if (sbytes < 0) { 1469 errno = EINVAL; 1470 return -1; 1471 } else if (sbytes == 0) { 1472 errno = EAGAIN; 1473 return -1; 1474 } 1475 1476 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 1477 1478 if (bytes == 0) { 1479 /* The only way this happens is if diov is 0 length */ 1480 errno = EINVAL; 1481 return -1; 1482 } 1483 1484 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 1485 1486 /* If we drained the pipe, mark it appropriately */ 1487 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1488 assert(sock->pipe_has_data == true); 1489 1490 group = __posix_group_impl(sock->base.group_impl); 1491 if (group && !sock->socket_has_data) { 1492 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1493 } 1494 1495 sock->pipe_has_data = false; 1496 } 1497 1498 return bytes; 1499 } 1500 1501 static inline ssize_t 1502 posix_sock_read(struct spdk_posix_sock *sock) 1503 { 1504 struct iovec iov[2]; 1505 int bytes_avail, bytes_recvd; 1506 struct spdk_posix_sock_group_impl *group; 1507 1508 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 1509 1510 if (bytes_avail <= 0) { 1511 return bytes_avail; 1512 } 1513 1514 if (sock->ssl) { 1515 bytes_recvd = SSL_readv(sock->ssl, iov, 2); 1516 } else { 1517 bytes_recvd = readv(sock->fd, iov, 2); 1518 } 1519 1520 assert(sock->pipe_has_data == false); 1521 1522 if (bytes_recvd <= 0) { 1523 /* Errors count as draining the socket data */ 1524 if (sock->base.group_impl && sock->socket_has_data) { 1525 group = __posix_group_impl(sock->base.group_impl); 1526 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1527 } 1528 1529 sock->socket_has_data = false; 1530 1531 return bytes_recvd; 1532 } 1533 1534 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 1535 1536 #if DEBUG 1537 if (sock->base.group_impl) { 1538 assert(sock->socket_has_data == true); 1539 } 1540 #endif 1541 1542 sock->pipe_has_data = true; 1543 if (bytes_recvd < bytes_avail) { 1544 /* We drained the kernel socket entirely. */ 1545 sock->socket_has_data = false; 1546 } 1547 1548 return bytes_recvd; 1549 } 1550 1551 static ssize_t 1552 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1553 { 1554 struct spdk_posix_sock *sock = __posix_sock(_sock); 1555 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1556 int rc, i; 1557 size_t len; 1558 1559 if (sock->recv_pipe == NULL) { 1560 assert(sock->pipe_has_data == false); 1561 if (group && sock->socket_has_data) { 1562 sock->socket_has_data = false; 1563 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1564 } 1565 if (sock->ssl) { 1566 return SSL_readv(sock->ssl, iov, iovcnt); 1567 } else { 1568 return readv(sock->fd, iov, iovcnt); 1569 } 1570 } 1571 1572 /* If the socket is not in a group, we must assume it always has 1573 * data waiting for us because it is not epolled */ 1574 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 1575 /* If the user is receiving a sufficiently large amount of data, 1576 * receive directly to their buffers. */ 1577 len = 0; 1578 for (i = 0; i < iovcnt; i++) { 1579 len += iov[i].iov_len; 1580 } 1581 1582 if (len >= MIN_SOCK_PIPE_SIZE) { 1583 /* TODO: Should this detect if kernel socket is drained? */ 1584 if (sock->ssl) { 1585 return SSL_readv(sock->ssl, iov, iovcnt); 1586 } else { 1587 return readv(sock->fd, iov, iovcnt); 1588 } 1589 } 1590 1591 /* Otherwise, do a big read into our pipe */ 1592 rc = posix_sock_read(sock); 1593 if (rc <= 0) { 1594 return rc; 1595 } 1596 } 1597 1598 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 1599 } 1600 1601 static ssize_t 1602 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1603 { 1604 struct iovec iov[1]; 1605 1606 iov[0].iov_base = buf; 1607 iov[0].iov_len = len; 1608 1609 return posix_sock_readv(sock, iov, 1); 1610 } 1611 1612 static ssize_t 1613 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1614 { 1615 struct spdk_posix_sock *sock = __posix_sock(_sock); 1616 int rc; 1617 1618 /* In order to process a writev, we need to flush any asynchronous writes 1619 * first. */ 1620 rc = _sock_flush(_sock); 1621 if (rc < 0) { 1622 return rc; 1623 } 1624 1625 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1626 /* We weren't able to flush all requests */ 1627 errno = EAGAIN; 1628 return -1; 1629 } 1630 1631 if (sock->ssl) { 1632 return SSL_writev(sock->ssl, iov, iovcnt); 1633 } else { 1634 return writev(sock->fd, iov, iovcnt); 1635 } 1636 } 1637 1638 static int 1639 posix_sock_recv_next(struct spdk_sock *_sock, void **buf, void **ctx) 1640 { 1641 struct spdk_posix_sock *sock = __posix_sock(_sock); 1642 struct iovec iov; 1643 ssize_t rc; 1644 1645 if (sock->recv_pipe != NULL) { 1646 errno = ENOTSUP; 1647 return -1; 1648 } 1649 1650 iov.iov_len = spdk_sock_group_get_buf(_sock->group_impl->group, &iov.iov_base, ctx); 1651 if (iov.iov_len == 0) { 1652 errno = ENOBUFS; 1653 return -1; 1654 } 1655 1656 rc = posix_sock_readv(_sock, &iov, 1); 1657 if (rc <= 0) { 1658 spdk_sock_group_provide_buf(_sock->group_impl->group, iov.iov_base, iov.iov_len, *ctx); 1659 return rc; 1660 } 1661 1662 *buf = iov.iov_base; 1663 1664 return rc; 1665 } 1666 1667 static void 1668 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1669 { 1670 int rc; 1671 1672 spdk_sock_request_queue(sock, req); 1673 1674 /* If there are a sufficient number queued, just flush them out immediately. */ 1675 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1676 rc = _sock_flush(sock); 1677 if (rc < 0 && errno != EAGAIN) { 1678 spdk_sock_abort_requests(sock); 1679 } 1680 } 1681 } 1682 1683 static int 1684 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1685 { 1686 struct spdk_posix_sock *sock = __posix_sock(_sock); 1687 int val; 1688 int rc; 1689 1690 assert(sock != NULL); 1691 1692 val = nbytes; 1693 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1694 if (rc != 0) { 1695 return -1; 1696 } 1697 return 0; 1698 } 1699 1700 static bool 1701 posix_sock_is_ipv6(struct spdk_sock *_sock) 1702 { 1703 struct spdk_posix_sock *sock = __posix_sock(_sock); 1704 struct sockaddr_storage sa; 1705 socklen_t salen; 1706 int rc; 1707 1708 assert(sock != NULL); 1709 1710 memset(&sa, 0, sizeof sa); 1711 salen = sizeof sa; 1712 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1713 if (rc != 0) { 1714 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1715 return false; 1716 } 1717 1718 return (sa.ss_family == AF_INET6); 1719 } 1720 1721 static bool 1722 posix_sock_is_ipv4(struct spdk_sock *_sock) 1723 { 1724 struct spdk_posix_sock *sock = __posix_sock(_sock); 1725 struct sockaddr_storage sa; 1726 socklen_t salen; 1727 int rc; 1728 1729 assert(sock != NULL); 1730 1731 memset(&sa, 0, sizeof sa); 1732 salen = sizeof sa; 1733 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1734 if (rc != 0) { 1735 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1736 return false; 1737 } 1738 1739 return (sa.ss_family == AF_INET); 1740 } 1741 1742 static bool 1743 posix_sock_is_connected(struct spdk_sock *_sock) 1744 { 1745 struct spdk_posix_sock *sock = __posix_sock(_sock); 1746 uint8_t byte; 1747 int rc; 1748 1749 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1750 if (rc == 0) { 1751 return false; 1752 } 1753 1754 if (rc < 0) { 1755 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1756 return true; 1757 } 1758 1759 return false; 1760 } 1761 1762 return true; 1763 } 1764 1765 static struct spdk_sock_group_impl * 1766 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1767 { 1768 struct spdk_posix_sock *sock = __posix_sock(_sock); 1769 struct spdk_sock_group_impl *group_impl; 1770 1771 if (sock->placement_id != -1) { 1772 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint); 1773 return group_impl; 1774 } 1775 1776 return NULL; 1777 } 1778 1779 static struct spdk_sock_group_impl * 1780 _sock_group_impl_create(uint32_t enable_placement_id) 1781 { 1782 struct spdk_posix_sock_group_impl *group_impl; 1783 int fd; 1784 1785 #if defined(SPDK_EPOLL) 1786 fd = epoll_create1(0); 1787 #elif defined(SPDK_KEVENT) 1788 fd = kqueue(); 1789 #endif 1790 if (fd == -1) { 1791 return NULL; 1792 } 1793 1794 group_impl = calloc(1, sizeof(*group_impl)); 1795 if (group_impl == NULL) { 1796 SPDK_ERRLOG("group_impl allocation failed\n"); 1797 close(fd); 1798 return NULL; 1799 } 1800 1801 group_impl->pipe_group = spdk_pipe_group_create(); 1802 if (group_impl->pipe_group == NULL) { 1803 SPDK_ERRLOG("pipe_group allocation failed\n"); 1804 free(group_impl); 1805 close(fd); 1806 return NULL; 1807 } 1808 1809 group_impl->fd = fd; 1810 TAILQ_INIT(&group_impl->socks_with_data); 1811 group_impl->placement_id = -1; 1812 1813 if (enable_placement_id == PLACEMENT_CPU) { 1814 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1815 group_impl->placement_id = spdk_env_get_current_core(); 1816 } 1817 1818 return &group_impl->base; 1819 } 1820 1821 static struct spdk_sock_group_impl * 1822 posix_sock_group_impl_create(void) 1823 { 1824 return _sock_group_impl_create(g_posix_impl_opts.enable_placement_id); 1825 } 1826 1827 static struct spdk_sock_group_impl * 1828 ssl_sock_group_impl_create(void) 1829 { 1830 return _sock_group_impl_create(g_ssl_impl_opts.enable_placement_id); 1831 } 1832 1833 static void 1834 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1835 int placement_id) 1836 { 1837 #if defined(SO_MARK) 1838 int rc; 1839 1840 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1841 &placement_id, sizeof(placement_id)); 1842 if (rc != 0) { 1843 /* Not fatal */ 1844 SPDK_ERRLOG("Error setting SO_MARK\n"); 1845 return; 1846 } 1847 1848 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1849 if (rc != 0) { 1850 /* Not fatal */ 1851 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1852 return; 1853 } 1854 1855 sock->placement_id = placement_id; 1856 #endif 1857 } 1858 1859 static void 1860 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1861 { 1862 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1863 1864 if (group->placement_id == -1) { 1865 group->placement_id = spdk_sock_map_find_free(&g_map); 1866 1867 /* If a free placement id is found, update existing sockets in this group */ 1868 if (group->placement_id != -1) { 1869 struct spdk_sock *sock, *tmp; 1870 1871 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1872 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1873 } 1874 } 1875 } 1876 1877 if (group->placement_id != -1) { 1878 /* 1879 * group placement id is already determined for this poll group. 1880 * Mark socket with group's placement id. 1881 */ 1882 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1883 } 1884 } 1885 1886 static int 1887 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1888 { 1889 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1890 struct spdk_posix_sock *sock = __posix_sock(_sock); 1891 int rc; 1892 1893 #if defined(SPDK_EPOLL) 1894 struct epoll_event event; 1895 1896 memset(&event, 0, sizeof(event)); 1897 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1898 event.events = EPOLLIN | EPOLLERR; 1899 event.data.ptr = sock; 1900 1901 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1902 #elif defined(SPDK_KEVENT) 1903 struct kevent event; 1904 struct timespec ts = {0}; 1905 1906 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1907 1908 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1909 #endif 1910 1911 if (rc != 0) { 1912 return rc; 1913 } 1914 1915 /* switched from another polling group due to scheduling */ 1916 if (spdk_unlikely(sock->recv_pipe != NULL && 1917 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1918 sock->pipe_has_data = true; 1919 sock->socket_has_data = false; 1920 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1921 } else if (sock->recv_pipe != NULL) { 1922 rc = spdk_pipe_group_add(group->pipe_group, sock->recv_pipe); 1923 assert(rc == 0); 1924 } 1925 1926 if (_sock->impl_opts.enable_placement_id == PLACEMENT_MARK) { 1927 posix_sock_update_mark(_group, _sock); 1928 } else if (sock->placement_id != -1) { 1929 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1930 if (rc != 0) { 1931 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1932 /* Do not treat this as an error. The system will continue running. */ 1933 } 1934 } 1935 1936 return rc; 1937 } 1938 1939 static int 1940 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1941 { 1942 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1943 struct spdk_posix_sock *sock = __posix_sock(_sock); 1944 int rc; 1945 1946 if (sock->pipe_has_data || sock->socket_has_data) { 1947 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1948 sock->pipe_has_data = false; 1949 sock->socket_has_data = false; 1950 } else if (sock->recv_pipe != NULL) { 1951 rc = spdk_pipe_group_remove(group->pipe_group, sock->recv_pipe); 1952 assert(rc == 0); 1953 } 1954 1955 if (sock->placement_id != -1) { 1956 spdk_sock_map_release(&g_map, sock->placement_id); 1957 } 1958 1959 #if defined(SPDK_EPOLL) 1960 struct epoll_event event; 1961 1962 /* Event parameter is ignored but some old kernel version still require it. */ 1963 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1964 #elif defined(SPDK_KEVENT) 1965 struct kevent event; 1966 struct timespec ts = {0}; 1967 1968 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1969 1970 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1971 if (rc == 0 && event.flags & EV_ERROR) { 1972 rc = -1; 1973 errno = event.data; 1974 } 1975 #endif 1976 1977 spdk_sock_abort_requests(_sock); 1978 1979 return rc; 1980 } 1981 1982 static int 1983 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1984 struct spdk_sock **socks) 1985 { 1986 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1987 struct spdk_sock *sock, *tmp; 1988 int num_events, i, rc; 1989 struct spdk_posix_sock *psock, *ptmp; 1990 #if defined(SPDK_EPOLL) 1991 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1992 #elif defined(SPDK_KEVENT) 1993 struct kevent events[MAX_EVENTS_PER_POLL]; 1994 struct timespec ts = {0}; 1995 #endif 1996 1997 #ifdef SPDK_ZEROCOPY 1998 /* When all of the following conditions are met 1999 * - non-blocking socket 2000 * - zero copy is enabled 2001 * - interrupts suppressed (i.e. busy polling) 2002 * - the NIC tx queue is full at the time sendmsg() is called 2003 * - epoll_wait determines there is an EPOLLIN event for the socket 2004 * then we can get into a situation where data we've sent is queued 2005 * up in the kernel network stack, but interrupts have been suppressed 2006 * because other traffic is flowing so the kernel misses the signal 2007 * to flush the software tx queue. If there wasn't incoming data 2008 * pending on the socket, then epoll_wait would have been sufficient 2009 * to kick off the send operation, but since there is a pending event 2010 * epoll_wait does not trigger the necessary operation. 2011 * 2012 * We deal with this by checking for all of the above conditions and 2013 * additionally looking for EPOLLIN events that were not consumed from 2014 * the last poll loop. We take this to mean that the upper layer is 2015 * unable to consume them because it is blocked waiting for resources 2016 * to free up, and those resources are most likely freed in response 2017 * to a pending asynchronous write completing. 2018 * 2019 * Additionally, sockets that have the same placement_id actually share 2020 * an underlying hardware queue. That means polling one of them is 2021 * equivalent to polling all of them. As a quick mechanism to avoid 2022 * making extra poll() calls, stash the last placement_id during the loop 2023 * and only poll if it's not the same. The overwhelmingly common case 2024 * is that all sockets in this list have the same placement_id because 2025 * SPDK is intentionally grouping sockets by that value, so even 2026 * though this won't stop all extra calls to poll(), it's very fast 2027 * and will catch all of them in practice. 2028 */ 2029 int last_placement_id = -1; 2030 2031 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 2032 if (psock->zcopy && psock->placement_id >= 0 && 2033 psock->placement_id != last_placement_id) { 2034 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 2035 2036 poll(&pfd, 1, 0); 2037 last_placement_id = psock->placement_id; 2038 } 2039 } 2040 #endif 2041 2042 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 2043 * a completion callback could remove the sock from the 2044 * group. */ 2045 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 2046 rc = _sock_flush(sock); 2047 if (rc < 0 && errno != EAGAIN) { 2048 spdk_sock_abort_requests(sock); 2049 } 2050 } 2051 2052 assert(max_events > 0); 2053 2054 #if defined(SPDK_EPOLL) 2055 num_events = epoll_wait(group->fd, events, max_events, 0); 2056 #elif defined(SPDK_KEVENT) 2057 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 2058 #endif 2059 2060 if (num_events == -1) { 2061 return -1; 2062 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 2063 sock = TAILQ_FIRST(&_group->socks); 2064 psock = __posix_sock(sock); 2065 /* poll() is called here to busy poll the queue associated with 2066 * first socket in list and potentially reap incoming data. 2067 */ 2068 if (sock->opts.priority) { 2069 struct pollfd pfd = {0, 0, 0}; 2070 2071 pfd.fd = psock->fd; 2072 pfd.events = POLLIN | POLLERR; 2073 poll(&pfd, 1, 0); 2074 } 2075 } 2076 2077 for (i = 0; i < num_events; i++) { 2078 #if defined(SPDK_EPOLL) 2079 sock = events[i].data.ptr; 2080 psock = __posix_sock(sock); 2081 2082 #ifdef SPDK_ZEROCOPY 2083 if (events[i].events & EPOLLERR) { 2084 rc = _sock_check_zcopy(sock); 2085 /* If the socket was closed or removed from 2086 * the group in response to a send ack, don't 2087 * add it to the array here. */ 2088 if (rc || sock->cb_fn == NULL) { 2089 continue; 2090 } 2091 } 2092 #endif 2093 if ((events[i].events & EPOLLIN) == 0) { 2094 continue; 2095 } 2096 2097 #elif defined(SPDK_KEVENT) 2098 sock = events[i].udata; 2099 psock = __posix_sock(sock); 2100 #endif 2101 2102 /* If the socket is not already in the list, add it now */ 2103 if (!psock->socket_has_data && !psock->pipe_has_data) { 2104 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 2105 } 2106 psock->socket_has_data = true; 2107 } 2108 2109 num_events = 0; 2110 2111 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 2112 if (num_events == max_events) { 2113 break; 2114 } 2115 2116 /* If the socket's cb_fn is NULL, just remove it from the 2117 * list and do not add it to socks array */ 2118 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 2119 psock->socket_has_data = false; 2120 psock->pipe_has_data = false; 2121 TAILQ_REMOVE(&group->socks_with_data, psock, link); 2122 continue; 2123 } 2124 2125 socks[num_events++] = &psock->base; 2126 } 2127 2128 /* Cycle the has_data list so that each time we poll things aren't 2129 * in the same order. Say we have 6 sockets in the list, named as follows: 2130 * A B C D E F 2131 * And all 6 sockets had epoll events, but max_events is only 3. That means 2132 * psock currently points at D. We want to rearrange the list to the following: 2133 * D E F A B C 2134 * 2135 * The variables below are named according to this example to make it easier to 2136 * follow the swaps. 2137 */ 2138 if (psock != NULL) { 2139 struct spdk_posix_sock *pa, *pc, *pd, *pf; 2140 2141 /* Capture pointers to the elements we need */ 2142 pd = psock; 2143 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 2144 pa = TAILQ_FIRST(&group->socks_with_data); 2145 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 2146 2147 /* Break the link between C and D */ 2148 pc->link.tqe_next = NULL; 2149 2150 /* Connect F to A */ 2151 pf->link.tqe_next = pa; 2152 pa->link.tqe_prev = &pf->link.tqe_next; 2153 2154 /* Fix up the list first/last pointers */ 2155 group->socks_with_data.tqh_first = pd; 2156 group->socks_with_data.tqh_last = &pc->link.tqe_next; 2157 2158 /* D is in front of the list, make tqe prev pointer point to the head of list */ 2159 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 2160 } 2161 2162 return num_events; 2163 } 2164 2165 static int 2166 _sock_group_impl_close(struct spdk_sock_group_impl *_group, uint32_t enable_placement_id) 2167 { 2168 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2169 int rc; 2170 2171 if (enable_placement_id == PLACEMENT_CPU) { 2172 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 2173 } 2174 2175 spdk_pipe_group_destroy(group->pipe_group); 2176 rc = close(group->fd); 2177 free(group); 2178 return rc; 2179 } 2180 2181 static int 2182 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2183 { 2184 return _sock_group_impl_close(_group, g_posix_impl_opts.enable_placement_id); 2185 } 2186 2187 static int 2188 ssl_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2189 { 2190 return _sock_group_impl_close(_group, g_ssl_impl_opts.enable_placement_id); 2191 } 2192 2193 static struct spdk_net_impl g_posix_net_impl = { 2194 .name = "posix", 2195 .getaddr = posix_sock_getaddr, 2196 .connect = posix_sock_connect, 2197 .listen = posix_sock_listen, 2198 .accept = posix_sock_accept, 2199 .close = posix_sock_close, 2200 .recv = posix_sock_recv, 2201 .readv = posix_sock_readv, 2202 .writev = posix_sock_writev, 2203 .recv_next = posix_sock_recv_next, 2204 .writev_async = posix_sock_writev_async, 2205 .flush = posix_sock_flush, 2206 .set_recvlowat = posix_sock_set_recvlowat, 2207 .set_recvbuf = posix_sock_set_recvbuf, 2208 .set_sendbuf = posix_sock_set_sendbuf, 2209 .is_ipv6 = posix_sock_is_ipv6, 2210 .is_ipv4 = posix_sock_is_ipv4, 2211 .is_connected = posix_sock_is_connected, 2212 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2213 .group_impl_create = posix_sock_group_impl_create, 2214 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2215 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2216 .group_impl_poll = posix_sock_group_impl_poll, 2217 .group_impl_close = posix_sock_group_impl_close, 2218 .get_opts = posix_sock_impl_get_opts, 2219 .set_opts = posix_sock_impl_set_opts, 2220 }; 2221 2222 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY + 1); 2223 2224 static struct spdk_sock * 2225 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 2226 { 2227 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true); 2228 } 2229 2230 static struct spdk_sock * 2231 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 2232 { 2233 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true); 2234 } 2235 2236 static struct spdk_sock * 2237 ssl_sock_accept(struct spdk_sock *_sock) 2238 { 2239 return _posix_sock_accept(_sock, true); 2240 } 2241 2242 static struct spdk_net_impl g_ssl_net_impl = { 2243 .name = "ssl", 2244 .getaddr = posix_sock_getaddr, 2245 .connect = ssl_sock_connect, 2246 .listen = ssl_sock_listen, 2247 .accept = ssl_sock_accept, 2248 .close = posix_sock_close, 2249 .recv = posix_sock_recv, 2250 .readv = posix_sock_readv, 2251 .writev = posix_sock_writev, 2252 .recv_next = posix_sock_recv_next, 2253 .writev_async = posix_sock_writev_async, 2254 .flush = posix_sock_flush, 2255 .set_recvlowat = posix_sock_set_recvlowat, 2256 .set_recvbuf = posix_sock_set_recvbuf, 2257 .set_sendbuf = posix_sock_set_sendbuf, 2258 .is_ipv6 = posix_sock_is_ipv6, 2259 .is_ipv4 = posix_sock_is_ipv4, 2260 .is_connected = posix_sock_is_connected, 2261 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2262 .group_impl_create = ssl_sock_group_impl_create, 2263 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2264 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2265 .group_impl_poll = posix_sock_group_impl_poll, 2266 .group_impl_close = ssl_sock_group_impl_close, 2267 .get_opts = ssl_sock_impl_get_opts, 2268 .set_opts = ssl_sock_impl_set_opts, 2269 }; 2270 2271 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl, DEFAULT_SOCK_PRIORITY); 2272 SPDK_LOG_REGISTER_COMPONENT(sock_posix) 2273