1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. All rights reserved. 3 * Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #if defined(__FreeBSD__) 10 #include <sys/event.h> 11 #define SPDK_KEVENT 12 #else 13 #include <sys/epoll.h> 14 #define SPDK_EPOLL 15 #endif 16 17 #if defined(__linux__) 18 #include <linux/errqueue.h> 19 #endif 20 21 #include "spdk/env.h" 22 #include "spdk/log.h" 23 #include "spdk/pipe.h" 24 #include "spdk/sock.h" 25 #include "spdk/util.h" 26 #include "spdk/string.h" 27 #include "spdk_internal/sock.h" 28 #include "../sock_kernel.h" 29 30 #include "openssl/crypto.h" 31 #include "openssl/err.h" 32 #include "openssl/ssl.h" 33 34 #define MAX_TMPBUF 1024 35 #define PORTNUMLEN 32 36 37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) 38 #define SPDK_ZEROCOPY 39 #endif 40 41 struct spdk_posix_sock { 42 struct spdk_sock base; 43 int fd; 44 45 uint32_t sendmsg_idx; 46 47 struct spdk_pipe *recv_pipe; 48 void *recv_buf; 49 int recv_buf_sz; 50 bool pipe_has_data; 51 bool socket_has_data; 52 bool zcopy; 53 54 int placement_id; 55 56 SSL_CTX *ctx; 57 SSL *ssl; 58 59 TAILQ_ENTRY(spdk_posix_sock) link; 60 }; 61 62 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); 63 64 struct spdk_posix_sock_group_impl { 65 struct spdk_sock_group_impl base; 66 int fd; 67 struct spdk_has_data_list socks_with_data; 68 int placement_id; 69 }; 70 71 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { 72 .recv_buf_size = MIN_SO_RCVBUF_SIZE, 73 .send_buf_size = MIN_SO_SNDBUF_SIZE, 74 .enable_recv_pipe = true, 75 .enable_quickack = false, 76 .enable_placement_id = PLACEMENT_NONE, 77 .enable_zerocopy_send_server = true, 78 .enable_zerocopy_send_client = false, 79 .zerocopy_threshold = 0 80 }; 81 82 static struct spdk_sock_map g_map = { 83 .entries = STAILQ_HEAD_INITIALIZER(g_map.entries), 84 .mtx = PTHREAD_MUTEX_INITIALIZER 85 }; 86 87 __attribute((destructor)) static void 88 posix_sock_map_cleanup(void) 89 { 90 spdk_sock_map_cleanup(&g_map); 91 } 92 93 #define __posix_sock(sock) (struct spdk_posix_sock *)sock 94 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group 95 96 static int 97 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, 98 char *caddr, int clen, uint16_t *cport) 99 { 100 struct spdk_posix_sock *sock = __posix_sock(_sock); 101 struct sockaddr_storage sa; 102 socklen_t salen; 103 int rc; 104 105 assert(sock != NULL); 106 107 memset(&sa, 0, sizeof sa); 108 salen = sizeof sa; 109 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 110 if (rc != 0) { 111 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 112 return -1; 113 } 114 115 switch (sa.ss_family) { 116 case AF_UNIX: 117 /* Acceptable connection types that don't have IPs */ 118 return 0; 119 case AF_INET: 120 case AF_INET6: 121 /* Code below will get IP addresses */ 122 break; 123 default: 124 /* Unsupported socket family */ 125 return -1; 126 } 127 128 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); 129 if (rc != 0) { 130 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 131 return -1; 132 } 133 134 if (sport) { 135 if (sa.ss_family == AF_INET) { 136 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 137 } else if (sa.ss_family == AF_INET6) { 138 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 139 } 140 } 141 142 memset(&sa, 0, sizeof sa); 143 salen = sizeof sa; 144 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); 145 if (rc != 0) { 146 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); 147 return -1; 148 } 149 150 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); 151 if (rc != 0) { 152 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); 153 return -1; 154 } 155 156 if (cport) { 157 if (sa.ss_family == AF_INET) { 158 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); 159 } else if (sa.ss_family == AF_INET6) { 160 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); 161 } 162 } 163 164 return 0; 165 } 166 167 enum posix_sock_create_type { 168 SPDK_SOCK_CREATE_LISTEN, 169 SPDK_SOCK_CREATE_CONNECT, 170 }; 171 172 static int 173 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) 174 { 175 uint8_t *new_buf; 176 struct spdk_pipe *new_pipe; 177 struct iovec siov[2]; 178 struct iovec diov[2]; 179 int sbytes; 180 ssize_t bytes; 181 182 if (sock->recv_buf_sz == sz) { 183 return 0; 184 } 185 186 /* If the new size is 0, just free the pipe */ 187 if (sz == 0) { 188 spdk_pipe_destroy(sock->recv_pipe); 189 free(sock->recv_buf); 190 sock->recv_pipe = NULL; 191 sock->recv_buf = NULL; 192 return 0; 193 } else if (sz < MIN_SOCK_PIPE_SIZE) { 194 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); 195 return -1; 196 } 197 198 /* Round up to next 64 byte multiple */ 199 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); 200 if (!new_buf) { 201 SPDK_ERRLOG("socket recv buf allocation failed\n"); 202 return -ENOMEM; 203 } 204 205 new_pipe = spdk_pipe_create(new_buf, sz + 1); 206 if (new_pipe == NULL) { 207 SPDK_ERRLOG("socket pipe allocation failed\n"); 208 free(new_buf); 209 return -ENOMEM; 210 } 211 212 if (sock->recv_pipe != NULL) { 213 /* Pull all of the data out of the old pipe */ 214 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 215 if (sbytes > sz) { 216 /* Too much data to fit into the new pipe size */ 217 spdk_pipe_destroy(new_pipe); 218 free(new_buf); 219 return -EINVAL; 220 } 221 222 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); 223 assert(sbytes == sz); 224 225 bytes = spdk_iovcpy(siov, 2, diov, 2); 226 spdk_pipe_writer_advance(new_pipe, bytes); 227 228 spdk_pipe_destroy(sock->recv_pipe); 229 free(sock->recv_buf); 230 } 231 232 sock->recv_buf_sz = sz; 233 sock->recv_buf = new_buf; 234 sock->recv_pipe = new_pipe; 235 236 return 0; 237 } 238 239 static int 240 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) 241 { 242 struct spdk_posix_sock *sock = __posix_sock(_sock); 243 int rc; 244 245 assert(sock != NULL); 246 247 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { 248 rc = posix_sock_alloc_pipe(sock, sz); 249 if (rc) { 250 return rc; 251 } 252 } 253 254 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ 255 if (sz < MIN_SO_RCVBUF_SIZE) { 256 sz = MIN_SO_RCVBUF_SIZE; 257 } 258 259 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 260 if (rc < 0) { 261 return rc; 262 } 263 264 return 0; 265 } 266 267 static int 268 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) 269 { 270 struct spdk_posix_sock *sock = __posix_sock(_sock); 271 int rc; 272 273 assert(sock != NULL); 274 275 if (sz < MIN_SO_SNDBUF_SIZE) { 276 sz = MIN_SO_SNDBUF_SIZE; 277 } 278 279 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 280 if (rc < 0) { 281 return rc; 282 } 283 284 return 0; 285 } 286 287 static void 288 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy) 289 { 290 #if defined(SPDK_ZEROCOPY) || defined(__linux__) 291 int flag; 292 int rc; 293 #endif 294 295 #if defined(SPDK_ZEROCOPY) 296 flag = 1; 297 298 if (enable_zero_copy) { 299 /* Try to turn on zero copy sends */ 300 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); 301 if (rc == 0) { 302 sock->zcopy = true; 303 } 304 } 305 #endif 306 307 #if defined(__linux__) 308 flag = 1; 309 310 if (g_spdk_posix_sock_impl_opts.enable_quickack) { 311 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag)); 312 if (rc != 0) { 313 SPDK_ERRLOG("quickack was failed to set\n"); 314 } 315 } 316 317 spdk_sock_get_placement_id(sock->fd, g_spdk_posix_sock_impl_opts.enable_placement_id, 318 &sock->placement_id); 319 320 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 321 /* Save placement_id */ 322 spdk_sock_map_insert(&g_map, sock->placement_id, NULL); 323 } 324 #endif 325 } 326 327 static struct spdk_posix_sock * 328 posix_sock_alloc(int fd, bool enable_zero_copy) 329 { 330 struct spdk_posix_sock *sock; 331 332 sock = calloc(1, sizeof(*sock)); 333 if (sock == NULL) { 334 SPDK_ERRLOG("sock allocation failed\n"); 335 return NULL; 336 } 337 338 sock->fd = fd; 339 posix_sock_init(sock, enable_zero_copy); 340 341 return sock; 342 } 343 344 static int 345 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts) 346 { 347 int fd; 348 int val = 1; 349 int rc, sz; 350 #if defined(__linux__) 351 int to; 352 #endif 353 354 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 355 if (fd < 0) { 356 /* error */ 357 return -1; 358 } 359 360 sz = g_spdk_posix_sock_impl_opts.recv_buf_size; 361 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); 362 if (rc) { 363 /* Not fatal */ 364 } 365 366 sz = g_spdk_posix_sock_impl_opts.send_buf_size; 367 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); 368 if (rc) { 369 /* Not fatal */ 370 } 371 372 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); 373 if (rc != 0) { 374 close(fd); 375 /* error */ 376 return -1; 377 } 378 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); 379 if (rc != 0) { 380 close(fd); 381 /* error */ 382 return -1; 383 } 384 385 #if defined(SO_PRIORITY) 386 if (opts->priority) { 387 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); 388 if (rc != 0) { 389 close(fd); 390 /* error */ 391 return -1; 392 } 393 } 394 #endif 395 396 if (res->ai_family == AF_INET6) { 397 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); 398 if (rc != 0) { 399 close(fd); 400 /* error */ 401 return -1; 402 } 403 } 404 405 if (opts->ack_timeout) { 406 #if defined(__linux__) 407 to = opts->ack_timeout; 408 rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to)); 409 if (rc != 0) { 410 close(fd); 411 /* error */ 412 return -1; 413 } 414 #else 415 SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n"); 416 #endif 417 } 418 419 return fd; 420 } 421 422 #define PSK_ID "nqn.2014-08.org.nvmexpress:uuid:f81d4fae-7dec-11d0-a765-00a0c91e6bf6" 423 #define PSK_KEY "1234567890ABCDEF" 424 425 static unsigned int 426 posix_sock_tls_psk_server_cb(SSL *ssl, 427 const char *id, 428 unsigned char *psk, 429 unsigned int max_psk_len) 430 { 431 long key_len; 432 unsigned char *default_psk; 433 434 if (PSK_KEY == NULL) { 435 SPDK_ERRLOG("PSK is not set\n"); 436 goto err; 437 } 438 SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(PSK_ID)); 439 if (id == NULL) { 440 SPDK_ERRLOG("Received empty PSK ID\n"); 441 goto err; 442 } 443 SPDK_DEBUGLOG(sock_posix, "Received PSK ID '%s'\n", id); 444 if (strcmp(PSK_ID, id) != 0) { 445 SPDK_ERRLOG("Unknown Client's PSK ID\n"); 446 goto err; 447 } 448 449 SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK KEY %u\n", max_psk_len); 450 default_psk = OPENSSL_hexstr2buf(PSK_KEY, &key_len); 451 if (default_psk == NULL) { 452 SPDK_ERRLOG("Could not unhexlify PSK\n"); 453 goto err; 454 } 455 if (key_len > max_psk_len) { 456 SPDK_ERRLOG("Insufficient buffer size to copy PSK\n"); 457 goto err; 458 } 459 460 memcpy(psk, default_psk, key_len); 461 462 return key_len; 463 464 err: 465 return 0; 466 } 467 468 static unsigned int 469 posix_sock_tls_psk_client_cb(SSL *ssl, const char *hint, 470 char *identity, 471 unsigned int max_identity_len, 472 unsigned char *psk, 473 unsigned int max_psk_len) 474 { 475 long key_len; 476 unsigned char *default_psk; 477 478 if (hint) { 479 SPDK_DEBUGLOG(sock_posix, "Received PSK identity hint '%s'\n", hint); 480 } 481 482 if (PSK_KEY == NULL) { 483 SPDK_ERRLOG("PSK is not set\n"); 484 goto err; 485 } 486 default_psk = OPENSSL_hexstr2buf(PSK_KEY, &key_len); 487 if (default_psk == NULL) { 488 SPDK_ERRLOG("Could not unhexlify PSK\n"); 489 goto err; 490 } 491 if ((strlen(PSK_ID) + 1 > max_identity_len) 492 || (key_len > max_psk_len)) { 493 SPDK_ERRLOG("PSK ID or Key buffer is not sufficient\n"); 494 goto err; 495 } 496 spdk_strcpy_pad(identity, PSK_ID, strlen(PSK_ID), 0); 497 SPDK_DEBUGLOG(sock_posix, "Sending PSK identity '%s'\n", identity); 498 499 memcpy(psk, default_psk, key_len); 500 SPDK_DEBUGLOG(sock_posix, "Provided out-of-band (OOB) PSK for TLS1.3 client\n"); 501 502 return key_len; 503 504 err: 505 return 0; 506 } 507 508 static SSL_CTX * 509 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts) 510 { 511 SSL_CTX *ctx; 512 int tls_version = 0; 513 bool ktls_enabled = false; 514 #ifdef SSL_OP_ENABLE_KTLS 515 long options; 516 #endif 517 518 SSL_library_init(); 519 OpenSSL_add_all_algorithms(); 520 SSL_load_error_strings(); 521 /* Produce a SSL CTX in SSL V2 and V3 standards compliant way */ 522 ctx = SSL_CTX_new(method); 523 if (!ctx) { 524 SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 525 return NULL; 526 } 527 SPDK_DEBUGLOG(sock_posix, "SSL context created\n"); 528 529 switch (opts->tls_version) { 530 case 0: 531 /* auto-negotioation */ 532 break; 533 case SPDK_TLS_VERSION_1_1: 534 tls_version = TLS1_1_VERSION; 535 break; 536 case SPDK_TLS_VERSION_1_2: 537 tls_version = TLS1_2_VERSION; 538 break; 539 case SPDK_TLS_VERSION_1_3: 540 tls_version = TLS1_3_VERSION; 541 break; 542 default: 543 SPDK_ERRLOG("Incorrect TLS version provided: %d\n", opts->tls_version); 544 goto err; 545 } 546 547 if (tls_version) { 548 SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", opts->tls_version, tls_version); 549 if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) { 550 SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", opts->tls_version, tls_version); 551 goto err; 552 } 553 if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) { 554 SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", opts->tls_version, tls_version); 555 goto err; 556 } 557 } 558 if (opts->ktls) { 559 SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n"); 560 #ifdef SSL_OP_ENABLE_KTLS 561 options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS); 562 ktls_enabled = options & SSL_OP_ENABLE_KTLS; 563 #else 564 ktls_enabled = false; 565 #endif 566 if (!ktls_enabled) { 567 SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n"); 568 goto err; 569 } 570 } 571 572 return ctx; 573 574 err: 575 SSL_CTX_free(ctx); 576 return NULL; 577 } 578 579 static SSL * 580 ssl_sock_connect_loop(SSL_CTX *ctx, int fd) 581 { 582 int rc; 583 SSL *ssl; 584 int ssl_get_error; 585 586 ssl = SSL_new(ctx); 587 if (!ssl) { 588 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 589 return NULL; 590 } 591 SSL_set_fd(ssl, fd); 592 SSL_set_psk_client_callback(ssl, posix_sock_tls_psk_client_cb); 593 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 594 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 595 while ((rc = SSL_connect(ssl)) != 1) { 596 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 597 ssl_get_error = SSL_get_error(ssl, rc); 598 SPDK_DEBUGLOG(sock_posix, "SSL_connect failed %d = SSL_connect(%p), %d = SSL_get_error(%p, %d)\n", 599 rc, ssl, ssl_get_error, ssl, rc); 600 switch (ssl_get_error) { 601 case SSL_ERROR_WANT_READ: 602 case SSL_ERROR_WANT_WRITE: 603 continue; 604 default: 605 break; 606 } 607 SPDK_ERRLOG("SSL_connect() failed, errno = %d\n", errno); 608 SSL_free(ssl); 609 return NULL; 610 } 611 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 612 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 613 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 614 return ssl; 615 } 616 617 static SSL * 618 ssl_sock_accept_loop(SSL_CTX *ctx, int fd) 619 { 620 int rc; 621 SSL *ssl; 622 int ssl_get_error; 623 624 ssl = SSL_new(ctx); 625 if (!ssl) { 626 SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL)); 627 return NULL; 628 } 629 SSL_set_fd(ssl, fd); 630 SSL_set_psk_server_callback(ssl, posix_sock_tls_psk_server_cb); 631 SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl); 632 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 633 while ((rc = SSL_accept(ssl)) != 1) { 634 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 635 ssl_get_error = SSL_get_error(ssl, rc); 636 SPDK_DEBUGLOG(sock_posix, "SSL_accept failed %d = SSL_accept(%p), %d = SSL_get_error(%p, %d)\n", rc, 637 ssl, ssl_get_error, ssl, rc); 638 switch (ssl_get_error) { 639 case SSL_ERROR_WANT_READ: 640 case SSL_ERROR_WANT_WRITE: 641 continue; 642 default: 643 break; 644 } 645 SPDK_ERRLOG("SSL_accept() failed, errno = %d\n", errno); 646 SSL_free(ssl); 647 return NULL; 648 } 649 SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl); 650 SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n", 651 SSL_CIPHER_get_name(SSL_get_current_cipher(ssl))); 652 return ssl; 653 } 654 655 static ssize_t 656 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt) 657 { 658 int i, rc = 0; 659 ssize_t total = 0; 660 661 for (i = 0; i < iovcnt; i++) { 662 rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len); 663 664 if (rc > 0) { 665 total += rc; 666 } 667 if (rc != (int)iov[i].iov_len) { 668 break; 669 } 670 } 671 if (total > 0) { 672 errno = 0; 673 return total; 674 } 675 switch (SSL_get_error(ssl, rc)) { 676 case SSL_ERROR_ZERO_RETURN: 677 errno = ENOTCONN; 678 return 0; 679 case SSL_ERROR_WANT_READ: 680 case SSL_ERROR_WANT_WRITE: 681 case SSL_ERROR_WANT_CONNECT: 682 case SSL_ERROR_WANT_ACCEPT: 683 case SSL_ERROR_WANT_X509_LOOKUP: 684 case SSL_ERROR_WANT_ASYNC: 685 case SSL_ERROR_WANT_ASYNC_JOB: 686 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 687 errno = EAGAIN; 688 return -1; 689 case SSL_ERROR_SYSCALL: 690 case SSL_ERROR_SSL: 691 errno = ENOTCONN; 692 return -1; 693 default: 694 errno = ENOTCONN; 695 return -1; 696 } 697 } 698 699 static ssize_t 700 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt) 701 { 702 int i, rc = 0; 703 ssize_t total = 0; 704 705 for (i = 0; i < iovcnt; i++) { 706 rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len); 707 708 if (rc > 0) { 709 total += rc; 710 } 711 if (rc != (int)iov[i].iov_len) { 712 break; 713 } 714 } 715 if (total > 0) { 716 errno = 0; 717 return total; 718 } 719 switch (SSL_get_error(ssl, rc)) { 720 case SSL_ERROR_ZERO_RETURN: 721 errno = ENOTCONN; 722 return 0; 723 case SSL_ERROR_WANT_READ: 724 case SSL_ERROR_WANT_WRITE: 725 case SSL_ERROR_WANT_CONNECT: 726 case SSL_ERROR_WANT_ACCEPT: 727 case SSL_ERROR_WANT_X509_LOOKUP: 728 case SSL_ERROR_WANT_ASYNC: 729 case SSL_ERROR_WANT_ASYNC_JOB: 730 case SSL_ERROR_WANT_CLIENT_HELLO_CB: 731 errno = EAGAIN; 732 return -1; 733 case SSL_ERROR_SYSCALL: 734 case SSL_ERROR_SSL: 735 errno = ENOTCONN; 736 return -1; 737 default: 738 errno = ENOTCONN; 739 return -1; 740 } 741 } 742 743 static struct spdk_sock * 744 posix_sock_create(const char *ip, int port, 745 enum posix_sock_create_type type, 746 struct spdk_sock_opts *opts, 747 bool enable_ssl) 748 { 749 struct spdk_posix_sock *sock; 750 char buf[MAX_TMPBUF]; 751 char portnum[PORTNUMLEN]; 752 char *p; 753 struct addrinfo hints, *res, *res0; 754 int fd, flag; 755 int rc; 756 bool enable_zcopy_user_opts = true; 757 bool enable_zcopy_impl_opts = true; 758 SSL_CTX *ctx = 0; 759 SSL *ssl = 0; 760 761 assert(opts != NULL); 762 763 if (ip == NULL) { 764 return NULL; 765 } 766 if (ip[0] == '[') { 767 snprintf(buf, sizeof(buf), "%s", ip + 1); 768 p = strchr(buf, ']'); 769 if (p != NULL) { 770 *p = '\0'; 771 } 772 ip = (const char *) &buf[0]; 773 } 774 775 snprintf(portnum, sizeof portnum, "%d", port); 776 memset(&hints, 0, sizeof hints); 777 hints.ai_family = PF_UNSPEC; 778 hints.ai_socktype = SOCK_STREAM; 779 hints.ai_flags = AI_NUMERICSERV; 780 hints.ai_flags |= AI_PASSIVE; 781 hints.ai_flags |= AI_NUMERICHOST; 782 rc = getaddrinfo(ip, portnum, &hints, &res0); 783 if (rc != 0) { 784 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc); 785 return NULL; 786 } 787 788 /* try listen */ 789 fd = -1; 790 for (res = res0; res != NULL; res = res->ai_next) { 791 retry: 792 fd = posix_fd_create(res, opts); 793 if (fd < 0) { 794 continue; 795 } 796 if (type == SPDK_SOCK_CREATE_LISTEN) { 797 if (enable_ssl) { 798 ctx = posix_sock_create_ssl_context(TLS_server_method(), opts); 799 if (!ctx) { 800 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 801 close(fd); 802 fd = -1; 803 break; 804 } 805 } 806 rc = bind(fd, res->ai_addr, res->ai_addrlen); 807 if (rc != 0) { 808 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); 809 switch (errno) { 810 case EINTR: 811 /* interrupted? */ 812 close(fd); 813 goto retry; 814 case EADDRNOTAVAIL: 815 SPDK_ERRLOG("IP address %s not available. " 816 "Verify IP address in config file " 817 "and make sure setup script is " 818 "run before starting spdk app.\n", ip); 819 /* FALLTHROUGH */ 820 default: 821 /* try next family */ 822 close(fd); 823 fd = -1; 824 continue; 825 } 826 } 827 /* bind OK */ 828 rc = listen(fd, 512); 829 if (rc != 0) { 830 SPDK_ERRLOG("listen() failed, errno = %d\n", errno); 831 close(fd); 832 fd = -1; 833 break; 834 } 835 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_server; 836 } else if (type == SPDK_SOCK_CREATE_CONNECT) { 837 rc = connect(fd, res->ai_addr, res->ai_addrlen); 838 if (rc != 0) { 839 SPDK_ERRLOG("connect() failed, errno = %d\n", errno); 840 /* try next family */ 841 close(fd); 842 fd = -1; 843 continue; 844 } 845 enable_zcopy_impl_opts = g_spdk_posix_sock_impl_opts.enable_zerocopy_send_client; 846 if (enable_ssl) { 847 ctx = posix_sock_create_ssl_context(TLS_client_method(), opts); 848 if (!ctx) { 849 SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno); 850 close(fd); 851 fd = -1; 852 break; 853 } 854 ssl = ssl_sock_connect_loop(ctx, fd); 855 if (!ssl) { 856 SPDK_ERRLOG("ssl_sock_connect_loop() failed, errno = %d\n", errno); 857 close(fd); 858 fd = -1; 859 SSL_CTX_free(ctx); 860 break; 861 } 862 } 863 } 864 865 flag = fcntl(fd, F_GETFL); 866 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { 867 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 868 close(fd); 869 fd = -1; 870 break; 871 } 872 break; 873 } 874 freeaddrinfo(res0); 875 876 if (fd < 0) { 877 return NULL; 878 } 879 880 /* Only enable zero copy for non-loopback and non-ssl sockets. */ 881 enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd) && !enable_ssl; 882 883 sock = posix_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts); 884 if (sock == NULL) { 885 SPDK_ERRLOG("sock allocation failed\n"); 886 close(fd); 887 return NULL; 888 } 889 890 if (ctx) { 891 sock->ctx = ctx; 892 } 893 894 if (ssl) { 895 sock->ssl = ssl; 896 } 897 898 return &sock->base; 899 } 900 901 static struct spdk_sock * 902 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 903 { 904 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false); 905 } 906 907 static struct spdk_sock * 908 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 909 { 910 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false); 911 } 912 913 static struct spdk_sock * 914 posix_sock_accept(struct spdk_sock *_sock) 915 { 916 struct spdk_posix_sock *sock = __posix_sock(_sock); 917 struct sockaddr_storage sa; 918 socklen_t salen; 919 int rc, fd; 920 struct spdk_posix_sock *new_sock; 921 int flag; 922 SSL *ssl = 0; 923 924 memset(&sa, 0, sizeof(sa)); 925 salen = sizeof(sa); 926 927 assert(sock != NULL); 928 929 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); 930 931 if (rc == -1) { 932 return NULL; 933 } 934 935 fd = rc; 936 937 flag = fcntl(fd, F_GETFL); 938 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { 939 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); 940 close(fd); 941 return NULL; 942 } 943 944 #if defined(SO_PRIORITY) 945 /* The priority is not inherited, so call this function again */ 946 if (sock->base.opts.priority) { 947 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); 948 if (rc != 0) { 949 close(fd); 950 return NULL; 951 } 952 } 953 #endif 954 955 /* Establish SSL connection */ 956 if (sock->ctx) { 957 ssl = ssl_sock_accept_loop(sock->ctx, fd); 958 if (!ssl) { 959 SPDK_ERRLOG("ssl_sock_accept_loop() failed, errno = %d\n", errno); 960 close(fd); 961 SSL_CTX_free(sock->ctx); 962 return NULL; 963 } 964 } 965 966 /* Inherit the zero copy feature from the listen socket */ 967 new_sock = posix_sock_alloc(fd, sock->zcopy); 968 if (new_sock == NULL) { 969 close(fd); 970 return NULL; 971 } 972 973 if (sock->ctx) { 974 new_sock->ctx = sock->ctx; 975 } 976 977 if (ssl) { 978 new_sock->ssl = ssl; 979 } 980 981 return &new_sock->base; 982 } 983 984 static int 985 posix_sock_close(struct spdk_sock *_sock) 986 { 987 struct spdk_posix_sock *sock = __posix_sock(_sock); 988 989 assert(TAILQ_EMPTY(&_sock->pending_reqs)); 990 991 /* If the socket fails to close, the best choice is to 992 * leak the fd but continue to free the rest of the sock 993 * memory. */ 994 close(sock->fd); 995 996 spdk_pipe_destroy(sock->recv_pipe); 997 free(sock->recv_buf); 998 free(sock); 999 1000 return 0; 1001 } 1002 1003 #ifdef SPDK_ZEROCOPY 1004 static int 1005 _sock_check_zcopy(struct spdk_sock *sock) 1006 { 1007 struct spdk_posix_sock *psock = __posix_sock(sock); 1008 struct msghdr msgh = {}; 1009 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; 1010 ssize_t rc; 1011 struct sock_extended_err *serr; 1012 struct cmsghdr *cm; 1013 uint32_t idx; 1014 struct spdk_sock_request *req, *treq; 1015 bool found; 1016 1017 msgh.msg_control = buf; 1018 msgh.msg_controllen = sizeof(buf); 1019 1020 while (true) { 1021 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); 1022 1023 if (rc < 0) { 1024 if (errno == EWOULDBLOCK || errno == EAGAIN) { 1025 return 0; 1026 } 1027 1028 if (!TAILQ_EMPTY(&sock->pending_reqs)) { 1029 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); 1030 } else { 1031 SPDK_WARNLOG("Recvmsg yielded an error!\n"); 1032 } 1033 return 0; 1034 } 1035 1036 cm = CMSG_FIRSTHDR(&msgh); 1037 if (!(cm && 1038 ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) || 1039 (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) { 1040 SPDK_WARNLOG("Unexpected cmsg level or type!\n"); 1041 return 0; 1042 } 1043 1044 serr = (struct sock_extended_err *)CMSG_DATA(cm); 1045 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 1046 SPDK_WARNLOG("Unexpected extended error origin\n"); 1047 return 0; 1048 } 1049 1050 /* Most of the time, the pending_reqs array is in the exact 1051 * order we need such that all of the requests to complete are 1052 * in order, in the front. It is guaranteed that all requests 1053 * belonging to the same sendmsg call are sequential, so once 1054 * we encounter one match we can stop looping as soon as a 1055 * non-match is found. 1056 */ 1057 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { 1058 found = false; 1059 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { 1060 if (!req->internal.is_zcopy) { 1061 /* This wasn't a zcopy request. It was just waiting in line to complete */ 1062 rc = spdk_sock_request_put(sock, req, 0); 1063 if (rc < 0) { 1064 return rc; 1065 } 1066 } else if (req->internal.offset == idx) { 1067 found = true; 1068 rc = spdk_sock_request_put(sock, req, 0); 1069 if (rc < 0) { 1070 return rc; 1071 } 1072 } else if (found) { 1073 break; 1074 } 1075 } 1076 } 1077 } 1078 1079 return 0; 1080 } 1081 #endif 1082 1083 static int 1084 _sock_flush(struct spdk_sock *sock) 1085 { 1086 struct spdk_posix_sock *psock = __posix_sock(sock); 1087 struct msghdr msg = {}; 1088 int flags; 1089 struct iovec iovs[IOV_BATCH_SIZE]; 1090 int iovcnt; 1091 int retval; 1092 struct spdk_sock_request *req; 1093 int i; 1094 ssize_t rc; 1095 unsigned int offset; 1096 size_t len; 1097 bool is_zcopy = false; 1098 1099 /* Can't flush from within a callback or we end up with recursive calls */ 1100 if (sock->cb_cnt > 0) { 1101 return 0; 1102 } 1103 1104 #ifdef SPDK_ZEROCOPY 1105 if (psock->zcopy) { 1106 flags = MSG_ZEROCOPY | MSG_NOSIGNAL; 1107 } else 1108 #endif 1109 { 1110 flags = MSG_NOSIGNAL; 1111 } 1112 1113 iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags); 1114 if (iovcnt == 0) { 1115 return 0; 1116 } 1117 1118 #ifdef SPDK_ZEROCOPY 1119 is_zcopy = flags & MSG_ZEROCOPY; 1120 #endif 1121 1122 /* Perform the vectored write */ 1123 msg.msg_iov = iovs; 1124 msg.msg_iovlen = iovcnt; 1125 1126 if (psock->ssl) { 1127 rc = SSL_writev(psock->ssl, iovs, iovcnt); 1128 } else { 1129 rc = sendmsg(psock->fd, &msg, flags); 1130 } 1131 if (rc <= 0) { 1132 if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) { 1133 return 0; 1134 } 1135 return rc; 1136 } 1137 1138 if (is_zcopy) { 1139 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the 1140 * req->internal.offset, so sendmsg_idx should not be zero */ 1141 if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) { 1142 psock->sendmsg_idx = 1; 1143 } else { 1144 psock->sendmsg_idx++; 1145 } 1146 } 1147 1148 /* Consume the requests that were actually written */ 1149 req = TAILQ_FIRST(&sock->queued_reqs); 1150 while (req) { 1151 offset = req->internal.offset; 1152 1153 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */ 1154 req->internal.is_zcopy = is_zcopy; 1155 1156 for (i = 0; i < req->iovcnt; i++) { 1157 /* Advance by the offset first */ 1158 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 1159 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 1160 continue; 1161 } 1162 1163 /* Calculate the remaining length of this element */ 1164 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 1165 1166 if (len > (size_t)rc) { 1167 /* This element was partially sent. */ 1168 req->internal.offset += rc; 1169 return 0; 1170 } 1171 1172 offset = 0; 1173 req->internal.offset += len; 1174 rc -= len; 1175 } 1176 1177 /* Handled a full request. */ 1178 spdk_sock_request_pend(sock, req); 1179 1180 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) { 1181 /* The sendmsg syscall above isn't currently asynchronous, 1182 * so it's already done. */ 1183 retval = spdk_sock_request_put(sock, req, 0); 1184 if (retval) { 1185 break; 1186 } 1187 } else { 1188 /* Re-use the offset field to hold the sendmsg call index. The 1189 * index is 0 based, so subtract one here because we've already 1190 * incremented above. */ 1191 req->internal.offset = psock->sendmsg_idx - 1; 1192 } 1193 1194 if (rc == 0) { 1195 break; 1196 } 1197 1198 req = TAILQ_FIRST(&sock->queued_reqs); 1199 } 1200 1201 return 0; 1202 } 1203 1204 static int 1205 posix_sock_flush(struct spdk_sock *sock) 1206 { 1207 #ifdef SPDK_ZEROCOPY 1208 struct spdk_posix_sock *psock = __posix_sock(sock); 1209 1210 if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) { 1211 _sock_check_zcopy(sock); 1212 } 1213 #endif 1214 1215 return _sock_flush(sock); 1216 } 1217 1218 static ssize_t 1219 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) 1220 { 1221 struct iovec siov[2]; 1222 int sbytes; 1223 ssize_t bytes; 1224 struct spdk_posix_sock_group_impl *group; 1225 1226 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); 1227 if (sbytes < 0) { 1228 errno = EINVAL; 1229 return -1; 1230 } else if (sbytes == 0) { 1231 errno = EAGAIN; 1232 return -1; 1233 } 1234 1235 bytes = spdk_iovcpy(siov, 2, diov, diovcnt); 1236 1237 if (bytes == 0) { 1238 /* The only way this happens is if diov is 0 length */ 1239 errno = EINVAL; 1240 return -1; 1241 } 1242 1243 spdk_pipe_reader_advance(sock->recv_pipe, bytes); 1244 1245 /* If we drained the pipe, mark it appropriately */ 1246 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { 1247 assert(sock->pipe_has_data == true); 1248 1249 group = __posix_group_impl(sock->base.group_impl); 1250 if (group && !sock->socket_has_data) { 1251 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1252 } 1253 1254 sock->pipe_has_data = false; 1255 } 1256 1257 return bytes; 1258 } 1259 1260 static inline ssize_t 1261 posix_sock_read(struct spdk_posix_sock *sock) 1262 { 1263 struct iovec iov[2]; 1264 int bytes_avail, bytes_recvd; 1265 struct spdk_posix_sock_group_impl *group; 1266 1267 bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); 1268 1269 if (bytes_avail <= 0) { 1270 return bytes_avail; 1271 } 1272 1273 if (sock->ssl) { 1274 bytes_recvd = SSL_readv(sock->ssl, iov, 2); 1275 } else { 1276 bytes_recvd = readv(sock->fd, iov, 2); 1277 } 1278 1279 assert(sock->pipe_has_data == false); 1280 1281 if (bytes_recvd <= 0) { 1282 /* Errors count as draining the socket data */ 1283 if (sock->base.group_impl && sock->socket_has_data) { 1284 group = __posix_group_impl(sock->base.group_impl); 1285 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1286 } 1287 1288 sock->socket_has_data = false; 1289 1290 return bytes_recvd; 1291 } 1292 1293 spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); 1294 1295 #if DEBUG 1296 if (sock->base.group_impl) { 1297 assert(sock->socket_has_data == true); 1298 } 1299 #endif 1300 1301 sock->pipe_has_data = true; 1302 if (bytes_recvd < bytes_avail) { 1303 /* We drained the kernel socket entirely. */ 1304 sock->socket_has_data = false; 1305 } 1306 1307 return bytes_recvd; 1308 } 1309 1310 static ssize_t 1311 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1312 { 1313 struct spdk_posix_sock *sock = __posix_sock(_sock); 1314 struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl); 1315 int rc, i; 1316 size_t len; 1317 1318 if (sock->recv_pipe == NULL) { 1319 assert(sock->pipe_has_data == false); 1320 if (group && sock->socket_has_data) { 1321 sock->socket_has_data = false; 1322 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1323 } 1324 if (sock->ssl) { 1325 return SSL_readv(sock->ssl, iov, iovcnt); 1326 } else { 1327 return readv(sock->fd, iov, iovcnt); 1328 } 1329 } 1330 1331 /* If the socket is not in a group, we must assume it always has 1332 * data waiting for us because it is not epolled */ 1333 if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { 1334 /* If the user is receiving a sufficiently large amount of data, 1335 * receive directly to their buffers. */ 1336 len = 0; 1337 for (i = 0; i < iovcnt; i++) { 1338 len += iov[i].iov_len; 1339 } 1340 1341 if (len >= MIN_SOCK_PIPE_SIZE) { 1342 /* TODO: Should this detect if kernel socket is drained? */ 1343 if (sock->ssl) { 1344 return SSL_readv(sock->ssl, iov, iovcnt); 1345 } else { 1346 return readv(sock->fd, iov, iovcnt); 1347 } 1348 } 1349 1350 /* Otherwise, do a big read into our pipe */ 1351 rc = posix_sock_read(sock); 1352 if (rc <= 0) { 1353 return rc; 1354 } 1355 } 1356 1357 return posix_sock_recv_from_pipe(sock, iov, iovcnt); 1358 } 1359 1360 static ssize_t 1361 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 1362 { 1363 struct iovec iov[1]; 1364 1365 iov[0].iov_base = buf; 1366 iov[0].iov_len = len; 1367 1368 return posix_sock_readv(sock, iov, 1); 1369 } 1370 1371 static ssize_t 1372 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) 1373 { 1374 struct spdk_posix_sock *sock = __posix_sock(_sock); 1375 int rc; 1376 1377 /* In order to process a writev, we need to flush any asynchronous writes 1378 * first. */ 1379 rc = _sock_flush(_sock); 1380 if (rc < 0) { 1381 return rc; 1382 } 1383 1384 if (!TAILQ_EMPTY(&_sock->queued_reqs)) { 1385 /* We weren't able to flush all requests */ 1386 errno = EAGAIN; 1387 return -1; 1388 } 1389 1390 if (sock->ssl) { 1391 return SSL_writev(sock->ssl, iov, iovcnt); 1392 } else { 1393 return writev(sock->fd, iov, iovcnt); 1394 } 1395 } 1396 1397 static void 1398 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 1399 { 1400 int rc; 1401 1402 spdk_sock_request_queue(sock, req); 1403 1404 /* If there are a sufficient number queued, just flush them out immediately. */ 1405 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { 1406 rc = _sock_flush(sock); 1407 if (rc) { 1408 spdk_sock_abort_requests(sock); 1409 } 1410 } 1411 } 1412 1413 static int 1414 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) 1415 { 1416 struct spdk_posix_sock *sock = __posix_sock(_sock); 1417 int val; 1418 int rc; 1419 1420 assert(sock != NULL); 1421 1422 val = nbytes; 1423 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); 1424 if (rc != 0) { 1425 return -1; 1426 } 1427 return 0; 1428 } 1429 1430 static bool 1431 posix_sock_is_ipv6(struct spdk_sock *_sock) 1432 { 1433 struct spdk_posix_sock *sock = __posix_sock(_sock); 1434 struct sockaddr_storage sa; 1435 socklen_t salen; 1436 int rc; 1437 1438 assert(sock != NULL); 1439 1440 memset(&sa, 0, sizeof sa); 1441 salen = sizeof sa; 1442 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1443 if (rc != 0) { 1444 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1445 return false; 1446 } 1447 1448 return (sa.ss_family == AF_INET6); 1449 } 1450 1451 static bool 1452 posix_sock_is_ipv4(struct spdk_sock *_sock) 1453 { 1454 struct spdk_posix_sock *sock = __posix_sock(_sock); 1455 struct sockaddr_storage sa; 1456 socklen_t salen; 1457 int rc; 1458 1459 assert(sock != NULL); 1460 1461 memset(&sa, 0, sizeof sa); 1462 salen = sizeof sa; 1463 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); 1464 if (rc != 0) { 1465 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); 1466 return false; 1467 } 1468 1469 return (sa.ss_family == AF_INET); 1470 } 1471 1472 static bool 1473 posix_sock_is_connected(struct spdk_sock *_sock) 1474 { 1475 struct spdk_posix_sock *sock = __posix_sock(_sock); 1476 uint8_t byte; 1477 int rc; 1478 1479 rc = recv(sock->fd, &byte, 1, MSG_PEEK); 1480 if (rc == 0) { 1481 return false; 1482 } 1483 1484 if (rc < 0) { 1485 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1486 return true; 1487 } 1488 1489 return false; 1490 } 1491 1492 return true; 1493 } 1494 1495 static struct spdk_sock_group_impl * 1496 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint) 1497 { 1498 struct spdk_posix_sock *sock = __posix_sock(_sock); 1499 struct spdk_sock_group_impl *group_impl; 1500 1501 if (sock->placement_id != -1) { 1502 spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint); 1503 return group_impl; 1504 } 1505 1506 return NULL; 1507 } 1508 1509 static struct spdk_sock_group_impl * 1510 posix_sock_group_impl_create(void) 1511 { 1512 struct spdk_posix_sock_group_impl *group_impl; 1513 int fd; 1514 1515 #if defined(SPDK_EPOLL) 1516 fd = epoll_create1(0); 1517 #elif defined(SPDK_KEVENT) 1518 fd = kqueue(); 1519 #endif 1520 if (fd == -1) { 1521 return NULL; 1522 } 1523 1524 group_impl = calloc(1, sizeof(*group_impl)); 1525 if (group_impl == NULL) { 1526 SPDK_ERRLOG("group_impl allocation failed\n"); 1527 close(fd); 1528 return NULL; 1529 } 1530 1531 group_impl->fd = fd; 1532 TAILQ_INIT(&group_impl->socks_with_data); 1533 group_impl->placement_id = -1; 1534 1535 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1536 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base); 1537 group_impl->placement_id = spdk_env_get_current_core(); 1538 } 1539 1540 return &group_impl->base; 1541 } 1542 1543 static void 1544 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock, 1545 int placement_id) 1546 { 1547 #if defined(SO_MARK) 1548 int rc; 1549 1550 rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK, 1551 &placement_id, sizeof(placement_id)); 1552 if (rc != 0) { 1553 /* Not fatal */ 1554 SPDK_ERRLOG("Error setting SO_MARK\n"); 1555 return; 1556 } 1557 1558 rc = spdk_sock_map_insert(&g_map, placement_id, &group->base); 1559 if (rc != 0) { 1560 /* Not fatal */ 1561 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1562 return; 1563 } 1564 1565 sock->placement_id = placement_id; 1566 #endif 1567 } 1568 1569 static void 1570 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1571 { 1572 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1573 1574 if (group->placement_id == -1) { 1575 group->placement_id = spdk_sock_map_find_free(&g_map); 1576 1577 /* If a free placement id is found, update existing sockets in this group */ 1578 if (group->placement_id != -1) { 1579 struct spdk_sock *sock, *tmp; 1580 1581 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1582 posix_sock_mark(group, __posix_sock(sock), group->placement_id); 1583 } 1584 } 1585 } 1586 1587 if (group->placement_id != -1) { 1588 /* 1589 * group placement id is already determined for this poll group. 1590 * Mark socket with group's placement id. 1591 */ 1592 posix_sock_mark(group, __posix_sock(_sock), group->placement_id); 1593 } 1594 } 1595 1596 static int 1597 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1598 { 1599 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1600 struct spdk_posix_sock *sock = __posix_sock(_sock); 1601 int rc; 1602 1603 #if defined(SPDK_EPOLL) 1604 struct epoll_event event; 1605 1606 memset(&event, 0, sizeof(event)); 1607 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ 1608 event.events = EPOLLIN | EPOLLERR; 1609 event.data.ptr = sock; 1610 1611 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); 1612 #elif defined(SPDK_KEVENT) 1613 struct kevent event; 1614 struct timespec ts = {0}; 1615 1616 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); 1617 1618 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1619 #endif 1620 1621 if (rc != 0) { 1622 return rc; 1623 } 1624 1625 /* switched from another polling group due to scheduling */ 1626 if (spdk_unlikely(sock->recv_pipe != NULL && 1627 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { 1628 sock->pipe_has_data = true; 1629 sock->socket_has_data = false; 1630 TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); 1631 } 1632 1633 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { 1634 posix_sock_update_mark(_group, _sock); 1635 } else if (sock->placement_id != -1) { 1636 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base); 1637 if (rc != 0) { 1638 SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc); 1639 /* Do not treat this as an error. The system will continue running. */ 1640 } 1641 } 1642 1643 return rc; 1644 } 1645 1646 static int 1647 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) 1648 { 1649 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1650 struct spdk_posix_sock *sock = __posix_sock(_sock); 1651 int rc; 1652 1653 if (sock->pipe_has_data || sock->socket_has_data) { 1654 TAILQ_REMOVE(&group->socks_with_data, sock, link); 1655 sock->pipe_has_data = false; 1656 sock->socket_has_data = false; 1657 } 1658 1659 if (sock->placement_id != -1) { 1660 spdk_sock_map_release(&g_map, sock->placement_id); 1661 } 1662 1663 #if defined(SPDK_EPOLL) 1664 struct epoll_event event; 1665 1666 /* Event parameter is ignored but some old kernel version still require it. */ 1667 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); 1668 #elif defined(SPDK_KEVENT) 1669 struct kevent event; 1670 struct timespec ts = {0}; 1671 1672 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 1673 1674 rc = kevent(group->fd, &event, 1, NULL, 0, &ts); 1675 if (rc == 0 && event.flags & EV_ERROR) { 1676 rc = -1; 1677 errno = event.data; 1678 } 1679 #endif 1680 1681 spdk_sock_abort_requests(_sock); 1682 1683 return rc; 1684 } 1685 1686 static int 1687 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, 1688 struct spdk_sock **socks) 1689 { 1690 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1691 struct spdk_sock *sock, *tmp; 1692 int num_events, i, rc; 1693 struct spdk_posix_sock *psock, *ptmp; 1694 #if defined(SPDK_EPOLL) 1695 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1696 #elif defined(SPDK_KEVENT) 1697 struct kevent events[MAX_EVENTS_PER_POLL]; 1698 struct timespec ts = {0}; 1699 #endif 1700 1701 #ifdef SPDK_ZEROCOPY 1702 /* When all of the following conditions are met 1703 * - non-blocking socket 1704 * - zero copy is enabled 1705 * - interrupts suppressed (i.e. busy polling) 1706 * - the NIC tx queue is full at the time sendmsg() is called 1707 * - epoll_wait determines there is an EPOLLIN event for the socket 1708 * then we can get into a situation where data we've sent is queued 1709 * up in the kernel network stack, but interrupts have been suppressed 1710 * because other traffic is flowing so the kernel misses the signal 1711 * to flush the software tx queue. If there wasn't incoming data 1712 * pending on the socket, then epoll_wait would have been sufficient 1713 * to kick off the send operation, but since there is a pending event 1714 * epoll_wait does not trigger the necessary operation. 1715 * 1716 * We deal with this by checking for all of the above conditions and 1717 * additionally looking for EPOLLIN events that were not consumed from 1718 * the last poll loop. We take this to mean that the upper layer is 1719 * unable to consume them because it is blocked waiting for resources 1720 * to free up, and those resources are most likely freed in response 1721 * to a pending asynchronous write completing. 1722 * 1723 * Additionally, sockets that have the same placement_id actually share 1724 * an underlying hardware queue. That means polling one of them is 1725 * equivalent to polling all of them. As a quick mechanism to avoid 1726 * making extra poll() calls, stash the last placement_id during the loop 1727 * and only poll if it's not the same. The overwhelmingly common case 1728 * is that all sockets in this list have the same placement_id because 1729 * SPDK is intentionally grouping sockets by that value, so even 1730 * though this won't stop all extra calls to poll(), it's very fast 1731 * and will catch all of them in practice. 1732 */ 1733 int last_placement_id = -1; 1734 1735 TAILQ_FOREACH(psock, &group->socks_with_data, link) { 1736 if (psock->zcopy && psock->placement_id >= 0 && 1737 psock->placement_id != last_placement_id) { 1738 struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; 1739 1740 poll(&pfd, 1, 0); 1741 last_placement_id = psock->placement_id; 1742 } 1743 } 1744 #endif 1745 1746 /* This must be a TAILQ_FOREACH_SAFE because while flushing, 1747 * a completion callback could remove the sock from the 1748 * group. */ 1749 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { 1750 rc = _sock_flush(sock); 1751 if (rc) { 1752 spdk_sock_abort_requests(sock); 1753 } 1754 } 1755 1756 assert(max_events > 0); 1757 1758 #if defined(SPDK_EPOLL) 1759 num_events = epoll_wait(group->fd, events, max_events, 0); 1760 #elif defined(SPDK_KEVENT) 1761 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); 1762 #endif 1763 1764 if (num_events == -1) { 1765 return -1; 1766 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { 1767 sock = TAILQ_FIRST(&_group->socks); 1768 psock = __posix_sock(sock); 1769 /* poll() is called here to busy poll the queue associated with 1770 * first socket in list and potentially reap incoming data. 1771 */ 1772 if (sock->opts.priority) { 1773 struct pollfd pfd = {0, 0, 0}; 1774 1775 pfd.fd = psock->fd; 1776 pfd.events = POLLIN | POLLERR; 1777 poll(&pfd, 1, 0); 1778 } 1779 } 1780 1781 for (i = 0; i < num_events; i++) { 1782 #if defined(SPDK_EPOLL) 1783 sock = events[i].data.ptr; 1784 psock = __posix_sock(sock); 1785 1786 #ifdef SPDK_ZEROCOPY 1787 if (events[i].events & EPOLLERR) { 1788 rc = _sock_check_zcopy(sock); 1789 /* If the socket was closed or removed from 1790 * the group in response to a send ack, don't 1791 * add it to the array here. */ 1792 if (rc || sock->cb_fn == NULL) { 1793 continue; 1794 } 1795 } 1796 #endif 1797 if ((events[i].events & EPOLLIN) == 0) { 1798 continue; 1799 } 1800 1801 #elif defined(SPDK_KEVENT) 1802 sock = events[i].udata; 1803 psock = __posix_sock(sock); 1804 #endif 1805 1806 /* If the socket is not already in the list, add it now */ 1807 if (!psock->socket_has_data && !psock->pipe_has_data) { 1808 TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); 1809 } 1810 psock->socket_has_data = true; 1811 } 1812 1813 num_events = 0; 1814 1815 TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { 1816 if (num_events == max_events) { 1817 break; 1818 } 1819 1820 /* If the socket's cb_fn is NULL, just remove it from the 1821 * list and do not add it to socks array */ 1822 if (spdk_unlikely(psock->base.cb_fn == NULL)) { 1823 psock->socket_has_data = false; 1824 psock->pipe_has_data = false; 1825 TAILQ_REMOVE(&group->socks_with_data, psock, link); 1826 continue; 1827 } 1828 1829 socks[num_events++] = &psock->base; 1830 } 1831 1832 /* Cycle the has_data list so that each time we poll things aren't 1833 * in the same order. Say we have 6 sockets in the list, named as follows: 1834 * A B C D E F 1835 * And all 6 sockets had epoll events, but max_events is only 3. That means 1836 * psock currently points at D. We want to rearrange the list to the following: 1837 * D E F A B C 1838 * 1839 * The variables below are named according to this example to make it easier to 1840 * follow the swaps. 1841 */ 1842 if (psock != NULL) { 1843 struct spdk_posix_sock *pa, *pc, *pd, *pf; 1844 1845 /* Capture pointers to the elements we need */ 1846 pd = psock; 1847 pc = TAILQ_PREV(pd, spdk_has_data_list, link); 1848 pa = TAILQ_FIRST(&group->socks_with_data); 1849 pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); 1850 1851 /* Break the link between C and D */ 1852 pc->link.tqe_next = NULL; 1853 1854 /* Connect F to A */ 1855 pf->link.tqe_next = pa; 1856 pa->link.tqe_prev = &pf->link.tqe_next; 1857 1858 /* Fix up the list first/last pointers */ 1859 group->socks_with_data.tqh_first = pd; 1860 group->socks_with_data.tqh_last = &pc->link.tqe_next; 1861 1862 /* D is in front of the list, make tqe prev pointer point to the head of list */ 1863 pd->link.tqe_prev = &group->socks_with_data.tqh_first; 1864 } 1865 1866 return num_events; 1867 } 1868 1869 static int 1870 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) 1871 { 1872 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); 1873 int rc; 1874 1875 if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { 1876 spdk_sock_map_release(&g_map, spdk_env_get_current_core()); 1877 } 1878 1879 rc = close(group->fd); 1880 free(group); 1881 return rc; 1882 } 1883 1884 static int 1885 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) 1886 { 1887 if (!opts || !len) { 1888 errno = EINVAL; 1889 return -1; 1890 } 1891 memset(opts, 0, *len); 1892 1893 #define FIELD_OK(field) \ 1894 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len 1895 1896 #define GET_FIELD(field) \ 1897 if (FIELD_OK(field)) { \ 1898 opts->field = g_spdk_posix_sock_impl_opts.field; \ 1899 } 1900 1901 GET_FIELD(recv_buf_size); 1902 GET_FIELD(send_buf_size); 1903 GET_FIELD(enable_recv_pipe); 1904 GET_FIELD(enable_zerocopy_send); 1905 GET_FIELD(enable_quickack); 1906 GET_FIELD(enable_placement_id); 1907 GET_FIELD(enable_zerocopy_send_server); 1908 GET_FIELD(enable_zerocopy_send_client); 1909 GET_FIELD(zerocopy_threshold); 1910 1911 #undef GET_FIELD 1912 #undef FIELD_OK 1913 1914 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); 1915 return 0; 1916 } 1917 1918 static int 1919 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) 1920 { 1921 if (!opts) { 1922 errno = EINVAL; 1923 return -1; 1924 } 1925 1926 #define FIELD_OK(field) \ 1927 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len 1928 1929 #define SET_FIELD(field) \ 1930 if (FIELD_OK(field)) { \ 1931 g_spdk_posix_sock_impl_opts.field = opts->field; \ 1932 } 1933 1934 SET_FIELD(recv_buf_size); 1935 SET_FIELD(send_buf_size); 1936 SET_FIELD(enable_recv_pipe); 1937 SET_FIELD(enable_zerocopy_send); 1938 SET_FIELD(enable_quickack); 1939 SET_FIELD(enable_placement_id); 1940 SET_FIELD(enable_zerocopy_send_server); 1941 SET_FIELD(enable_zerocopy_send_client); 1942 SET_FIELD(zerocopy_threshold); 1943 1944 #undef SET_FIELD 1945 #undef FIELD_OK 1946 1947 return 0; 1948 } 1949 1950 1951 static struct spdk_net_impl g_posix_net_impl = { 1952 .name = "posix", 1953 .getaddr = posix_sock_getaddr, 1954 .connect = posix_sock_connect, 1955 .listen = posix_sock_listen, 1956 .accept = posix_sock_accept, 1957 .close = posix_sock_close, 1958 .recv = posix_sock_recv, 1959 .readv = posix_sock_readv, 1960 .writev = posix_sock_writev, 1961 .writev_async = posix_sock_writev_async, 1962 .flush = posix_sock_flush, 1963 .set_recvlowat = posix_sock_set_recvlowat, 1964 .set_recvbuf = posix_sock_set_recvbuf, 1965 .set_sendbuf = posix_sock_set_sendbuf, 1966 .is_ipv6 = posix_sock_is_ipv6, 1967 .is_ipv4 = posix_sock_is_ipv4, 1968 .is_connected = posix_sock_is_connected, 1969 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 1970 .group_impl_create = posix_sock_group_impl_create, 1971 .group_impl_add_sock = posix_sock_group_impl_add_sock, 1972 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 1973 .group_impl_poll = posix_sock_group_impl_poll, 1974 .group_impl_close = posix_sock_group_impl_close, 1975 .get_opts = posix_sock_impl_get_opts, 1976 .set_opts = posix_sock_impl_set_opts, 1977 }; 1978 1979 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); 1980 1981 static struct spdk_sock * 1982 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) 1983 { 1984 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true); 1985 } 1986 1987 static struct spdk_sock * 1988 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) 1989 { 1990 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true); 1991 } 1992 1993 static struct spdk_net_impl g_ssl_net_impl = { 1994 .name = "ssl", 1995 .getaddr = posix_sock_getaddr, 1996 .connect = ssl_sock_connect, 1997 .listen = ssl_sock_listen, 1998 .accept = posix_sock_accept, 1999 .close = posix_sock_close, 2000 .recv = posix_sock_recv, 2001 .readv = posix_sock_readv, 2002 .writev = posix_sock_writev, 2003 .writev_async = posix_sock_writev_async, 2004 .flush = posix_sock_flush, 2005 .set_recvlowat = posix_sock_set_recvlowat, 2006 .set_recvbuf = posix_sock_set_recvbuf, 2007 .set_sendbuf = posix_sock_set_sendbuf, 2008 .is_ipv6 = posix_sock_is_ipv6, 2009 .is_ipv4 = posix_sock_is_ipv4, 2010 .is_connected = posix_sock_is_connected, 2011 .group_impl_get_optimal = posix_sock_group_impl_get_optimal, 2012 .group_impl_create = posix_sock_group_impl_create, 2013 .group_impl_add_sock = posix_sock_group_impl_add_sock, 2014 .group_impl_remove_sock = posix_sock_group_impl_remove_sock, 2015 .group_impl_poll = posix_sock_group_impl_poll, 2016 .group_impl_close = posix_sock_group_impl_close, 2017 .get_opts = posix_sock_impl_get_opts, 2018 .set_opts = posix_sock_impl_set_opts, 2019 }; 2020 2021 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl, DEFAULT_SOCK_PRIORITY); 2022 SPDK_LOG_REGISTER_COMPONENT(sock_posix) 2023