1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. All rights reserved. 3 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #if defined(__FreeBSD__) 10 #include <sys/event.h> 11 #define SPDK_KEVENT 12 #else 13 #include <sys/epoll.h> 14 #define SPDK_EPOLL 15 #endif 16 17 #if defined(__linux__) 18 #include <linux/errqueue.h> 19 #endif 20 21 #include "spdk/env.h" 22 #include "spdk/log.h" 23 #include "spdk/pipe.h" 24 #include "spdk/sock.h" 25 #include "spdk/util.h" 26 #include "spdk/string.h" 27 #include "spdk_internal/sock.h" 28 #include "spdk/net.h" 29 30 #include "openssl/crypto.h" 31 #include "openssl/err.h" 32 #include "openssl/ssl.h" 33 34 #define MAX_TMPBUF 1024 35 #define PORTNUMLEN 32 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 struct spdk_posix_sock { 42 struct spdk_sock base; 43 int fd; 44 45 uint32_t sendmsg_idx; 46 47 struct spdk_pipe *recv_pipe; 48 int recv_buf_sz; 49 bool pipe_has_data; 50 bool socket_has_data; 51 bool zcopy; 52 53 int placement_id; 54 55 SSL_CTX *ctx; 56 SSL *ssl; 57 58 TAILQ_ENTRY(spdk_posix_sock) link; 59 }; 60 61 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 62 63 struct spdk_posix_sock_group_impl { 64 struct spdk_sock_group_impl base; 65 int fd; 66 struct spdk_interrupt *intr; 67 struct spdk_has_data_list socks_with_data; 68 int placement_id; 69 struct spdk_pipe_group *pipe_group; 70 }; 71 72 static struct spdk_sock_impl_opts g_posix_impl_opts = { 73 .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE, 74 .send_buf_size = DEFAULT_SO_SNDBUF_SIZE, 75 .enable_recv_pipe = true, 76 .enable_quickack = false, 77 .enable_placement_id = PLACEMENT_NONE, 78 .enable_zerocopy_send_server = true, 79 .enable_zerocopy_send_client = false, 80 .zerocopy_threshold = 0, 81 .tls_version = 0, 82 .enable_ktls = false, 83 .psk_key = NULL, 84 .psk_key_size = 0, 85 .psk_identity = NULL, 86 .get_key = NULL, 87 .get_key_ctx = NULL, 88 .tls_cipher_suites = NULL 89 }; 90 91 static struct spdk_sock_impl_opts g_ssl_impl_opts = { 92 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 93 .send_buf_size = MIN_SO_SNDBUF_SIZE, 94 .enable_recv_pipe = true, 95 .enable_quickack = false, 96 .enable_placement_id = PLACEMENT_NONE, 97 .enable_zerocopy_send_server = true, 98 .enable_zerocopy_send_client = false, 99 .zerocopy_threshold = 0, 100 .tls_version = 0, 101 .enable_ktls = false, 102 .psk_key = NULL, 103 .psk_identity = NULL 104 }; 105 106 static struct spdk_sock_map g_map = { 107 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 108 .mtx = PTHREAD_MUTEX_INITIALIZER 109 }; 110 111 __attribute((destructor)) static void 112 posix_sock_map_cleanup(void) 113 { 114 spdk_sock_map_cleanup(&g_map); 115 } 116 117 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 118 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 119 120 static void 121 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src, 122 size_t len) 123 { 124 #define FIELD_OK(field) \ 125 offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len 126 127 #define SET_FIELD(field) \ 128 if (FIELD_OK(field)) { \ 129 dest->field = src->field; \ 130 } 131 132 SET_FIELD(recv_buf_size); 133 SET_FIELD(send_buf_size); 134 SET_FIELD(enable_recv_pipe); 135 SET_FIELD(enable_zerocopy_send); 136 SET_FIELD(enable_quickack); 137 SET_FIELD(enable_placement_id); 138 SET_FIELD(enable_zerocopy_send_server); 139 SET_FIELD(enable_zerocopy_send_client); 140 SET_FIELD(zerocopy_threshold); 141 SET_FIELD(tls_version); 142 SET_FIELD(enable_ktls); 143 SET_FIELD(psk_key); 144 SET_FIELD(psk_key_size); 145 SET_FIELD(psk_identity); 146 SET_FIELD(get_key); 147 SET_FIELD(get_key_ctx); 148 SET_FIELD(tls_cipher_suites); 149 150 #undef SET_FIELD 151 #undef FIELD_OK 152 } 153 154 static int 155 _sock_impl_get_opts(struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 156 size_t *len) 157 { 158 if (!opts || !len) { 159 errno = EINVAL; 160 return -1; 161 } 162 163 assert(sizeof(*opts) >= *len); 164 memset(opts, 0, *len); 165 166 posix_sock_copy_impl_opts(opts, impl_opts, *len); 167 *len = spdk_min(*len, sizeof(*impl_opts)); 168 169 return 0; 170 } 171 172 static int 173 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 174 { 175 return _sock_impl_get_opts(opts, &g_posix_impl_opts, len); 176 } 177 178 static int 179 ssl_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 180 { 181 return _sock_impl_get_opts(opts, &g_ssl_impl_opts, len); 182 } 183 184 static int 185 _sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts, 186 size_t len) 187 { 188 if (!opts) { 189 errno = EINVAL; 190 return -1; 191 } 192 193 assert(sizeof(*opts) >= len); 194 posix_sock_copy_impl_opts(impl_opts, opts, len); 195 196 return 0; 197 } 198 199 static int 200 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 201 { 202 return _sock_impl_set_opts(opts, &g_posix_impl_opts, len); 203 } 204 205 static int 206 ssl_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 207 { 208 return _sock_impl_set_opts(opts, &g_ssl_impl_opts, len); 209 } 210 211 static void 212 _opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest, 213 const struct spdk_sock_impl_opts *default_impl) 214 { 215 /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */ 216 memcpy(dest, default_impl, sizeof(*dest)); 217 218 if (opts->impl_opts != NULL) { 219 assert(sizeof(*dest) >= opts->impl_opts_size); 220 posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size); 221 } 222 } 223 224 static int 225 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 226 char *caddr, int clen, uint16_t *cport) 227 { 228 struct spdk_posix_sock *sock = __posix_sock(_sock); 229 230 assert(sock != NULL); 231 return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport); 232 } 233 234 enum posix_sock_create_type { 235 SPDK_SOCK_CREATE_LISTEN, 236 SPDK_SOCK_CREATE_CONNECT, 237 }; 238 239 static int 240 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 241 { 242 uint8_t *new_buf, *old_buf; 243 struct spdk_pipe *new_pipe; 244 struct iovec siov[2]; 245 struct iovec diov[2]; 246 int sbytes; 247 ssize_t bytes; 248 int rc; 249 250 if (sock->recv_buf_sz == sz) { 251 return 0; 252 } 253 254 /* If the new size is 0, just free the pipe */ 255 if (sz == 0) { 256 old_buf = spdk_pipe_destroy(sock->recv_pipe); 257 free(old_buf); 258 sock->recv_pipe = NULL; 259 return 0; 260 } else if (sz < MIN_SOCK_PIPE_SIZE) { 261 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 262 return -1; 263 } 264 265 /* Round up to next 64 byte multiple */ 266 rc = posix_memalign((void **)&new_buf, 64, sz); 267 if (rc != 0) { 268 SPDK_ERRLOG("socket recv buf allocation failed\n"); 269 return -ENOMEM; 270 } 271 memset(new_buf, 0, sz); 272 273 new_pipe = spdk_pipe_create(new_buf, sz); 274 if (new_pipe == NULL) { 275 SPDK_ERRLOG("socket pipe allocation failed\n"); 276 free(new_buf); 277 return -ENOMEM; 278 } 279 280 if (sock->recv_pipe != NULL) { 281 /* Pull all of the data out of the old pipe */ 282 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 283 if (sbytes > sz) { 284 /* Too much data to fit into the new pipe size */ 285 old_buf = spdk_pipe_destroy(new_pipe); 286 free(old_buf); 287 return -EINVAL; 288 } 289 290 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 291 assert(sbytes == sz); 292 293 bytes = spdk_iovcpy(siov, 2, diov, 2); 294 spdk_pipe_writer_advance(new_pipe, bytes); 295 296 old_buf = spdk_pipe_destroy(sock->recv_pipe); 297 free(old_buf); 298 } 299 300 sock->recv_buf_sz = sz; 301 sock->recv_pipe = new_pipe; 302 303 if (sock->base.group_impl) { 304 struct spdk_posix_sock_group_impl *group; 305 306 group = __posix_group_impl(sock->base.group_impl); 307 spdk_pipe_group_add(group->pipe_group, sock->recv_pipe); 308 } 309 310 return 0; 311 } 312 313 static int 314 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 315 { 316 struct spdk_posix_sock *sock = __posix_sock(_sock); 317 int min_size; 318 int rc; 319 320 assert(sock != NULL); 321 322 if (_sock->impl_opts.enable_recv_pipe) { 323 rc = posix_sock_alloc_pipe(sock, sz); 324 if (rc) { 325 return rc; 326 } 327 } 328 329 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and 330 * _sock->impl_opts.recv_buf_size. */ 331 min_size = spdk_max(MIN_SO_RCVBUF_SIZE, _sock->impl_opts.recv_buf_size); 332 333 if (sz < min_size) { 334 sz = min_size; 335 } 336 337 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 338 if (rc < 0) { 339 return rc; 340 } 341 342 _sock->impl_opts.recv_buf_size = sz; 343 344 return 0; 345 } 346 347 static int 348 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 349 { 350 struct spdk_posix_sock *sock = __posix_sock(_sock); 351 int min_size; 352 int rc; 353 354 assert(sock != NULL); 355 356 /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and 357 * _sock->impl_opts.send_buf_size. */ 358 min_size = spdk_max(MIN_SO_SNDBUF_SIZE, _sock->impl_opts.send_buf_size); 359 360 if (sz < min_size) { 361 sz = min_size; 362 } 363 364 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 365 if (rc < 0) { 366 return rc; 367 } 368 369 _sock->impl_opts.send_buf_size = sz; 370 371 return 0; 372 } 373 374 static void 375 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 376 { 377 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 378 int flag; 379 int rc; 380 #endif 381 382 #if defined(SPDK_ZEROCOPY) 383 flag = 1; 384 385 if (enable_zero_copy) { 386 /* Try to turn on zero copy sends */ 387 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 388 if (rc == 0) { 389 sock->zcopy = true; 390 } 391 } 392 #endif 393 394 #if defined(__linux__) 395 flag = 1; 396 397 if (sock->base.impl_opts.enable_quickack) { 398 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 399 if (rc != 0) { 400 SPDK_ERRLOG("quickack was failed to set\n"); 401 } 402 } 403 404 spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id, 405 &sock->placement_id); 406 407 if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) { 408 /* Save placement_id */ 409 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 410 } 411 #endif 412 } 413 414 static struct spdk_posix_sock * 415 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy) 416 { 417 struct spdk_posix_sock *sock; 418 419 sock = calloc(1, sizeof(*sock)); 420 if (sock == NULL) { 421 SPDK_ERRLOG("sock allocation failed\n"); 422 return NULL; 423 } 424 425 sock->fd = fd; 426 memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts)); 427 posix_sock_init(sock, enable_zero_copy); 428 429 return sock; 430 } 431 432 static int 433 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts, 434 struct spdk_sock_impl_opts *impl_opts) 435 { 436 int fd; 437 int val = 1; 438 int rc, sz; 439 #if defined(__linux__) 440 int to; 441 #endif 442 443 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 444 if (fd < 0) { 445 /* error */ 446 return -1; 447 } 448 449 sz = impl_opts->recv_buf_size; 450 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 451 if (rc) { 452 /* Not fatal */ 453 } 454 455 sz = impl_opts->send_buf_size; 456 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 457 if (rc) { 458 /* Not fatal */ 459 } 460 461 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 462 if (rc != 0) { 463 close(fd); 464 /* error */ 465 return -1; 466 } 467 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 468 if (rc != 0) { 469 close(fd); 470 /* error */ 471 return -1; 472 } 473 474 #if defined(SO_PRIORITY) 475 if (opts->priority) { 476 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 477 if (rc != 0) { 478 close(fd); 479 /* error */ 480 return -1; 481 } 482 } 483 #endif 484 485 if (res->ai_family == AF_INET6) { 486 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 487 if (rc != 0) { 488 close(fd); 489 /* error */ 490 return -1; 491 } 492 } 493 494 if (opts->ack_timeout) { 495 #if defined(__linux__) 496 to = opts->ack_timeout; 497 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to)); 498 if (rc != 0) { 499 close(fd); 500 /* error */ 501 return -1; 502 } 503 #else 504 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 505 #endif 506 } 507 508 return fd; 509 } 510 511 static int 512 posix_sock_psk_find_session_server_cb(SSL *ssl, const unsigned char *identity, 513 size_t identity_len, SSL_SESSION **sess) 514 { 515 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 516 uint8_t key[SSL_MAX_MASTER_KEY_LENGTH] = {}; 517 int keylen; 518 int rc, i; 519 STACK_OF(SSL_CIPHER) *ciphers; 520 const SSL_CIPHER *cipher; 521 const char *cipher_name; 522 const char *user_cipher = NULL; 523 bool found = false; 524 525 if (impl_opts->get_key) { 526 rc = impl_opts->get_key(key, sizeof(key), &user_cipher, identity, impl_opts->get_key_ctx); 527 if (rc < 0) { 528 SPDK_ERRLOG("Unable to find PSK for identity: %s\n", identity); 529 return 0; 530 } 531 keylen = rc; 532 } else { 533 if (impl_opts->psk_key == NULL) { 534 SPDK_ERRLOG("PSK is not set\n"); 535 return 0; 536 } 537 538 SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(impl_opts->psk_identity)); 539 if (strcmp(impl_opts->psk_identity, identity) != 0) { 540 SPDK_ERRLOG("Unknown Client's PSK ID\n"); 541 return 0; 542 } 543 keylen = impl_opts->psk_key_size; 544 545 memcpy(key, impl_opts->psk_key, keylen); 546 user_cipher = impl_opts->tls_cipher_suites; 547 } 548 549 if (user_cipher == NULL) { 550 SPDK_ERRLOG("Cipher suite not set\n"); 551 return 0; 552 } 553 554 *sess = SSL_SESSION_new(); 555 if (*sess == NULL) { 556 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 557 return 0; 558 } 559 560 ciphers = SSL_get_ciphers(ssl); 561 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 562 cipher = sk_SSL_CIPHER_value(ciphers, i); 563 cipher_name = SSL_CIPHER_get_name(cipher); 564 565 if (strcmp(user_cipher, cipher_name) == 0) { 566 rc = SSL_SESSION_set_cipher(*sess, cipher); 567 if (rc != 1) { 568 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 569 goto err; 570 } 571 found = true; 572 break; 573 } 574 } 575 if (found == false) { 576 SPDK_ERRLOG("No suitable cipher found\n"); 577 goto err; 578 } 579 580 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 581 582 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 583 if (rc != 1) { 584 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 585 goto err; 586 } 587 588 rc = SSL_SESSION_set1_master_key(*sess, key, keylen); 589 if (rc != 1) { 590 SPDK_ERRLOG("Unable to set PSK for session\n"); 591 goto err; 592 } 593 594 return 1; 595 596 err: 597 SSL_SESSION_free(*sess); 598 *sess = NULL; 599 return 0; 600 } 601 602 static int 603 posix_sock_psk_use_session_client_cb(SSL *ssl, const EVP_MD *md, const unsigned char **identity, 604 size_t *identity_len, SSL_SESSION **sess) 605 { 606 struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl); 607 int rc, i; 608 STACK_OF(SSL_CIPHER) *ciphers; 609 const SSL_CIPHER *cipher; 610 const char *cipher_name; 611 long keylen; 612 bool found = false; 613 614 if (impl_opts->psk_key == NULL) { 615 SPDK_ERRLOG("PSK is not set\n"); 616 return 0; 617 } 618 if (impl_opts->psk_key_size > SSL_MAX_MASTER_KEY_LENGTH) { 619 SPDK_ERRLOG("PSK too long\n"); 620 return 0; 621 } 622 keylen = impl_opts->psk_key_size; 623 624 if (impl_opts->tls_cipher_suites == NULL) { 625 SPDK_ERRLOG("Cipher suite not set\n"); 626 return 0; 627 } 628 *sess = SSL_SESSION_new(); 629 if (*sess == NULL) { 630 SPDK_ERRLOG("Unable to allocate new SSL session\n"); 631 return 0; 632 } 633 634 ciphers = SSL_get_ciphers(ssl); 635 for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { 636 cipher = sk_SSL_CIPHER_value(ciphers, i); 637 cipher_name = SSL_CIPHER_get_name(cipher); 638 639 if (strcmp(impl_opts->tls_cipher_suites, cipher_name) == 0) { 640 rc = SSL_SESSION_set_cipher(*sess, cipher); 641 if (rc != 1) { 642 SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name); 643 goto err; 644 } 645 found = true; 646 break; 647 } 648 } 649 if (found == false) { 650 SPDK_ERRLOG("No suitable cipher found\n"); 651 goto err; 652 } 653 654 SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name); 655 656 rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION); 657 if (rc != 1) { 658 SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION); 659 goto err; 660 } 661 662 rc = SSL_SESSION_set1_master_key(*sess, impl_opts->psk_key, keylen); 663 if (rc != 1) { 664 SPDK_ERRLOG("Unable to set PSK for session\n"); 665 goto err; 666 } 667 668 *identity_len = strlen(impl_opts->psk_identity); 669 *identity = impl_opts->psk_identity; 670 671 return 1; 672 673 err: 674 SSL_SESSION_free(*sess); 675 *sess = NULL; 676 return 0; 677 } 678 679 static SSL_CTX * 680 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts, 681 struct spdk_sock_impl_opts *impl_opts) 682 { 683 SSL_CTX *ctx; 684 int tls_version = 0; 685 bool ktls_enabled = false; 686 #ifdef SSL_OP_ENABLE_KTLS 687 long options; 688 #endif 689 690 SSL_library_init(); 691 OpenSSL_add_all_algorithms(); 692 SSL_load_error_strings(); 693 /* Produce a SSL CTX in SSL V2 and V3 standards compliant way */ 694 ctx = SSL_CTX_new(method); 695 if (!ctx) { 696 SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 697 return NULL; 698 } 699 SPDK_DEBUGLOG(sock_posix, "SSL context created\n"); 700 701 switch (impl_opts->tls_version) { 702 case 0: 703 /* auto-negotioation */ 704 break; 705 case SPDK_TLS_VERSION_1_3: 706 tls_version = TLS1_3_VERSION; 707 break; 708 default: 709 SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version); 710 goto err; 711 } 712 713 if (tls_version) { 714 SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version, 715 tls_version); 716 if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) { 717 SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 718 goto err; 719 } 720 if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) { 721 SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version); 722 goto err; 723 } 724 } 725 if (impl_opts->enable_ktls) { 726 SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n"); 727 #ifdef SSL_OP_ENABLE_KTLS 728 options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS); 729 ktls_enabled = options & SSL_OP_ENABLE_KTLS; 730 #else 731 ktls_enabled = false; 732 #endif 733 if (!ktls_enabled) { 734 SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n"); 735 goto err; 736 } 737 } 738 739 /* SSL_CTX_set_ciphersuites() return 1 if the requested 740 * cipher suite list was configured, and 0 otherwise. */ 741 if (impl_opts->tls_cipher_suites != NULL && 742 SSL_CTX_set_ciphersuites(ctx, impl_opts->tls_cipher_suites) != 1) { 743 SPDK_ERRLOG("Unable to set TLS cipher suites for SSL'\n"); 744 goto err; 745 } 746 747 return ctx; 748 749 err: 750 SSL_CTX_free(ctx); 751 return NULL; 752 } 753 754 static SSL * 755 ssl_sock_setup_connect(SSL_CTX *ctx, int fd) 756 { 757 SSL *ssl; 758 759 ssl = SSL_new(ctx); 760 if (!ssl) { 761 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 762 return NULL; 763 } 764 SSL_set_fd(ssl, fd); 765 SSL_set_connect_state(ssl); 766 SSL_set_psk_use_session_callback(ssl, posix_sock_psk_use_session_client_cb); 767 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 768 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 769 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 770 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 771 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 772 return ssl; 773 } 774 775 static SSL * 776 ssl_sock_setup_accept(SSL_CTX *ctx, int fd) 777 { 778 SSL *ssl; 779 780 ssl = SSL_new(ctx); 781 if (!ssl) { 782 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 783 return NULL; 784 } 785 SSL_set_fd(ssl, fd); 786 SSL_set_accept_state(ssl); 787 SSL_set_psk_find_session_callback(ssl, posix_sock_psk_find_session_server_cb); 788 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 789 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 790 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 791 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 792 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 793 return ssl; 794 } 795 796 static ssize_t 797 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt) 798 { 799 int i, rc = 0; 800 ssize_t total = 0; 801 802 for (i = 0; i < iovcnt; i++) { 803 rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len); 804 805 if (rc > 0) { 806 total += rc; 807 } 808 if (rc != (int)iov[i].iov_len) { 809 break; 810 } 811 } 812 if (total > 0) { 813 errno = 0; 814 return total; 815 } 816 switch (SSL_get_error(ssl, rc)) { 817 case SSL_ERROR_ZERO_RETURN: 818 errno = ENOTCONN; 819 return 0; 820 case SSL_ERROR_WANT_READ: 821 case SSL_ERROR_WANT_WRITE: 822 case SSL_ERROR_WANT_CONNECT: 823 case SSL_ERROR_WANT_ACCEPT: 824 case SSL_ERROR_WANT_X509_LOOKUP: 825 case SSL_ERROR_WANT_ASYNC: 826 case SSL_ERROR_WANT_ASYNC_JOB: 827 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 828 errno = EAGAIN; 829 return -1; 830 case SSL_ERROR_SYSCALL: 831 case SSL_ERROR_SSL: 832 errno = ENOTCONN; 833 return -1; 834 default: 835 errno = ENOTCONN; 836 return -1; 837 } 838 } 839 840 static ssize_t 841 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt) 842 { 843 int i, rc = 0; 844 ssize_t total = 0; 845 846 for (i = 0; i < iovcnt; i++) { 847 rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len); 848 849 if (rc > 0) { 850 total += rc; 851 } 852 if (rc != (int)iov[i].iov_len) { 853 break; 854 } 855 } 856 if (total > 0) { 857 errno = 0; 858 return total; 859 } 860 switch (SSL_get_error(ssl, rc)) { 861 case SSL_ERROR_ZERO_RETURN: 862 errno = ENOTCONN; 863 return 0; 864 case SSL_ERROR_WANT_READ: 865 case SSL_ERROR_WANT_WRITE: 866 case SSL_ERROR_WANT_CONNECT: 867 case SSL_ERROR_WANT_ACCEPT: 868 case SSL_ERROR_WANT_X509_LOOKUP: 869 case SSL_ERROR_WANT_ASYNC: 870 case SSL_ERROR_WANT_ASYNC_JOB: 871 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 872 errno = EAGAIN; 873 return -1; 874 case SSL_ERROR_SYSCALL: 875 case SSL_ERROR_SSL: 876 errno = ENOTCONN; 877 return -1; 878 default: 879 errno = ENOTCONN; 880 return -1; 881 } 882 } 883 884 static struct spdk_sock * 885 posix_sock_create(const char *ip, int port, 886 enum posix_sock_create_type type, 887 struct spdk_sock_opts *opts, 888 bool enable_ssl) 889 { 890 struct spdk_posix_sock *sock; 891 struct spdk_sock_impl_opts impl_opts; 892 char buf[MAX_TMPBUF]; 893 char portnum[PORTNUMLEN]; 894 char *p; 895 struct addrinfo hints, *res, *res0; 896 int fd, flag; 897 int rc; 898 bool enable_zcopy_user_opts = true; 899 bool enable_zcopy_impl_opts = true; 900 SSL_CTX *ctx = 0; 901 SSL *ssl = 0; 902 903 assert(opts != NULL); 904 if (enable_ssl) { 905 _opts_get_impl_opts(opts, &impl_opts, &g_ssl_impl_opts); 906 } else { 907 _opts_get_impl_opts(opts, &impl_opts, &g_posix_impl_opts); 908 } 909 910 if (ip == NULL) { 911 return NULL; 912 } 913 if (ip[0] == '[') { 914 snprintf(buf, sizeof(buf), "%s", ip + 1); 915 p = strchr(buf, ']'); 916 if (p != NULL) { 917 *p = '\0'; 918 } 919 ip = (const char *) &buf[0]; 920 } 921 922 snprintf(portnum, sizeof portnum, "%d", port); 923 memset(&hints, 0, sizeof hints); 924 hints.ai_family = PF_UNSPEC; 925 hints.ai_socktype = SOCK_STREAM; 926 hints.ai_flags = AI_NUMERICSERV; 927 hints.ai_flags |= AI_PASSIVE; 928 hints.ai_flags |= AI_NUMERICHOST; 929 rc = getaddrinfo(ip, portnum, &hints, &res0); 930 if (rc != 0) { 931 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 932 return NULL; 933 } 934 935 /* try listen */ 936 fd = -1; 937 for (res = res0; res != NULL; res = res->ai_next) { 938 retry: 939 fd = posix_fd_create(res, opts, &impl_opts); 940 if (fd < 0) { 941 continue; 942 } 943 if (type == SPDK_SOCK_CREATE_LISTEN) { 944 rc = bind(fd, res->ai_addr, res->ai_addrlen); 945 if (rc != 0) { 946 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 947 switch (errno) { 948 case EINTR: 949 /* interrupted? */ 950 close(fd); 951 goto retry; 952 case EADDRNOTAVAIL: 953 SPDK_ERRLOG("IP address %s not available. " 954 "Verify IP address in config file " 955 "and make sure setup script is " 956 "run before starting spdk app.\n", ip); 957 /* FALLTHROUGH */ 958 default: 959 /* try next family */ 960 close(fd); 961 fd = -1; 962 continue; 963 } 964 } 965 /* bind OK */ 966 rc = listen(fd, 512); 967 if (rc != 0) { 968 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 969 close(fd); 970 fd = -1; 971 break; 972 } 973 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server; 974 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 975 rc = connect(fd, res->ai_addr, res->ai_addrlen); 976 if (rc != 0) { 977 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 978 /* try next family */ 979 close(fd); 980 fd = -1; 981 continue; 982 } 983 enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client; 984 if (enable_ssl) { 985 ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts); 986 if (!ctx) { 987 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 988 close(fd); 989 fd = -1; 990 break; 991 } 992 ssl = ssl_sock_setup_connect(ctx, fd); 993 if (!ssl) { 994 SPDK_ERRLOG("ssl_sock_setup_connect() failed, errno = %d\n", errno); 995 close(fd); 996 fd = -1; 997 SSL_CTX_free(ctx); 998 break; 999 } 1000 } 1001 } 1002 1003 flag = fcntl(fd, F_GETFL); 1004 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 1005 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1006 SSL_free(ssl); 1007 SSL_CTX_free(ctx); 1008 close(fd); 1009 fd = -1; 1010 break; 1011 } 1012 break; 1013 } 1014 freeaddrinfo(res0); 1015 1016 if (fd < 0) { 1017 return NULL; 1018 } 1019 1020 /* Only enable zero copy for non-loopback and non-ssl sockets. */ 1021 enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd) && !enable_ssl; 1022 1023 sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts); 1024 if (sock == NULL) { 1025 SPDK_ERRLOG("sock allocation failed\n"); 1026 SSL_free(ssl); 1027 SSL_CTX_free(ctx); 1028 close(fd); 1029 return NULL; 1030 } 1031 1032 if (ctx) { 1033 sock->ctx = ctx; 1034 } 1035 1036 if (ssl) { 1037 sock->ssl = ssl; 1038 SSL_set_app_data(ssl, &sock->base.impl_opts); 1039 } 1040 1041 return &sock->base; 1042 } 1043 1044 static struct spdk_sock * 1045 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 1046 { 1047 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false); 1048 } 1049 1050 static struct spdk_sock * 1051 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 1052 { 1053 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false); 1054 } 1055 1056 static struct spdk_sock * 1057 _posix_sock_accept(struct spdk_sock *_sock, bool enable_ssl) 1058 { 1059 struct spdk_posix_sock *sock = __posix_sock(_sock); 1060 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1061 struct sockaddr_storage sa; 1062 socklen_t salen; 1063 int rc, fd; 1064 struct spdk_posix_sock *new_sock; 1065 int flag; 1066 SSL_CTX *ctx = 0; 1067 SSL *ssl = 0; 1068 1069 memset(&sa, 0, sizeof(sa)); 1070 salen = sizeof(sa); 1071 1072 assert(sock != NULL); 1073 1074 /* epoll_wait will trigger again if there is more than one request */ 1075 if (group && sock->socket_has_data) { 1076 sock->socket_has_data = false; 1077 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1078 } 1079 1080 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 1081 1082 if (rc == -1) { 1083 return NULL; 1084 } 1085 1086 fd = rc; 1087 1088 flag = fcntl(fd, F_GETFL); 1089 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 1090 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 1091 close(fd); 1092 return NULL; 1093 } 1094 1095 #if defined(SO_PRIORITY) 1096 /* The priority is not inherited, so call this function again */ 1097 if (sock->base.opts.priority) { 1098 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 1099 if (rc != 0) { 1100 close(fd); 1101 return NULL; 1102 } 1103 } 1104 #endif 1105 1106 /* Establish SSL connection */ 1107 if (enable_ssl) { 1108 ctx = posix_sock_create_ssl_context(TLS_server_method(), &sock->base.opts, &sock->base.impl_opts); 1109 if (!ctx) { 1110 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 1111 close(fd); 1112 return NULL; 1113 } 1114 ssl = ssl_sock_setup_accept(ctx, fd); 1115 if (!ssl) { 1116 SPDK_ERRLOG("ssl_sock_setup_accept() failed, errno = %d\n", errno); 1117 close(fd); 1118 SSL_CTX_free(ctx); 1119 return NULL; 1120 } 1121 } 1122 1123 /* Inherit the zero copy feature from the listen socket */ 1124 new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy); 1125 if (new_sock == NULL) { 1126 close(fd); 1127 SSL_free(ssl); 1128 SSL_CTX_free(ctx); 1129 return NULL; 1130 } 1131 1132 if (ctx) { 1133 new_sock->ctx = ctx; 1134 } 1135 1136 if (ssl) { 1137 new_sock->ssl = ssl; 1138 SSL_set_app_data(ssl, &new_sock->base.impl_opts); 1139 } 1140 1141 return &new_sock->base; 1142 } 1143 1144 static struct spdk_sock * 1145 posix_sock_accept(struct spdk_sock *_sock) 1146 { 1147 return _posix_sock_accept(_sock, false); 1148 } 1149 1150 static int 1151 posix_sock_close(struct spdk_sock *_sock) 1152 { 1153 struct spdk_posix_sock *sock = __posix_sock(_sock); 1154 void *pipe_buf; 1155 1156 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 1157 1158 if (sock->ssl != NULL) { 1159 SSL_shutdown(sock->ssl); 1160 } 1161 1162 /* If the socket fails to close, the best choice is to 1163 * leak the fd but continue to free the rest of the sock 1164 * memory. */ 1165 close(sock->fd); 1166 1167 SSL_free(sock->ssl); 1168 SSL_CTX_free(sock->ctx); 1169 1170 pipe_buf = spdk_pipe_destroy(sock->recv_pipe); 1171 free(pipe_buf); 1172 free(sock); 1173 1174 return 0; 1175 } 1176 1177 #ifdef SPDK_ZEROCOPY 1178 static int 1179 _sock_check_zcopy(struct spdk_sock *sock) 1180 { 1181 struct spdk_posix_sock *psock = __posix_sock(sock); 1182 struct msghdr msgh = {}; 1183 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 1184 ssize_t rc; 1185 struct sock_extended_err *serr; 1186 struct cmsghdr *cm; 1187 uint32_t idx; 1188 struct spdk_sock_request *req, *treq; 1189 bool found; 1190 1191 msgh.msg_control = buf; 1192 msgh.msg_controllen = sizeof(buf); 1193 1194 while (true) { 1195 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 1196 1197 if (rc < 0) { 1198 if (errno == EWOULDBLOCK || errno == EAGAIN) { 1199 return 0; 1200 } 1201 1202 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 1203 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 1204 } else { 1205 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1206 } 1207 return 0; 1208 } 1209 1210 cm = CMSG_FIRSTHDR(&msgh); 1211 if (!(cm && 1212 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1213 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 1214 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1215 return 0; 1216 } 1217 1218 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1219 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1220 SPDK_WARNLOG("Unexpected extended error origin\n"); 1221 return 0; 1222 } 1223 1224 /* Most of the time, the pending_reqs array is in the exact 1225 * order we need such that all of the requests to complete are 1226 * in order, in the front. It is guaranteed that all requests 1227 * belonging to the same sendmsg call are sequential, so once 1228 * we encounter one match we can stop looping as soon as a 1229 * non-match is found. 1230 */ 1231 idx = serr->ee_info; 1232 while (true) { 1233 found = false; 1234 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 1235 if (!req->internal.is_zcopy) { 1236 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1237 rc = spdk_sock_request_put(sock, req, 0); 1238 if (rc < 0) { 1239 return rc; 1240 } 1241 } else if (req->internal.offset == idx) { 1242 found = true; 1243 rc = spdk_sock_request_put(sock, req, 0); 1244 if (rc < 0) { 1245 return rc; 1246 } 1247 } else if (found) { 1248 break; 1249 } 1250 } 1251 1252 if (idx == serr->ee_data) { 1253 break; 1254 } 1255 1256 if (idx == UINT32_MAX) { 1257 idx = 0; 1258 } else { 1259 idx++; 1260 } 1261 } 1262 } 1263 1264 return 0; 1265 } 1266 #endif 1267 1268 static int 1269 _sock_flush(struct spdk_sock *sock) 1270 { 1271 struct spdk_posix_sock *psock = __posix_sock(sock); 1272 struct msghdr msg = {}; 1273 int flags; 1274 struct iovec iovs[IOV_BATCH_SIZE]; 1275 int iovcnt; 1276 int retval; 1277 struct spdk_sock_request *req; 1278 int i; 1279 ssize_t rc, sent; 1280 unsigned int offset; 1281 size_t len; 1282 bool is_zcopy = false; 1283 1284 /* Can't flush from within a callback or we end up with recursive calls */ 1285 if (sock->cb_cnt > 0) { 1286 errno = EAGAIN; 1287 return -1; 1288 } 1289 1290 #ifdef SPDK_ZEROCOPY 1291 if (psock->zcopy) { 1292 flags = MSG_ZEROCOPY | MSG_NOSIGNAL; 1293 } else 1294 #endif 1295 { 1296 flags = MSG_NOSIGNAL; 1297 } 1298 1299 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); 1300 if (iovcnt == 0) { 1301 return 0; 1302 } 1303 1304 #ifdef SPDK_ZEROCOPY 1305 is_zcopy = flags & MSG_ZEROCOPY; 1306 #endif 1307 1308 /* Perform the vectored write */ 1309 msg.msg_iov = iovs; 1310 msg.msg_iovlen = iovcnt; 1311 1312 if (psock->ssl) { 1313 rc = SSL_writev(psock->ssl, iovs, iovcnt); 1314 } else { 1315 rc = sendmsg(psock->fd, &msg, flags); 1316 } 1317 if (rc <= 0) { 1318 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 1319 errno = EAGAIN; 1320 } 1321 return -1; 1322 } 1323 1324 sent = rc; 1325 1326 if (is_zcopy) { 1327 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1328 * req->internal.offset, so sendmsg_idx should not be zero */ 1329 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 1330 psock->sendmsg_idx = 1; 1331 } else { 1332 psock->sendmsg_idx++; 1333 } 1334 } 1335 1336 /* Consume the requests that were actually written */ 1337 req = TAILQ_FIRST(&sock->queued_reqs); 1338 while (req) { 1339 offset = req->internal.offset; 1340 1341 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1342 req->internal.is_zcopy = is_zcopy; 1343 1344 for (i = 0; i < req->iovcnt; i++) { 1345 /* Advance by the offset first */ 1346 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1347 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1348 continue; 1349 } 1350 1351 /* Calculate the remaining length of this element */ 1352 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1353 1354 if (len > (size_t)rc) { 1355 /* This element was partially sent. */ 1356 req->internal.offset += rc; 1357 return sent; 1358 } 1359 1360 offset = 0; 1361 req->internal.offset += len; 1362 rc -= len; 1363 } 1364 1365 /* Handled a full request. */ 1366 spdk_sock_request_pend(sock, req); 1367 1368 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { 1369 /* The sendmsg syscall above isn't currently asynchronous, 1370 * so it's already done. */ 1371 retval = spdk_sock_request_put(sock, req, 0); 1372 if (retval) { 1373 break; 1374 } 1375 } else { 1376 /* Re-use the offset field to hold the sendmsg call index. The 1377 * index is 0 based, so subtract one here because we've already 1378 * incremented above. */ 1379 req->internal.offset = psock->sendmsg_idx - 1; 1380 } 1381 1382 if (rc == 0) { 1383 break; 1384 } 1385 1386 req = TAILQ_FIRST(&sock->queued_reqs); 1387 } 1388 1389 return sent; 1390 } 1391 1392 static int 1393 posix_sock_flush(struct spdk_sock *sock) 1394 { 1395 #ifdef SPDK_ZEROCOPY 1396 struct spdk_posix_sock *psock = __posix_sock(sock); 1397 1398 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 1399 _sock_check_zcopy(sock); 1400 } 1401 #endif 1402 1403 return _sock_flush(sock); 1404 } 1405 1406 static ssize_t 1407 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 1408 { 1409 struct iovec siov[2]; 1410 int sbytes; 1411 ssize_t bytes; 1412 struct spdk_posix_sock_group_impl *group; 1413 1414 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 1415 if (sbytes < 0) { 1416 errno = EINVAL; 1417 return -1; 1418 } else if (sbytes == 0) { 1419 errno = EAGAIN; 1420 return -1; 1421 } 1422 1423 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 1424 1425 if (bytes == 0) { 1426 /* The only way this happens is if diov is 0 length */ 1427 errno = EINVAL; 1428 return -1; 1429 } 1430 1431 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 1432 1433 /* If we drained the pipe, mark it appropriately */ 1434 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1435 assert(sock->pipe_has_data == true); 1436 1437 group = __posix_group_impl(sock->base.group_impl); 1438 if (group && !sock->socket_has_data) { 1439 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1440 } 1441 1442 sock->pipe_has_data = false; 1443 } 1444 1445 return bytes; 1446 } 1447 1448 static inline ssize_t 1449 posix_sock_read(struct spdk_posix_sock *sock) 1450 { 1451 struct iovec iov[2]; 1452 int bytes_avail, bytes_recvd; 1453 struct spdk_posix_sock_group_impl *group; 1454 1455 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 1456 1457 if (bytes_avail <= 0) { 1458 return bytes_avail; 1459 } 1460 1461 if (sock->ssl) { 1462 bytes_recvd = SSL_readv(sock->ssl, iov, 2); 1463 } else { 1464 bytes_recvd = readv(sock->fd, iov, 2); 1465 } 1466 1467 assert(sock->pipe_has_data == false); 1468 1469 if (bytes_recvd <= 0) { 1470 /* Errors count as draining the socket data */ 1471 if (sock->base.group_impl && sock->socket_has_data) { 1472 group = __posix_group_impl(sock->base.group_impl); 1473 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1474 } 1475 1476 sock->socket_has_data = false; 1477 1478 return bytes_recvd; 1479 } 1480 1481 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 1482 1483 #if DEBUG 1484 if (sock->base.group_impl) { 1485 assert(sock->socket_has_data == true); 1486 } 1487 #endif 1488 1489 sock->pipe_has_data = true; 1490 if (bytes_recvd < bytes_avail) { 1491 /* We drained the kernel socket entirely. */ 1492 sock->socket_has_data = false; 1493 } 1494 1495 return bytes_recvd; 1496 } 1497 1498 static ssize_t 1499 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1500 { 1501 struct spdk_posix_sock *sock = __posix_sock(_sock); 1502 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1503 int rc, i; 1504 size_t len; 1505 1506 if (sock->recv_pipe == NULL) { 1507 assert(sock->pipe_has_data == false); 1508 if (group && sock->socket_has_data) { 1509 sock->socket_has_data = false; 1510 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1511 } 1512 if (sock->ssl) { 1513 return SSL_readv(sock->ssl, iov, iovcnt); 1514 } else { 1515 return readv(sock->fd, iov, iovcnt); 1516 } 1517 } 1518 1519 /* If the socket is not in a group, we must assume it always has 1520 * data waiting for us because it is not epolled */ 1521 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 1522 /* If the user is receiving a sufficiently large amount of data, 1523 * receive directly to their buffers. */ 1524 len = 0; 1525 for (i = 0; i < iovcnt; i++) { 1526 len += iov[i].iov_len; 1527 } 1528 1529 if (len >= MIN_SOCK_PIPE_SIZE) { 1530 /* TODO: Should this detect if kernel socket is drained? */ 1531 if (sock->ssl) { 1532 return SSL_readv(sock->ssl, iov, iovcnt); 1533 } else { 1534 return readv(sock->fd, iov, iovcnt); 1535 } 1536 } 1537 1538 /* Otherwise, do a big read into our pipe */ 1539 rc = posix_sock_read(sock); 1540 if (rc <= 0) { 1541 return rc; 1542 } 1543 } 1544 1545 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 1546 } 1547 1548 static ssize_t 1549 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1550 { 1551 struct iovec iov[1]; 1552 1553 iov[0].iov_base = buf; 1554 iov[0].iov_len = len; 1555 1556 return posix_sock_readv(sock, iov, 1); 1557 } 1558 1559 static ssize_t 1560 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1561 { 1562 struct spdk_posix_sock *sock = __posix_sock(_sock); 1563 int rc; 1564 1565 /* In order to process a writev, we need to flush any asynchronous writes 1566 * first. */ 1567 rc = _sock_flush(_sock); 1568 if (rc < 0) { 1569 return rc; 1570 } 1571 1572 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1573 /* We weren't able to flush all requests */ 1574 errno = EAGAIN; 1575 return -1; 1576 } 1577 1578 if (sock->ssl) { 1579 return SSL_writev(sock->ssl, iov, iovcnt); 1580 } else { 1581 return writev(sock->fd, iov, iovcnt); 1582 } 1583 } 1584 1585 static int 1586 posix_sock_recv_next(struct spdk_sock *_sock, void **buf, void **ctx) 1587 { 1588 struct spdk_posix_sock *sock = __posix_sock(_sock); 1589 struct iovec iov; 1590 ssize_t rc; 1591 1592 if (sock->recv_pipe != NULL) { 1593 errno = ENOTSUP; 1594 return -1; 1595 } 1596 1597 iov.iov_len = spdk_sock_group_get_buf(_sock->group_impl->group, &iov.iov_base, ctx); 1598 if (iov.iov_len == 0) { 1599 errno = ENOBUFS; 1600 return -1; 1601 } 1602 1603 rc = posix_sock_readv(_sock, &iov, 1); 1604 if (rc <= 0) { 1605 spdk_sock_group_provide_buf(_sock->group_impl->group, iov.iov_base, iov.iov_len, *ctx); 1606 return rc; 1607 } 1608 1609 *buf = iov.iov_base; 1610 1611 return rc; 1612 } 1613 1614 static void 1615 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1616 { 1617 int rc; 1618 1619 spdk_sock_request_queue(sock, req); 1620 1621 /* If there are a sufficient number queued, just flush them out immediately. */ 1622 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1623 rc = _sock_flush(sock); 1624 if (rc < 0 && errno != EAGAIN) { 1625 spdk_sock_abort_requests(sock); 1626 } 1627 } 1628 } 1629 1630 static int 1631 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1632 { 1633 struct spdk_posix_sock *sock = __posix_sock(_sock); 1634 int val; 1635 int rc; 1636 1637 assert(sock != NULL); 1638 1639 val = nbytes; 1640 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1641 if (rc != 0) { 1642 return -1; 1643 } 1644 return 0; 1645 } 1646 1647 static bool 1648 posix_sock_is_ipv6(struct spdk_sock *_sock) 1649 { 1650 struct spdk_posix_sock *sock = __posix_sock(_sock); 1651 struct sockaddr_storage sa; 1652 socklen_t salen; 1653 int rc; 1654 1655 assert(sock != NULL); 1656 1657 memset(&sa, 0, sizeof sa); 1658 salen = sizeof sa; 1659 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1660 if (rc != 0) { 1661 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1662 return false; 1663 } 1664 1665 return (sa.ss_family == AF_INET6); 1666 } 1667 1668 static bool 1669 posix_sock_is_ipv4(struct spdk_sock *_sock) 1670 { 1671 struct spdk_posix_sock *sock = __posix_sock(_sock); 1672 struct sockaddr_storage sa; 1673 socklen_t salen; 1674 int rc; 1675 1676 assert(sock != NULL); 1677 1678 memset(&sa, 0, sizeof sa); 1679 salen = sizeof sa; 1680 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1681 if (rc != 0) { 1682 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1683 return false; 1684 } 1685 1686 return (sa.ss_family == AF_INET); 1687 } 1688 1689 static bool 1690 posix_sock_is_connected(struct spdk_sock *_sock) 1691 { 1692 struct spdk_posix_sock *sock = __posix_sock(_sock); 1693 uint8_t byte; 1694 int rc; 1695 1696 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1697 if (rc == 0) { 1698 return false; 1699 } 1700 1701 if (rc < 0) { 1702 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1703 return true; 1704 } 1705 1706 return false; 1707 } 1708 1709 return true; 1710 } 1711 1712 static struct spdk_sock_group_impl * 1713 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1714 { 1715 struct spdk_posix_sock *sock = __posix_sock(_sock); 1716 struct spdk_sock_group_impl *group_impl; 1717 1718 if (sock->placement_id != -1) { 1719 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint); 1720 return group_impl; 1721 } 1722 1723 return NULL; 1724 } 1725 1726 static struct spdk_sock_group_impl * 1727 _sock_group_impl_create(uint32_t enable_placement_id) 1728 { 1729 struct spdk_posix_sock_group_impl *group_impl; 1730 int fd; 1731 1732 #if defined(SPDK_EPOLL) 1733 fd = epoll_create1(0); 1734 #elif defined(SPDK_KEVENT) 1735 fd = kqueue(); 1736 #endif 1737 if (fd == -1) { 1738 return NULL; 1739 } 1740 1741 group_impl = calloc(1, sizeof(*group_impl)); 1742 if (group_impl == NULL) { 1743 SPDK_ERRLOG("group_impl allocation failed\n"); 1744 close(fd); 1745 return NULL; 1746 } 1747 1748 group_impl->pipe_group = spdk_pipe_group_create(); 1749 if (group_impl->pipe_group == NULL) { 1750 SPDK_ERRLOG("pipe_group allocation failed\n"); 1751 free(group_impl); 1752 close(fd); 1753 return NULL; 1754 } 1755 1756 group_impl->fd = fd; 1757 TAILQ_INIT(&group_impl->socks_with_data); 1758 group_impl->placement_id = -1; 1759 1760 if (enable_placement_id == PLACEMENT_CPU) { 1761 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1762 group_impl->placement_id = spdk_env_get_current_core(); 1763 } 1764 1765 return &group_impl->base; 1766 } 1767 1768 static struct spdk_sock_group_impl * 1769 posix_sock_group_impl_create(void) 1770 { 1771 return _sock_group_impl_create(g_posix_impl_opts.enable_placement_id); 1772 } 1773 1774 static struct spdk_sock_group_impl * 1775 ssl_sock_group_impl_create(void) 1776 { 1777 return _sock_group_impl_create(g_ssl_impl_opts.enable_placement_id); 1778 } 1779 1780 static void 1781 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1782 int placement_id) 1783 { 1784 #if defined(SO_MARK) 1785 int rc; 1786 1787 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1788 &placement_id, sizeof(placement_id)); 1789 if (rc != 0) { 1790 /* Not fatal */ 1791 SPDK_ERRLOG("Error setting SO_MARK\n"); 1792 return; 1793 } 1794 1795 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1796 if (rc != 0) { 1797 /* Not fatal */ 1798 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1799 return; 1800 } 1801 1802 sock->placement_id = placement_id; 1803 #endif 1804 } 1805 1806 static void 1807 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1808 { 1809 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1810 1811 if (group->placement_id == -1) { 1812 group->placement_id = spdk_sock_map_find_free(&g_map); 1813 1814 /* If a free placement id is found, update existing sockets in this group */ 1815 if (group->placement_id != -1) { 1816 struct spdk_sock *sock, *tmp; 1817 1818 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1819 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1820 } 1821 } 1822 } 1823 1824 if (group->placement_id != -1) { 1825 /* 1826 * group placement id is already determined for this poll group. 1827 * Mark socket with group's placement id. 1828 */ 1829 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1830 } 1831 } 1832 1833 static int 1834 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1835 { 1836 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1837 struct spdk_posix_sock *sock = __posix_sock(_sock); 1838 int rc; 1839 1840 #if defined(SPDK_EPOLL) 1841 struct epoll_event event; 1842 1843 memset(&event, 0, sizeof(event)); 1844 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1845 event.events = EPOLLIN | EPOLLERR; 1846 if (spdk_interrupt_mode_is_enabled()) { 1847 event.events |= EPOLLOUT; 1848 } 1849 1850 event.data.ptr = sock; 1851 1852 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1853 #elif defined(SPDK_KEVENT) 1854 struct kevent event; 1855 struct timespec ts = {0}; 1856 1857 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1858 1859 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1860 #endif 1861 1862 if (rc != 0) { 1863 return rc; 1864 } 1865 1866 /* switched from another polling group due to scheduling */ 1867 if (spdk_unlikely(sock->recv_pipe != NULL && 1868 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1869 sock->pipe_has_data = true; 1870 sock->socket_has_data = false; 1871 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1872 } else if (sock->recv_pipe != NULL) { 1873 rc = spdk_pipe_group_add(group->pipe_group, sock->recv_pipe); 1874 assert(rc == 0); 1875 } 1876 1877 if (_sock->impl_opts.enable_placement_id == PLACEMENT_MARK) { 1878 posix_sock_update_mark(_group, _sock); 1879 } else if (sock->placement_id != -1) { 1880 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1881 if (rc != 0) { 1882 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1883 /* Do not treat this as an error. The system will continue running. */ 1884 } 1885 } 1886 1887 return rc; 1888 } 1889 1890 static int 1891 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1892 { 1893 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1894 struct spdk_posix_sock *sock = __posix_sock(_sock); 1895 int rc; 1896 1897 if (sock->pipe_has_data || sock->socket_has_data) { 1898 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1899 sock->pipe_has_data = false; 1900 sock->socket_has_data = false; 1901 } else if (sock->recv_pipe != NULL) { 1902 rc = spdk_pipe_group_remove(group->pipe_group, sock->recv_pipe); 1903 assert(rc == 0); 1904 } 1905 1906 if (sock->placement_id != -1) { 1907 spdk_sock_map_release(&g_map, sock->placement_id); 1908 } 1909 1910 #if defined(SPDK_EPOLL) 1911 struct epoll_event event; 1912 1913 /* Event parameter is ignored but some old kernel version still require it. */ 1914 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1915 #elif defined(SPDK_KEVENT) 1916 struct kevent event; 1917 struct timespec ts = {0}; 1918 1919 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1920 1921 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1922 if (rc == 0 && event.flags & EV_ERROR) { 1923 rc = -1; 1924 errno = event.data; 1925 } 1926 #endif 1927 1928 spdk_sock_abort_requests(_sock); 1929 1930 return rc; 1931 } 1932 1933 static int 1934 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1935 struct spdk_sock **socks) 1936 { 1937 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1938 struct spdk_sock *sock, *tmp; 1939 int num_events, i, rc; 1940 struct spdk_posix_sock *psock, *ptmp; 1941 #if defined(SPDK_EPOLL) 1942 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1943 #elif defined(SPDK_KEVENT) 1944 struct kevent events[MAX_EVENTS_PER_POLL]; 1945 struct timespec ts = {0}; 1946 #endif 1947 1948 #ifdef SPDK_ZEROCOPY 1949 /* When all of the following conditions are met 1950 * - non-blocking socket 1951 * - zero copy is enabled 1952 * - interrupts suppressed (i.e. busy polling) 1953 * - the NIC tx queue is full at the time sendmsg() is called 1954 * - epoll_wait determines there is an EPOLLIN event for the socket 1955 * then we can get into a situation where data we've sent is queued 1956 * up in the kernel network stack, but interrupts have been suppressed 1957 * because other traffic is flowing so the kernel misses the signal 1958 * to flush the software tx queue. If there wasn't incoming data 1959 * pending on the socket, then epoll_wait would have been sufficient 1960 * to kick off the send operation, but since there is a pending event 1961 * epoll_wait does not trigger the necessary operation. 1962 * 1963 * We deal with this by checking for all of the above conditions and 1964 * additionally looking for EPOLLIN events that were not consumed from 1965 * the last poll loop. We take this to mean that the upper layer is 1966 * unable to consume them because it is blocked waiting for resources 1967 * to free up, and those resources are most likely freed in response 1968 * to a pending asynchronous write completing. 1969 * 1970 * Additionally, sockets that have the same placement_id actually share 1971 * an underlying hardware queue. That means polling one of them is 1972 * equivalent to polling all of them. As a quick mechanism to avoid 1973 * making extra poll() calls, stash the last placement_id during the loop 1974 * and only poll if it's not the same. The overwhelmingly common case 1975 * is that all sockets in this list have the same placement_id because 1976 * SPDK is intentionally grouping sockets by that value, so even 1977 * though this won't stop all extra calls to poll(), it's very fast 1978 * and will catch all of them in practice. 1979 */ 1980 int last_placement_id = -1; 1981 1982 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 1983 if (psock->zcopy && psock->placement_id >= 0 && 1984 psock->placement_id != last_placement_id) { 1985 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 1986 1987 poll(&pfd, 1, 0); 1988 last_placement_id = psock->placement_id; 1989 } 1990 } 1991 #endif 1992 1993 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1994 * a completion callback could remove the sock from the 1995 * group. */ 1996 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1997 rc = _sock_flush(sock); 1998 if (rc < 0 && errno != EAGAIN) { 1999 spdk_sock_abort_requests(sock); 2000 } 2001 } 2002 2003 assert(max_events > 0); 2004 2005 #if defined(SPDK_EPOLL) 2006 num_events = epoll_wait(group->fd, events, max_events, 0); 2007 #elif defined(SPDK_KEVENT) 2008 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 2009 #endif 2010 2011 if (num_events == -1) { 2012 return -1; 2013 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 2014 sock = TAILQ_FIRST(&_group->socks); 2015 psock = __posix_sock(sock); 2016 /* poll() is called here to busy poll the queue associated with 2017 * first socket in list and potentially reap incoming data. 2018 */ 2019 if (sock->opts.priority) { 2020 struct pollfd pfd = {0, 0, 0}; 2021 2022 pfd.fd = psock->fd; 2023 pfd.events = POLLIN | POLLERR; 2024 poll(&pfd, 1, 0); 2025 } 2026 } 2027 2028 for (i = 0; i < num_events; i++) { 2029 #if defined(SPDK_EPOLL) 2030 sock = events[i].data.ptr; 2031 psock = __posix_sock(sock); 2032 2033 #ifdef SPDK_ZEROCOPY 2034 if (events[i].events & EPOLLERR) { 2035 rc = _sock_check_zcopy(sock); 2036 /* If the socket was closed or removed from 2037 * the group in response to a send ack, don't 2038 * add it to the array here. */ 2039 if (rc || sock->cb_fn == NULL) { 2040 continue; 2041 } 2042 } 2043 #endif 2044 if ((events[i].events & EPOLLIN) == 0) { 2045 continue; 2046 } 2047 2048 #elif defined(SPDK_KEVENT) 2049 sock = events[i].udata; 2050 psock = __posix_sock(sock); 2051 #endif 2052 2053 /* If the socket is not already in the list, add it now */ 2054 if (!psock->socket_has_data && !psock->pipe_has_data) { 2055 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 2056 } 2057 psock->socket_has_data = true; 2058 } 2059 2060 num_events = 0; 2061 2062 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 2063 if (num_events == max_events) { 2064 break; 2065 } 2066 2067 /* If the socket's cb_fn is NULL, just remove it from the 2068 * list and do not add it to socks array */ 2069 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 2070 psock->socket_has_data = false; 2071 psock->pipe_has_data = false; 2072 TAILQ_REMOVE(&group->socks_with_data, psock, link); 2073 continue; 2074 } 2075 2076 socks[num_events++] = &psock->base; 2077 } 2078 2079 /* Cycle the has_data list so that each time we poll things aren't 2080 * in the same order. Say we have 6 sockets in the list, named as follows: 2081 * A B C D E F 2082 * And all 6 sockets had epoll events, but max_events is only 3. That means 2083 * psock currently points at D. We want to rearrange the list to the following: 2084 * D E F A B C 2085 * 2086 * The variables below are named according to this example to make it easier to 2087 * follow the swaps. 2088 */ 2089 if (psock != NULL) { 2090 struct spdk_posix_sock *pa, *pc, *pd, *pf; 2091 2092 /* Capture pointers to the elements we need */ 2093 pd = psock; 2094 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 2095 pa = TAILQ_FIRST(&group->socks_with_data); 2096 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 2097 2098 /* Break the link between C and D */ 2099 pc->link.tqe_next = NULL; 2100 2101 /* Connect F to A */ 2102 pf->link.tqe_next = pa; 2103 pa->link.tqe_prev = &pf->link.tqe_next; 2104 2105 /* Fix up the list first/last pointers */ 2106 group->socks_with_data.tqh_first = pd; 2107 group->socks_with_data.tqh_last = &pc->link.tqe_next; 2108 2109 /* D is in front of the list, make tqe prev pointer point to the head of list */ 2110 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 2111 } 2112 2113 return num_events; 2114 } 2115 2116 static int 2117 posix_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events, 2118 spdk_interrupt_fn fn, void *arg, const char *name) 2119 { 2120 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2121 2122 group->intr = spdk_interrupt_register_for_events(group->fd, events, fn, arg, name); 2123 2124 return group->intr ? 0 : -1; 2125 } 2126 2127 static void 2128 posix_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group) 2129 { 2130 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2131 2132 spdk_interrupt_unregister(&group->intr); 2133 } 2134 2135 static int 2136 _sock_group_impl_close(struct spdk_sock_group_impl *_group, uint32_t enable_placement_id) 2137 { 2138 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 2139 int rc; 2140 2141 if (enable_placement_id == PLACEMENT_CPU) { 2142 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 2143 } 2144 2145 spdk_pipe_group_destroy(group->pipe_group); 2146 rc = close(group->fd); 2147 free(group); 2148 return rc; 2149 } 2150 2151 static int 2152 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2153 { 2154 return _sock_group_impl_close(_group, g_posix_impl_opts.enable_placement_id); 2155 } 2156 2157 static int 2158 ssl_sock_group_impl_close(struct spdk_sock_group_impl *_group) 2159 { 2160 return _sock_group_impl_close(_group, g_ssl_impl_opts.enable_placement_id); 2161 } 2162 2163 static struct spdk_net_impl g_posix_net_impl = { 2164 .name = "posix", 2165 .getaddr = posix_sock_getaddr, 2166 .connect = posix_sock_connect, 2167 .listen = posix_sock_listen, 2168 .accept = posix_sock_accept, 2169 .close = posix_sock_close, 2170 .recv = posix_sock_recv, 2171 .readv = posix_sock_readv, 2172 .writev = posix_sock_writev, 2173 .recv_next = posix_sock_recv_next, 2174 .writev_async = posix_sock_writev_async, 2175 .flush = posix_sock_flush, 2176 .set_recvlowat = posix_sock_set_recvlowat, 2177 .set_recvbuf = posix_sock_set_recvbuf, 2178 .set_sendbuf = posix_sock_set_sendbuf, 2179 .is_ipv6 = posix_sock_is_ipv6, 2180 .is_ipv4 = posix_sock_is_ipv4, 2181 .is_connected = posix_sock_is_connected, 2182 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2183 .group_impl_create = posix_sock_group_impl_create, 2184 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2185 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2186 .group_impl_poll = posix_sock_group_impl_poll, 2187 .group_impl_register_interrupt = posix_sock_group_impl_register_interrupt, 2188 .group_impl_unregister_interrupt = posix_sock_group_impl_unregister_interrupt, 2189 .group_impl_close = posix_sock_group_impl_close, 2190 .get_opts = posix_sock_impl_get_opts, 2191 .set_opts = posix_sock_impl_set_opts, 2192 }; 2193 2194 SPDK_NET_IMPL_REGISTER_DEFAULT(posix, &g_posix_net_impl); 2195 2196 static struct spdk_sock * 2197 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 2198 { 2199 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true); 2200 } 2201 2202 static struct spdk_sock * 2203 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 2204 { 2205 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true); 2206 } 2207 2208 static struct spdk_sock * 2209 ssl_sock_accept(struct spdk_sock *_sock) 2210 { 2211 return _posix_sock_accept(_sock, true); 2212 } 2213 2214 static struct spdk_net_impl g_ssl_net_impl = { 2215 .name = "ssl", 2216 .getaddr = posix_sock_getaddr, 2217 .connect = ssl_sock_connect, 2218 .listen = ssl_sock_listen, 2219 .accept = ssl_sock_accept, 2220 .close = posix_sock_close, 2221 .recv = posix_sock_recv, 2222 .readv = posix_sock_readv, 2223 .writev = posix_sock_writev, 2224 .recv_next = posix_sock_recv_next, 2225 .writev_async = posix_sock_writev_async, 2226 .flush = posix_sock_flush, 2227 .set_recvlowat = posix_sock_set_recvlowat, 2228 .set_recvbuf = posix_sock_set_recvbuf, 2229 .set_sendbuf = posix_sock_set_sendbuf, 2230 .is_ipv6 = posix_sock_is_ipv6, 2231 .is_ipv4 = posix_sock_is_ipv4, 2232 .is_connected = posix_sock_is_connected, 2233 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2234 .group_impl_create = ssl_sock_group_impl_create, 2235 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2236 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2237 .group_impl_poll = posix_sock_group_impl_poll, 2238 .group_impl_register_interrupt = posix_sock_group_impl_register_interrupt, 2239 .group_impl_unregister_interrupt = posix_sock_group_impl_unregister_interrupt, 2240 .group_impl_close = ssl_sock_group_impl_close, 2241 .get_opts = ssl_sock_impl_get_opts, 2242 .set_opts = ssl_sock_impl_set_opts, 2243 }; 2244 2245 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl); 2246 SPDK_LOG_REGISTER_COMPONENT(sock_posix) 2247