xref: /spdk/module/sock/posix/posix.c (revision 03b6183a3c6c54099cc23d23ef9730c9ff87be83)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #if defined(__FreeBSD__)
10 #include <sys/event.h>
11 #define SPDK_KEVENT
12 #else
13 #include <sys/epoll.h>
14 #define SPDK_EPOLL
15 #endif
16 
17 #if defined(__linux__)
18 #include <linux/errqueue.h>
19 #endif
20 
21 #include "spdk/env.h"
22 #include "spdk/log.h"
23 #include "spdk/pipe.h"
24 #include "spdk/sock.h"
25 #include "spdk/util.h"
26 #include "spdk/string.h"
27 #include "spdk_internal/sock.h"
28 #include "../sock_kernel.h"
29 
30 #include "openssl/crypto.h"
31 #include "openssl/err.h"
32 #include "openssl/ssl.h"
33 
34 #define MAX_TMPBUF 1024
35 #define PORTNUMLEN 32
36 
37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
38 #define SPDK_ZEROCOPY
39 #endif
40 
41 struct spdk_posix_sock {
42 	struct spdk_sock	base;
43 	int			fd;
44 
45 	uint32_t		sendmsg_idx;
46 
47 	struct spdk_pipe	*recv_pipe;
48 	void			*recv_buf;
49 	int			recv_buf_sz;
50 	bool			pipe_has_data;
51 	bool			socket_has_data;
52 	bool			zcopy;
53 
54 	int			placement_id;
55 
56 	SSL_CTX			*ctx;
57 	SSL			*ssl;
58 
59 	TAILQ_ENTRY(spdk_posix_sock)	link;
60 };
61 
62 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
63 
64 struct spdk_posix_sock_group_impl {
65 	struct spdk_sock_group_impl	base;
66 	int				fd;
67 	struct spdk_has_data_list	socks_with_data;
68 	int				placement_id;
69 };
70 
71 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
72 	.recv_buf_size = DEFAULT_SO_RCVBUF_SIZE,
73 	.send_buf_size = DEFAULT_SO_SNDBUF_SIZE,
74 	.enable_recv_pipe = true,
75 	.enable_quickack = false,
76 	.enable_placement_id = PLACEMENT_NONE,
77 	.enable_zerocopy_send_server = true,
78 	.enable_zerocopy_send_client = false,
79 	.zerocopy_threshold = 0,
80 	.tls_version = 0,
81 	.enable_ktls = false,
82 	.psk_key = NULL,
83 	.psk_key_size = 0,
84 	.psk_identity = NULL,
85 	.get_key = NULL,
86 	.get_key_ctx = NULL,
87 	.tls_cipher_suites = NULL
88 };
89 
90 static struct spdk_sock_map g_map = {
91 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
92 	.mtx = PTHREAD_MUTEX_INITIALIZER
93 };
94 
95 __attribute((destructor)) static void
96 posix_sock_map_cleanup(void)
97 {
98 	spdk_sock_map_cleanup(&g_map);
99 }
100 
101 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
102 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
103 
104 static void
105 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
106 			  size_t len)
107 {
108 #define FIELD_OK(field) \
109 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
110 
111 #define SET_FIELD(field) \
112 	if (FIELD_OK(field)) { \
113 		dest->field = src->field; \
114 	}
115 
116 	SET_FIELD(recv_buf_size);
117 	SET_FIELD(send_buf_size);
118 	SET_FIELD(enable_recv_pipe);
119 	SET_FIELD(enable_zerocopy_send);
120 	SET_FIELD(enable_quickack);
121 	SET_FIELD(enable_placement_id);
122 	SET_FIELD(enable_zerocopy_send_server);
123 	SET_FIELD(enable_zerocopy_send_client);
124 	SET_FIELD(zerocopy_threshold);
125 	SET_FIELD(tls_version);
126 	SET_FIELD(enable_ktls);
127 	SET_FIELD(psk_key);
128 	SET_FIELD(psk_key_size);
129 	SET_FIELD(psk_identity);
130 	SET_FIELD(get_key);
131 	SET_FIELD(get_key_ctx);
132 	SET_FIELD(tls_cipher_suites);
133 
134 #undef SET_FIELD
135 #undef FIELD_OK
136 }
137 
138 static int
139 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
140 {
141 	if (!opts || !len) {
142 		errno = EINVAL;
143 		return -1;
144 	}
145 
146 	assert(sizeof(*opts) >= *len);
147 	memset(opts, 0, *len);
148 
149 	posix_sock_copy_impl_opts(opts, &g_spdk_posix_sock_impl_opts, *len);
150 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
151 
152 	return 0;
153 }
154 
155 static int
156 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
157 {
158 	if (!opts) {
159 		errno = EINVAL;
160 		return -1;
161 	}
162 
163 	assert(sizeof(*opts) >= len);
164 	posix_sock_copy_impl_opts(&g_spdk_posix_sock_impl_opts, opts, len);
165 
166 	return 0;
167 }
168 
169 static void
170 posix_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
171 {
172 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
173 	memcpy(dest, &g_spdk_posix_sock_impl_opts, sizeof(*dest));
174 
175 	if (opts->impl_opts != NULL) {
176 		assert(sizeof(*dest) >= opts->impl_opts_size);
177 		posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
178 	}
179 }
180 
181 static int
182 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
183 		   char *caddr, int clen, uint16_t *cport)
184 {
185 	struct spdk_posix_sock *sock = __posix_sock(_sock);
186 	struct sockaddr_storage sa;
187 	socklen_t salen;
188 	int rc;
189 
190 	assert(sock != NULL);
191 
192 	memset(&sa, 0, sizeof sa);
193 	salen = sizeof sa;
194 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
195 	if (rc != 0) {
196 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
197 		return -1;
198 	}
199 
200 	switch (sa.ss_family) {
201 	case AF_UNIX:
202 		/* Acceptable connection types that don't have IPs */
203 		return 0;
204 	case AF_INET:
205 	case AF_INET6:
206 		/* Code below will get IP addresses */
207 		break;
208 	default:
209 		/* Unsupported socket family */
210 		return -1;
211 	}
212 
213 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
214 	if (rc != 0) {
215 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
216 		return -1;
217 	}
218 
219 	if (sport) {
220 		if (sa.ss_family == AF_INET) {
221 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
222 		} else if (sa.ss_family == AF_INET6) {
223 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
224 		}
225 	}
226 
227 	memset(&sa, 0, sizeof sa);
228 	salen = sizeof sa;
229 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
230 	if (rc != 0) {
231 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
232 		return -1;
233 	}
234 
235 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
236 	if (rc != 0) {
237 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
238 		return -1;
239 	}
240 
241 	if (cport) {
242 		if (sa.ss_family == AF_INET) {
243 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
244 		} else if (sa.ss_family == AF_INET6) {
245 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
246 		}
247 	}
248 
249 	return 0;
250 }
251 
252 enum posix_sock_create_type {
253 	SPDK_SOCK_CREATE_LISTEN,
254 	SPDK_SOCK_CREATE_CONNECT,
255 };
256 
257 static int
258 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
259 {
260 	uint8_t *new_buf;
261 	struct spdk_pipe *new_pipe;
262 	struct iovec siov[2];
263 	struct iovec diov[2];
264 	int sbytes;
265 	ssize_t bytes;
266 	int rc;
267 
268 	if (sock->recv_buf_sz == sz) {
269 		return 0;
270 	}
271 
272 	/* If the new size is 0, just free the pipe */
273 	if (sz == 0) {
274 		spdk_pipe_destroy(sock->recv_pipe);
275 		free(sock->recv_buf);
276 		sock->recv_pipe = NULL;
277 		sock->recv_buf = NULL;
278 		return 0;
279 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
280 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
281 		return -1;
282 	}
283 
284 	/* Round up to next 64 byte multiple */
285 	rc = posix_memalign((void **)&new_buf, 64, sz);
286 	if (rc != 0) {
287 		SPDK_ERRLOG("socket recv buf allocation failed\n");
288 		return -ENOMEM;
289 	}
290 	memset(new_buf, 0, sz);
291 
292 	new_pipe = spdk_pipe_create(new_buf, sz);
293 	if (new_pipe == NULL) {
294 		SPDK_ERRLOG("socket pipe allocation failed\n");
295 		free(new_buf);
296 		return -ENOMEM;
297 	}
298 
299 	if (sock->recv_pipe != NULL) {
300 		/* Pull all of the data out of the old pipe */
301 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
302 		if (sbytes > sz) {
303 			/* Too much data to fit into the new pipe size */
304 			spdk_pipe_destroy(new_pipe);
305 			free(new_buf);
306 			return -EINVAL;
307 		}
308 
309 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
310 		assert(sbytes == sz);
311 
312 		bytes = spdk_iovcpy(siov, 2, diov, 2);
313 		spdk_pipe_writer_advance(new_pipe, bytes);
314 
315 		spdk_pipe_destroy(sock->recv_pipe);
316 		free(sock->recv_buf);
317 	}
318 
319 	sock->recv_buf_sz = sz;
320 	sock->recv_buf = new_buf;
321 	sock->recv_pipe = new_pipe;
322 
323 	return 0;
324 }
325 
326 static int
327 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
328 {
329 	struct spdk_posix_sock *sock = __posix_sock(_sock);
330 	int min_size;
331 	int rc;
332 
333 	assert(sock != NULL);
334 
335 	if (_sock->impl_opts.enable_recv_pipe) {
336 		rc = posix_sock_alloc_pipe(sock, sz);
337 		if (rc) {
338 			return rc;
339 		}
340 	}
341 
342 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
343 	 * g_spdk_posix_sock_impl_opts.recv_buf_size. */
344 	min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_posix_sock_impl_opts.recv_buf_size);
345 
346 	if (sz < min_size) {
347 		sz = min_size;
348 	}
349 
350 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
351 	if (rc < 0) {
352 		return rc;
353 	}
354 
355 	_sock->impl_opts.recv_buf_size = sz;
356 
357 	return 0;
358 }
359 
360 static int
361 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
362 {
363 	struct spdk_posix_sock *sock = __posix_sock(_sock);
364 	int min_size;
365 	int rc;
366 
367 	assert(sock != NULL);
368 
369 	/* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
370 	 * g_spdk_posix_sock_impl_opts.send_buf_size. */
371 	min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_posix_sock_impl_opts.send_buf_size);
372 
373 	if (sz < min_size) {
374 		sz = min_size;
375 	}
376 
377 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
378 	if (rc < 0) {
379 		return rc;
380 	}
381 
382 	_sock->impl_opts.send_buf_size = sz;
383 
384 	return 0;
385 }
386 
387 static void
388 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy)
389 {
390 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
391 	int flag;
392 	int rc;
393 #endif
394 
395 #if defined(SPDK_ZEROCOPY)
396 	flag = 1;
397 
398 	if (enable_zero_copy) {
399 		/* Try to turn on zero copy sends */
400 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
401 		if (rc == 0) {
402 			sock->zcopy = true;
403 		}
404 	}
405 #endif
406 
407 #if defined(__linux__)
408 	flag = 1;
409 
410 	if (sock->base.impl_opts.enable_quickack) {
411 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
412 		if (rc != 0) {
413 			SPDK_ERRLOG("quickack was failed to set\n");
414 		}
415 	}
416 
417 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
418 				   &sock->placement_id);
419 
420 	if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) {
421 		/* Save placement_id */
422 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
423 	}
424 #endif
425 }
426 
427 static struct spdk_posix_sock *
428 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
429 {
430 	struct spdk_posix_sock *sock;
431 
432 	sock = calloc(1, sizeof(*sock));
433 	if (sock == NULL) {
434 		SPDK_ERRLOG("sock allocation failed\n");
435 		return NULL;
436 	}
437 
438 	sock->fd = fd;
439 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
440 	posix_sock_init(sock, enable_zero_copy);
441 
442 	return sock;
443 }
444 
445 static int
446 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts,
447 		struct spdk_sock_impl_opts *impl_opts)
448 {
449 	int fd;
450 	int val = 1;
451 	int rc, sz;
452 #if defined(__linux__)
453 	int to;
454 #endif
455 
456 	fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
457 	if (fd < 0) {
458 		/* error */
459 		return -1;
460 	}
461 
462 	sz = impl_opts->recv_buf_size;
463 	rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
464 	if (rc) {
465 		/* Not fatal */
466 	}
467 
468 	sz = impl_opts->send_buf_size;
469 	rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
470 	if (rc) {
471 		/* Not fatal */
472 	}
473 
474 	rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
475 	if (rc != 0) {
476 		close(fd);
477 		/* error */
478 		return -1;
479 	}
480 	rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
481 	if (rc != 0) {
482 		close(fd);
483 		/* error */
484 		return -1;
485 	}
486 
487 #if defined(SO_PRIORITY)
488 	if (opts->priority) {
489 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
490 		if (rc != 0) {
491 			close(fd);
492 			/* error */
493 			return -1;
494 		}
495 	}
496 #endif
497 
498 	if (res->ai_family == AF_INET6) {
499 		rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
500 		if (rc != 0) {
501 			close(fd);
502 			/* error */
503 			return -1;
504 		}
505 	}
506 
507 	if (opts->ack_timeout) {
508 #if defined(__linux__)
509 		to = opts->ack_timeout;
510 		rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to));
511 		if (rc != 0) {
512 			close(fd);
513 			/* error */
514 			return -1;
515 		}
516 #else
517 		SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
518 #endif
519 	}
520 
521 	return fd;
522 }
523 
524 static int
525 posix_sock_psk_find_session_server_cb(SSL *ssl, const unsigned char *identity,
526 				      size_t identity_len, SSL_SESSION **sess)
527 {
528 	struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl);
529 	uint8_t key[SSL_MAX_MASTER_KEY_LENGTH] = {};
530 	int keylen;
531 	int rc, i;
532 	STACK_OF(SSL_CIPHER) *ciphers;
533 	const SSL_CIPHER *cipher;
534 	const char *cipher_name;
535 	const char *user_cipher = NULL;
536 	bool found = false;
537 
538 	if (impl_opts->get_key) {
539 		rc = impl_opts->get_key(key, sizeof(key), &user_cipher, identity, impl_opts->get_key_ctx);
540 		if (rc < 0) {
541 			SPDK_ERRLOG("Unable to find PSK for identity: %s\n", identity);
542 			return 0;
543 		}
544 		keylen = rc;
545 	} else {
546 		if (impl_opts->psk_key == NULL) {
547 			SPDK_ERRLOG("PSK is not set\n");
548 			return 0;
549 		}
550 
551 		SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(impl_opts->psk_identity));
552 		if (strcmp(impl_opts->psk_identity, identity) != 0) {
553 			SPDK_ERRLOG("Unknown Client's PSK ID\n");
554 			return 0;
555 		}
556 		keylen = impl_opts->psk_key_size;
557 
558 		memcpy(key, impl_opts->psk_key, keylen);
559 		user_cipher = impl_opts->tls_cipher_suites;
560 	}
561 
562 	if (user_cipher == NULL) {
563 		SPDK_ERRLOG("Cipher suite not set\n");
564 		return 0;
565 	}
566 
567 	*sess = SSL_SESSION_new();
568 	if (*sess == NULL) {
569 		SPDK_ERRLOG("Unable to allocate new SSL session\n");
570 		return 0;
571 	}
572 
573 	ciphers = SSL_get_ciphers(ssl);
574 	for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) {
575 		cipher = sk_SSL_CIPHER_value(ciphers, i);
576 		cipher_name = SSL_CIPHER_get_name(cipher);
577 
578 		if (strcmp(user_cipher, cipher_name) == 0) {
579 			rc = SSL_SESSION_set_cipher(*sess, cipher);
580 			if (rc != 1) {
581 				SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name);
582 				goto err;
583 			}
584 			found = true;
585 			break;
586 		}
587 	}
588 	if (found == false) {
589 		SPDK_ERRLOG("No suitable cipher found\n");
590 		goto err;
591 	}
592 
593 	SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name);
594 
595 	rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION);
596 	if (rc != 1) {
597 		SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION);
598 		goto err;
599 	}
600 
601 	rc = SSL_SESSION_set1_master_key(*sess, key, keylen);
602 	if (rc != 1) {
603 		SPDK_ERRLOG("Unable to set PSK for session\n");
604 		goto err;
605 	}
606 
607 	return 1;
608 
609 err:
610 	SSL_SESSION_free(*sess);
611 	*sess = NULL;
612 	return 0;
613 }
614 
615 static int
616 posix_sock_psk_use_session_client_cb(SSL *ssl, const EVP_MD *md, const unsigned char **identity,
617 				     size_t *identity_len, SSL_SESSION **sess)
618 {
619 	struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl);
620 	int rc, i;
621 	STACK_OF(SSL_CIPHER) *ciphers;
622 	const SSL_CIPHER *cipher;
623 	const char *cipher_name;
624 	long keylen;
625 	bool found = false;
626 
627 	if (impl_opts->psk_key == NULL) {
628 		SPDK_ERRLOG("PSK is not set\n");
629 		return 0;
630 	}
631 	if (impl_opts->psk_key_size > SSL_MAX_MASTER_KEY_LENGTH) {
632 		SPDK_ERRLOG("PSK too long\n");
633 		return 0;
634 	}
635 	keylen = impl_opts->psk_key_size;
636 
637 	if (impl_opts->tls_cipher_suites == NULL) {
638 		SPDK_ERRLOG("Cipher suite not set\n");
639 		return 0;
640 	}
641 	*sess = SSL_SESSION_new();
642 	if (*sess == NULL) {
643 		SPDK_ERRLOG("Unable to allocate new SSL session\n");
644 		return 0;
645 	}
646 
647 	ciphers = SSL_get_ciphers(ssl);
648 	for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) {
649 		cipher = sk_SSL_CIPHER_value(ciphers, i);
650 		cipher_name = SSL_CIPHER_get_name(cipher);
651 
652 		if (strcmp(impl_opts->tls_cipher_suites, cipher_name) == 0) {
653 			rc = SSL_SESSION_set_cipher(*sess, cipher);
654 			if (rc != 1) {
655 				SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name);
656 				goto err;
657 			}
658 			found = true;
659 			break;
660 		}
661 	}
662 	if (found == false) {
663 		SPDK_ERRLOG("No suitable cipher found\n");
664 		goto err;
665 	}
666 
667 	SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name);
668 
669 	rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION);
670 	if (rc != 1) {
671 		SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION);
672 		goto err;
673 	}
674 
675 	rc = SSL_SESSION_set1_master_key(*sess, impl_opts->psk_key, keylen);
676 	if (rc != 1) {
677 		SPDK_ERRLOG("Unable to set PSK for session\n");
678 		goto err;
679 	}
680 
681 	*identity_len = strlen(impl_opts->psk_identity);
682 	*identity = impl_opts->psk_identity;
683 
684 	return 1;
685 
686 err:
687 	SSL_SESSION_free(*sess);
688 	*sess = NULL;
689 	return 0;
690 }
691 
692 static SSL_CTX *
693 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts,
694 			      struct spdk_sock_impl_opts *impl_opts)
695 {
696 	SSL_CTX *ctx;
697 	int tls_version = 0;
698 	bool ktls_enabled = false;
699 #ifdef SSL_OP_ENABLE_KTLS
700 	long options;
701 #endif
702 
703 	SSL_library_init();
704 	OpenSSL_add_all_algorithms();
705 	SSL_load_error_strings();
706 	/* Produce a SSL CTX in SSL V2 and V3 standards compliant way */
707 	ctx = SSL_CTX_new(method);
708 	if (!ctx) {
709 		SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
710 		return NULL;
711 	}
712 	SPDK_DEBUGLOG(sock_posix, "SSL context created\n");
713 
714 	switch (impl_opts->tls_version) {
715 	case 0:
716 		/* auto-negotioation */
717 		break;
718 	case SPDK_TLS_VERSION_1_3:
719 		tls_version = TLS1_3_VERSION;
720 		break;
721 	default:
722 		SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version);
723 		goto err;
724 	}
725 
726 	if (tls_version) {
727 		SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version,
728 			      tls_version);
729 		if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) {
730 			SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version);
731 			goto err;
732 		}
733 		if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) {
734 			SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version);
735 			goto err;
736 		}
737 	}
738 	if (impl_opts->enable_ktls) {
739 		SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n");
740 #ifdef SSL_OP_ENABLE_KTLS
741 		options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS);
742 		ktls_enabled = options & SSL_OP_ENABLE_KTLS;
743 #else
744 		ktls_enabled = false;
745 #endif
746 		if (!ktls_enabled) {
747 			SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n");
748 			goto err;
749 		}
750 	}
751 
752 	/* SSL_CTX_set_ciphersuites() return 1 if the requested
753 	 * cipher suite list was configured, and 0 otherwise. */
754 	if (impl_opts->tls_cipher_suites != NULL &&
755 	    SSL_CTX_set_ciphersuites(ctx, impl_opts->tls_cipher_suites) != 1) {
756 		SPDK_ERRLOG("Unable to set TLS cipher suites for SSL'\n");
757 		goto err;
758 	}
759 
760 	return ctx;
761 
762 err:
763 	SSL_CTX_free(ctx);
764 	return NULL;
765 }
766 
767 static SSL *
768 ssl_sock_connect_loop(SSL_CTX *ctx, int fd, struct spdk_sock_impl_opts *impl_opts)
769 {
770 	int rc;
771 	SSL *ssl;
772 	int ssl_get_error;
773 
774 	ssl = SSL_new(ctx);
775 	if (!ssl) {
776 		SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
777 		return NULL;
778 	}
779 	SSL_set_fd(ssl, fd);
780 	SSL_set_app_data(ssl, impl_opts);
781 	SSL_set_psk_use_session_callback(ssl, posix_sock_psk_use_session_client_cb);
782 	SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl);
783 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
784 	while ((rc = SSL_connect(ssl)) != 1) {
785 		SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
786 		ssl_get_error = SSL_get_error(ssl, rc);
787 		SPDK_DEBUGLOG(sock_posix, "SSL_connect failed %d = SSL_connect(%p), %d = SSL_get_error(%p, %d)\n",
788 			      rc, ssl, ssl_get_error, ssl, rc);
789 		switch (ssl_get_error) {
790 		case SSL_ERROR_WANT_READ:
791 		case SSL_ERROR_WANT_WRITE:
792 			continue;
793 		default:
794 			break;
795 		}
796 		SPDK_ERRLOG("SSL_connect() failed, errno = %d\n", errno);
797 		SSL_free(ssl);
798 		return NULL;
799 	}
800 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
801 	SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n",
802 		      SSL_CIPHER_get_name(SSL_get_current_cipher(ssl)));
803 	return ssl;
804 }
805 
806 static SSL *
807 ssl_sock_accept_loop(SSL_CTX *ctx, int fd, struct spdk_sock_impl_opts *impl_opts)
808 {
809 	int rc;
810 	SSL *ssl;
811 	int ssl_get_error;
812 
813 	ssl = SSL_new(ctx);
814 	if (!ssl) {
815 		SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
816 		return NULL;
817 	}
818 	SSL_set_fd(ssl, fd);
819 	SSL_set_app_data(ssl, impl_opts);
820 	SSL_set_psk_find_session_callback(ssl, posix_sock_psk_find_session_server_cb);
821 	SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl);
822 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
823 	while ((rc = SSL_accept(ssl)) != 1) {
824 		SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
825 		ssl_get_error = SSL_get_error(ssl, rc);
826 		SPDK_DEBUGLOG(sock_posix, "SSL_accept failed %d = SSL_accept(%p), %d = SSL_get_error(%p, %d)\n", rc,
827 			      ssl, ssl_get_error, ssl, rc);
828 		switch (ssl_get_error) {
829 		case SSL_ERROR_WANT_READ:
830 		case SSL_ERROR_WANT_WRITE:
831 			continue;
832 		default:
833 			break;
834 		}
835 		SPDK_ERRLOG("SSL_accept() failed, errno = %d\n", errno);
836 		SSL_free(ssl);
837 		return NULL;
838 	}
839 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
840 	SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n",
841 		      SSL_CIPHER_get_name(SSL_get_current_cipher(ssl)));
842 	return ssl;
843 }
844 
845 static ssize_t
846 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt)
847 {
848 	int i, rc = 0;
849 	ssize_t total = 0;
850 
851 	for (i = 0; i < iovcnt; i++) {
852 		rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len);
853 
854 		if (rc > 0) {
855 			total += rc;
856 		}
857 		if (rc != (int)iov[i].iov_len) {
858 			break;
859 		}
860 	}
861 	if (total > 0) {
862 		errno = 0;
863 		return total;
864 	}
865 	switch (SSL_get_error(ssl, rc)) {
866 	case SSL_ERROR_ZERO_RETURN:
867 		errno = ENOTCONN;
868 		return 0;
869 	case SSL_ERROR_WANT_READ:
870 	case SSL_ERROR_WANT_WRITE:
871 	case SSL_ERROR_WANT_CONNECT:
872 	case SSL_ERROR_WANT_ACCEPT:
873 	case SSL_ERROR_WANT_X509_LOOKUP:
874 	case SSL_ERROR_WANT_ASYNC:
875 	case SSL_ERROR_WANT_ASYNC_JOB:
876 	case SSL_ERROR_WANT_CLIENT_HELLO_CB:
877 		errno = EAGAIN;
878 		return -1;
879 	case SSL_ERROR_SYSCALL:
880 	case SSL_ERROR_SSL:
881 		errno = ENOTCONN;
882 		return -1;
883 	default:
884 		errno = ENOTCONN;
885 		return -1;
886 	}
887 }
888 
889 static ssize_t
890 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt)
891 {
892 	int i, rc = 0;
893 	ssize_t total = 0;
894 
895 	for (i = 0; i < iovcnt; i++) {
896 		rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len);
897 
898 		if (rc > 0) {
899 			total += rc;
900 		}
901 		if (rc != (int)iov[i].iov_len) {
902 			break;
903 		}
904 	}
905 	if (total > 0) {
906 		errno = 0;
907 		return total;
908 	}
909 	switch (SSL_get_error(ssl, rc)) {
910 	case SSL_ERROR_ZERO_RETURN:
911 		errno = ENOTCONN;
912 		return 0;
913 	case SSL_ERROR_WANT_READ:
914 	case SSL_ERROR_WANT_WRITE:
915 	case SSL_ERROR_WANT_CONNECT:
916 	case SSL_ERROR_WANT_ACCEPT:
917 	case SSL_ERROR_WANT_X509_LOOKUP:
918 	case SSL_ERROR_WANT_ASYNC:
919 	case SSL_ERROR_WANT_ASYNC_JOB:
920 	case SSL_ERROR_WANT_CLIENT_HELLO_CB:
921 		errno = EAGAIN;
922 		return -1;
923 	case SSL_ERROR_SYSCALL:
924 	case SSL_ERROR_SSL:
925 		errno = ENOTCONN;
926 		return -1;
927 	default:
928 		errno = ENOTCONN;
929 		return -1;
930 	}
931 }
932 
933 static struct spdk_sock *
934 posix_sock_create(const char *ip, int port,
935 		  enum posix_sock_create_type type,
936 		  struct spdk_sock_opts *opts,
937 		  bool enable_ssl)
938 {
939 	struct spdk_posix_sock *sock;
940 	struct spdk_sock_impl_opts impl_opts;
941 	char buf[MAX_TMPBUF];
942 	char portnum[PORTNUMLEN];
943 	char *p;
944 	struct addrinfo hints, *res, *res0;
945 	int fd, flag;
946 	int rc;
947 	bool enable_zcopy_user_opts = true;
948 	bool enable_zcopy_impl_opts = true;
949 	SSL_CTX *ctx = 0;
950 	SSL *ssl = 0;
951 
952 	assert(opts != NULL);
953 	posix_opts_get_impl_opts(opts, &impl_opts);
954 
955 	if (ip == NULL) {
956 		return NULL;
957 	}
958 	if (ip[0] == '[') {
959 		snprintf(buf, sizeof(buf), "%s", ip + 1);
960 		p = strchr(buf, ']');
961 		if (p != NULL) {
962 			*p = '\0';
963 		}
964 		ip = (const char *) &buf[0];
965 	}
966 
967 	snprintf(portnum, sizeof portnum, "%d", port);
968 	memset(&hints, 0, sizeof hints);
969 	hints.ai_family = PF_UNSPEC;
970 	hints.ai_socktype = SOCK_STREAM;
971 	hints.ai_flags = AI_NUMERICSERV;
972 	hints.ai_flags |= AI_PASSIVE;
973 	hints.ai_flags |= AI_NUMERICHOST;
974 	rc = getaddrinfo(ip, portnum, &hints, &res0);
975 	if (rc != 0) {
976 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
977 		return NULL;
978 	}
979 
980 	/* try listen */
981 	fd = -1;
982 	for (res = res0; res != NULL; res = res->ai_next) {
983 retry:
984 		fd = posix_fd_create(res, opts, &impl_opts);
985 		if (fd < 0) {
986 			continue;
987 		}
988 		if (type == SPDK_SOCK_CREATE_LISTEN) {
989 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
990 			if (rc != 0) {
991 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
992 				switch (errno) {
993 				case EINTR:
994 					/* interrupted? */
995 					close(fd);
996 					goto retry;
997 				case EADDRNOTAVAIL:
998 					SPDK_ERRLOG("IP address %s not available. "
999 						    "Verify IP address in config file "
1000 						    "and make sure setup script is "
1001 						    "run before starting spdk app.\n", ip);
1002 				/* FALLTHROUGH */
1003 				default:
1004 					/* try next family */
1005 					close(fd);
1006 					fd = -1;
1007 					continue;
1008 				}
1009 			}
1010 			/* bind OK */
1011 			rc = listen(fd, 512);
1012 			if (rc != 0) {
1013 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
1014 				close(fd);
1015 				fd = -1;
1016 				break;
1017 			}
1018 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
1019 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
1020 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
1021 			if (rc != 0) {
1022 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
1023 				/* try next family */
1024 				close(fd);
1025 				fd = -1;
1026 				continue;
1027 			}
1028 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
1029 			if (enable_ssl) {
1030 				ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts);
1031 				if (!ctx) {
1032 					SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno);
1033 					close(fd);
1034 					fd = -1;
1035 					break;
1036 				}
1037 				ssl = ssl_sock_connect_loop(ctx, fd, &impl_opts);
1038 				if (!ssl) {
1039 					SPDK_ERRLOG("ssl_sock_connect_loop() failed, errno = %d\n", errno);
1040 					close(fd);
1041 					fd = -1;
1042 					SSL_CTX_free(ctx);
1043 					break;
1044 				}
1045 			}
1046 		}
1047 
1048 		flag = fcntl(fd, F_GETFL);
1049 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
1050 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
1051 			SSL_free(ssl);
1052 			SSL_CTX_free(ctx);
1053 			close(fd);
1054 			fd = -1;
1055 			break;
1056 		}
1057 		break;
1058 	}
1059 	freeaddrinfo(res0);
1060 
1061 	if (fd < 0) {
1062 		return NULL;
1063 	}
1064 
1065 	/* Only enable zero copy for non-loopback and non-ssl sockets. */
1066 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd) && !enable_ssl;
1067 
1068 	sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
1069 	if (sock == NULL) {
1070 		SPDK_ERRLOG("sock allocation failed\n");
1071 		SSL_free(ssl);
1072 		SSL_CTX_free(ctx);
1073 		close(fd);
1074 		return NULL;
1075 	}
1076 
1077 	if (ctx) {
1078 		sock->ctx = ctx;
1079 	}
1080 
1081 	if (ssl) {
1082 		sock->ssl = ssl;
1083 	}
1084 
1085 	return &sock->base;
1086 }
1087 
1088 static struct spdk_sock *
1089 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
1090 {
1091 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false);
1092 }
1093 
1094 static struct spdk_sock *
1095 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
1096 {
1097 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false);
1098 }
1099 
1100 static struct spdk_sock *
1101 _posix_sock_accept(struct spdk_sock *_sock, bool enable_ssl)
1102 {
1103 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
1104 	struct sockaddr_storage		sa;
1105 	socklen_t			salen;
1106 	int				rc, fd;
1107 	struct spdk_posix_sock		*new_sock;
1108 	int				flag;
1109 	SSL_CTX *ctx = 0;
1110 	SSL *ssl = 0;
1111 
1112 	memset(&sa, 0, sizeof(sa));
1113 	salen = sizeof(sa);
1114 
1115 	assert(sock != NULL);
1116 
1117 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
1118 
1119 	if (rc == -1) {
1120 		return NULL;
1121 	}
1122 
1123 	fd = rc;
1124 
1125 	flag = fcntl(fd, F_GETFL);
1126 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
1127 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
1128 		close(fd);
1129 		return NULL;
1130 	}
1131 
1132 #if defined(SO_PRIORITY)
1133 	/* The priority is not inherited, so call this function again */
1134 	if (sock->base.opts.priority) {
1135 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
1136 		if (rc != 0) {
1137 			close(fd);
1138 			return NULL;
1139 		}
1140 	}
1141 #endif
1142 
1143 	/* Establish SSL connection */
1144 	if (enable_ssl) {
1145 		ctx = posix_sock_create_ssl_context(TLS_server_method(), &sock->base.opts, &sock->base.impl_opts);
1146 		if (!ctx) {
1147 			SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno);
1148 			close(fd);
1149 			return NULL;
1150 		}
1151 		ssl = ssl_sock_accept_loop(ctx, fd, &sock->base.impl_opts);
1152 		if (!ssl) {
1153 			SPDK_ERRLOG("ssl_sock_accept_loop() failed, errno = %d\n", errno);
1154 			close(fd);
1155 			SSL_CTX_free(ctx);
1156 			return NULL;
1157 		}
1158 	}
1159 
1160 	/* Inherit the zero copy feature from the listen socket */
1161 	new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
1162 	if (new_sock == NULL) {
1163 		close(fd);
1164 		SSL_free(ssl);
1165 		SSL_CTX_free(ctx);
1166 		return NULL;
1167 	}
1168 
1169 	if (ctx) {
1170 		new_sock->ctx = ctx;
1171 	}
1172 
1173 	if (ssl) {
1174 		new_sock->ssl = ssl;
1175 	}
1176 
1177 	return &new_sock->base;
1178 }
1179 
1180 static struct spdk_sock *
1181 posix_sock_accept(struct spdk_sock *_sock)
1182 {
1183 	return _posix_sock_accept(_sock, false);
1184 }
1185 
1186 static int
1187 posix_sock_close(struct spdk_sock *_sock)
1188 {
1189 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1190 
1191 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
1192 
1193 	if (sock->ssl != NULL) {
1194 		SSL_shutdown(sock->ssl);
1195 	}
1196 
1197 	/* If the socket fails to close, the best choice is to
1198 	 * leak the fd but continue to free the rest of the sock
1199 	 * memory. */
1200 	close(sock->fd);
1201 
1202 	SSL_free(sock->ssl);
1203 	SSL_CTX_free(sock->ctx);
1204 
1205 	spdk_pipe_destroy(sock->recv_pipe);
1206 	free(sock->recv_buf);
1207 	free(sock);
1208 
1209 	return 0;
1210 }
1211 
1212 #ifdef SPDK_ZEROCOPY
1213 static int
1214 _sock_check_zcopy(struct spdk_sock *sock)
1215 {
1216 	struct spdk_posix_sock *psock = __posix_sock(sock);
1217 	struct msghdr msgh = {};
1218 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
1219 	ssize_t rc;
1220 	struct sock_extended_err *serr;
1221 	struct cmsghdr *cm;
1222 	uint32_t idx;
1223 	struct spdk_sock_request *req, *treq;
1224 	bool found;
1225 
1226 	msgh.msg_control = buf;
1227 	msgh.msg_controllen = sizeof(buf);
1228 
1229 	while (true) {
1230 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
1231 
1232 		if (rc < 0) {
1233 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
1234 				return 0;
1235 			}
1236 
1237 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
1238 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
1239 			} else {
1240 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
1241 			}
1242 			return 0;
1243 		}
1244 
1245 		cm = CMSG_FIRSTHDR(&msgh);
1246 		if (!(cm &&
1247 		      ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
1248 		       (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) {
1249 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
1250 			return 0;
1251 		}
1252 
1253 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
1254 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
1255 			SPDK_WARNLOG("Unexpected extended error origin\n");
1256 			return 0;
1257 		}
1258 
1259 		/* Most of the time, the pending_reqs array is in the exact
1260 		 * order we need such that all of the requests to complete are
1261 		 * in order, in the front. It is guaranteed that all requests
1262 		 * belonging to the same sendmsg call are sequential, so once
1263 		 * we encounter one match we can stop looping as soon as a
1264 		 * non-match is found.
1265 		 */
1266 		idx = serr->ee_info;
1267 		while (true) {
1268 			found = false;
1269 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
1270 				if (!req->internal.is_zcopy) {
1271 					/* This wasn't a zcopy request. It was just waiting in line to complete */
1272 					rc = spdk_sock_request_put(sock, req, 0);
1273 					if (rc < 0) {
1274 						return rc;
1275 					}
1276 				} else if (req->internal.offset == idx) {
1277 					found = true;
1278 					rc = spdk_sock_request_put(sock, req, 0);
1279 					if (rc < 0) {
1280 						return rc;
1281 					}
1282 				} else if (found) {
1283 					break;
1284 				}
1285 			}
1286 
1287 			if (idx == serr->ee_data) {
1288 				break;
1289 			}
1290 
1291 			if (idx == UINT32_MAX) {
1292 				idx = 0;
1293 			} else {
1294 				idx++;
1295 			}
1296 		}
1297 	}
1298 
1299 	return 0;
1300 }
1301 #endif
1302 
1303 static int
1304 _sock_flush(struct spdk_sock *sock)
1305 {
1306 	struct spdk_posix_sock *psock = __posix_sock(sock);
1307 	struct msghdr msg = {};
1308 	int flags;
1309 	struct iovec iovs[IOV_BATCH_SIZE];
1310 	int iovcnt;
1311 	int retval;
1312 	struct spdk_sock_request *req;
1313 	int i;
1314 	ssize_t rc, sent;
1315 	unsigned int offset;
1316 	size_t len;
1317 	bool is_zcopy = false;
1318 
1319 	/* Can't flush from within a callback or we end up with recursive calls */
1320 	if (sock->cb_cnt > 0) {
1321 		errno = EAGAIN;
1322 		return -1;
1323 	}
1324 
1325 #ifdef SPDK_ZEROCOPY
1326 	if (psock->zcopy) {
1327 		flags = MSG_ZEROCOPY | MSG_NOSIGNAL;
1328 	} else
1329 #endif
1330 	{
1331 		flags = MSG_NOSIGNAL;
1332 	}
1333 
1334 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags);
1335 	if (iovcnt == 0) {
1336 		return 0;
1337 	}
1338 
1339 #ifdef SPDK_ZEROCOPY
1340 	is_zcopy = flags & MSG_ZEROCOPY;
1341 #endif
1342 
1343 	/* Perform the vectored write */
1344 	msg.msg_iov = iovs;
1345 	msg.msg_iovlen = iovcnt;
1346 
1347 	if (psock->ssl) {
1348 		rc = SSL_writev(psock->ssl, iovs, iovcnt);
1349 	} else {
1350 		rc = sendmsg(psock->fd, &msg, flags);
1351 	}
1352 	if (rc <= 0) {
1353 		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
1354 			errno = EAGAIN;
1355 		}
1356 		return -1;
1357 	}
1358 
1359 	sent = rc;
1360 
1361 	if (is_zcopy) {
1362 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
1363 		 * req->internal.offset, so sendmsg_idx should not be zero  */
1364 		if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
1365 			psock->sendmsg_idx = 1;
1366 		} else {
1367 			psock->sendmsg_idx++;
1368 		}
1369 	}
1370 
1371 	/* Consume the requests that were actually written */
1372 	req = TAILQ_FIRST(&sock->queued_reqs);
1373 	while (req) {
1374 		offset = req->internal.offset;
1375 
1376 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
1377 		req->internal.is_zcopy = is_zcopy;
1378 
1379 		for (i = 0; i < req->iovcnt; i++) {
1380 			/* Advance by the offset first */
1381 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
1382 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
1383 				continue;
1384 			}
1385 
1386 			/* Calculate the remaining length of this element */
1387 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
1388 
1389 			if (len > (size_t)rc) {
1390 				/* This element was partially sent. */
1391 				req->internal.offset += rc;
1392 				return sent;
1393 			}
1394 
1395 			offset = 0;
1396 			req->internal.offset += len;
1397 			rc -= len;
1398 		}
1399 
1400 		/* Handled a full request. */
1401 		spdk_sock_request_pend(sock, req);
1402 
1403 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) {
1404 			/* The sendmsg syscall above isn't currently asynchronous,
1405 			* so it's already done. */
1406 			retval = spdk_sock_request_put(sock, req, 0);
1407 			if (retval) {
1408 				break;
1409 			}
1410 		} else {
1411 			/* Re-use the offset field to hold the sendmsg call index. The
1412 			 * index is 0 based, so subtract one here because we've already
1413 			 * incremented above. */
1414 			req->internal.offset = psock->sendmsg_idx - 1;
1415 		}
1416 
1417 		if (rc == 0) {
1418 			break;
1419 		}
1420 
1421 		req = TAILQ_FIRST(&sock->queued_reqs);
1422 	}
1423 
1424 	return sent;
1425 }
1426 
1427 static int
1428 posix_sock_flush(struct spdk_sock *sock)
1429 {
1430 #ifdef SPDK_ZEROCOPY
1431 	struct spdk_posix_sock *psock = __posix_sock(sock);
1432 
1433 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
1434 		_sock_check_zcopy(sock);
1435 	}
1436 #endif
1437 
1438 	return _sock_flush(sock);
1439 }
1440 
1441 static ssize_t
1442 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
1443 {
1444 	struct iovec siov[2];
1445 	int sbytes;
1446 	ssize_t bytes;
1447 	struct spdk_posix_sock_group_impl *group;
1448 
1449 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
1450 	if (sbytes < 0) {
1451 		errno = EINVAL;
1452 		return -1;
1453 	} else if (sbytes == 0) {
1454 		errno = EAGAIN;
1455 		return -1;
1456 	}
1457 
1458 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
1459 
1460 	if (bytes == 0) {
1461 		/* The only way this happens is if diov is 0 length */
1462 		errno = EINVAL;
1463 		return -1;
1464 	}
1465 
1466 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
1467 
1468 	/* If we drained the pipe, mark it appropriately */
1469 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
1470 		assert(sock->pipe_has_data == true);
1471 
1472 		group = __posix_group_impl(sock->base.group_impl);
1473 		if (group && !sock->socket_has_data) {
1474 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1475 		}
1476 
1477 		sock->pipe_has_data = false;
1478 	}
1479 
1480 	return bytes;
1481 }
1482 
1483 static inline ssize_t
1484 posix_sock_read(struct spdk_posix_sock *sock)
1485 {
1486 	struct iovec iov[2];
1487 	int bytes_avail, bytes_recvd;
1488 	struct spdk_posix_sock_group_impl *group;
1489 
1490 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
1491 
1492 	if (bytes_avail <= 0) {
1493 		return bytes_avail;
1494 	}
1495 
1496 	if (sock->ssl) {
1497 		bytes_recvd = SSL_readv(sock->ssl, iov, 2);
1498 	} else {
1499 		bytes_recvd = readv(sock->fd, iov, 2);
1500 	}
1501 
1502 	assert(sock->pipe_has_data == false);
1503 
1504 	if (bytes_recvd <= 0) {
1505 		/* Errors count as draining the socket data */
1506 		if (sock->base.group_impl && sock->socket_has_data) {
1507 			group = __posix_group_impl(sock->base.group_impl);
1508 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1509 		}
1510 
1511 		sock->socket_has_data = false;
1512 
1513 		return bytes_recvd;
1514 	}
1515 
1516 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
1517 
1518 #if DEBUG
1519 	if (sock->base.group_impl) {
1520 		assert(sock->socket_has_data == true);
1521 	}
1522 #endif
1523 
1524 	sock->pipe_has_data = true;
1525 	if (bytes_recvd < bytes_avail) {
1526 		/* We drained the kernel socket entirely. */
1527 		sock->socket_has_data = false;
1528 	}
1529 
1530 	return bytes_recvd;
1531 }
1532 
1533 static ssize_t
1534 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1535 {
1536 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1537 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
1538 	int rc, i;
1539 	size_t len;
1540 
1541 	if (sock->recv_pipe == NULL) {
1542 		assert(sock->pipe_has_data == false);
1543 		if (group && sock->socket_has_data) {
1544 			sock->socket_has_data = false;
1545 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1546 		}
1547 		if (sock->ssl) {
1548 			return SSL_readv(sock->ssl, iov, iovcnt);
1549 		} else {
1550 			return readv(sock->fd, iov, iovcnt);
1551 		}
1552 	}
1553 
1554 	/* If the socket is not in a group, we must assume it always has
1555 	 * data waiting for us because it is not epolled */
1556 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
1557 		/* If the user is receiving a sufficiently large amount of data,
1558 		 * receive directly to their buffers. */
1559 		len = 0;
1560 		for (i = 0; i < iovcnt; i++) {
1561 			len += iov[i].iov_len;
1562 		}
1563 
1564 		if (len >= MIN_SOCK_PIPE_SIZE) {
1565 			/* TODO: Should this detect if kernel socket is drained? */
1566 			if (sock->ssl) {
1567 				return SSL_readv(sock->ssl, iov, iovcnt);
1568 			} else {
1569 				return readv(sock->fd, iov, iovcnt);
1570 			}
1571 		}
1572 
1573 		/* Otherwise, do a big read into our pipe */
1574 		rc = posix_sock_read(sock);
1575 		if (rc <= 0) {
1576 			return rc;
1577 		}
1578 	}
1579 
1580 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
1581 }
1582 
1583 static ssize_t
1584 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1585 {
1586 	struct iovec iov[1];
1587 
1588 	iov[0].iov_base = buf;
1589 	iov[0].iov_len = len;
1590 
1591 	return posix_sock_readv(sock, iov, 1);
1592 }
1593 
1594 static ssize_t
1595 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1596 {
1597 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1598 	int rc;
1599 
1600 	/* In order to process a writev, we need to flush any asynchronous writes
1601 	 * first. */
1602 	rc = _sock_flush(_sock);
1603 	if (rc < 0) {
1604 		return rc;
1605 	}
1606 
1607 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1608 		/* We weren't able to flush all requests */
1609 		errno = EAGAIN;
1610 		return -1;
1611 	}
1612 
1613 	if (sock->ssl) {
1614 		return SSL_writev(sock->ssl, iov, iovcnt);
1615 	} else {
1616 		return writev(sock->fd, iov, iovcnt);
1617 	}
1618 }
1619 
1620 static int
1621 posix_sock_recv_next(struct spdk_sock *_sock, void **buf, void **ctx)
1622 {
1623 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1624 	struct iovec iov;
1625 	ssize_t rc;
1626 
1627 	if (sock->recv_pipe != NULL) {
1628 		errno = ENOTSUP;
1629 		return -1;
1630 	}
1631 
1632 	iov.iov_len = spdk_sock_group_get_buf(_sock->group_impl->group, &iov.iov_base, ctx);
1633 	if (iov.iov_len == 0) {
1634 		errno = ENOBUFS;
1635 		return -1;
1636 	}
1637 
1638 	rc = posix_sock_readv(_sock, &iov, 1);
1639 	if (rc <= 0) {
1640 		spdk_sock_group_provide_buf(_sock->group_impl->group, iov.iov_base, iov.iov_len, *ctx);
1641 		return rc;
1642 	}
1643 
1644 	*buf = iov.iov_base;
1645 
1646 	return rc;
1647 }
1648 
1649 static void
1650 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1651 {
1652 	int rc;
1653 
1654 	spdk_sock_request_queue(sock, req);
1655 
1656 	/* If there are a sufficient number queued, just flush them out immediately. */
1657 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1658 		rc = _sock_flush(sock);
1659 		if (rc < 0 && errno != EAGAIN) {
1660 			spdk_sock_abort_requests(sock);
1661 		}
1662 	}
1663 }
1664 
1665 static int
1666 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1667 {
1668 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1669 	int val;
1670 	int rc;
1671 
1672 	assert(sock != NULL);
1673 
1674 	val = nbytes;
1675 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1676 	if (rc != 0) {
1677 		return -1;
1678 	}
1679 	return 0;
1680 }
1681 
1682 static bool
1683 posix_sock_is_ipv6(struct spdk_sock *_sock)
1684 {
1685 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1686 	struct sockaddr_storage sa;
1687 	socklen_t salen;
1688 	int rc;
1689 
1690 	assert(sock != NULL);
1691 
1692 	memset(&sa, 0, sizeof sa);
1693 	salen = sizeof sa;
1694 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1695 	if (rc != 0) {
1696 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1697 		return false;
1698 	}
1699 
1700 	return (sa.ss_family == AF_INET6);
1701 }
1702 
1703 static bool
1704 posix_sock_is_ipv4(struct spdk_sock *_sock)
1705 {
1706 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1707 	struct sockaddr_storage sa;
1708 	socklen_t salen;
1709 	int rc;
1710 
1711 	assert(sock != NULL);
1712 
1713 	memset(&sa, 0, sizeof sa);
1714 	salen = sizeof sa;
1715 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1716 	if (rc != 0) {
1717 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1718 		return false;
1719 	}
1720 
1721 	return (sa.ss_family == AF_INET);
1722 }
1723 
1724 static bool
1725 posix_sock_is_connected(struct spdk_sock *_sock)
1726 {
1727 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1728 	uint8_t byte;
1729 	int rc;
1730 
1731 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1732 	if (rc == 0) {
1733 		return false;
1734 	}
1735 
1736 	if (rc < 0) {
1737 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1738 			return true;
1739 		}
1740 
1741 		return false;
1742 	}
1743 
1744 	return true;
1745 }
1746 
1747 static struct spdk_sock_group_impl *
1748 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1749 {
1750 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1751 	struct spdk_sock_group_impl *group_impl;
1752 
1753 	if (sock->placement_id != -1) {
1754 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint);
1755 		return group_impl;
1756 	}
1757 
1758 	return NULL;
1759 }
1760 
1761 static struct spdk_sock_group_impl *
1762 posix_sock_group_impl_create(void)
1763 {
1764 	struct spdk_posix_sock_group_impl *group_impl;
1765 	int fd;
1766 
1767 #if defined(SPDK_EPOLL)
1768 	fd = epoll_create1(0);
1769 #elif defined(SPDK_KEVENT)
1770 	fd = kqueue();
1771 #endif
1772 	if (fd == -1) {
1773 		return NULL;
1774 	}
1775 
1776 	group_impl = calloc(1, sizeof(*group_impl));
1777 	if (group_impl == NULL) {
1778 		SPDK_ERRLOG("group_impl allocation failed\n");
1779 		close(fd);
1780 		return NULL;
1781 	}
1782 
1783 	group_impl->fd = fd;
1784 	TAILQ_INIT(&group_impl->socks_with_data);
1785 	group_impl->placement_id = -1;
1786 
1787 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1788 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1789 		group_impl->placement_id = spdk_env_get_current_core();
1790 	}
1791 
1792 	return &group_impl->base;
1793 }
1794 
1795 static void
1796 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1797 		int placement_id)
1798 {
1799 #if defined(SO_MARK)
1800 	int rc;
1801 
1802 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1803 			&placement_id, sizeof(placement_id));
1804 	if (rc != 0) {
1805 		/* Not fatal */
1806 		SPDK_ERRLOG("Error setting SO_MARK\n");
1807 		return;
1808 	}
1809 
1810 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1811 	if (rc != 0) {
1812 		/* Not fatal */
1813 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1814 		return;
1815 	}
1816 
1817 	sock->placement_id = placement_id;
1818 #endif
1819 }
1820 
1821 static void
1822 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1823 {
1824 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1825 
1826 	if (group->placement_id == -1) {
1827 		group->placement_id = spdk_sock_map_find_free(&g_map);
1828 
1829 		/* If a free placement id is found, update existing sockets in this group */
1830 		if (group->placement_id != -1) {
1831 			struct spdk_sock  *sock, *tmp;
1832 
1833 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1834 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1835 			}
1836 		}
1837 	}
1838 
1839 	if (group->placement_id != -1) {
1840 		/*
1841 		 * group placement id is already determined for this poll group.
1842 		 * Mark socket with group's placement id.
1843 		 */
1844 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1845 	}
1846 }
1847 
1848 static int
1849 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1850 {
1851 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1852 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1853 	int rc;
1854 
1855 #if defined(SPDK_EPOLL)
1856 	struct epoll_event event;
1857 
1858 	memset(&event, 0, sizeof(event));
1859 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1860 	event.events = EPOLLIN | EPOLLERR;
1861 	event.data.ptr = sock;
1862 
1863 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1864 #elif defined(SPDK_KEVENT)
1865 	struct kevent event;
1866 	struct timespec ts = {0};
1867 
1868 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1869 
1870 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1871 #endif
1872 
1873 	if (rc != 0) {
1874 		return rc;
1875 	}
1876 
1877 	/* switched from another polling group due to scheduling */
1878 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1879 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1880 		sock->pipe_has_data = true;
1881 		sock->socket_has_data = false;
1882 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1883 	}
1884 
1885 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
1886 		posix_sock_update_mark(_group, _sock);
1887 	} else if (sock->placement_id != -1) {
1888 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1889 		if (rc != 0) {
1890 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1891 			/* Do not treat this as an error. The system will continue running. */
1892 		}
1893 	}
1894 
1895 	return rc;
1896 }
1897 
1898 static int
1899 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1900 {
1901 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1902 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1903 	int rc;
1904 
1905 	if (sock->pipe_has_data || sock->socket_has_data) {
1906 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1907 		sock->pipe_has_data = false;
1908 		sock->socket_has_data = false;
1909 	}
1910 
1911 	if (sock->placement_id != -1) {
1912 		spdk_sock_map_release(&g_map, sock->placement_id);
1913 	}
1914 
1915 #if defined(SPDK_EPOLL)
1916 	struct epoll_event event;
1917 
1918 	/* Event parameter is ignored but some old kernel version still require it. */
1919 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1920 #elif defined(SPDK_KEVENT)
1921 	struct kevent event;
1922 	struct timespec ts = {0};
1923 
1924 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1925 
1926 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1927 	if (rc == 0 && event.flags & EV_ERROR) {
1928 		rc = -1;
1929 		errno = event.data;
1930 	}
1931 #endif
1932 
1933 	spdk_sock_abort_requests(_sock);
1934 
1935 	return rc;
1936 }
1937 
1938 static int
1939 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1940 			   struct spdk_sock **socks)
1941 {
1942 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1943 	struct spdk_sock *sock, *tmp;
1944 	int num_events, i, rc;
1945 	struct spdk_posix_sock *psock, *ptmp;
1946 #if defined(SPDK_EPOLL)
1947 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1948 #elif defined(SPDK_KEVENT)
1949 	struct kevent events[MAX_EVENTS_PER_POLL];
1950 	struct timespec ts = {0};
1951 #endif
1952 
1953 #ifdef SPDK_ZEROCOPY
1954 	/* When all of the following conditions are met
1955 	 * - non-blocking socket
1956 	 * - zero copy is enabled
1957 	 * - interrupts suppressed (i.e. busy polling)
1958 	 * - the NIC tx queue is full at the time sendmsg() is called
1959 	 * - epoll_wait determines there is an EPOLLIN event for the socket
1960 	 * then we can get into a situation where data we've sent is queued
1961 	 * up in the kernel network stack, but interrupts have been suppressed
1962 	 * because other traffic is flowing so the kernel misses the signal
1963 	 * to flush the software tx queue. If there wasn't incoming data
1964 	 * pending on the socket, then epoll_wait would have been sufficient
1965 	 * to kick off the send operation, but since there is a pending event
1966 	 * epoll_wait does not trigger the necessary operation.
1967 	 *
1968 	 * We deal with this by checking for all of the above conditions and
1969 	 * additionally looking for EPOLLIN events that were not consumed from
1970 	 * the last poll loop. We take this to mean that the upper layer is
1971 	 * unable to consume them because it is blocked waiting for resources
1972 	 * to free up, and those resources are most likely freed in response
1973 	 * to a pending asynchronous write completing.
1974 	 *
1975 	 * Additionally, sockets that have the same placement_id actually share
1976 	 * an underlying hardware queue. That means polling one of them is
1977 	 * equivalent to polling all of them. As a quick mechanism to avoid
1978 	 * making extra poll() calls, stash the last placement_id during the loop
1979 	 * and only poll if it's not the same. The overwhelmingly common case
1980 	 * is that all sockets in this list have the same placement_id because
1981 	 * SPDK is intentionally grouping sockets by that value, so even
1982 	 * though this won't stop all extra calls to poll(), it's very fast
1983 	 * and will catch all of them in practice.
1984 	 */
1985 	int last_placement_id = -1;
1986 
1987 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
1988 		if (psock->zcopy && psock->placement_id >= 0 &&
1989 		    psock->placement_id != last_placement_id) {
1990 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
1991 
1992 			poll(&pfd, 1, 0);
1993 			last_placement_id = psock->placement_id;
1994 		}
1995 	}
1996 #endif
1997 
1998 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1999 	 * a completion callback could remove the sock from the
2000 	 * group. */
2001 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
2002 		rc = _sock_flush(sock);
2003 		if (rc < 0 && errno != EAGAIN) {
2004 			spdk_sock_abort_requests(sock);
2005 		}
2006 	}
2007 
2008 	assert(max_events > 0);
2009 
2010 #if defined(SPDK_EPOLL)
2011 	num_events = epoll_wait(group->fd, events, max_events, 0);
2012 #elif defined(SPDK_KEVENT)
2013 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
2014 #endif
2015 
2016 	if (num_events == -1) {
2017 		return -1;
2018 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
2019 		sock = TAILQ_FIRST(&_group->socks);
2020 		psock = __posix_sock(sock);
2021 		/* poll() is called here to busy poll the queue associated with
2022 		 * first socket in list and potentially reap incoming data.
2023 		 */
2024 		if (sock->opts.priority) {
2025 			struct pollfd pfd = {0, 0, 0};
2026 
2027 			pfd.fd = psock->fd;
2028 			pfd.events = POLLIN | POLLERR;
2029 			poll(&pfd, 1, 0);
2030 		}
2031 	}
2032 
2033 	for (i = 0; i < num_events; i++) {
2034 #if defined(SPDK_EPOLL)
2035 		sock = events[i].data.ptr;
2036 		psock = __posix_sock(sock);
2037 
2038 #ifdef SPDK_ZEROCOPY
2039 		if (events[i].events & EPOLLERR) {
2040 			rc = _sock_check_zcopy(sock);
2041 			/* If the socket was closed or removed from
2042 			 * the group in response to a send ack, don't
2043 			 * add it to the array here. */
2044 			if (rc || sock->cb_fn == NULL) {
2045 				continue;
2046 			}
2047 		}
2048 #endif
2049 		if ((events[i].events & EPOLLIN) == 0) {
2050 			continue;
2051 		}
2052 
2053 #elif defined(SPDK_KEVENT)
2054 		sock = events[i].udata;
2055 		psock = __posix_sock(sock);
2056 #endif
2057 
2058 		/* If the socket is not already in the list, add it now */
2059 		if (!psock->socket_has_data && !psock->pipe_has_data) {
2060 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
2061 		}
2062 		psock->socket_has_data = true;
2063 	}
2064 
2065 	num_events = 0;
2066 
2067 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
2068 		if (num_events == max_events) {
2069 			break;
2070 		}
2071 
2072 		/* If the socket's cb_fn is NULL, just remove it from the
2073 		 * list and do not add it to socks array */
2074 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
2075 			psock->socket_has_data = false;
2076 			psock->pipe_has_data = false;
2077 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
2078 			continue;
2079 		}
2080 
2081 		socks[num_events++] = &psock->base;
2082 	}
2083 
2084 	/* Cycle the has_data list so that each time we poll things aren't
2085 	 * in the same order. Say we have 6 sockets in the list, named as follows:
2086 	 * A B C D E F
2087 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
2088 	 * psock currently points at D. We want to rearrange the list to the following:
2089 	 * D E F A B C
2090 	 *
2091 	 * The variables below are named according to this example to make it easier to
2092 	 * follow the swaps.
2093 	 */
2094 	if (psock != NULL) {
2095 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
2096 
2097 		/* Capture pointers to the elements we need */
2098 		pd = psock;
2099 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
2100 		pa = TAILQ_FIRST(&group->socks_with_data);
2101 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
2102 
2103 		/* Break the link between C and D */
2104 		pc->link.tqe_next = NULL;
2105 
2106 		/* Connect F to A */
2107 		pf->link.tqe_next = pa;
2108 		pa->link.tqe_prev = &pf->link.tqe_next;
2109 
2110 		/* Fix up the list first/last pointers */
2111 		group->socks_with_data.tqh_first = pd;
2112 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
2113 
2114 		/* D is in front of the list, make tqe prev pointer point to the head of list */
2115 		pd->link.tqe_prev = &group->socks_with_data.tqh_first;
2116 	}
2117 
2118 	return num_events;
2119 }
2120 
2121 static int
2122 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
2123 {
2124 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
2125 	int rc;
2126 
2127 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
2128 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
2129 	}
2130 
2131 	rc = close(group->fd);
2132 	free(group);
2133 	return rc;
2134 }
2135 
2136 static struct spdk_net_impl g_posix_net_impl = {
2137 	.name		= "posix",
2138 	.getaddr	= posix_sock_getaddr,
2139 	.connect	= posix_sock_connect,
2140 	.listen		= posix_sock_listen,
2141 	.accept		= posix_sock_accept,
2142 	.close		= posix_sock_close,
2143 	.recv		= posix_sock_recv,
2144 	.readv		= posix_sock_readv,
2145 	.writev		= posix_sock_writev,
2146 	.recv_next	= posix_sock_recv_next,
2147 	.writev_async	= posix_sock_writev_async,
2148 	.flush		= posix_sock_flush,
2149 	.set_recvlowat	= posix_sock_set_recvlowat,
2150 	.set_recvbuf	= posix_sock_set_recvbuf,
2151 	.set_sendbuf	= posix_sock_set_sendbuf,
2152 	.is_ipv6	= posix_sock_is_ipv6,
2153 	.is_ipv4	= posix_sock_is_ipv4,
2154 	.is_connected	= posix_sock_is_connected,
2155 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
2156 	.group_impl_create	= posix_sock_group_impl_create,
2157 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
2158 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
2159 	.group_impl_poll	= posix_sock_group_impl_poll,
2160 	.group_impl_close	= posix_sock_group_impl_close,
2161 	.get_opts	= posix_sock_impl_get_opts,
2162 	.set_opts	= posix_sock_impl_set_opts,
2163 };
2164 
2165 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY + 1);
2166 
2167 static struct spdk_sock *
2168 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
2169 {
2170 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true);
2171 }
2172 
2173 static struct spdk_sock *
2174 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
2175 {
2176 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true);
2177 }
2178 
2179 static struct spdk_sock *
2180 ssl_sock_accept(struct spdk_sock *_sock)
2181 {
2182 	return _posix_sock_accept(_sock, true);
2183 }
2184 
2185 static struct spdk_net_impl g_ssl_net_impl = {
2186 	.name		= "ssl",
2187 	.getaddr	= posix_sock_getaddr,
2188 	.connect	= ssl_sock_connect,
2189 	.listen		= ssl_sock_listen,
2190 	.accept		= ssl_sock_accept,
2191 	.close		= posix_sock_close,
2192 	.recv		= posix_sock_recv,
2193 	.readv		= posix_sock_readv,
2194 	.writev		= posix_sock_writev,
2195 	.recv_next	= posix_sock_recv_next,
2196 	.writev_async	= posix_sock_writev_async,
2197 	.flush		= posix_sock_flush,
2198 	.set_recvlowat	= posix_sock_set_recvlowat,
2199 	.set_recvbuf	= posix_sock_set_recvbuf,
2200 	.set_sendbuf	= posix_sock_set_sendbuf,
2201 	.is_ipv6	= posix_sock_is_ipv6,
2202 	.is_ipv4	= posix_sock_is_ipv4,
2203 	.is_connected	= posix_sock_is_connected,
2204 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
2205 	.group_impl_create	= posix_sock_group_impl_create,
2206 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
2207 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
2208 	.group_impl_poll	= posix_sock_group_impl_poll,
2209 	.group_impl_close	= posix_sock_group_impl_close,
2210 	.get_opts	= posix_sock_impl_get_opts,
2211 	.set_opts	= posix_sock_impl_set_opts,
2212 };
2213 
2214 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl, DEFAULT_SOCK_PRIORITY);
2215 SPDK_LOG_REGISTER_COMPONENT(sock_posix)
2216