1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. All rights reserved. 3 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #if defined(__FreeBSD__) 10 #include <sys/event.h> 11 #define SPDK_KEVENT 12 #else 13 #define SPDK_EPOLL 14 #endif 15 16 #if defined(__linux__) 17 #include <linux/errqueue.h> 18 #endif 19 20 #include "spdk/env.h" 21 #include "spdk/log.h" 22 #include "spdk/pipe.h" 23 #include "spdk/sock.h" 24 #include "spdk/util.h" 25 #include "spdk/string.h" 26 #include "spdk/net.h" 27 #include "spdk/file.h" 28 #include "spdk_internal/sock.h" 29 #include "spdk/net.h" 30 31 #include "openssl/crypto.h" 32 #include "openssl/err.h" 33 #include "openssl/ssl.h" 34 35 #define MAX_TMPBUF 1024 36 #define PORTNUMLEN 32 37 38 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 39 #define SPDK_ZEROCOPY 40 #endif 41 42 struct spdk_posix_sock { 43 struct spdk_sock base; 44 int fd; 45 46 uint32_t sendmsg_idx; 47 48 struct spdk_pipe *recv_pipe; 49 int recv_buf_sz; 50 bool pipe_has_data; 51 bool socket_has_data; 52 bool zcopy; 53 54 int placement_id; 55 56 SSL_CTX *ctx; 57 SSL *ssl; 58 59 TAILQ_ENTRY(spdk_posix_sock) link; 60 61 char interface_name[IFNAMSIZ]; 62 }; 63 64 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 65 66 struct spdk_posix_sock_group_impl { 67 struct spdk_sock_group_impl base; 68 int fd; 69 struct spdk_interrupt *intr; 70 struct spdk_has_data_list socks_with_data; 71 int placement_id; 72 struct spdk_pipe_group *pipe_group; 73 }; 74 75 static struct spdk_sock_impl_opts g_posix_impl_opts = { 76 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 77 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 78 .enable_recv_pipe = true, 79 .enable_quickack = false, 80 .enable_placement_id = PLACEMENT_NONE, 81 .enable_zerocopy_send_server = true, 82 .enable_zerocopy_send_client = false, 83 .zerocopy_threshold = 0, 84 .tls_version = 0, 85 .enable_ktls = false, 86 .psk_key = NULL, 87 .psk_key_size = 0, 88 .psk_identity = NULL, 89 .get_key = NULL, 90 .get_key_ctx = NULL, 91 .tls_cipher_suites = NULL 92 }; 93 94 static struct spdk_sock_impl_opts g_ssl_impl_opts = { 95 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 96 .send_buf_size = MIN_SO_SNDBUF_SIZE, 97 .enable_recv_pipe = true, 98 .enable_quickack = false, 99 .enable_placement_id = PLACEMENT_NONE, 100 .enable_zerocopy_send_server = true, 101 .enable_zerocopy_send_client = false, 102 .zerocopy_threshold = 0, 103 .tls_version = 0, 104 .enable_ktls = false, 105 .psk_key = NULL, 106 .psk_identity = NULL 107 }; 108 109 static struct spdk_sock_map g_map = { 110 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 111 .mtx = PTHREAD_MUTEX_INITIALIZER 112 }; 113 114 __attribute((destructor)) static void 115 posix_sock_map_cleanup(void) 116 { 117 spdk_sock_map_cleanup(&g_map); 118 } 119 120 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 121 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 122 123 static void 124 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 125 size_t len) 126 { 127 #define FIELD_OK(field) \ 128 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 129 130 #define SET_FIELD(field) \ 131 if (FIELD_OK(field)) { \ 132 dest->field = src->field; \ 133 } 134 135 SET_FIELD(recv_buf_size); 136 SET_FIELD(send_buf_size); 137 SET_FIELD(enable_recv_pipe); 138 SET_FIELD(enable_zerocopy_send); 139 SET_FIELD(enable_quickack); 140 SET_FIELD(enable_placement_id); 141 SET_FIELD(enable_zerocopy_send_server); 142 SET_FIELD(enable_zerocopy_send_client); 143 SET_FIELD(zerocopy_threshold); 144 SET_FIELD(tls_version); 145 SET_FIELD(enable_ktls); 146 SET_FIELD(psk_key); 147 SET_FIELD(psk_key_size); 148 SET_FIELD(psk_identity); 149 SET_FIELD(get_key); 150 SET_FIELD(get_key_ctx); 151 SET_FIELD(tls_cipher_suites); 152 153 #undef SET_FIELD 154 #undef FIELD_OK 155 } 156 157 static int 158 _sock_impl_get_opts(struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 159 size_t *len) 160 { 161 if (!opts || !len) { 162 errno = EINVAL; 163 return -1; 164 } 165 166 assert(sizeof(*opts) >= *len); 167 memset(opts, 0, *len); 168 169 posix_sock_copy_impl_opts(opts, impl_opts, *len); 170 *len = spdk_min(*len, sizeof(*impl_opts)); 171 172 return 0; 173 } 174 175 static int 176 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 177 { 178 return _sock_impl_get_opts(opts, &g_posix_impl_opts, len); 179 } 180 181 static int 182 ssl_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 183 { 184 return _sock_impl_get_opts(opts, &g_ssl_impl_opts, len); 185 } 186 187 static int 188 _sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 189 size_t len) 190 { 191 if (!opts) { 192 errno = EINVAL; 193 return -1; 194 } 195 196 assert(sizeof(*opts) >= len); 197 posix_sock_copy_impl_opts(impl_opts, opts, len); 198 199 return 0; 200 } 201 202 static int 203 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 204 { 205 return _sock_impl_set_opts(opts, &g_posix_impl_opts, len); 206 } 207 208 static int 209 ssl_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 210 { 211 return _sock_impl_set_opts(opts, &g_ssl_impl_opts, len); 212 } 213 214 static void 215 _opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest, 216 const struct spdk_sock_impl_opts *default_impl) 217 { 218 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 219 memcpy(dest, default_impl, sizeof(*dest)); 220 221 if (opts->impl_opts != NULL) { 222 assert(sizeof(*dest) >= opts->impl_opts_size); 223 posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 224 } 225 } 226 227 static int 228 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 229 char *caddr, int clen, uint16_t *cport) 230 { 231 struct spdk_posix_sock *sock = __posix_sock(_sock); 232 233 assert(sock != NULL); 234 return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport); 235 } 236 237 static const char * 238 posix_sock_get_interface_name(struct spdk_sock *_sock) 239 { 240 struct spdk_posix_sock *sock = __posix_sock(_sock); 241 char saddr[64]; 242 int rc; 243 244 rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL); 245 if (rc != 0) { 246 return NULL; 247 } 248 249 rc = spdk_net_get_interface_name(saddr, sock->interface_name, 250 sizeof(sock->interface_name)); 251 if (rc != 0) { 252 return NULL; 253 } 254 255 return sock->interface_name; 256 } 257 258 static int32_t 259 posix_sock_get_numa_id(struct spdk_sock *sock) 260 { 261 const char *interface_name; 262 uint32_t numa_id; 263 int rc; 264 265 interface_name = posix_sock_get_interface_name(sock); 266 if (interface_name == NULL) { 267 return SPDK_ENV_NUMA_ID_ANY; 268 } 269 270 rc = spdk_read_sysfs_attribute_uint32(&numa_id, 271 "/sys/class/net/%s/device/numa_node", interface_name); 272 if (rc == 0 && numa_id <= INT32_MAX) { 273 return (int32_t)numa_id; 274 } else { 275 return SPDK_ENV_NUMA_ID_ANY; 276 } 277 } 278 279 enum posix_sock_create_type { 280 SPDK_SOCK_CREATE_LISTEN, 281 SPDK_SOCK_CREATE_CONNECT, 282 }; 283 284 static int 285 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 286 { 287 uint8_t *new_buf, *old_buf; 288 struct spdk_pipe *new_pipe; 289 struct iovec siov[2]; 290 struct iovec diov[2]; 291 int sbytes; 292 ssize_t bytes; 293 int rc; 294 295 if (sock->recv_buf_sz == sz) { 296 return 0; 297 } 298 299 /* If the new size is 0, just free the pipe */ 300 if (sz == 0) { 301 old_buf = spdk_pipe_destroy(sock->recv_pipe); 302 free(old_buf); 303 sock->recv_pipe = NULL; 304 return 0; 305 } else if (sz < MIN_SOCK_PIPE_SIZE) { 306 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 307 return -1; 308 } 309 310 /* Round up to next 64 byte multiple */ 311 rc = posix_memalign((void **)&new_buf, 64, sz); 312 if (rc != 0) { 313 SPDK_ERRLOG("socket recv buf allocation failed\n"); 314 return -ENOMEM; 315 } 316 memset(new_buf, 0, sz); 317 318 new_pipe = spdk_pipe_create(new_buf, sz); 319 if (new_pipe == NULL) { 320 SPDK_ERRLOG("socket pipe allocation failed\n"); 321 free(new_buf); 322 return -ENOMEM; 323 } 324 325 if (sock->recv_pipe != NULL) { 326 /* Pull all of the data out of the old pipe */ 327 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 328 if (sbytes > sz) { 329 /* Too much data to fit into the new pipe size */ 330 old_buf = spdk_pipe_destroy(new_pipe); 331 free(old_buf); 332 return -EINVAL; 333 } 334 335 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 336 assert(sbytes == sz); 337 338 bytes = spdk_iovcpy(siov, 2, diov, 2); 339 spdk_pipe_writer_advance(new_pipe, bytes); 340 341 old_buf = spdk_pipe_destroy(sock->recv_pipe); 342 free(old_buf); 343 } 344 345 sock->recv_buf_sz = sz; 346 sock->recv_pipe = new_pipe; 347 348 if (sock->base.group_impl) { 349 struct spdk_posix_sock_group_impl *group; 350 351 group = __posix_group_impl(sock->base.group_impl); 352 spdk_pipe_group_add(group->pipe_group, sock->recv_pipe); 353 } 354 355 return 0; 356 } 357 358 static int 359 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 360 { 361 struct spdk_posix_sock *sock = __posix_sock(_sock); 362 int min_size; 363 int rc; 364 365 assert(sock != NULL); 366 367 if (_sock->impl_opts.enable_recv_pipe) { 368 rc = posix_sock_alloc_pipe(sock, sz); 369 if (rc) { 370 return rc; 371 } 372 } 373 374 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 375 * _sock->impl_opts.recv_buf_size. */ 376 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, _sock->impl_opts.recv_buf_size); 377 378 if (sz < min_size) { 379 sz = min_size; 380 } 381 382 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 383 if (rc < 0) { 384 return rc; 385 } 386 387 _sock->impl_opts.recv_buf_size = sz; 388 389 return 0; 390 } 391 392 static int 393 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 394 { 395 struct spdk_posix_sock *sock = __posix_sock(_sock); 396 int min_size; 397 int rc; 398 399 assert(sock != NULL); 400 401 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 402 * _sock->impl_opts.send_buf_size. */ 403 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, _sock->impl_opts.send_buf_size); 404 405 if (sz < min_size) { 406 sz = min_size; 407 } 408 409 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 410 if (rc < 0) { 411 return rc; 412 } 413 414 _sock->impl_opts.send_buf_size = sz; 415 416 return 0; 417 } 418 419 static void 420 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 421 { 422 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 423 int flag; 424 int rc; 425 #endif 426 427 #if defined(SPDK_ZEROCOPY) 428 flag = 1; 429 430 if (enable_zero_copy) { 431 /* Try to turn on zero copy sends */ 432 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 433 if (rc == 0) { 434 sock->zcopy = true; 435 } 436 } 437 #endif 438 439 #if defined(__linux__) 440 flag = 1; 441 442 if (sock->base.impl_opts.enable_quickack) { 443 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 444 if (rc != 0) { 445 SPDK_ERRLOG("quickack was failed to set\n"); 446 } 447 } 448 449 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 450 &sock->placement_id); 451 452 if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) { 453 /* Save placement_id */ 454 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 455 } 456 #endif 457 } 458 459 static struct spdk_posix_sock * 460 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 461 { 462 struct spdk_posix_sock *sock; 463 464 sock = calloc(1, sizeof(*sock)); 465 if (sock == NULL) { 466 SPDK_ERRLOG("sock allocation failed\n"); 467 return NULL; 468 } 469 470 sock->fd = fd; 471 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 472 posix_sock_init(sock, enable_zero_copy); 473 474 return sock; 475 } 476 477 static int 478 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts, 479 struct spdk_sock_impl_opts *impl_opts) 480 { 481 int fd; 482 int val = 1; 483 int rc, sz; 484 #if defined(__linux__) 485 int to; 486 #endif 487 488 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 489 if (fd < 0) { 490 /* error */ 491 return -1; 492 } 493 494 sz = impl_opts->recv_buf_size; 495 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 496 if (rc) { 497 /* Not fatal */ 498 } 499 500 sz = impl_opts->send_buf_size; 501 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 502 if (rc) { 503 /* Not fatal */ 504 } 505 506 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 507 if (rc != 0) { 508 close(fd); 509 /* error */ 510 return -1; 511 } 512 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 513 if (rc != 0) { 514 close(fd); 515 /* error */ 516 return -1; 517 } 518 519 #if defined(SO_PRIORITY) 520 if (opts->priority) { 521 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 522 if (rc != 0) { 523 close(fd); 524 /* error */ 525 return -1; 526 } 527 } 528 #endif 529 530 if (res->ai_family == AF_INET6) { 531 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 532 if (rc != 0) { 533 close(fd); 534 /* error */ 535 return -1; 536 } 537 } 538 539 if (opts->ack_timeout) { 540 #if defined(__linux__) 541 to = opts->ack_timeout; 542 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to)); 543 if (rc != 0) { 544 close(fd); 545 /* error */ 546 return -1; 547 } 548 #else 549 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 550 #endif 551 } 552 553 return fd; 554 } 555 556 static int 557 posix_sock_psk_find_session_server_cb(SSL *ssl, const unsigned char *identity, 558 size_t identity_len, SSL_SESSION **sess) 559 { 560 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 561 uint8_t key[SSL_MAX_MASTER_KEY_LENGTH] = {}; 562 int keylen; 563 int rc, i; 564 STACK_OF(SSL_CIPHER) *ciphers; 565 const SSL_CIPHER *cipher; 566 const char *cipher_name; 567 const char *user_cipher = NULL; 568 bool found = false; 569 570 if (impl_opts->get_key) { 571 rc = impl_opts->get_key(key, sizeof(key), &user_cipher, identity, impl_opts->get_key_ctx); 572 if (rc < 0) { 573 SPDK_ERRLOG("Unable to find PSK for identity: %s\n", identity); 574 return 0; 575 } 576 keylen = rc; 577 } else { 578 if (impl_opts->psk_key == NULL) { 579 SPDK_ERRLOG("PSK is not set\n"); 580 return 0; 581 } 582 583 SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(impl_opts->psk_identity)); 584 if (strcmp(impl_opts->psk_identity, identity) != 0) { 585 SPDK_ERRLOG("Unknown Client's PSK ID\n"); 586 return 0; 587 } 588 keylen = impl_opts->psk_key_size; 589 590 memcpy(key, impl_opts->psk_key, keylen); 591 user_cipher = impl_opts->tls_cipher_suites; 592 } 593 594 if (user_cipher == NULL) { 595 SPDK_ERRLOG("Cipher suite not set\n"); 596 return 0; 597 } 598 599 *sess = SSL_SESSION_new(); 600 if (*sess == NULL) { 601 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 602 return 0; 603 } 604 605 ciphers = SSL_get_ciphers(ssl); 606 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 607 cipher = sk_SSL_CIPHER_value(ciphers, i); 608 cipher_name = SSL_CIPHER_get_name(cipher); 609 610 if (strcmp(user_cipher, cipher_name) == 0) { 611 rc = SSL_SESSION_set_cipher(*sess, cipher); 612 if (rc != 1) { 613 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 614 goto err; 615 } 616 found = true; 617 break; 618 } 619 } 620 if (found == false) { 621 SPDK_ERRLOG("No suitable cipher found\n"); 622 goto err; 623 } 624 625 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 626 627 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 628 if (rc != 1) { 629 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 630 goto err; 631 } 632 633 rc = SSL_SESSION_set1_master_key(*sess, key, keylen); 634 if (rc != 1) { 635 SPDK_ERRLOG("Unable to set PSK for session\n"); 636 goto err; 637 } 638 639 return 1; 640 641 err: 642 SSL_SESSION_free(*sess); 643 *sess = NULL; 644 return 0; 645 } 646 647 static int 648 posix_sock_psk_use_session_client_cb(SSL *ssl, const EVP_MD *md, const unsigned char **identity, 649 size_t *identity_len, SSL_SESSION **sess) 650 { 651 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 652 int rc, i; 653 STACK_OF(SSL_CIPHER) *ciphers; 654 const SSL_CIPHER *cipher; 655 const char *cipher_name; 656 long keylen; 657 bool found = false; 658 659 if (impl_opts->psk_key == NULL) { 660 SPDK_ERRLOG("PSK is not set\n"); 661 return 0; 662 } 663 if (impl_opts->psk_key_size > SSL_MAX_MASTER_KEY_LENGTH) { 664 SPDK_ERRLOG("PSK too long\n"); 665 return 0; 666 } 667 keylen = impl_opts->psk_key_size; 668 669 if (impl_opts->tls_cipher_suites == NULL) { 670 SPDK_ERRLOG("Cipher suite not set\n"); 671 return 0; 672 } 673 *sess = SSL_SESSION_new(); 674 if (*sess == NULL) { 675 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 676 return 0; 677 } 678 679 ciphers = SSL_get_ciphers(ssl); 680 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 681 cipher = sk_SSL_CIPHER_value(ciphers, i); 682 cipher_name = SSL_CIPHER_get_name(cipher); 683 684 if (strcmp(impl_opts->tls_cipher_suites, cipher_name) == 0) { 685 rc = SSL_SESSION_set_cipher(*sess, cipher); 686 if (rc != 1) { 687 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 688 goto err; 689 } 690 found = true; 691 break; 692 } 693 } 694 if (found == false) { 695 SPDK_ERRLOG("No suitable cipher found\n"); 696 goto err; 697 } 698 699 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 700 701 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 702 if (rc != 1) { 703 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 704 goto err; 705 } 706 707 rc = SSL_SESSION_set1_master_key(*sess, impl_opts->psk_key, keylen); 708 if (rc != 1) { 709 SPDK_ERRLOG("Unable to set PSK for session\n"); 710 goto err; 711 } 712 713 *identity_len = strlen(impl_opts->psk_identity); 714 *identity = impl_opts->psk_identity; 715 716 return 1; 717 718 err: 719 SSL_SESSION_free(*sess); 720 *sess = NULL; 721 return 0; 722 } 723 724 static SSL_CTX * 725 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts, 726 struct spdk_sock_impl_opts *impl_opts) 727 { 728 SSL_CTX *ctx; 729 int tls_version = 0; 730 bool ktls_enabled = false; 731 #ifdef SSL_OP_ENABLE_KTLS 732 long options; 733 #endif 734 735 SSL_library_init(); 736 OpenSSL_add_all_algorithms(); 737 SSL_load_error_strings(); 738 /* Produce a SSL CTX in SSL V2 and V3 standards compliant way */ 739 ctx = SSL_CTX_new(method); 740 if (!ctx) { 741 SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 742 return NULL; 743 } 744 SPDK_DEBUGLOG(sock_posix, "SSL context created\n"); 745 746 switch (impl_opts->tls_version) { 747 case 0: 748 /* auto-negotiation */ 749 break; 750 case SPDK_TLS_VERSION_1_3: 751 tls_version = TLS1_3_VERSION; 752 break; 753 default: 754 SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version); 755 goto err; 756 } 757 758 if (tls_version) { 759 SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version, 760 tls_version); 761 if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) { 762 SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 763 goto err; 764 } 765 if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) { 766 SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 767 goto err; 768 } 769 } 770 if (impl_opts->enable_ktls) { 771 SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n"); 772 #ifdef SSL_OP_ENABLE_KTLS 773 options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS); 774 ktls_enabled = options & SSL_OP_ENABLE_KTLS; 775 #else 776 ktls_enabled = false; 777 #endif 778 if (!ktls_enabled) { 779 SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n"); 780 goto err; 781 } 782 } 783 784 /* SSL_CTX_set_ciphersuites() return 1 if the requested 785 * cipher suite list was configured, and 0 otherwise. */ 786 if (impl_opts->tls_cipher_suites != NULL && 787 SSL_CTX_set_ciphersuites(ctx, impl_opts->tls_cipher_suites) != 1) { 788 SPDK_ERRLOG("Unable to set TLS cipher suites for SSL'\n"); 789 goto err; 790 } 791 792 return ctx; 793 794 err: 795 SSL_CTX_free(ctx); 796 return NULL; 797 } 798 799 static SSL * 800 ssl_sock_setup_connect(SSL_CTX *ctx, int fd) 801 { 802 SSL *ssl; 803 804 ssl = SSL_new(ctx); 805 if (!ssl) { 806 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 807 return NULL; 808 } 809 SSL_set_fd(ssl, fd); 810 SSL_set_connect_state(ssl); 811 SSL_set_psk_use_session_callback(ssl, posix_sock_psk_use_session_client_cb); 812 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 813 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 814 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 815 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 816 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 817 return ssl; 818 } 819 820 static SSL * 821 ssl_sock_setup_accept(SSL_CTX *ctx, int fd) 822 { 823 SSL *ssl; 824 825 ssl = SSL_new(ctx); 826 if (!ssl) { 827 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 828 return NULL; 829 } 830 SSL_set_fd(ssl, fd); 831 SSL_set_accept_state(ssl); 832 SSL_set_psk_find_session_callback(ssl, posix_sock_psk_find_session_server_cb); 833 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 834 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 835 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 836 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 837 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 838 return ssl; 839 } 840 841 static ssize_t 842 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt) 843 { 844 int i, rc = 0; 845 ssize_t total = 0; 846 847 for (i = 0; i < iovcnt; i++) { 848 rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len); 849 850 if (rc > 0) { 851 total += rc; 852 } 853 if (rc != (int)iov[i].iov_len) { 854 break; 855 } 856 } 857 if (total > 0) { 858 errno = 0; 859 return total; 860 } 861 switch (SSL_get_error(ssl, rc)) { 862 case SSL_ERROR_ZERO_RETURN: 863 errno = ENOTCONN; 864 return 0; 865 case SSL_ERROR_WANT_READ: 866 case SSL_ERROR_WANT_WRITE: 867 case SSL_ERROR_WANT_CONNECT: 868 case SSL_ERROR_WANT_ACCEPT: 869 case SSL_ERROR_WANT_X509_LOOKUP: 870 case SSL_ERROR_WANT_ASYNC: 871 case SSL_ERROR_WANT_ASYNC_JOB: 872 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 873 errno = EAGAIN; 874 return -1; 875 case SSL_ERROR_SYSCALL: 876 case SSL_ERROR_SSL: 877 errno = ENOTCONN; 878 return -1; 879 default: 880 errno = ENOTCONN; 881 return -1; 882 } 883 } 884 885 static ssize_t 886 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt) 887 { 888 int i, rc = 0; 889 ssize_t total = 0; 890 891 for (i = 0; i < iovcnt; i++) { 892 rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len); 893 894 if (rc > 0) { 895 total += rc; 896 } 897 if (rc != (int)iov[i].iov_len) { 898 break; 899 } 900 } 901 if (total > 0) { 902 errno = 0; 903 return total; 904 } 905 switch (SSL_get_error(ssl, rc)) { 906 case SSL_ERROR_ZERO_RETURN: 907 errno = ENOTCONN; 908 return 0; 909 case SSL_ERROR_WANT_READ: 910 case SSL_ERROR_WANT_WRITE: 911 case SSL_ERROR_WANT_CONNECT: 912 case SSL_ERROR_WANT_ACCEPT: 913 case SSL_ERROR_WANT_X509_LOOKUP: 914 case SSL_ERROR_WANT_ASYNC: 915 case SSL_ERROR_WANT_ASYNC_JOB: 916 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 917 errno = EAGAIN; 918 return -1; 919 case SSL_ERROR_SYSCALL: 920 case SSL_ERROR_SSL: 921 errno = ENOTCONN; 922 return -1; 923 default: 924 errno = ENOTCONN; 925 return -1; 926 } 927 } 928 929 static struct spdk_sock * 930 posix_sock_create(const char *ip, int port, 931 enum posix_sock_create_type type, 932 struct spdk_sock_opts *opts, 933 bool enable_ssl) 934 { 935 struct spdk_posix_sock *sock; 936 struct spdk_sock_impl_opts impl_opts; 937 char buf[MAX_TMPBUF]; 938 char portnum[PORTNUMLEN]; 939 char *p; 940 const char *src_addr; 941 uint16_t src_port; 942 struct addrinfo hints, *res, *res0, *src_ai; 943 int fd, flag; 944 int rc; 945 bool enable_zcopy_user_opts = true; 946 bool enable_zcopy_impl_opts = true; 947 SSL_CTX *ctx = 0; 948 SSL *ssl = 0; 949 950 assert(opts != NULL); 951 if (enable_ssl) { 952 _opts_get_impl_opts(opts, &impl_opts, &g_ssl_impl_opts); 953 } else { 954 _opts_get_impl_opts(opts, &impl_opts, &g_posix_impl_opts); 955 } 956 957 if (ip == NULL) { 958 return NULL; 959 } 960 if (ip[0] == '[') { 961 snprintf(buf, sizeof(buf), "%s", ip + 1); 962 p = strchr(buf, ']'); 963 if (p != NULL) { 964 *p = '\0'; 965 } 966 ip = (const char *) &buf[0]; 967 } 968 969 snprintf(portnum, sizeof portnum, "%d", port); 970 memset(&hints, 0, sizeof hints); 971 hints.ai_family = PF_UNSPEC; 972 hints.ai_socktype = SOCK_STREAM; 973 hints.ai_flags = AI_NUMERICSERV; 974 hints.ai_flags |= AI_PASSIVE; 975 hints.ai_flags |= AI_NUMERICHOST; 976 rc = getaddrinfo(ip, portnum, &hints, &res0); 977 if (rc != 0) { 978 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 979 return NULL; 980 } 981 982 /* try listen */ 983 fd = -1; 984 for (res = res0; res != NULL; res = res->ai_next) { 985 retry: 986 fd = posix_fd_create(res, opts, &impl_opts); 987 if (fd < 0) { 988 continue; 989 } 990 if (type == SPDK_SOCK_CREATE_LISTEN) { 991 rc = bind(fd, res->ai_addr, res->ai_addrlen); 992 if (rc != 0) { 993 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 994 switch (errno) { 995 case EINTR: 996 /* interrupted? */ 997 close(fd); 998 goto retry; 999 case EADDRNOTAVAIL: 1000 SPDK_ERRLOG("IP address %s not available. " 1001 "Verify IP address in config file " 1002 "and make sure setup script is " 1003 "run before starting spdk app.\n", ip); 1004 /* FALLTHROUGH */ 1005 default: 1006 /* try next family */ 1007 close(fd); 1008 fd = -1; 1009 continue; 1010 } 1011 } 1012 /* bind OK */ 1013 rc = listen(fd, 512); 1014 if (rc != 0) { 1015 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 1016 close(fd); 1017 fd = -1; 1018 break; 1019 } 1020 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 1021 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 1022 src_addr = SPDK_GET_FIELD(opts, src_addr, NULL, opts->opts_size); 1023 src_port = SPDK_GET_FIELD(opts, src_port, 0, opts->opts_size); 1024 if (src_addr != NULL || src_port != 0) { 1025 snprintf(portnum, sizeof(portnum), "%"PRIu16, src_port); 1026 memset(&hints, 0, sizeof hints); 1027 hints.ai_family = AF_UNSPEC; 1028 hints.ai_socktype = SOCK_STREAM; 1029 hints.ai_flags = AI_NUMERICSERV | AI_NUMERICHOST | AI_PASSIVE; 1030 rc = getaddrinfo(src_addr, src_port > 0 ? portnum : NULL, 1031 &hints, &src_ai); 1032 if (rc != 0 || src_ai == NULL) { 1033 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", 1034 rc != 0 ? gai_strerror(rc) : "", rc); 1035 close(fd); 1036 fd = -1; 1037 break; 1038 } 1039 rc = bind(fd, src_ai->ai_addr, src_ai->ai_addrlen); 1040 if (rc != 0) { 1041 SPDK_ERRLOG("bind() failed errno %d (%s:%s)\n", errno, 1042 src_addr ? src_addr : "", portnum); 1043 close(fd); 1044 fd = -1; 1045 freeaddrinfo(src_ai); 1046 src_ai = NULL; 1047 break; 1048 } 1049 freeaddrinfo(src_ai); 1050 src_ai = NULL; 1051 } 1052 rc = connect(fd, res->ai_addr, res->ai_addrlen); 1053 if (rc != 0) { 1054 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 1055 /* try next family */ 1056 close(fd); 1057 fd = -1; 1058 continue; 1059 } 1060 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 1061 if (enable_ssl) { 1062 ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts); 1063 if (!ctx) { 1064 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1065 close(fd); 1066 fd = -1; 1067 break; 1068 } 1069 ssl = ssl_sock_setup_connect(ctx, fd); 1070 if (!ssl) { 1071 SPDK_ERRLOG("ssl_sock_setup_connect() failed, errno = %d\n", errno); 1072 close(fd); 1073 fd = -1; 1074 SSL_CTX_free(ctx); 1075 break; 1076 } 1077 } 1078 } 1079 1080 flag = fcntl(fd, F_GETFL); 1081 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 1082 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1083 SSL_free(ssl); 1084 SSL_CTX_free(ctx); 1085 close(fd); 1086 fd = -1; 1087 break; 1088 } 1089 break; 1090 } 1091 freeaddrinfo(res0); 1092 1093 if (fd < 0) { 1094 return NULL; 1095 } 1096 1097 /* Only enable zero copy for non-loopback and non-ssl sockets. */ 1098 enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd) && !enable_ssl; 1099 1100 sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 1101 if (sock == NULL) { 1102 SPDK_ERRLOG("sock allocation failed\n"); 1103 SSL_free(ssl); 1104 SSL_CTX_free(ctx); 1105 close(fd); 1106 return NULL; 1107 } 1108 1109 if (ctx) { 1110 sock->ctx = ctx; 1111 } 1112 1113 if (ssl) { 1114 sock->ssl = ssl; 1115 SSL_set_app_data(ssl, &sock->base.impl_opts); 1116 } 1117 1118 return &sock->base; 1119 } 1120 1121 static struct spdk_sock * 1122 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 1123 { 1124 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false); 1125 } 1126 1127 static struct spdk_sock * 1128 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 1129 { 1130 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false); 1131 } 1132 1133 static struct spdk_sock * 1134 _posix_sock_accept(struct spdk_sock *_sock, bool enable_ssl) 1135 { 1136 struct spdk_posix_sock *sock = __posix_sock(_sock); 1137 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1138 struct sockaddr_storage sa; 1139 socklen_t salen; 1140 int rc, fd; 1141 struct spdk_posix_sock *new_sock; 1142 int flag; 1143 SSL_CTX *ctx = 0; 1144 SSL *ssl = 0; 1145 1146 memset(&sa, 0, sizeof(sa)); 1147 salen = sizeof(sa); 1148 1149 assert(sock != NULL); 1150 1151 /* epoll_wait will trigger again if there is more than one request */ 1152 if (group && sock->socket_has_data) { 1153 sock->socket_has_data = false; 1154 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1155 } 1156 1157 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 1158 1159 if (rc == -1) { 1160 return NULL; 1161 } 1162 1163 fd = rc; 1164 1165 flag = fcntl(fd, F_GETFL); 1166 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 1167 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1168 close(fd); 1169 return NULL; 1170 } 1171 1172 #if defined(SO_PRIORITY) 1173 /* The priority is not inherited, so call this function again */ 1174 if (sock->base.opts.priority) { 1175 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 1176 if (rc != 0) { 1177 close(fd); 1178 return NULL; 1179 } 1180 } 1181 #endif 1182 1183 /* Establish SSL connection */ 1184 if (enable_ssl) { 1185 ctx = posix_sock_create_ssl_context(TLS_server_method(), &sock->base.opts, &sock->base.impl_opts); 1186 if (!ctx) { 1187 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1188 close(fd); 1189 return NULL; 1190 } 1191 ssl = ssl_sock_setup_accept(ctx, fd); 1192 if (!ssl) { 1193 SPDK_ERRLOG("ssl_sock_setup_accept() failed, errno = %d\n", errno); 1194 close(fd); 1195 SSL_CTX_free(ctx); 1196 return NULL; 1197 } 1198 } 1199 1200 /* Inherit the zero copy feature from the listen socket */ 1201 new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 1202 if (new_sock == NULL) { 1203 close(fd); 1204 SSL_free(ssl); 1205 SSL_CTX_free(ctx); 1206 return NULL; 1207 } 1208 1209 if (ctx) { 1210 new_sock->ctx = ctx; 1211 } 1212 1213 if (ssl) { 1214 new_sock->ssl = ssl; 1215 SSL_set_app_data(ssl, &new_sock->base.impl_opts); 1216 } 1217 1218 return &new_sock->base; 1219 } 1220 1221 static struct spdk_sock * 1222 posix_sock_accept(struct spdk_sock *_sock) 1223 { 1224 return _posix_sock_accept(_sock, false); 1225 } 1226 1227 static int 1228 posix_sock_close(struct spdk_sock *_sock) 1229 { 1230 struct spdk_posix_sock *sock = __posix_sock(_sock); 1231 void *pipe_buf; 1232 1233 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 1234 1235 if (sock->ssl != NULL) { 1236 SSL_shutdown(sock->ssl); 1237 } 1238 1239 /* If the socket fails to close, the best choice is to 1240 * leak the fd but continue to free the rest of the sock 1241 * memory. */ 1242 close(sock->fd); 1243 1244 SSL_free(sock->ssl); 1245 SSL_CTX_free(sock->ctx); 1246 1247 pipe_buf = spdk_pipe_destroy(sock->recv_pipe); 1248 free(pipe_buf); 1249 free(sock); 1250 1251 return 0; 1252 } 1253 1254 #ifdef SPDK_ZEROCOPY 1255 static int 1256 _sock_check_zcopy(struct spdk_sock *sock) 1257 { 1258 struct spdk_posix_sock *psock = __posix_sock(sock); 1259 struct msghdr msgh = {}; 1260 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 1261 ssize_t rc; 1262 struct sock_extended_err *serr; 1263 struct cmsghdr *cm; 1264 uint32_t idx; 1265 struct spdk_sock_request *req, *treq; 1266 bool found; 1267 1268 msgh.msg_control = buf; 1269 msgh.msg_controllen = sizeof(buf); 1270 1271 while (true) { 1272 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 1273 1274 if (rc < 0) { 1275 if (errno == EWOULDBLOCK || errno == EAGAIN) { 1276 return 0; 1277 } 1278 1279 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 1280 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 1281 } else { 1282 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1283 } 1284 return 0; 1285 } 1286 1287 cm = CMSG_FIRSTHDR(&msgh); 1288 if (!(cm && 1289 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1290 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 1291 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1292 return 0; 1293 } 1294 1295 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1296 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1297 SPDK_WARNLOG("Unexpected extended error origin\n"); 1298 return 0; 1299 } 1300 1301 /* Most of the time, the pending_reqs array is in the exact 1302 * order we need such that all of the requests to complete are 1303 * in order, in the front. It is guaranteed that all requests 1304 * belonging to the same sendmsg call are sequential, so once 1305 * we encounter one match we can stop looping as soon as a 1306 * non-match is found. 1307 */ 1308 idx = serr->ee_info; 1309 while (true) { 1310 found = false; 1311 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 1312 if (!req->internal.is_zcopy) { 1313 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1314 rc = spdk_sock_request_put(sock, req, 0); 1315 if (rc < 0) { 1316 return rc; 1317 } 1318 } else if (req->internal.offset == idx) { 1319 found = true; 1320 rc = spdk_sock_request_put(sock, req, 0); 1321 if (rc < 0) { 1322 return rc; 1323 } 1324 } else if (found) { 1325 break; 1326 } 1327 } 1328 1329 if (idx == serr->ee_data) { 1330 break; 1331 } 1332 1333 if (idx == UINT32_MAX) { 1334 idx = 0; 1335 } else { 1336 idx++; 1337 } 1338 } 1339 } 1340 1341 return 0; 1342 } 1343 #endif 1344 1345 static int 1346 _sock_flush(struct spdk_sock *sock) 1347 { 1348 struct spdk_posix_sock *psock = __posix_sock(sock); 1349 struct msghdr msg = {}; 1350 int flags; 1351 struct iovec iovs[IOV_BATCH_SIZE]; 1352 int iovcnt; 1353 int retval; 1354 struct spdk_sock_request *req; 1355 int i; 1356 ssize_t rc, sent; 1357 unsigned int offset; 1358 size_t len; 1359 bool is_zcopy = false; 1360 1361 /* Can't flush from within a callback or we end up with recursive calls */ 1362 if (sock->cb_cnt > 0) { 1363 errno = EAGAIN; 1364 return -1; 1365 } 1366 1367 #ifdef SPDK_ZEROCOPY 1368 if (psock->zcopy) { 1369 flags = MSG_ZEROCOPY | MSG_NOSIGNAL; 1370 } else 1371 #endif 1372 { 1373 flags = MSG_NOSIGNAL; 1374 } 1375 1376 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); 1377 if (iovcnt == 0) { 1378 return 0; 1379 } 1380 1381 #ifdef SPDK_ZEROCOPY 1382 is_zcopy = flags & MSG_ZEROCOPY; 1383 #endif 1384 1385 /* Perform the vectored write */ 1386 msg.msg_iov = iovs; 1387 msg.msg_iovlen = iovcnt; 1388 1389 if (psock->ssl) { 1390 rc = SSL_writev(psock->ssl, iovs, iovcnt); 1391 } else { 1392 rc = sendmsg(psock->fd, &msg, flags); 1393 } 1394 if (rc <= 0) { 1395 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 1396 errno = EAGAIN; 1397 } 1398 return -1; 1399 } 1400 1401 sent = rc; 1402 1403 if (is_zcopy) { 1404 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1405 * req->internal.offset, so sendmsg_idx should not be zero */ 1406 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 1407 psock->sendmsg_idx = 1; 1408 } else { 1409 psock->sendmsg_idx++; 1410 } 1411 } 1412 1413 /* Consume the requests that were actually written */ 1414 req = TAILQ_FIRST(&sock->queued_reqs); 1415 while (req) { 1416 offset = req->internal.offset; 1417 1418 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1419 req->internal.is_zcopy = is_zcopy; 1420 1421 for (i = 0; i < req->iovcnt; i++) { 1422 /* Advance by the offset first */ 1423 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1424 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1425 continue; 1426 } 1427 1428 /* Calculate the remaining length of this element */ 1429 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1430 1431 if (len > (size_t)rc) { 1432 /* This element was partially sent. */ 1433 req->internal.offset += rc; 1434 return sent; 1435 } 1436 1437 offset = 0; 1438 req->internal.offset += len; 1439 rc -= len; 1440 } 1441 1442 /* Handled a full request. */ 1443 spdk_sock_request_pend(sock, req); 1444 1445 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { 1446 /* The sendmsg syscall above isn't currently asynchronous, 1447 * so it's already done. */ 1448 retval = spdk_sock_request_put(sock, req, 0); 1449 if (retval) { 1450 break; 1451 } 1452 } else { 1453 /* Re-use the offset field to hold the sendmsg call index. The 1454 * index is 0 based, so subtract one here because we've already 1455 * incremented above. */ 1456 req->internal.offset = psock->sendmsg_idx - 1; 1457 } 1458 1459 if (rc == 0) { 1460 break; 1461 } 1462 1463 req = TAILQ_FIRST(&sock->queued_reqs); 1464 } 1465 1466 return sent; 1467 } 1468 1469 static int 1470 posix_sock_flush(struct spdk_sock *sock) 1471 { 1472 #ifdef SPDK_ZEROCOPY 1473 struct spdk_posix_sock *psock = __posix_sock(sock); 1474 1475 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 1476 _sock_check_zcopy(sock); 1477 } 1478 #endif 1479 1480 return _sock_flush(sock); 1481 } 1482 1483 static ssize_t 1484 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 1485 { 1486 struct iovec siov[2]; 1487 int sbytes; 1488 ssize_t bytes; 1489 struct spdk_posix_sock_group_impl *group; 1490 1491 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 1492 if (sbytes < 0) { 1493 errno = EINVAL; 1494 return -1; 1495 } else if (sbytes == 0) { 1496 errno = EAGAIN; 1497 return -1; 1498 } 1499 1500 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 1501 1502 if (bytes == 0) { 1503 /* The only way this happens is if diov is 0 length */ 1504 errno = EINVAL; 1505 return -1; 1506 } 1507 1508 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 1509 1510 /* If we drained the pipe, mark it appropriately */ 1511 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1512 assert(sock->pipe_has_data == true); 1513 1514 group = __posix_group_impl(sock->base.group_impl); 1515 if (group && !sock->socket_has_data) { 1516 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1517 } 1518 1519 sock->pipe_has_data = false; 1520 } 1521 1522 return bytes; 1523 } 1524 1525 static inline ssize_t 1526 posix_sock_read(struct spdk_posix_sock *sock) 1527 { 1528 struct iovec iov[2]; 1529 int bytes_avail, bytes_recvd; 1530 struct spdk_posix_sock_group_impl *group; 1531 1532 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 1533 1534 if (bytes_avail <= 0) { 1535 return bytes_avail; 1536 } 1537 1538 if (sock->ssl) { 1539 bytes_recvd = SSL_readv(sock->ssl, iov, 2); 1540 } else { 1541 bytes_recvd = readv(sock->fd, iov, 2); 1542 } 1543 1544 assert(sock->pipe_has_data == false); 1545 1546 if (bytes_recvd <= 0) { 1547 /* Errors count as draining the socket data */ 1548 if (sock->base.group_impl && sock->socket_has_data) { 1549 group = __posix_group_impl(sock->base.group_impl); 1550 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1551 } 1552 1553 sock->socket_has_data = false; 1554 1555 return bytes_recvd; 1556 } 1557 1558 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 1559 1560 #if DEBUG 1561 if (sock->base.group_impl) { 1562 assert(sock->socket_has_data == true); 1563 } 1564 #endif 1565 1566 sock->pipe_has_data = true; 1567 if (bytes_recvd < bytes_avail) { 1568 /* We drained the kernel socket entirely. */ 1569 sock->socket_has_data = false; 1570 } 1571 1572 return bytes_recvd; 1573 } 1574 1575 static ssize_t 1576 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1577 { 1578 struct spdk_posix_sock *sock = __posix_sock(_sock); 1579 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1580 int rc, i; 1581 size_t len; 1582 1583 if (sock->recv_pipe == NULL) { 1584 assert(sock->pipe_has_data == false); 1585 if (group && sock->socket_has_data) { 1586 sock->socket_has_data = false; 1587 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1588 } 1589 if (sock->ssl) { 1590 return SSL_readv(sock->ssl, iov, iovcnt); 1591 } else { 1592 return readv(sock->fd, iov, iovcnt); 1593 } 1594 } 1595 1596 /* If the socket is not in a group, we must assume it always has 1597 * data waiting for us because it is not epolled */ 1598 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 1599 /* If the user is receiving a sufficiently large amount of data, 1600 * receive directly to their buffers. */ 1601 len = 0; 1602 for (i = 0; i < iovcnt; i++) { 1603 len += iov[i].iov_len; 1604 } 1605 1606 if (len >= MIN_SOCK_PIPE_SIZE) { 1607 /* TODO: Should this detect if kernel socket is drained? */ 1608 if (sock->ssl) { 1609 return SSL_readv(sock->ssl, iov, iovcnt); 1610 } else { 1611 return readv(sock->fd, iov, iovcnt); 1612 } 1613 } 1614 1615 /* Otherwise, do a big read into our pipe */ 1616 rc = posix_sock_read(sock); 1617 if (rc <= 0) { 1618 return rc; 1619 } 1620 } 1621 1622 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 1623 } 1624 1625 static ssize_t 1626 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1627 { 1628 struct iovec iov[1]; 1629 1630 iov[0].iov_base = buf; 1631 iov[0].iov_len = len; 1632 1633 return posix_sock_readv(sock, iov, 1); 1634 } 1635 1636 static ssize_t 1637 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1638 { 1639 struct spdk_posix_sock *sock = __posix_sock(_sock); 1640 int rc; 1641 1642 /* In order to process a writev, we need to flush any asynchronous writes 1643 * first. */ 1644 rc = _sock_flush(_sock); 1645 if (rc < 0) { 1646 return rc; 1647 } 1648 1649 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1650 /* We weren't able to flush all requests */ 1651 errno = EAGAIN; 1652 return -1; 1653 } 1654 1655 if (sock->ssl) { 1656 return SSL_writev(sock->ssl, iov, iovcnt); 1657 } else { 1658 return writev(sock->fd, iov, iovcnt); 1659 } 1660 } 1661 1662 static int 1663 posix_sock_recv_next(struct spdk_sock *_sock, void **buf, void **ctx) 1664 { 1665 struct spdk_posix_sock *sock = __posix_sock(_sock); 1666 struct iovec iov; 1667 ssize_t rc; 1668 1669 if (sock->recv_pipe != NULL) { 1670 errno = ENOTSUP; 1671 return -1; 1672 } 1673 1674 iov.iov_len = spdk_sock_group_get_buf(_sock->group_impl->group, &iov.iov_base, ctx); 1675 if (iov.iov_len == 0) { 1676 errno = ENOBUFS; 1677 return -1; 1678 } 1679 1680 rc = posix_sock_readv(_sock, &iov, 1); 1681 if (rc <= 0) { 1682 spdk_sock_group_provide_buf(_sock->group_impl->group, iov.iov_base, iov.iov_len, *ctx); 1683 return rc; 1684 } 1685 1686 *buf = iov.iov_base; 1687 1688 return rc; 1689 } 1690 1691 static void 1692 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1693 { 1694 int rc; 1695 1696 spdk_sock_request_queue(sock, req); 1697 1698 /* If there are a sufficient number queued, just flush them out immediately. */ 1699 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1700 rc = _sock_flush(sock); 1701 if (rc < 0 && errno != EAGAIN) { 1702 spdk_sock_abort_requests(sock); 1703 } 1704 } 1705 } 1706 1707 static int 1708 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1709 { 1710 struct spdk_posix_sock *sock = __posix_sock(_sock); 1711 int val; 1712 int rc; 1713 1714 assert(sock != NULL); 1715 1716 val = nbytes; 1717 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1718 if (rc != 0) { 1719 return -1; 1720 } 1721 return 0; 1722 } 1723 1724 static bool 1725 posix_sock_is_ipv6(struct spdk_sock *_sock) 1726 { 1727 struct spdk_posix_sock *sock = __posix_sock(_sock); 1728 struct sockaddr_storage sa; 1729 socklen_t salen; 1730 int rc; 1731 1732 assert(sock != NULL); 1733 1734 memset(&sa, 0, sizeof sa); 1735 salen = sizeof sa; 1736 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1737 if (rc != 0) { 1738 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1739 return false; 1740 } 1741 1742 return (sa.ss_family == AF_INET6); 1743 } 1744 1745 static bool 1746 posix_sock_is_ipv4(struct spdk_sock *_sock) 1747 { 1748 struct spdk_posix_sock *sock = __posix_sock(_sock); 1749 struct sockaddr_storage sa; 1750 socklen_t salen; 1751 int rc; 1752 1753 assert(sock != NULL); 1754 1755 memset(&sa, 0, sizeof sa); 1756 salen = sizeof sa; 1757 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1758 if (rc != 0) { 1759 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1760 return false; 1761 } 1762 1763 return (sa.ss_family == AF_INET); 1764 } 1765 1766 static bool 1767 posix_sock_is_connected(struct spdk_sock *_sock) 1768 { 1769 struct spdk_posix_sock *sock = __posix_sock(_sock); 1770 uint8_t byte; 1771 int rc; 1772 1773 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1774 if (rc == 0) { 1775 return false; 1776 } 1777 1778 if (rc < 0) { 1779 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1780 return true; 1781 } 1782 1783 return false; 1784 } 1785 1786 return true; 1787 } 1788 1789 static struct spdk_sock_group_impl * 1790 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1791 { 1792 struct spdk_posix_sock *sock = __posix_sock(_sock); 1793 struct spdk_sock_group_impl *group_impl; 1794 1795 if (sock->placement_id != -1) { 1796 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint); 1797 return group_impl; 1798 } 1799 1800 return NULL; 1801 } 1802 1803 static struct spdk_sock_group_impl * 1804 _sock_group_impl_create(uint32_t enable_placement_id) 1805 { 1806 struct spdk_posix_sock_group_impl *group_impl; 1807 int fd; 1808 1809 #if defined(SPDK_EPOLL) 1810 fd = epoll_create1(0); 1811 #elif defined(SPDK_KEVENT) 1812 fd = kqueue(); 1813 #endif 1814 if (fd == -1) { 1815 return NULL; 1816 } 1817 1818 group_impl = calloc(1, sizeof(*group_impl)); 1819 if (group_impl == NULL) { 1820 SPDK_ERRLOG("group_impl allocation failed\n"); 1821 close(fd); 1822 return NULL; 1823 } 1824 1825 group_impl->pipe_group = spdk_pipe_group_create(); 1826 if (group_impl->pipe_group == NULL) { 1827 SPDK_ERRLOG("pipe_group allocation failed\n"); 1828 free(group_impl); 1829 close(fd); 1830 return NULL; 1831 } 1832 1833 group_impl->fd = fd; 1834 TAILQ_INIT(&group_impl->socks_with_data); 1835 group_impl->placement_id = -1; 1836 1837 if (enable_placement_id == PLACEMENT_CPU) { 1838 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1839 group_impl->placement_id = spdk_env_get_current_core(); 1840 } 1841 1842 return &group_impl->base; 1843 } 1844 1845 static struct spdk_sock_group_impl * 1846 posix_sock_group_impl_create(void) 1847 { 1848 return _sock_group_impl_create(g_posix_impl_opts.enable_placement_id); 1849 } 1850 1851 static struct spdk_sock_group_impl * 1852 ssl_sock_group_impl_create(void) 1853 { 1854 return _sock_group_impl_create(g_ssl_impl_opts.enable_placement_id); 1855 } 1856 1857 static void 1858 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1859 int placement_id) 1860 { 1861 #if defined(SO_MARK) 1862 int rc; 1863 1864 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1865 &placement_id, sizeof(placement_id)); 1866 if (rc != 0) { 1867 /* Not fatal */ 1868 SPDK_ERRLOG("Error setting SO_MARK\n"); 1869 return; 1870 } 1871 1872 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1873 if (rc != 0) { 1874 /* Not fatal */ 1875 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1876 return; 1877 } 1878 1879 sock->placement_id = placement_id; 1880 #endif 1881 } 1882 1883 static void 1884 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1885 { 1886 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1887 1888 if (group->placement_id == -1) { 1889 group->placement_id = spdk_sock_map_find_free(&g_map); 1890 1891 /* If a free placement id is found, update existing sockets in this group */ 1892 if (group->placement_id != -1) { 1893 struct spdk_sock *sock, *tmp; 1894 1895 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1896 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1897 } 1898 } 1899 } 1900 1901 if (group->placement_id != -1) { 1902 /* 1903 * group placement id is already determined for this poll group. 1904 * Mark socket with group's placement id. 1905 */ 1906 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1907 } 1908 } 1909 1910 static int 1911 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1912 { 1913 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1914 struct spdk_posix_sock *sock = __posix_sock(_sock); 1915 int rc; 1916 1917 #if defined(SPDK_EPOLL) 1918 struct epoll_event event; 1919 1920 memset(&event, 0, sizeof(event)); 1921 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1922 event.events = EPOLLIN | EPOLLERR; 1923 event.data.ptr = sock; 1924 1925 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1926 #elif defined(SPDK_KEVENT) 1927 struct kevent event; 1928 struct timespec ts = {0}; 1929 1930 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1931 1932 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1933 #endif 1934 1935 if (rc != 0) { 1936 return rc; 1937 } 1938 1939 /* switched from another polling group due to scheduling */ 1940 if (spdk_unlikely(sock->recv_pipe != NULL && 1941 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1942 sock->pipe_has_data = true; 1943 sock->socket_has_data = false; 1944 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1945 } else if (sock->recv_pipe != NULL) { 1946 rc = spdk_pipe_group_add(group->pipe_group, sock->recv_pipe); 1947 assert(rc == 0); 1948 } 1949 1950 if (_sock->impl_opts.enable_placement_id == PLACEMENT_MARK) { 1951 posix_sock_update_mark(_group, _sock); 1952 } else if (sock->placement_id != -1) { 1953 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1954 if (rc != 0) { 1955 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1956 /* Do not treat this as an error. The system will continue running. */ 1957 } 1958 } 1959 1960 return rc; 1961 } 1962 1963 static int 1964 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1965 { 1966 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1967 struct spdk_posix_sock *sock = __posix_sock(_sock); 1968 int rc; 1969 1970 if (sock->pipe_has_data || sock->socket_has_data) { 1971 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1972 sock->pipe_has_data = false; 1973 sock->socket_has_data = false; 1974 } else if (sock->recv_pipe != NULL) { 1975 rc = spdk_pipe_group_remove(group->pipe_group, sock->recv_pipe); 1976 assert(rc == 0); 1977 } 1978 1979 if (sock->placement_id != -1) { 1980 spdk_sock_map_release(&g_map, sock->placement_id); 1981 } 1982 1983 #if defined(SPDK_EPOLL) 1984 struct epoll_event event; 1985 1986 /* Event parameter is ignored but some old kernel version still require it. */ 1987 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1988 #elif defined(SPDK_KEVENT) 1989 struct kevent event; 1990 struct timespec ts = {0}; 1991 1992 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1993 1994 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1995 if (rc == 0 && event.flags & EV_ERROR) { 1996 rc = -1; 1997 errno = event.data; 1998 } 1999 #endif 2000 2001 spdk_sock_abort_requests(_sock); 2002 2003 return rc; 2004 } 2005 2006 static int 2007 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 2008 struct spdk_sock **socks) 2009 { 2010 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2011 struct spdk_sock *sock, *tmp; 2012 int num_events, i, rc; 2013 struct spdk_posix_sock *psock, *ptmp; 2014 #if defined(SPDK_EPOLL) 2015 struct epoll_event events[MAX_EVENTS_PER_POLL]; 2016 #elif defined(SPDK_KEVENT) 2017 struct kevent events[MAX_EVENTS_PER_POLL]; 2018 struct timespec ts = {0}; 2019 #endif 2020 2021 #ifdef SPDK_ZEROCOPY 2022 /* When all of the following conditions are met 2023 * - non-blocking socket 2024 * - zero copy is enabled 2025 * - interrupts suppressed (i.e. busy polling) 2026 * - the NIC tx queue is full at the time sendmsg() is called 2027 * - epoll_wait determines there is an EPOLLIN event for the socket 2028 * then we can get into a situation where data we've sent is queued 2029 * up in the kernel network stack, but interrupts have been suppressed 2030 * because other traffic is flowing so the kernel misses the signal 2031 * to flush the software tx queue. If there wasn't incoming data 2032 * pending on the socket, then epoll_wait would have been sufficient 2033 * to kick off the send operation, but since there is a pending event 2034 * epoll_wait does not trigger the necessary operation. 2035 * 2036 * We deal with this by checking for all of the above conditions and 2037 * additionally looking for EPOLLIN events that were not consumed from 2038 * the last poll loop. We take this to mean that the upper layer is 2039 * unable to consume them because it is blocked waiting for resources 2040 * to free up, and those resources are most likely freed in response 2041 * to a pending asynchronous write completing. 2042 * 2043 * Additionally, sockets that have the same placement_id actually share 2044 * an underlying hardware queue. That means polling one of them is 2045 * equivalent to polling all of them. As a quick mechanism to avoid 2046 * making extra poll() calls, stash the last placement_id during the loop 2047 * and only poll if it's not the same. The overwhelmingly common case 2048 * is that all sockets in this list have the same placement_id because 2049 * SPDK is intentionally grouping sockets by that value, so even 2050 * though this won't stop all extra calls to poll(), it's very fast 2051 * and will catch all of them in practice. 2052 */ 2053 int last_placement_id = -1; 2054 2055 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 2056 if (psock->zcopy && psock->placement_id >= 0 && 2057 psock->placement_id != last_placement_id) { 2058 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 2059 2060 poll(&pfd, 1, 0); 2061 last_placement_id = psock->placement_id; 2062 } 2063 } 2064 #endif 2065 2066 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 2067 * a completion callback could remove the sock from the 2068 * group. */ 2069 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 2070 rc = _sock_flush(sock); 2071 if (rc < 0 && errno != EAGAIN) { 2072 spdk_sock_abort_requests(sock); 2073 } 2074 } 2075 2076 assert(max_events > 0); 2077 2078 #if defined(SPDK_EPOLL) 2079 num_events = epoll_wait(group->fd, events, max_events, 0); 2080 #elif defined(SPDK_KEVENT) 2081 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 2082 #endif 2083 2084 if (num_events == -1) { 2085 return -1; 2086 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 2087 sock = TAILQ_FIRST(&_group->socks); 2088 psock = __posix_sock(sock); 2089 /* poll() is called here to busy poll the queue associated with 2090 * first socket in list and potentially reap incoming data. 2091 */ 2092 if (sock->opts.priority) { 2093 struct pollfd pfd = {0, 0, 0}; 2094 2095 pfd.fd = psock->fd; 2096 pfd.events = POLLIN | POLLERR; 2097 poll(&pfd, 1, 0); 2098 } 2099 } 2100 2101 for (i = 0; i < num_events; i++) { 2102 #if defined(SPDK_EPOLL) 2103 sock = events[i].data.ptr; 2104 psock = __posix_sock(sock); 2105 2106 #ifdef SPDK_ZEROCOPY 2107 if (events[i].events & EPOLLERR) { 2108 rc = _sock_check_zcopy(sock); 2109 /* If the socket was closed or removed from 2110 * the group in response to a send ack, don't 2111 * add it to the array here. */ 2112 if (rc || sock->cb_fn == NULL) { 2113 continue; 2114 } 2115 } 2116 #endif 2117 if ((events[i].events & EPOLLIN) == 0) { 2118 continue; 2119 } 2120 2121 #elif defined(SPDK_KEVENT) 2122 sock = events[i].udata; 2123 psock = __posix_sock(sock); 2124 #endif 2125 2126 /* If the socket is not already in the list, add it now */ 2127 if (!psock->socket_has_data && !psock->pipe_has_data) { 2128 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 2129 } 2130 psock->socket_has_data = true; 2131 } 2132 2133 num_events = 0; 2134 2135 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 2136 if (num_events == max_events) { 2137 break; 2138 } 2139 2140 /* If the socket's cb_fn is NULL, just remove it from the 2141 * list and do not add it to socks array */ 2142 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 2143 psock->socket_has_data = false; 2144 psock->pipe_has_data = false; 2145 TAILQ_REMOVE(&group->socks_with_data, psock, link); 2146 continue; 2147 } 2148 2149 socks[num_events++] = &psock->base; 2150 } 2151 2152 /* Cycle the has_data list so that each time we poll things aren't 2153 * in the same order. Say we have 6 sockets in the list, named as follows: 2154 * A B C D E F 2155 * And all 6 sockets had epoll events, but max_events is only 3. That means 2156 * psock currently points at D. We want to rearrange the list to the following: 2157 * D E F A B C 2158 * 2159 * The variables below are named according to this example to make it easier to 2160 * follow the swaps. 2161 */ 2162 if (psock != NULL) { 2163 struct spdk_posix_sock *pa, *pc, *pd, *pf; 2164 2165 /* Capture pointers to the elements we need */ 2166 pd = psock; 2167 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 2168 pa = TAILQ_FIRST(&group->socks_with_data); 2169 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 2170 2171 /* Break the link between C and D */ 2172 pc->link.tqe_next = NULL; 2173 2174 /* Connect F to A */ 2175 pf->link.tqe_next = pa; 2176 pa->link.tqe_prev = &pf->link.tqe_next; 2177 2178 /* Fix up the list first/last pointers */ 2179 group->socks_with_data.tqh_first = pd; 2180 group->socks_with_data.tqh_last = &pc->link.tqe_next; 2181 2182 /* D is in front of the list, make tqe prev pointer point to the head of list */ 2183 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 2184 } 2185 2186 return num_events; 2187 } 2188 2189 static int 2190 posix_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events, 2191 spdk_interrupt_fn fn, void *arg, const char *name) 2192 { 2193 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2194 2195 group->intr = spdk_interrupt_register_for_events(group->fd, events, fn, arg, name); 2196 2197 return group->intr ? 0 : -1; 2198 } 2199 2200 static void 2201 posix_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group) 2202 { 2203 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2204 2205 spdk_interrupt_unregister(&group->intr); 2206 } 2207 2208 static int 2209 _sock_group_impl_close(struct spdk_sock_group_impl *_group, uint32_t enable_placement_id) 2210 { 2211 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2212 int rc; 2213 2214 if (enable_placement_id == PLACEMENT_CPU) { 2215 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 2216 } 2217 2218 spdk_pipe_group_destroy(group->pipe_group); 2219 rc = close(group->fd); 2220 free(group); 2221 return rc; 2222 } 2223 2224 static int 2225 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2226 { 2227 return _sock_group_impl_close(_group, g_posix_impl_opts.enable_placement_id); 2228 } 2229 2230 static int 2231 ssl_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2232 { 2233 return _sock_group_impl_close(_group, g_ssl_impl_opts.enable_placement_id); 2234 } 2235 2236 static struct spdk_net_impl g_posix_net_impl = { 2237 .name = "posix", 2238 .getaddr = posix_sock_getaddr, 2239 .get_interface_name = posix_sock_get_interface_name, 2240 .get_numa_id = posix_sock_get_numa_id, 2241 .connect = posix_sock_connect, 2242 .listen = posix_sock_listen, 2243 .accept = posix_sock_accept, 2244 .close = posix_sock_close, 2245 .recv = posix_sock_recv, 2246 .readv = posix_sock_readv, 2247 .writev = posix_sock_writev, 2248 .recv_next = posix_sock_recv_next, 2249 .writev_async = posix_sock_writev_async, 2250 .flush = posix_sock_flush, 2251 .set_recvlowat = posix_sock_set_recvlowat, 2252 .set_recvbuf = posix_sock_set_recvbuf, 2253 .set_sendbuf = posix_sock_set_sendbuf, 2254 .is_ipv6 = posix_sock_is_ipv6, 2255 .is_ipv4 = posix_sock_is_ipv4, 2256 .is_connected = posix_sock_is_connected, 2257 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2258 .group_impl_create = posix_sock_group_impl_create, 2259 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2260 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2261 .group_impl_poll = posix_sock_group_impl_poll, 2262 .group_impl_register_interrupt = posix_sock_group_impl_register_interrupt, 2263 .group_impl_unregister_interrupt = posix_sock_group_impl_unregister_interrupt, 2264 .group_impl_close = posix_sock_group_impl_close, 2265 .get_opts = posix_sock_impl_get_opts, 2266 .set_opts = posix_sock_impl_set_opts, 2267 }; 2268 2269 SPDK_NET_IMPL_REGISTER_DEFAULT(posix, &g_posix_net_impl); 2270 2271 static struct spdk_sock * 2272 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 2273 { 2274 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true); 2275 } 2276 2277 static struct spdk_sock * 2278 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 2279 { 2280 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true); 2281 } 2282 2283 static struct spdk_sock * 2284 ssl_sock_accept(struct spdk_sock *_sock) 2285 { 2286 return _posix_sock_accept(_sock, true); 2287 } 2288 2289 static struct spdk_net_impl g_ssl_net_impl = { 2290 .name = "ssl", 2291 .getaddr = posix_sock_getaddr, 2292 .get_interface_name = posix_sock_get_interface_name, 2293 .get_numa_id = posix_sock_get_numa_id, 2294 .connect = ssl_sock_connect, 2295 .listen = ssl_sock_listen, 2296 .accept = ssl_sock_accept, 2297 .close = posix_sock_close, 2298 .recv = posix_sock_recv, 2299 .readv = posix_sock_readv, 2300 .writev = posix_sock_writev, 2301 .recv_next = posix_sock_recv_next, 2302 .writev_async = posix_sock_writev_async, 2303 .flush = posix_sock_flush, 2304 .set_recvlowat = posix_sock_set_recvlowat, 2305 .set_recvbuf = posix_sock_set_recvbuf, 2306 .set_sendbuf = posix_sock_set_sendbuf, 2307 .is_ipv6 = posix_sock_is_ipv6, 2308 .is_ipv4 = posix_sock_is_ipv4, 2309 .is_connected = posix_sock_is_connected, 2310 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2311 .group_impl_create = ssl_sock_group_impl_create, 2312 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2313 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2314 .group_impl_poll = posix_sock_group_impl_poll, 2315 .group_impl_register_interrupt = posix_sock_group_impl_register_interrupt, 2316 .group_impl_unregister_interrupt = posix_sock_group_impl_unregister_interrupt, 2317 .group_impl_close = ssl_sock_group_impl_close, 2318 .get_opts = ssl_sock_impl_get_opts, 2319 .set_opts = ssl_sock_impl_set_opts, 2320 }; 2321 2322 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl); 2323 SPDK_LOG_REGISTER_COMPONENT(sock_posix) 2324