xref: /spdk/module/sock/posix/posix.c (revision 318515b44ec8b67f83bcc9ca83f0c7d5ea919e62)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #if defined(__FreeBSD__)
10 #include <sys/event.h>
11 #define SPDK_KEVENT
12 #else
13 #define SPDK_EPOLL
14 #endif
15 
16 #if defined(__linux__)
17 #include <linux/errqueue.h>
18 #endif
19 
20 #include "spdk/env.h"
21 #include "spdk/log.h"
22 #include "spdk/pipe.h"
23 #include "spdk/sock.h"
24 #include "spdk/util.h"
25 #include "spdk/string.h"
26 #include "spdk/net.h"
27 #include "spdk/file.h"
28 #include "spdk_internal/sock.h"
29 #include "spdk/net.h"
30 
31 #include "openssl/crypto.h"
32 #include "openssl/err.h"
33 #include "openssl/ssl.h"
34 
35 #define MAX_TMPBUF 1024
36 #define PORTNUMLEN 32
37 
38 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
39 #define SPDK_ZEROCOPY
40 #endif
41 
42 struct spdk_posix_sock {
43 	struct spdk_sock	base;
44 	int			fd;
45 
46 	uint32_t		sendmsg_idx;
47 
48 	struct spdk_pipe	*recv_pipe;
49 	int			recv_buf_sz;
50 	bool			pipe_has_data;
51 	bool			socket_has_data;
52 	bool			zcopy;
53 
54 	int			placement_id;
55 
56 	SSL_CTX			*ctx;
57 	SSL			*ssl;
58 
59 	TAILQ_ENTRY(spdk_posix_sock)	link;
60 
61 	char			interface_name[IFNAMSIZ];
62 };
63 
64 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
65 
66 struct spdk_posix_sock_group_impl {
67 	struct spdk_sock_group_impl	base;
68 	int				fd;
69 	struct spdk_interrupt		*intr;
70 	struct spdk_has_data_list	socks_with_data;
71 	int				placement_id;
72 	struct spdk_pipe_group		*pipe_group;
73 };
74 
75 static struct spdk_sock_impl_opts g_posix_impl_opts = {
76 	.recv_buf_size = DEFAULT_SO_RCVBUF_SIZE,
77 	.send_buf_size = DEFAULT_SO_SNDBUF_SIZE,
78 	.enable_recv_pipe = true,
79 	.enable_quickack = false,
80 	.enable_placement_id = PLACEMENT_NONE,
81 	.enable_zerocopy_send_server = true,
82 	.enable_zerocopy_send_client = false,
83 	.zerocopy_threshold = 0,
84 	.tls_version = 0,
85 	.enable_ktls = false,
86 	.psk_key = NULL,
87 	.psk_key_size = 0,
88 	.psk_identity = NULL,
89 	.get_key = NULL,
90 	.get_key_ctx = NULL,
91 	.tls_cipher_suites = NULL
92 };
93 
94 static struct spdk_sock_impl_opts g_ssl_impl_opts = {
95 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
96 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
97 	.enable_recv_pipe = true,
98 	.enable_quickack = false,
99 	.enable_placement_id = PLACEMENT_NONE,
100 	.enable_zerocopy_send_server = true,
101 	.enable_zerocopy_send_client = false,
102 	.zerocopy_threshold = 0,
103 	.tls_version = 0,
104 	.enable_ktls = false,
105 	.psk_key = NULL,
106 	.psk_identity = NULL
107 };
108 
109 static struct spdk_sock_map g_map = {
110 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
111 	.mtx = PTHREAD_MUTEX_INITIALIZER
112 };
113 
114 __attribute((destructor)) static void
115 posix_sock_map_cleanup(void)
116 {
117 	spdk_sock_map_cleanup(&g_map);
118 }
119 
120 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
121 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
122 
123 static void
124 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
125 			  size_t len)
126 {
127 #define FIELD_OK(field) \
128 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
129 
130 #define SET_FIELD(field) \
131 	if (FIELD_OK(field)) { \
132 		dest->field = src->field; \
133 	}
134 
135 	SET_FIELD(recv_buf_size);
136 	SET_FIELD(send_buf_size);
137 	SET_FIELD(enable_recv_pipe);
138 	SET_FIELD(enable_zerocopy_send);
139 	SET_FIELD(enable_quickack);
140 	SET_FIELD(enable_placement_id);
141 	SET_FIELD(enable_zerocopy_send_server);
142 	SET_FIELD(enable_zerocopy_send_client);
143 	SET_FIELD(zerocopy_threshold);
144 	SET_FIELD(tls_version);
145 	SET_FIELD(enable_ktls);
146 	SET_FIELD(psk_key);
147 	SET_FIELD(psk_key_size);
148 	SET_FIELD(psk_identity);
149 	SET_FIELD(get_key);
150 	SET_FIELD(get_key_ctx);
151 	SET_FIELD(tls_cipher_suites);
152 
153 #undef SET_FIELD
154 #undef FIELD_OK
155 }
156 
157 static int
158 _sock_impl_get_opts(struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts,
159 		    size_t *len)
160 {
161 	if (!opts || !len) {
162 		errno = EINVAL;
163 		return -1;
164 	}
165 
166 	assert(sizeof(*opts) >= *len);
167 	memset(opts, 0, *len);
168 
169 	posix_sock_copy_impl_opts(opts, impl_opts, *len);
170 	*len = spdk_min(*len, sizeof(*impl_opts));
171 
172 	return 0;
173 }
174 
175 static int
176 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
177 {
178 	return _sock_impl_get_opts(opts, &g_posix_impl_opts, len);
179 }
180 
181 static int
182 ssl_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
183 {
184 	return _sock_impl_get_opts(opts, &g_ssl_impl_opts, len);
185 }
186 
187 static int
188 _sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, struct spdk_sock_impl_opts *impl_opts,
189 		    size_t len)
190 {
191 	if (!opts) {
192 		errno = EINVAL;
193 		return -1;
194 	}
195 
196 	assert(sizeof(*opts) >= len);
197 	posix_sock_copy_impl_opts(impl_opts, opts, len);
198 
199 	return 0;
200 }
201 
202 static int
203 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
204 {
205 	return _sock_impl_set_opts(opts, &g_posix_impl_opts, len);
206 }
207 
208 static int
209 ssl_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
210 {
211 	return _sock_impl_set_opts(opts, &g_ssl_impl_opts, len);
212 }
213 
214 static void
215 _opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest,
216 		    const struct spdk_sock_impl_opts *default_impl)
217 {
218 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
219 	memcpy(dest, default_impl, sizeof(*dest));
220 
221 	if (opts->impl_opts != NULL) {
222 		assert(sizeof(*dest) >= opts->impl_opts_size);
223 		posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
224 	}
225 }
226 
227 static int
228 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
229 		   char *caddr, int clen, uint16_t *cport)
230 {
231 	struct spdk_posix_sock *sock = __posix_sock(_sock);
232 
233 	assert(sock != NULL);
234 	return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport);
235 }
236 
237 static const char *
238 posix_sock_get_interface_name(struct spdk_sock *_sock)
239 {
240 	struct spdk_posix_sock *sock = __posix_sock(_sock);
241 	char saddr[64];
242 	int rc;
243 
244 	rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL);
245 	if (rc != 0) {
246 		return NULL;
247 	}
248 
249 	rc = spdk_net_get_interface_name(saddr, sock->interface_name,
250 					 sizeof(sock->interface_name));
251 	if (rc != 0) {
252 		return NULL;
253 	}
254 
255 	return sock->interface_name;
256 }
257 
258 static int32_t
259 posix_sock_get_numa_id(struct spdk_sock *sock)
260 {
261 	const char *interface_name;
262 	uint32_t numa_id;
263 	int rc;
264 
265 	interface_name = posix_sock_get_interface_name(sock);
266 	if (interface_name == NULL) {
267 		return SPDK_ENV_NUMA_ID_ANY;
268 	}
269 
270 	rc = spdk_read_sysfs_attribute_uint32(&numa_id,
271 					      "/sys/class/net/%s/device/numa_node", interface_name);
272 	if (rc == 0 && numa_id <= INT32_MAX) {
273 		return (int32_t)numa_id;
274 	} else {
275 		return SPDK_ENV_NUMA_ID_ANY;
276 	}
277 }
278 
279 enum posix_sock_create_type {
280 	SPDK_SOCK_CREATE_LISTEN,
281 	SPDK_SOCK_CREATE_CONNECT,
282 };
283 
284 static int
285 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
286 {
287 	uint8_t *new_buf, *old_buf;
288 	struct spdk_pipe *new_pipe;
289 	struct iovec siov[2];
290 	struct iovec diov[2];
291 	int sbytes;
292 	ssize_t bytes;
293 	int rc;
294 
295 	if (sock->recv_buf_sz == sz) {
296 		return 0;
297 	}
298 
299 	/* If the new size is 0, just free the pipe */
300 	if (sz == 0) {
301 		old_buf = spdk_pipe_destroy(sock->recv_pipe);
302 		free(old_buf);
303 		sock->recv_pipe = NULL;
304 		return 0;
305 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
306 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
307 		return -1;
308 	}
309 
310 	/* Round up to next 64 byte multiple */
311 	rc = posix_memalign((void **)&new_buf, 64, sz);
312 	if (rc != 0) {
313 		SPDK_ERRLOG("socket recv buf allocation failed\n");
314 		return -ENOMEM;
315 	}
316 	memset(new_buf, 0, sz);
317 
318 	new_pipe = spdk_pipe_create(new_buf, sz);
319 	if (new_pipe == NULL) {
320 		SPDK_ERRLOG("socket pipe allocation failed\n");
321 		free(new_buf);
322 		return -ENOMEM;
323 	}
324 
325 	if (sock->recv_pipe != NULL) {
326 		/* Pull all of the data out of the old pipe */
327 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
328 		if (sbytes > sz) {
329 			/* Too much data to fit into the new pipe size */
330 			old_buf = spdk_pipe_destroy(new_pipe);
331 			free(old_buf);
332 			return -EINVAL;
333 		}
334 
335 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
336 		assert(sbytes == sz);
337 
338 		bytes = spdk_iovcpy(siov, 2, diov, 2);
339 		spdk_pipe_writer_advance(new_pipe, bytes);
340 
341 		old_buf = spdk_pipe_destroy(sock->recv_pipe);
342 		free(old_buf);
343 	}
344 
345 	sock->recv_buf_sz = sz;
346 	sock->recv_pipe = new_pipe;
347 
348 	if (sock->base.group_impl) {
349 		struct spdk_posix_sock_group_impl *group;
350 
351 		group = __posix_group_impl(sock->base.group_impl);
352 		spdk_pipe_group_add(group->pipe_group, sock->recv_pipe);
353 	}
354 
355 	return 0;
356 }
357 
358 static int
359 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
360 {
361 	struct spdk_posix_sock *sock = __posix_sock(_sock);
362 	int min_size;
363 	int rc;
364 
365 	assert(sock != NULL);
366 
367 	if (_sock->impl_opts.enable_recv_pipe) {
368 		rc = posix_sock_alloc_pipe(sock, sz);
369 		if (rc) {
370 			return rc;
371 		}
372 	}
373 
374 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
375 	 * _sock->impl_opts.recv_buf_size. */
376 	min_size = spdk_max(MIN_SO_RCVBUF_SIZE, _sock->impl_opts.recv_buf_size);
377 
378 	if (sz < min_size) {
379 		sz = min_size;
380 	}
381 
382 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
383 	if (rc < 0) {
384 		return rc;
385 	}
386 
387 	_sock->impl_opts.recv_buf_size = sz;
388 
389 	return 0;
390 }
391 
392 static int
393 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
394 {
395 	struct spdk_posix_sock *sock = __posix_sock(_sock);
396 	int min_size;
397 	int rc;
398 
399 	assert(sock != NULL);
400 
401 	/* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
402 	 * _sock->impl_opts.send_buf_size. */
403 	min_size = spdk_max(MIN_SO_SNDBUF_SIZE, _sock->impl_opts.send_buf_size);
404 
405 	if (sz < min_size) {
406 		sz = min_size;
407 	}
408 
409 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
410 	if (rc < 0) {
411 		return rc;
412 	}
413 
414 	_sock->impl_opts.send_buf_size = sz;
415 
416 	return 0;
417 }
418 
419 static void
420 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy)
421 {
422 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
423 	int flag;
424 	int rc;
425 #endif
426 
427 #if defined(SPDK_ZEROCOPY)
428 	flag = 1;
429 
430 	if (enable_zero_copy) {
431 		/* Try to turn on zero copy sends */
432 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
433 		if (rc == 0) {
434 			sock->zcopy = true;
435 		}
436 	}
437 #endif
438 
439 #if defined(__linux__)
440 	flag = 1;
441 
442 	if (sock->base.impl_opts.enable_quickack) {
443 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
444 		if (rc != 0) {
445 			SPDK_ERRLOG("quickack was failed to set\n");
446 		}
447 	}
448 
449 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
450 				   &sock->placement_id);
451 
452 	if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) {
453 		/* Save placement_id */
454 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
455 	}
456 #endif
457 }
458 
459 static struct spdk_posix_sock *
460 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
461 {
462 	struct spdk_posix_sock *sock;
463 
464 	sock = calloc(1, sizeof(*sock));
465 	if (sock == NULL) {
466 		SPDK_ERRLOG("sock allocation failed\n");
467 		return NULL;
468 	}
469 
470 	sock->fd = fd;
471 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
472 	posix_sock_init(sock, enable_zero_copy);
473 
474 	return sock;
475 }
476 
477 static int
478 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts,
479 		struct spdk_sock_impl_opts *impl_opts)
480 {
481 	int fd;
482 	int val = 1;
483 	int rc, sz;
484 #if defined(__linux__)
485 	int to;
486 #endif
487 
488 	fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
489 	if (fd < 0) {
490 		/* error */
491 		return -1;
492 	}
493 
494 	sz = impl_opts->recv_buf_size;
495 	rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
496 	if (rc) {
497 		/* Not fatal */
498 	}
499 
500 	sz = impl_opts->send_buf_size;
501 	rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
502 	if (rc) {
503 		/* Not fatal */
504 	}
505 
506 	rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
507 	if (rc != 0) {
508 		close(fd);
509 		/* error */
510 		return -1;
511 	}
512 	rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
513 	if (rc != 0) {
514 		close(fd);
515 		/* error */
516 		return -1;
517 	}
518 
519 #if defined(SO_PRIORITY)
520 	if (opts->priority) {
521 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
522 		if (rc != 0) {
523 			close(fd);
524 			/* error */
525 			return -1;
526 		}
527 	}
528 #endif
529 
530 	if (res->ai_family == AF_INET6) {
531 		rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
532 		if (rc != 0) {
533 			close(fd);
534 			/* error */
535 			return -1;
536 		}
537 	}
538 
539 	if (opts->ack_timeout) {
540 #if defined(__linux__)
541 		to = opts->ack_timeout;
542 		rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to));
543 		if (rc != 0) {
544 			close(fd);
545 			/* error */
546 			return -1;
547 		}
548 #else
549 		SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
550 #endif
551 	}
552 
553 	return fd;
554 }
555 
556 static int
557 posix_sock_psk_find_session_server_cb(SSL *ssl, const unsigned char *identity,
558 				      size_t identity_len, SSL_SESSION **sess)
559 {
560 	struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl);
561 	uint8_t key[SSL_MAX_MASTER_KEY_LENGTH] = {};
562 	int keylen;
563 	int rc, i;
564 	STACK_OF(SSL_CIPHER) *ciphers;
565 	const SSL_CIPHER *cipher;
566 	const char *cipher_name;
567 	const char *user_cipher = NULL;
568 	bool found = false;
569 
570 	if (impl_opts->get_key) {
571 		rc = impl_opts->get_key(key, sizeof(key), &user_cipher, identity, impl_opts->get_key_ctx);
572 		if (rc < 0) {
573 			SPDK_ERRLOG("Unable to find PSK for identity: %s\n", identity);
574 			return 0;
575 		}
576 		keylen = rc;
577 	} else {
578 		if (impl_opts->psk_key == NULL) {
579 			SPDK_ERRLOG("PSK is not set\n");
580 			return 0;
581 		}
582 
583 		SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(impl_opts->psk_identity));
584 		if (strcmp(impl_opts->psk_identity, identity) != 0) {
585 			SPDK_ERRLOG("Unknown Client's PSK ID\n");
586 			return 0;
587 		}
588 		keylen = impl_opts->psk_key_size;
589 
590 		memcpy(key, impl_opts->psk_key, keylen);
591 		user_cipher = impl_opts->tls_cipher_suites;
592 	}
593 
594 	if (user_cipher == NULL) {
595 		SPDK_ERRLOG("Cipher suite not set\n");
596 		return 0;
597 	}
598 
599 	*sess = SSL_SESSION_new();
600 	if (*sess == NULL) {
601 		SPDK_ERRLOG("Unable to allocate new SSL session\n");
602 		return 0;
603 	}
604 
605 	ciphers = SSL_get_ciphers(ssl);
606 	for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) {
607 		cipher = sk_SSL_CIPHER_value(ciphers, i);
608 		cipher_name = SSL_CIPHER_get_name(cipher);
609 
610 		if (strcmp(user_cipher, cipher_name) == 0) {
611 			rc = SSL_SESSION_set_cipher(*sess, cipher);
612 			if (rc != 1) {
613 				SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name);
614 				goto err;
615 			}
616 			found = true;
617 			break;
618 		}
619 	}
620 	if (found == false) {
621 		SPDK_ERRLOG("No suitable cipher found\n");
622 		goto err;
623 	}
624 
625 	SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name);
626 
627 	rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION);
628 	if (rc != 1) {
629 		SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION);
630 		goto err;
631 	}
632 
633 	rc = SSL_SESSION_set1_master_key(*sess, key, keylen);
634 	if (rc != 1) {
635 		SPDK_ERRLOG("Unable to set PSK for session\n");
636 		goto err;
637 	}
638 
639 	return 1;
640 
641 err:
642 	SSL_SESSION_free(*sess);
643 	*sess = NULL;
644 	return 0;
645 }
646 
647 static int
648 posix_sock_psk_use_session_client_cb(SSL *ssl, const EVP_MD *md, const unsigned char **identity,
649 				     size_t *identity_len, SSL_SESSION **sess)
650 {
651 	struct spdk_sock_impl_opts *impl_opts = SSL_get_app_data(ssl);
652 	int rc, i;
653 	STACK_OF(SSL_CIPHER) *ciphers;
654 	const SSL_CIPHER *cipher;
655 	const char *cipher_name;
656 	long keylen;
657 	bool found = false;
658 
659 	if (impl_opts->psk_key == NULL) {
660 		SPDK_ERRLOG("PSK is not set\n");
661 		return 0;
662 	}
663 	if (impl_opts->psk_key_size > SSL_MAX_MASTER_KEY_LENGTH) {
664 		SPDK_ERRLOG("PSK too long\n");
665 		return 0;
666 	}
667 	keylen = impl_opts->psk_key_size;
668 
669 	if (impl_opts->tls_cipher_suites == NULL) {
670 		SPDK_ERRLOG("Cipher suite not set\n");
671 		return 0;
672 	}
673 	*sess = SSL_SESSION_new();
674 	if (*sess == NULL) {
675 		SPDK_ERRLOG("Unable to allocate new SSL session\n");
676 		return 0;
677 	}
678 
679 	ciphers = SSL_get_ciphers(ssl);
680 	for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) {
681 		cipher = sk_SSL_CIPHER_value(ciphers, i);
682 		cipher_name = SSL_CIPHER_get_name(cipher);
683 
684 		if (strcmp(impl_opts->tls_cipher_suites, cipher_name) == 0) {
685 			rc = SSL_SESSION_set_cipher(*sess, cipher);
686 			if (rc != 1) {
687 				SPDK_ERRLOG("Unable to set cipher: %s\n", cipher_name);
688 				goto err;
689 			}
690 			found = true;
691 			break;
692 		}
693 	}
694 	if (found == false) {
695 		SPDK_ERRLOG("No suitable cipher found\n");
696 		goto err;
697 	}
698 
699 	SPDK_DEBUGLOG(sock_posix, "Cipher selected: %s\n", cipher_name);
700 
701 	rc = SSL_SESSION_set_protocol_version(*sess, TLS1_3_VERSION);
702 	if (rc != 1) {
703 		SPDK_ERRLOG("Unable to set TLS version: %d\n", TLS1_3_VERSION);
704 		goto err;
705 	}
706 
707 	rc = SSL_SESSION_set1_master_key(*sess, impl_opts->psk_key, keylen);
708 	if (rc != 1) {
709 		SPDK_ERRLOG("Unable to set PSK for session\n");
710 		goto err;
711 	}
712 
713 	*identity_len = strlen(impl_opts->psk_identity);
714 	*identity = impl_opts->psk_identity;
715 
716 	return 1;
717 
718 err:
719 	SSL_SESSION_free(*sess);
720 	*sess = NULL;
721 	return 0;
722 }
723 
724 static SSL_CTX *
725 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts,
726 			      struct spdk_sock_impl_opts *impl_opts)
727 {
728 	SSL_CTX *ctx;
729 	int tls_version = 0;
730 	bool ktls_enabled = false;
731 #ifdef SSL_OP_ENABLE_KTLS
732 	long options;
733 #endif
734 
735 	SSL_library_init();
736 	OpenSSL_add_all_algorithms();
737 	SSL_load_error_strings();
738 	/* Produce a SSL CTX in SSL V2 and V3 standards compliant way */
739 	ctx = SSL_CTX_new(method);
740 	if (!ctx) {
741 		SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
742 		return NULL;
743 	}
744 	SPDK_DEBUGLOG(sock_posix, "SSL context created\n");
745 
746 	switch (impl_opts->tls_version) {
747 	case 0:
748 		/* auto-negotiation */
749 		break;
750 	case SPDK_TLS_VERSION_1_3:
751 		tls_version = TLS1_3_VERSION;
752 		break;
753 	default:
754 		SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version);
755 		goto err;
756 	}
757 
758 	if (tls_version) {
759 		SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version,
760 			      tls_version);
761 		if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) {
762 			SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version);
763 			goto err;
764 		}
765 		if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) {
766 			SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version);
767 			goto err;
768 		}
769 	}
770 	if (impl_opts->enable_ktls) {
771 		SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n");
772 #ifdef SSL_OP_ENABLE_KTLS
773 		options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS);
774 		ktls_enabled = options & SSL_OP_ENABLE_KTLS;
775 #else
776 		ktls_enabled = false;
777 #endif
778 		if (!ktls_enabled) {
779 			SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n");
780 			goto err;
781 		}
782 	}
783 
784 	/* SSL_CTX_set_ciphersuites() return 1 if the requested
785 	 * cipher suite list was configured, and 0 otherwise. */
786 	if (impl_opts->tls_cipher_suites != NULL &&
787 	    SSL_CTX_set_ciphersuites(ctx, impl_opts->tls_cipher_suites) != 1) {
788 		SPDK_ERRLOG("Unable to set TLS cipher suites for SSL'\n");
789 		goto err;
790 	}
791 
792 	return ctx;
793 
794 err:
795 	SSL_CTX_free(ctx);
796 	return NULL;
797 }
798 
799 static SSL *
800 ssl_sock_setup_connect(SSL_CTX *ctx, int fd)
801 {
802 	SSL *ssl;
803 
804 	ssl = SSL_new(ctx);
805 	if (!ssl) {
806 		SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
807 		return NULL;
808 	}
809 	SSL_set_fd(ssl, fd);
810 	SSL_set_connect_state(ssl);
811 	SSL_set_psk_use_session_callback(ssl, posix_sock_psk_use_session_client_cb);
812 	SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl);
813 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
814 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
815 	SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n",
816 		      SSL_CIPHER_get_name(SSL_get_current_cipher(ssl)));
817 	return ssl;
818 }
819 
820 static SSL *
821 ssl_sock_setup_accept(SSL_CTX *ctx, int fd)
822 {
823 	SSL *ssl;
824 
825 	ssl = SSL_new(ctx);
826 	if (!ssl) {
827 		SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
828 		return NULL;
829 	}
830 	SSL_set_fd(ssl, fd);
831 	SSL_set_accept_state(ssl);
832 	SSL_set_psk_find_session_callback(ssl, posix_sock_psk_find_session_server_cb);
833 	SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl);
834 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
835 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
836 	SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n",
837 		      SSL_CIPHER_get_name(SSL_get_current_cipher(ssl)));
838 	return ssl;
839 }
840 
841 static ssize_t
842 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt)
843 {
844 	int i, rc = 0;
845 	ssize_t total = 0;
846 
847 	for (i = 0; i < iovcnt; i++) {
848 		rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len);
849 
850 		if (rc > 0) {
851 			total += rc;
852 		}
853 		if (rc != (int)iov[i].iov_len) {
854 			break;
855 		}
856 	}
857 	if (total > 0) {
858 		errno = 0;
859 		return total;
860 	}
861 	switch (SSL_get_error(ssl, rc)) {
862 	case SSL_ERROR_ZERO_RETURN:
863 		errno = ENOTCONN;
864 		return 0;
865 	case SSL_ERROR_WANT_READ:
866 	case SSL_ERROR_WANT_WRITE:
867 	case SSL_ERROR_WANT_CONNECT:
868 	case SSL_ERROR_WANT_ACCEPT:
869 	case SSL_ERROR_WANT_X509_LOOKUP:
870 	case SSL_ERROR_WANT_ASYNC:
871 	case SSL_ERROR_WANT_ASYNC_JOB:
872 	case SSL_ERROR_WANT_CLIENT_HELLO_CB:
873 		errno = EAGAIN;
874 		return -1;
875 	case SSL_ERROR_SYSCALL:
876 	case SSL_ERROR_SSL:
877 		errno = ENOTCONN;
878 		return -1;
879 	default:
880 		errno = ENOTCONN;
881 		return -1;
882 	}
883 }
884 
885 static ssize_t
886 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt)
887 {
888 	int i, rc = 0;
889 	ssize_t total = 0;
890 
891 	for (i = 0; i < iovcnt; i++) {
892 		rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len);
893 
894 		if (rc > 0) {
895 			total += rc;
896 		}
897 		if (rc != (int)iov[i].iov_len) {
898 			break;
899 		}
900 	}
901 	if (total > 0) {
902 		errno = 0;
903 		return total;
904 	}
905 	switch (SSL_get_error(ssl, rc)) {
906 	case SSL_ERROR_ZERO_RETURN:
907 		errno = ENOTCONN;
908 		return 0;
909 	case SSL_ERROR_WANT_READ:
910 	case SSL_ERROR_WANT_WRITE:
911 	case SSL_ERROR_WANT_CONNECT:
912 	case SSL_ERROR_WANT_ACCEPT:
913 	case SSL_ERROR_WANT_X509_LOOKUP:
914 	case SSL_ERROR_WANT_ASYNC:
915 	case SSL_ERROR_WANT_ASYNC_JOB:
916 	case SSL_ERROR_WANT_CLIENT_HELLO_CB:
917 		errno = EAGAIN;
918 		return -1;
919 	case SSL_ERROR_SYSCALL:
920 	case SSL_ERROR_SSL:
921 		errno = ENOTCONN;
922 		return -1;
923 	default:
924 		errno = ENOTCONN;
925 		return -1;
926 	}
927 }
928 
929 static struct spdk_sock *
930 posix_sock_create(const char *ip, int port,
931 		  enum posix_sock_create_type type,
932 		  struct spdk_sock_opts *opts,
933 		  bool enable_ssl)
934 {
935 	struct spdk_posix_sock *sock;
936 	struct spdk_sock_impl_opts impl_opts;
937 	char buf[MAX_TMPBUF];
938 	char portnum[PORTNUMLEN];
939 	char *p;
940 	const char *src_addr;
941 	uint16_t src_port;
942 	struct addrinfo hints, *res, *res0, *src_ai;
943 	int fd, flag;
944 	int rc;
945 	bool enable_zcopy_user_opts = true;
946 	bool enable_zcopy_impl_opts = true;
947 	SSL_CTX *ctx = 0;
948 	SSL *ssl = 0;
949 
950 	assert(opts != NULL);
951 	if (enable_ssl) {
952 		_opts_get_impl_opts(opts, &impl_opts, &g_ssl_impl_opts);
953 	} else {
954 		_opts_get_impl_opts(opts, &impl_opts, &g_posix_impl_opts);
955 	}
956 
957 	if (ip == NULL) {
958 		return NULL;
959 	}
960 	if (ip[0] == '[') {
961 		snprintf(buf, sizeof(buf), "%s", ip + 1);
962 		p = strchr(buf, ']');
963 		if (p != NULL) {
964 			*p = '\0';
965 		}
966 		ip = (const char *) &buf[0];
967 	}
968 
969 	snprintf(portnum, sizeof portnum, "%d", port);
970 	memset(&hints, 0, sizeof hints);
971 	hints.ai_family = PF_UNSPEC;
972 	hints.ai_socktype = SOCK_STREAM;
973 	hints.ai_flags = AI_NUMERICSERV;
974 	hints.ai_flags |= AI_PASSIVE;
975 	hints.ai_flags |= AI_NUMERICHOST;
976 	rc = getaddrinfo(ip, portnum, &hints, &res0);
977 	if (rc != 0) {
978 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
979 		return NULL;
980 	}
981 
982 	/* try listen */
983 	fd = -1;
984 	for (res = res0; res != NULL; res = res->ai_next) {
985 retry:
986 		fd = posix_fd_create(res, opts, &impl_opts);
987 		if (fd < 0) {
988 			continue;
989 		}
990 		if (type == SPDK_SOCK_CREATE_LISTEN) {
991 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
992 			if (rc != 0) {
993 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
994 				switch (errno) {
995 				case EINTR:
996 					/* interrupted? */
997 					close(fd);
998 					goto retry;
999 				case EADDRNOTAVAIL:
1000 					SPDK_ERRLOG("IP address %s not available. "
1001 						    "Verify IP address in config file "
1002 						    "and make sure setup script is "
1003 						    "run before starting spdk app.\n", ip);
1004 				/* FALLTHROUGH */
1005 				default:
1006 					/* try next family */
1007 					close(fd);
1008 					fd = -1;
1009 					continue;
1010 				}
1011 			}
1012 			/* bind OK */
1013 			rc = listen(fd, 512);
1014 			if (rc != 0) {
1015 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
1016 				close(fd);
1017 				fd = -1;
1018 				break;
1019 			}
1020 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
1021 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
1022 			src_addr = SPDK_GET_FIELD(opts, src_addr, NULL, opts->opts_size);
1023 			src_port = SPDK_GET_FIELD(opts, src_port, 0, opts->opts_size);
1024 			if (src_addr != NULL || src_port != 0) {
1025 				snprintf(portnum, sizeof(portnum), "%"PRIu16, src_port);
1026 				memset(&hints, 0, sizeof hints);
1027 				hints.ai_family = AF_UNSPEC;
1028 				hints.ai_socktype = SOCK_STREAM;
1029 				hints.ai_flags = AI_NUMERICSERV | AI_NUMERICHOST | AI_PASSIVE;
1030 				rc = getaddrinfo(src_addr, src_port > 0 ? portnum : NULL,
1031 						 &hints, &src_ai);
1032 				if (rc != 0 || src_ai == NULL) {
1033 					SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n",
1034 						    rc != 0 ? gai_strerror(rc) : "", rc);
1035 					close(fd);
1036 					fd = -1;
1037 					break;
1038 				}
1039 				rc = bind(fd, src_ai->ai_addr, src_ai->ai_addrlen);
1040 				if (rc != 0) {
1041 					SPDK_ERRLOG("bind() failed errno %d (%s:%s)\n", errno,
1042 						    src_addr ? src_addr : "", portnum);
1043 					close(fd);
1044 					fd = -1;
1045 					freeaddrinfo(src_ai);
1046 					src_ai = NULL;
1047 					break;
1048 				}
1049 				freeaddrinfo(src_ai);
1050 				src_ai = NULL;
1051 			}
1052 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
1053 			if (rc != 0) {
1054 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
1055 				/* try next family */
1056 				close(fd);
1057 				fd = -1;
1058 				continue;
1059 			}
1060 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
1061 			if (enable_ssl) {
1062 				ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts);
1063 				if (!ctx) {
1064 					SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno);
1065 					close(fd);
1066 					fd = -1;
1067 					break;
1068 				}
1069 				ssl = ssl_sock_setup_connect(ctx, fd);
1070 				if (!ssl) {
1071 					SPDK_ERRLOG("ssl_sock_setup_connect() failed, errno = %d\n", errno);
1072 					close(fd);
1073 					fd = -1;
1074 					SSL_CTX_free(ctx);
1075 					break;
1076 				}
1077 			}
1078 		}
1079 
1080 		flag = fcntl(fd, F_GETFL);
1081 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
1082 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
1083 			SSL_free(ssl);
1084 			SSL_CTX_free(ctx);
1085 			close(fd);
1086 			fd = -1;
1087 			break;
1088 		}
1089 		break;
1090 	}
1091 	freeaddrinfo(res0);
1092 
1093 	if (fd < 0) {
1094 		return NULL;
1095 	}
1096 
1097 	/* Only enable zero copy for non-loopback and non-ssl sockets. */
1098 	enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd) && !enable_ssl;
1099 
1100 	sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
1101 	if (sock == NULL) {
1102 		SPDK_ERRLOG("sock allocation failed\n");
1103 		SSL_free(ssl);
1104 		SSL_CTX_free(ctx);
1105 		close(fd);
1106 		return NULL;
1107 	}
1108 
1109 	if (ctx) {
1110 		sock->ctx = ctx;
1111 	}
1112 
1113 	if (ssl) {
1114 		sock->ssl = ssl;
1115 		SSL_set_app_data(ssl, &sock->base.impl_opts);
1116 	}
1117 
1118 	return &sock->base;
1119 }
1120 
1121 static struct spdk_sock *
1122 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
1123 {
1124 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false);
1125 }
1126 
1127 static struct spdk_sock *
1128 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
1129 {
1130 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false);
1131 }
1132 
1133 static struct spdk_sock *
1134 _posix_sock_accept(struct spdk_sock *_sock, bool enable_ssl)
1135 {
1136 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
1137 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
1138 	struct sockaddr_storage		sa;
1139 	socklen_t			salen;
1140 	int				rc, fd;
1141 	struct spdk_posix_sock		*new_sock;
1142 	int				flag;
1143 	SSL_CTX *ctx = 0;
1144 	SSL *ssl = 0;
1145 
1146 	memset(&sa, 0, sizeof(sa));
1147 	salen = sizeof(sa);
1148 
1149 	assert(sock != NULL);
1150 
1151 	/* epoll_wait will trigger again if there is more than one request */
1152 	if (group && sock->socket_has_data) {
1153 		sock->socket_has_data = false;
1154 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1155 	}
1156 
1157 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
1158 
1159 	if (rc == -1) {
1160 		return NULL;
1161 	}
1162 
1163 	fd = rc;
1164 
1165 	flag = fcntl(fd, F_GETFL);
1166 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
1167 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
1168 		close(fd);
1169 		return NULL;
1170 	}
1171 
1172 #if defined(SO_PRIORITY)
1173 	/* The priority is not inherited, so call this function again */
1174 	if (sock->base.opts.priority) {
1175 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
1176 		if (rc != 0) {
1177 			close(fd);
1178 			return NULL;
1179 		}
1180 	}
1181 #endif
1182 
1183 	/* Establish SSL connection */
1184 	if (enable_ssl) {
1185 		ctx = posix_sock_create_ssl_context(TLS_server_method(), &sock->base.opts, &sock->base.impl_opts);
1186 		if (!ctx) {
1187 			SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno);
1188 			close(fd);
1189 			return NULL;
1190 		}
1191 		ssl = ssl_sock_setup_accept(ctx, fd);
1192 		if (!ssl) {
1193 			SPDK_ERRLOG("ssl_sock_setup_accept() failed, errno = %d\n", errno);
1194 			close(fd);
1195 			SSL_CTX_free(ctx);
1196 			return NULL;
1197 		}
1198 	}
1199 
1200 	/* Inherit the zero copy feature from the listen socket */
1201 	new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
1202 	if (new_sock == NULL) {
1203 		close(fd);
1204 		SSL_free(ssl);
1205 		SSL_CTX_free(ctx);
1206 		return NULL;
1207 	}
1208 
1209 	if (ctx) {
1210 		new_sock->ctx = ctx;
1211 	}
1212 
1213 	if (ssl) {
1214 		new_sock->ssl = ssl;
1215 		SSL_set_app_data(ssl, &new_sock->base.impl_opts);
1216 	}
1217 
1218 	return &new_sock->base;
1219 }
1220 
1221 static struct spdk_sock *
1222 posix_sock_accept(struct spdk_sock *_sock)
1223 {
1224 	return _posix_sock_accept(_sock, false);
1225 }
1226 
1227 static int
1228 posix_sock_close(struct spdk_sock *_sock)
1229 {
1230 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1231 	void *pipe_buf;
1232 
1233 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
1234 
1235 	if (sock->ssl != NULL) {
1236 		SSL_shutdown(sock->ssl);
1237 	}
1238 
1239 	/* If the socket fails to close, the best choice is to
1240 	 * leak the fd but continue to free the rest of the sock
1241 	 * memory. */
1242 	close(sock->fd);
1243 
1244 	SSL_free(sock->ssl);
1245 	SSL_CTX_free(sock->ctx);
1246 
1247 	pipe_buf = spdk_pipe_destroy(sock->recv_pipe);
1248 	free(pipe_buf);
1249 	free(sock);
1250 
1251 	return 0;
1252 }
1253 
1254 #ifdef SPDK_ZEROCOPY
1255 static int
1256 _sock_check_zcopy(struct spdk_sock *sock)
1257 {
1258 	struct spdk_posix_sock *psock = __posix_sock(sock);
1259 	struct msghdr msgh = {};
1260 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
1261 	ssize_t rc;
1262 	struct sock_extended_err *serr;
1263 	struct cmsghdr *cm;
1264 	uint32_t idx;
1265 	struct spdk_sock_request *req, *treq;
1266 	bool found;
1267 
1268 	msgh.msg_control = buf;
1269 	msgh.msg_controllen = sizeof(buf);
1270 
1271 	while (true) {
1272 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
1273 
1274 		if (rc < 0) {
1275 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
1276 				return 0;
1277 			}
1278 
1279 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
1280 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
1281 			} else {
1282 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
1283 			}
1284 			return 0;
1285 		}
1286 
1287 		cm = CMSG_FIRSTHDR(&msgh);
1288 		if (!(cm &&
1289 		      ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
1290 		       (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) {
1291 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
1292 			return 0;
1293 		}
1294 
1295 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
1296 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
1297 			SPDK_WARNLOG("Unexpected extended error origin\n");
1298 			return 0;
1299 		}
1300 
1301 		/* Most of the time, the pending_reqs array is in the exact
1302 		 * order we need such that all of the requests to complete are
1303 		 * in order, in the front. It is guaranteed that all requests
1304 		 * belonging to the same sendmsg call are sequential, so once
1305 		 * we encounter one match we can stop looping as soon as a
1306 		 * non-match is found.
1307 		 */
1308 		idx = serr->ee_info;
1309 		while (true) {
1310 			found = false;
1311 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
1312 				if (!req->internal.is_zcopy) {
1313 					/* This wasn't a zcopy request. It was just waiting in line to complete */
1314 					rc = spdk_sock_request_put(sock, req, 0);
1315 					if (rc < 0) {
1316 						return rc;
1317 					}
1318 				} else if (req->internal.offset == idx) {
1319 					found = true;
1320 					rc = spdk_sock_request_put(sock, req, 0);
1321 					if (rc < 0) {
1322 						return rc;
1323 					}
1324 				} else if (found) {
1325 					break;
1326 				}
1327 			}
1328 
1329 			if (idx == serr->ee_data) {
1330 				break;
1331 			}
1332 
1333 			if (idx == UINT32_MAX) {
1334 				idx = 0;
1335 			} else {
1336 				idx++;
1337 			}
1338 		}
1339 	}
1340 
1341 	return 0;
1342 }
1343 #endif
1344 
1345 static int
1346 _sock_flush(struct spdk_sock *sock)
1347 {
1348 	struct spdk_posix_sock *psock = __posix_sock(sock);
1349 	struct msghdr msg = {};
1350 	int flags;
1351 	struct iovec iovs[IOV_BATCH_SIZE];
1352 	int iovcnt;
1353 	int retval;
1354 	struct spdk_sock_request *req;
1355 	int i;
1356 	ssize_t rc, sent;
1357 	unsigned int offset;
1358 	size_t len;
1359 	bool is_zcopy = false;
1360 
1361 	/* Can't flush from within a callback or we end up with recursive calls */
1362 	if (sock->cb_cnt > 0) {
1363 		errno = EAGAIN;
1364 		return -1;
1365 	}
1366 
1367 #ifdef SPDK_ZEROCOPY
1368 	if (psock->zcopy) {
1369 		flags = MSG_ZEROCOPY | MSG_NOSIGNAL;
1370 	} else
1371 #endif
1372 	{
1373 		flags = MSG_NOSIGNAL;
1374 	}
1375 
1376 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags);
1377 	if (iovcnt == 0) {
1378 		return 0;
1379 	}
1380 
1381 #ifdef SPDK_ZEROCOPY
1382 	is_zcopy = flags & MSG_ZEROCOPY;
1383 #endif
1384 
1385 	/* Perform the vectored write */
1386 	msg.msg_iov = iovs;
1387 	msg.msg_iovlen = iovcnt;
1388 
1389 	if (psock->ssl) {
1390 		rc = SSL_writev(psock->ssl, iovs, iovcnt);
1391 	} else {
1392 		rc = sendmsg(psock->fd, &msg, flags);
1393 	}
1394 	if (rc <= 0) {
1395 		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
1396 			errno = EAGAIN;
1397 		}
1398 		return -1;
1399 	}
1400 
1401 	sent = rc;
1402 
1403 	if (is_zcopy) {
1404 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
1405 		 * req->internal.offset, so sendmsg_idx should not be zero  */
1406 		if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
1407 			psock->sendmsg_idx = 1;
1408 		} else {
1409 			psock->sendmsg_idx++;
1410 		}
1411 	}
1412 
1413 	/* Consume the requests that were actually written */
1414 	req = TAILQ_FIRST(&sock->queued_reqs);
1415 	while (req) {
1416 		offset = req->internal.offset;
1417 
1418 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
1419 		req->internal.is_zcopy = is_zcopy;
1420 
1421 		for (i = 0; i < req->iovcnt; i++) {
1422 			/* Advance by the offset first */
1423 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
1424 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
1425 				continue;
1426 			}
1427 
1428 			/* Calculate the remaining length of this element */
1429 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
1430 
1431 			if (len > (size_t)rc) {
1432 				/* This element was partially sent. */
1433 				req->internal.offset += rc;
1434 				return sent;
1435 			}
1436 
1437 			offset = 0;
1438 			req->internal.offset += len;
1439 			rc -= len;
1440 		}
1441 
1442 		/* Handled a full request. */
1443 		spdk_sock_request_pend(sock, req);
1444 
1445 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) {
1446 			/* The sendmsg syscall above isn't currently asynchronous,
1447 			* so it's already done. */
1448 			retval = spdk_sock_request_put(sock, req, 0);
1449 			if (retval) {
1450 				break;
1451 			}
1452 		} else {
1453 			/* Re-use the offset field to hold the sendmsg call index. The
1454 			 * index is 0 based, so subtract one here because we've already
1455 			 * incremented above. */
1456 			req->internal.offset = psock->sendmsg_idx - 1;
1457 		}
1458 
1459 		if (rc == 0) {
1460 			break;
1461 		}
1462 
1463 		req = TAILQ_FIRST(&sock->queued_reqs);
1464 	}
1465 
1466 	return sent;
1467 }
1468 
1469 static int
1470 posix_sock_flush(struct spdk_sock *sock)
1471 {
1472 #ifdef SPDK_ZEROCOPY
1473 	struct spdk_posix_sock *psock = __posix_sock(sock);
1474 
1475 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
1476 		_sock_check_zcopy(sock);
1477 	}
1478 #endif
1479 
1480 	return _sock_flush(sock);
1481 }
1482 
1483 static ssize_t
1484 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
1485 {
1486 	struct iovec siov[2];
1487 	int sbytes;
1488 	ssize_t bytes;
1489 	struct spdk_posix_sock_group_impl *group;
1490 
1491 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
1492 	if (sbytes < 0) {
1493 		errno = EINVAL;
1494 		return -1;
1495 	} else if (sbytes == 0) {
1496 		errno = EAGAIN;
1497 		return -1;
1498 	}
1499 
1500 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
1501 
1502 	if (bytes == 0) {
1503 		/* The only way this happens is if diov is 0 length */
1504 		errno = EINVAL;
1505 		return -1;
1506 	}
1507 
1508 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
1509 
1510 	/* If we drained the pipe, mark it appropriately */
1511 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
1512 		assert(sock->pipe_has_data == true);
1513 
1514 		group = __posix_group_impl(sock->base.group_impl);
1515 		if (group && !sock->socket_has_data) {
1516 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1517 		}
1518 
1519 		sock->pipe_has_data = false;
1520 	}
1521 
1522 	return bytes;
1523 }
1524 
1525 static inline ssize_t
1526 posix_sock_read(struct spdk_posix_sock *sock)
1527 {
1528 	struct iovec iov[2];
1529 	int bytes_avail, bytes_recvd;
1530 	struct spdk_posix_sock_group_impl *group;
1531 
1532 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
1533 
1534 	if (bytes_avail <= 0) {
1535 		return bytes_avail;
1536 	}
1537 
1538 	if (sock->ssl) {
1539 		bytes_recvd = SSL_readv(sock->ssl, iov, 2);
1540 	} else {
1541 		bytes_recvd = readv(sock->fd, iov, 2);
1542 	}
1543 
1544 	assert(sock->pipe_has_data == false);
1545 
1546 	if (bytes_recvd <= 0) {
1547 		/* Errors count as draining the socket data */
1548 		if (sock->base.group_impl && sock->socket_has_data) {
1549 			group = __posix_group_impl(sock->base.group_impl);
1550 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1551 		}
1552 
1553 		sock->socket_has_data = false;
1554 
1555 		return bytes_recvd;
1556 	}
1557 
1558 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
1559 
1560 #if DEBUG
1561 	if (sock->base.group_impl) {
1562 		assert(sock->socket_has_data == true);
1563 	}
1564 #endif
1565 
1566 	sock->pipe_has_data = true;
1567 	if (bytes_recvd < bytes_avail) {
1568 		/* We drained the kernel socket entirely. */
1569 		sock->socket_has_data = false;
1570 	}
1571 
1572 	return bytes_recvd;
1573 }
1574 
1575 static ssize_t
1576 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1577 {
1578 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1579 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
1580 	int rc, i;
1581 	size_t len;
1582 
1583 	if (sock->recv_pipe == NULL) {
1584 		assert(sock->pipe_has_data == false);
1585 		if (group && sock->socket_has_data) {
1586 			sock->socket_has_data = false;
1587 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1588 		}
1589 		if (sock->ssl) {
1590 			return SSL_readv(sock->ssl, iov, iovcnt);
1591 		} else {
1592 			return readv(sock->fd, iov, iovcnt);
1593 		}
1594 	}
1595 
1596 	/* If the socket is not in a group, we must assume it always has
1597 	 * data waiting for us because it is not epolled */
1598 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
1599 		/* If the user is receiving a sufficiently large amount of data,
1600 		 * receive directly to their buffers. */
1601 		len = 0;
1602 		for (i = 0; i < iovcnt; i++) {
1603 			len += iov[i].iov_len;
1604 		}
1605 
1606 		if (len >= MIN_SOCK_PIPE_SIZE) {
1607 			/* TODO: Should this detect if kernel socket is drained? */
1608 			if (sock->ssl) {
1609 				return SSL_readv(sock->ssl, iov, iovcnt);
1610 			} else {
1611 				return readv(sock->fd, iov, iovcnt);
1612 			}
1613 		}
1614 
1615 		/* Otherwise, do a big read into our pipe */
1616 		rc = posix_sock_read(sock);
1617 		if (rc <= 0) {
1618 			return rc;
1619 		}
1620 	}
1621 
1622 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
1623 }
1624 
1625 static ssize_t
1626 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1627 {
1628 	struct iovec iov[1];
1629 
1630 	iov[0].iov_base = buf;
1631 	iov[0].iov_len = len;
1632 
1633 	return posix_sock_readv(sock, iov, 1);
1634 }
1635 
1636 static ssize_t
1637 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1638 {
1639 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1640 	int rc;
1641 
1642 	/* In order to process a writev, we need to flush any asynchronous writes
1643 	 * first. */
1644 	rc = _sock_flush(_sock);
1645 	if (rc < 0) {
1646 		return rc;
1647 	}
1648 
1649 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1650 		/* We weren't able to flush all requests */
1651 		errno = EAGAIN;
1652 		return -1;
1653 	}
1654 
1655 	if (sock->ssl) {
1656 		return SSL_writev(sock->ssl, iov, iovcnt);
1657 	} else {
1658 		return writev(sock->fd, iov, iovcnt);
1659 	}
1660 }
1661 
1662 static int
1663 posix_sock_recv_next(struct spdk_sock *_sock, void **buf, void **ctx)
1664 {
1665 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1666 	struct iovec iov;
1667 	ssize_t rc;
1668 
1669 	if (sock->recv_pipe != NULL) {
1670 		errno = ENOTSUP;
1671 		return -1;
1672 	}
1673 
1674 	iov.iov_len = spdk_sock_group_get_buf(_sock->group_impl->group, &iov.iov_base, ctx);
1675 	if (iov.iov_len == 0) {
1676 		errno = ENOBUFS;
1677 		return -1;
1678 	}
1679 
1680 	rc = posix_sock_readv(_sock, &iov, 1);
1681 	if (rc <= 0) {
1682 		spdk_sock_group_provide_buf(_sock->group_impl->group, iov.iov_base, iov.iov_len, *ctx);
1683 		return rc;
1684 	}
1685 
1686 	*buf = iov.iov_base;
1687 
1688 	return rc;
1689 }
1690 
1691 static void
1692 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1693 {
1694 	int rc;
1695 
1696 	spdk_sock_request_queue(sock, req);
1697 
1698 	/* If there are a sufficient number queued, just flush them out immediately. */
1699 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1700 		rc = _sock_flush(sock);
1701 		if (rc < 0 && errno != EAGAIN) {
1702 			spdk_sock_abort_requests(sock);
1703 		}
1704 	}
1705 }
1706 
1707 static int
1708 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1709 {
1710 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1711 	int val;
1712 	int rc;
1713 
1714 	assert(sock != NULL);
1715 
1716 	val = nbytes;
1717 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1718 	if (rc != 0) {
1719 		return -1;
1720 	}
1721 	return 0;
1722 }
1723 
1724 static bool
1725 posix_sock_is_ipv6(struct spdk_sock *_sock)
1726 {
1727 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1728 	struct sockaddr_storage sa;
1729 	socklen_t salen;
1730 	int rc;
1731 
1732 	assert(sock != NULL);
1733 
1734 	memset(&sa, 0, sizeof sa);
1735 	salen = sizeof sa;
1736 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1737 	if (rc != 0) {
1738 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1739 		return false;
1740 	}
1741 
1742 	return (sa.ss_family == AF_INET6);
1743 }
1744 
1745 static bool
1746 posix_sock_is_ipv4(struct spdk_sock *_sock)
1747 {
1748 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1749 	struct sockaddr_storage sa;
1750 	socklen_t salen;
1751 	int rc;
1752 
1753 	assert(sock != NULL);
1754 
1755 	memset(&sa, 0, sizeof sa);
1756 	salen = sizeof sa;
1757 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1758 	if (rc != 0) {
1759 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1760 		return false;
1761 	}
1762 
1763 	return (sa.ss_family == AF_INET);
1764 }
1765 
1766 static bool
1767 posix_sock_is_connected(struct spdk_sock *_sock)
1768 {
1769 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1770 	uint8_t byte;
1771 	int rc;
1772 
1773 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1774 	if (rc == 0) {
1775 		return false;
1776 	}
1777 
1778 	if (rc < 0) {
1779 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1780 			return true;
1781 		}
1782 
1783 		return false;
1784 	}
1785 
1786 	return true;
1787 }
1788 
1789 static struct spdk_sock_group_impl *
1790 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1791 {
1792 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1793 	struct spdk_sock_group_impl *group_impl;
1794 
1795 	if (sock->placement_id != -1) {
1796 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint);
1797 		return group_impl;
1798 	}
1799 
1800 	return NULL;
1801 }
1802 
1803 static struct spdk_sock_group_impl *
1804 _sock_group_impl_create(uint32_t enable_placement_id)
1805 {
1806 	struct spdk_posix_sock_group_impl *group_impl;
1807 	int fd;
1808 
1809 #if defined(SPDK_EPOLL)
1810 	fd = epoll_create1(0);
1811 #elif defined(SPDK_KEVENT)
1812 	fd = kqueue();
1813 #endif
1814 	if (fd == -1) {
1815 		return NULL;
1816 	}
1817 
1818 	group_impl = calloc(1, sizeof(*group_impl));
1819 	if (group_impl == NULL) {
1820 		SPDK_ERRLOG("group_impl allocation failed\n");
1821 		close(fd);
1822 		return NULL;
1823 	}
1824 
1825 	group_impl->pipe_group = spdk_pipe_group_create();
1826 	if (group_impl->pipe_group == NULL) {
1827 		SPDK_ERRLOG("pipe_group allocation failed\n");
1828 		free(group_impl);
1829 		close(fd);
1830 		return NULL;
1831 	}
1832 
1833 	group_impl->fd = fd;
1834 	TAILQ_INIT(&group_impl->socks_with_data);
1835 	group_impl->placement_id = -1;
1836 
1837 	if (enable_placement_id == PLACEMENT_CPU) {
1838 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1839 		group_impl->placement_id = spdk_env_get_current_core();
1840 	}
1841 
1842 	return &group_impl->base;
1843 }
1844 
1845 static struct spdk_sock_group_impl *
1846 posix_sock_group_impl_create(void)
1847 {
1848 	return _sock_group_impl_create(g_posix_impl_opts.enable_placement_id);
1849 }
1850 
1851 static struct spdk_sock_group_impl *
1852 ssl_sock_group_impl_create(void)
1853 {
1854 	return _sock_group_impl_create(g_ssl_impl_opts.enable_placement_id);
1855 }
1856 
1857 static void
1858 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1859 		int placement_id)
1860 {
1861 #if defined(SO_MARK)
1862 	int rc;
1863 
1864 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1865 			&placement_id, sizeof(placement_id));
1866 	if (rc != 0) {
1867 		/* Not fatal */
1868 		SPDK_ERRLOG("Error setting SO_MARK\n");
1869 		return;
1870 	}
1871 
1872 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1873 	if (rc != 0) {
1874 		/* Not fatal */
1875 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1876 		return;
1877 	}
1878 
1879 	sock->placement_id = placement_id;
1880 #endif
1881 }
1882 
1883 static void
1884 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1885 {
1886 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1887 
1888 	if (group->placement_id == -1) {
1889 		group->placement_id = spdk_sock_map_find_free(&g_map);
1890 
1891 		/* If a free placement id is found, update existing sockets in this group */
1892 		if (group->placement_id != -1) {
1893 			struct spdk_sock  *sock, *tmp;
1894 
1895 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1896 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1897 			}
1898 		}
1899 	}
1900 
1901 	if (group->placement_id != -1) {
1902 		/*
1903 		 * group placement id is already determined for this poll group.
1904 		 * Mark socket with group's placement id.
1905 		 */
1906 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1907 	}
1908 }
1909 
1910 static int
1911 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1912 {
1913 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1914 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1915 	int rc;
1916 
1917 #if defined(SPDK_EPOLL)
1918 	struct epoll_event event;
1919 
1920 	memset(&event, 0, sizeof(event));
1921 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1922 	event.events = EPOLLIN | EPOLLERR;
1923 	event.data.ptr = sock;
1924 
1925 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1926 #elif defined(SPDK_KEVENT)
1927 	struct kevent event;
1928 	struct timespec ts = {0};
1929 
1930 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1931 
1932 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1933 #endif
1934 
1935 	if (rc != 0) {
1936 		return rc;
1937 	}
1938 
1939 	/* switched from another polling group due to scheduling */
1940 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1941 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1942 		sock->pipe_has_data = true;
1943 		sock->socket_has_data = false;
1944 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1945 	} else if (sock->recv_pipe != NULL) {
1946 		rc = spdk_pipe_group_add(group->pipe_group, sock->recv_pipe);
1947 		assert(rc == 0);
1948 	}
1949 
1950 	if (_sock->impl_opts.enable_placement_id == PLACEMENT_MARK) {
1951 		posix_sock_update_mark(_group, _sock);
1952 	} else if (sock->placement_id != -1) {
1953 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1954 		if (rc != 0) {
1955 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1956 			/* Do not treat this as an error. The system will continue running. */
1957 		}
1958 	}
1959 
1960 	return rc;
1961 }
1962 
1963 static int
1964 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1965 {
1966 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1967 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1968 	int rc;
1969 
1970 	if (sock->pipe_has_data || sock->socket_has_data) {
1971 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1972 		sock->pipe_has_data = false;
1973 		sock->socket_has_data = false;
1974 	} else if (sock->recv_pipe != NULL) {
1975 		rc = spdk_pipe_group_remove(group->pipe_group, sock->recv_pipe);
1976 		assert(rc == 0);
1977 	}
1978 
1979 	if (sock->placement_id != -1) {
1980 		spdk_sock_map_release(&g_map, sock->placement_id);
1981 	}
1982 
1983 #if defined(SPDK_EPOLL)
1984 	struct epoll_event event;
1985 
1986 	/* Event parameter is ignored but some old kernel version still require it. */
1987 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1988 #elif defined(SPDK_KEVENT)
1989 	struct kevent event;
1990 	struct timespec ts = {0};
1991 
1992 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1993 
1994 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1995 	if (rc == 0 && event.flags & EV_ERROR) {
1996 		rc = -1;
1997 		errno = event.data;
1998 	}
1999 #endif
2000 
2001 	spdk_sock_abort_requests(_sock);
2002 
2003 	return rc;
2004 }
2005 
2006 static int
2007 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
2008 			   struct spdk_sock **socks)
2009 {
2010 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
2011 	struct spdk_sock *sock, *tmp;
2012 	int num_events, i, rc;
2013 	struct spdk_posix_sock *psock, *ptmp;
2014 #if defined(SPDK_EPOLL)
2015 	struct epoll_event events[MAX_EVENTS_PER_POLL];
2016 #elif defined(SPDK_KEVENT)
2017 	struct kevent events[MAX_EVENTS_PER_POLL];
2018 	struct timespec ts = {0};
2019 #endif
2020 
2021 #ifdef SPDK_ZEROCOPY
2022 	/* When all of the following conditions are met
2023 	 * - non-blocking socket
2024 	 * - zero copy is enabled
2025 	 * - interrupts suppressed (i.e. busy polling)
2026 	 * - the NIC tx queue is full at the time sendmsg() is called
2027 	 * - epoll_wait determines there is an EPOLLIN event for the socket
2028 	 * then we can get into a situation where data we've sent is queued
2029 	 * up in the kernel network stack, but interrupts have been suppressed
2030 	 * because other traffic is flowing so the kernel misses the signal
2031 	 * to flush the software tx queue. If there wasn't incoming data
2032 	 * pending on the socket, then epoll_wait would have been sufficient
2033 	 * to kick off the send operation, but since there is a pending event
2034 	 * epoll_wait does not trigger the necessary operation.
2035 	 *
2036 	 * We deal with this by checking for all of the above conditions and
2037 	 * additionally looking for EPOLLIN events that were not consumed from
2038 	 * the last poll loop. We take this to mean that the upper layer is
2039 	 * unable to consume them because it is blocked waiting for resources
2040 	 * to free up, and those resources are most likely freed in response
2041 	 * to a pending asynchronous write completing.
2042 	 *
2043 	 * Additionally, sockets that have the same placement_id actually share
2044 	 * an underlying hardware queue. That means polling one of them is
2045 	 * equivalent to polling all of them. As a quick mechanism to avoid
2046 	 * making extra poll() calls, stash the last placement_id during the loop
2047 	 * and only poll if it's not the same. The overwhelmingly common case
2048 	 * is that all sockets in this list have the same placement_id because
2049 	 * SPDK is intentionally grouping sockets by that value, so even
2050 	 * though this won't stop all extra calls to poll(), it's very fast
2051 	 * and will catch all of them in practice.
2052 	 */
2053 	int last_placement_id = -1;
2054 
2055 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
2056 		if (psock->zcopy && psock->placement_id >= 0 &&
2057 		    psock->placement_id != last_placement_id) {
2058 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
2059 
2060 			poll(&pfd, 1, 0);
2061 			last_placement_id = psock->placement_id;
2062 		}
2063 	}
2064 #endif
2065 
2066 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
2067 	 * a completion callback could remove the sock from the
2068 	 * group. */
2069 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
2070 		rc = _sock_flush(sock);
2071 		if (rc < 0 && errno != EAGAIN) {
2072 			spdk_sock_abort_requests(sock);
2073 		}
2074 	}
2075 
2076 	assert(max_events > 0);
2077 
2078 #if defined(SPDK_EPOLL)
2079 	num_events = epoll_wait(group->fd, events, max_events, 0);
2080 #elif defined(SPDK_KEVENT)
2081 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
2082 #endif
2083 
2084 	if (num_events == -1) {
2085 		return -1;
2086 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
2087 		sock = TAILQ_FIRST(&_group->socks);
2088 		psock = __posix_sock(sock);
2089 		/* poll() is called here to busy poll the queue associated with
2090 		 * first socket in list and potentially reap incoming data.
2091 		 */
2092 		if (sock->opts.priority) {
2093 			struct pollfd pfd = {0, 0, 0};
2094 
2095 			pfd.fd = psock->fd;
2096 			pfd.events = POLLIN | POLLERR;
2097 			poll(&pfd, 1, 0);
2098 		}
2099 	}
2100 
2101 	for (i = 0; i < num_events; i++) {
2102 #if defined(SPDK_EPOLL)
2103 		sock = events[i].data.ptr;
2104 		psock = __posix_sock(sock);
2105 
2106 #ifdef SPDK_ZEROCOPY
2107 		if (events[i].events & EPOLLERR) {
2108 			rc = _sock_check_zcopy(sock);
2109 			/* If the socket was closed or removed from
2110 			 * the group in response to a send ack, don't
2111 			 * add it to the array here. */
2112 			if (rc || sock->cb_fn == NULL) {
2113 				continue;
2114 			}
2115 		}
2116 #endif
2117 		if ((events[i].events & EPOLLIN) == 0) {
2118 			continue;
2119 		}
2120 
2121 #elif defined(SPDK_KEVENT)
2122 		sock = events[i].udata;
2123 		psock = __posix_sock(sock);
2124 #endif
2125 
2126 		/* If the socket is not already in the list, add it now */
2127 		if (!psock->socket_has_data && !psock->pipe_has_data) {
2128 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
2129 		}
2130 		psock->socket_has_data = true;
2131 	}
2132 
2133 	num_events = 0;
2134 
2135 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
2136 		if (num_events == max_events) {
2137 			break;
2138 		}
2139 
2140 		/* If the socket's cb_fn is NULL, just remove it from the
2141 		 * list and do not add it to socks array */
2142 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
2143 			psock->socket_has_data = false;
2144 			psock->pipe_has_data = false;
2145 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
2146 			continue;
2147 		}
2148 
2149 		socks[num_events++] = &psock->base;
2150 	}
2151 
2152 	/* Cycle the has_data list so that each time we poll things aren't
2153 	 * in the same order. Say we have 6 sockets in the list, named as follows:
2154 	 * A B C D E F
2155 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
2156 	 * psock currently points at D. We want to rearrange the list to the following:
2157 	 * D E F A B C
2158 	 *
2159 	 * The variables below are named according to this example to make it easier to
2160 	 * follow the swaps.
2161 	 */
2162 	if (psock != NULL) {
2163 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
2164 
2165 		/* Capture pointers to the elements we need */
2166 		pd = psock;
2167 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
2168 		pa = TAILQ_FIRST(&group->socks_with_data);
2169 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
2170 
2171 		/* Break the link between C and D */
2172 		pc->link.tqe_next = NULL;
2173 
2174 		/* Connect F to A */
2175 		pf->link.tqe_next = pa;
2176 		pa->link.tqe_prev = &pf->link.tqe_next;
2177 
2178 		/* Fix up the list first/last pointers */
2179 		group->socks_with_data.tqh_first = pd;
2180 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
2181 
2182 		/* D is in front of the list, make tqe prev pointer point to the head of list */
2183 		pd->link.tqe_prev = &group->socks_with_data.tqh_first;
2184 	}
2185 
2186 	return num_events;
2187 }
2188 
2189 static int
2190 posix_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events,
2191 		spdk_interrupt_fn fn, void *arg, const char *name)
2192 {
2193 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
2194 
2195 	group->intr = spdk_interrupt_register_for_events(group->fd, events, fn, arg, name);
2196 
2197 	return group->intr ? 0 : -1;
2198 }
2199 
2200 static void
2201 posix_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group)
2202 {
2203 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
2204 
2205 	spdk_interrupt_unregister(&group->intr);
2206 }
2207 
2208 static int
2209 _sock_group_impl_close(struct spdk_sock_group_impl *_group, uint32_t enable_placement_id)
2210 {
2211 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
2212 	int rc;
2213 
2214 	if (enable_placement_id == PLACEMENT_CPU) {
2215 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
2216 	}
2217 
2218 	spdk_pipe_group_destroy(group->pipe_group);
2219 	rc = close(group->fd);
2220 	free(group);
2221 	return rc;
2222 }
2223 
2224 static int
2225 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
2226 {
2227 	return _sock_group_impl_close(_group, g_posix_impl_opts.enable_placement_id);
2228 }
2229 
2230 static int
2231 ssl_sock_group_impl_close(struct spdk_sock_group_impl *_group)
2232 {
2233 	return _sock_group_impl_close(_group, g_ssl_impl_opts.enable_placement_id);
2234 }
2235 
2236 static struct spdk_net_impl g_posix_net_impl = {
2237 	.name		= "posix",
2238 	.getaddr	= posix_sock_getaddr,
2239 	.get_interface_name = posix_sock_get_interface_name,
2240 	.get_numa_id	= posix_sock_get_numa_id,
2241 	.connect	= posix_sock_connect,
2242 	.listen		= posix_sock_listen,
2243 	.accept		= posix_sock_accept,
2244 	.close		= posix_sock_close,
2245 	.recv		= posix_sock_recv,
2246 	.readv		= posix_sock_readv,
2247 	.writev		= posix_sock_writev,
2248 	.recv_next	= posix_sock_recv_next,
2249 	.writev_async	= posix_sock_writev_async,
2250 	.flush		= posix_sock_flush,
2251 	.set_recvlowat	= posix_sock_set_recvlowat,
2252 	.set_recvbuf	= posix_sock_set_recvbuf,
2253 	.set_sendbuf	= posix_sock_set_sendbuf,
2254 	.is_ipv6	= posix_sock_is_ipv6,
2255 	.is_ipv4	= posix_sock_is_ipv4,
2256 	.is_connected	= posix_sock_is_connected,
2257 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
2258 	.group_impl_create	= posix_sock_group_impl_create,
2259 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
2260 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
2261 	.group_impl_poll	= posix_sock_group_impl_poll,
2262 	.group_impl_register_interrupt     = posix_sock_group_impl_register_interrupt,
2263 	.group_impl_unregister_interrupt  = posix_sock_group_impl_unregister_interrupt,
2264 	.group_impl_close	= posix_sock_group_impl_close,
2265 	.get_opts	= posix_sock_impl_get_opts,
2266 	.set_opts	= posix_sock_impl_set_opts,
2267 };
2268 
2269 SPDK_NET_IMPL_REGISTER_DEFAULT(posix, &g_posix_net_impl);
2270 
2271 static struct spdk_sock *
2272 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
2273 {
2274 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true);
2275 }
2276 
2277 static struct spdk_sock *
2278 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
2279 {
2280 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true);
2281 }
2282 
2283 static struct spdk_sock *
2284 ssl_sock_accept(struct spdk_sock *_sock)
2285 {
2286 	return _posix_sock_accept(_sock, true);
2287 }
2288 
2289 static struct spdk_net_impl g_ssl_net_impl = {
2290 	.name		= "ssl",
2291 	.getaddr	= posix_sock_getaddr,
2292 	.get_interface_name = posix_sock_get_interface_name,
2293 	.get_numa_id	= posix_sock_get_numa_id,
2294 	.connect	= ssl_sock_connect,
2295 	.listen		= ssl_sock_listen,
2296 	.accept		= ssl_sock_accept,
2297 	.close		= posix_sock_close,
2298 	.recv		= posix_sock_recv,
2299 	.readv		= posix_sock_readv,
2300 	.writev		= posix_sock_writev,
2301 	.recv_next	= posix_sock_recv_next,
2302 	.writev_async	= posix_sock_writev_async,
2303 	.flush		= posix_sock_flush,
2304 	.set_recvlowat	= posix_sock_set_recvlowat,
2305 	.set_recvbuf	= posix_sock_set_recvbuf,
2306 	.set_sendbuf	= posix_sock_set_sendbuf,
2307 	.is_ipv6	= posix_sock_is_ipv6,
2308 	.is_ipv4	= posix_sock_is_ipv4,
2309 	.is_connected	= posix_sock_is_connected,
2310 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
2311 	.group_impl_create	= ssl_sock_group_impl_create,
2312 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
2313 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
2314 	.group_impl_poll	= posix_sock_group_impl_poll,
2315 	.group_impl_register_interrupt    = posix_sock_group_impl_register_interrupt,
2316 	.group_impl_unregister_interrupt  = posix_sock_group_impl_unregister_interrupt,
2317 	.group_impl_close	= ssl_sock_group_impl_close,
2318 	.get_opts	= ssl_sock_impl_get_opts,
2319 	.set_opts	= ssl_sock_impl_set_opts,
2320 };
2321 
2322 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl);
2323 SPDK_LOG_REGISTER_COMPONENT(sock_posix)
2324