xref: /spdk/module/sock/posix/posix.c (revision d33497d3f4a2b8f96e5764a91b516ba7ba11e316)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation. All rights reserved.
3  *   Copyright (c) 2020, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #if defined(__FreeBSD__)
10 #include <sys/event.h>
11 #define SPDK_KEVENT
12 #else
13 #include <sys/epoll.h>
14 #define SPDK_EPOLL
15 #endif
16 
17 #if defined(__linux__)
18 #include <linux/errqueue.h>
19 #endif
20 
21 #include "spdk/env.h"
22 #include "spdk/log.h"
23 #include "spdk/pipe.h"
24 #include "spdk/sock.h"
25 #include "spdk/util.h"
26 #include "spdk/string.h"
27 #include "spdk_internal/sock.h"
28 #include "../sock_kernel.h"
29 
30 #include "openssl/crypto.h"
31 #include "openssl/err.h"
32 #include "openssl/ssl.h"
33 
34 #define MAX_TMPBUF 1024
35 #define PORTNUMLEN 32
36 
37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
38 #define SPDK_ZEROCOPY
39 #endif
40 
41 struct spdk_posix_sock {
42 	struct spdk_sock	base;
43 	int			fd;
44 
45 	uint32_t		sendmsg_idx;
46 
47 	struct spdk_pipe	*recv_pipe;
48 	void			*recv_buf;
49 	int			recv_buf_sz;
50 	bool			pipe_has_data;
51 	bool			socket_has_data;
52 	bool			zcopy;
53 
54 	int			placement_id;
55 
56 	SSL_CTX			*ctx;
57 	SSL			*ssl;
58 
59 	TAILQ_ENTRY(spdk_posix_sock)	link;
60 };
61 
62 TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);
63 
64 struct spdk_posix_sock_group_impl {
65 	struct spdk_sock_group_impl	base;
66 	int				fd;
67 	struct spdk_has_data_list	socks_with_data;
68 	int				placement_id;
69 };
70 
71 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
72 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
73 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
74 	.enable_recv_pipe = true,
75 	.enable_quickack = false,
76 	.enable_placement_id = PLACEMENT_NONE,
77 	.enable_zerocopy_send_server = true,
78 	.enable_zerocopy_send_client = false,
79 	.zerocopy_threshold = 0,
80 	.tls_version = 0,
81 	.enable_ktls = false
82 };
83 
84 static struct spdk_sock_map g_map = {
85 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
86 	.mtx = PTHREAD_MUTEX_INITIALIZER
87 };
88 
89 __attribute((destructor)) static void
90 posix_sock_map_cleanup(void)
91 {
92 	spdk_sock_map_cleanup(&g_map);
93 }
94 
95 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
96 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
97 
98 static void
99 posix_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
100 			  size_t len)
101 {
102 #define FIELD_OK(field) \
103 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
104 
105 #define SET_FIELD(field) \
106 	if (FIELD_OK(field)) { \
107 		dest->field = src->field; \
108 	}
109 
110 	SET_FIELD(recv_buf_size);
111 	SET_FIELD(send_buf_size);
112 	SET_FIELD(enable_recv_pipe);
113 	SET_FIELD(enable_zerocopy_send);
114 	SET_FIELD(enable_quickack);
115 	SET_FIELD(enable_placement_id);
116 	SET_FIELD(enable_zerocopy_send_server);
117 	SET_FIELD(enable_zerocopy_send_client);
118 	SET_FIELD(zerocopy_threshold);
119 	SET_FIELD(tls_version);
120 	SET_FIELD(enable_ktls);
121 
122 #undef SET_FIELD
123 #undef FIELD_OK
124 }
125 
126 static int
127 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
128 {
129 	if (!opts || !len) {
130 		errno = EINVAL;
131 		return -1;
132 	}
133 
134 	assert(sizeof(*opts) >= *len);
135 	memset(opts, 0, *len);
136 
137 	posix_sock_copy_impl_opts(opts, &g_spdk_posix_sock_impl_opts, *len);
138 	*len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
139 
140 	return 0;
141 }
142 
143 static int
144 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
145 {
146 	if (!opts) {
147 		errno = EINVAL;
148 		return -1;
149 	}
150 
151 	assert(sizeof(*opts) >= len);
152 	posix_sock_copy_impl_opts(&g_spdk_posix_sock_impl_opts, opts, len);
153 
154 	return 0;
155 }
156 
157 static void
158 posix_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
159 {
160 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
161 	memcpy(dest, &g_spdk_posix_sock_impl_opts, sizeof(*dest));
162 
163 	if (opts->impl_opts != NULL) {
164 		assert(sizeof(*dest) >= opts->impl_opts_size);
165 		posix_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
166 	}
167 }
168 
169 static int
170 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
171 		   char *caddr, int clen, uint16_t *cport)
172 {
173 	struct spdk_posix_sock *sock = __posix_sock(_sock);
174 	struct sockaddr_storage sa;
175 	socklen_t salen;
176 	int rc;
177 
178 	assert(sock != NULL);
179 
180 	memset(&sa, 0, sizeof sa);
181 	salen = sizeof sa;
182 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
183 	if (rc != 0) {
184 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
185 		return -1;
186 	}
187 
188 	switch (sa.ss_family) {
189 	case AF_UNIX:
190 		/* Acceptable connection types that don't have IPs */
191 		return 0;
192 	case AF_INET:
193 	case AF_INET6:
194 		/* Code below will get IP addresses */
195 		break;
196 	default:
197 		/* Unsupported socket family */
198 		return -1;
199 	}
200 
201 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
202 	if (rc != 0) {
203 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
204 		return -1;
205 	}
206 
207 	if (sport) {
208 		if (sa.ss_family == AF_INET) {
209 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
210 		} else if (sa.ss_family == AF_INET6) {
211 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
212 		}
213 	}
214 
215 	memset(&sa, 0, sizeof sa);
216 	salen = sizeof sa;
217 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
218 	if (rc != 0) {
219 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
220 		return -1;
221 	}
222 
223 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
224 	if (rc != 0) {
225 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
226 		return -1;
227 	}
228 
229 	if (cport) {
230 		if (sa.ss_family == AF_INET) {
231 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
232 		} else if (sa.ss_family == AF_INET6) {
233 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
234 		}
235 	}
236 
237 	return 0;
238 }
239 
240 enum posix_sock_create_type {
241 	SPDK_SOCK_CREATE_LISTEN,
242 	SPDK_SOCK_CREATE_CONNECT,
243 };
244 
245 static int
246 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
247 {
248 	uint8_t *new_buf;
249 	struct spdk_pipe *new_pipe;
250 	struct iovec siov[2];
251 	struct iovec diov[2];
252 	int sbytes;
253 	ssize_t bytes;
254 
255 	if (sock->recv_buf_sz == sz) {
256 		return 0;
257 	}
258 
259 	/* If the new size is 0, just free the pipe */
260 	if (sz == 0) {
261 		spdk_pipe_destroy(sock->recv_pipe);
262 		free(sock->recv_buf);
263 		sock->recv_pipe = NULL;
264 		sock->recv_buf = NULL;
265 		return 0;
266 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
267 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
268 		return -1;
269 	}
270 
271 	/* Round up to next 64 byte multiple */
272 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
273 	if (!new_buf) {
274 		SPDK_ERRLOG("socket recv buf allocation failed\n");
275 		return -ENOMEM;
276 	}
277 
278 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
279 	if (new_pipe == NULL) {
280 		SPDK_ERRLOG("socket pipe allocation failed\n");
281 		free(new_buf);
282 		return -ENOMEM;
283 	}
284 
285 	if (sock->recv_pipe != NULL) {
286 		/* Pull all of the data out of the old pipe */
287 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
288 		if (sbytes > sz) {
289 			/* Too much data to fit into the new pipe size */
290 			spdk_pipe_destroy(new_pipe);
291 			free(new_buf);
292 			return -EINVAL;
293 		}
294 
295 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
296 		assert(sbytes == sz);
297 
298 		bytes = spdk_iovcpy(siov, 2, diov, 2);
299 		spdk_pipe_writer_advance(new_pipe, bytes);
300 
301 		spdk_pipe_destroy(sock->recv_pipe);
302 		free(sock->recv_buf);
303 	}
304 
305 	sock->recv_buf_sz = sz;
306 	sock->recv_buf = new_buf;
307 	sock->recv_pipe = new_pipe;
308 
309 	return 0;
310 }
311 
312 static int
313 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
314 {
315 	struct spdk_posix_sock *sock = __posix_sock(_sock);
316 	int rc;
317 
318 	assert(sock != NULL);
319 
320 	if (_sock->impl_opts.enable_recv_pipe) {
321 		rc = posix_sock_alloc_pipe(sock, sz);
322 		if (rc) {
323 			return rc;
324 		}
325 	}
326 
327 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
328 	if (sz < MIN_SO_RCVBUF_SIZE) {
329 		sz = MIN_SO_RCVBUF_SIZE;
330 	}
331 
332 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
333 	if (rc < 0) {
334 		return rc;
335 	}
336 
337 	_sock->impl_opts.recv_buf_size = sz;
338 
339 	return 0;
340 }
341 
342 static int
343 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
344 {
345 	struct spdk_posix_sock *sock = __posix_sock(_sock);
346 	int rc;
347 
348 	assert(sock != NULL);
349 
350 	if (sz < MIN_SO_SNDBUF_SIZE) {
351 		sz = MIN_SO_SNDBUF_SIZE;
352 	}
353 
354 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
355 	if (rc < 0) {
356 		return rc;
357 	}
358 
359 	_sock->impl_opts.send_buf_size = sz;
360 
361 	return 0;
362 }
363 
364 static void
365 posix_sock_init(struct spdk_posix_sock *sock, bool enable_zero_copy)
366 {
367 #if defined(SPDK_ZEROCOPY) || defined(__linux__)
368 	int flag;
369 	int rc;
370 #endif
371 
372 #if defined(SPDK_ZEROCOPY)
373 	flag = 1;
374 
375 	if (enable_zero_copy) {
376 		/* Try to turn on zero copy sends */
377 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
378 		if (rc == 0) {
379 			sock->zcopy = true;
380 		}
381 	}
382 #endif
383 
384 #if defined(__linux__)
385 	flag = 1;
386 
387 	if (sock->base.impl_opts.enable_quickack) {
388 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
389 		if (rc != 0) {
390 			SPDK_ERRLOG("quickack was failed to set\n");
391 		}
392 	}
393 
394 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
395 				   &sock->placement_id);
396 
397 	if (sock->base.impl_opts.enable_placement_id == PLACEMENT_MARK) {
398 		/* Save placement_id */
399 		spdk_sock_map_insert(&g_map, sock->placement_id, NULL);
400 	}
401 #endif
402 }
403 
404 static struct spdk_posix_sock *
405 posix_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
406 {
407 	struct spdk_posix_sock *sock;
408 
409 	sock = calloc(1, sizeof(*sock));
410 	if (sock == NULL) {
411 		SPDK_ERRLOG("sock allocation failed\n");
412 		return NULL;
413 	}
414 
415 	sock->fd = fd;
416 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
417 	posix_sock_init(sock, enable_zero_copy);
418 
419 	return sock;
420 }
421 
422 static int
423 posix_fd_create(struct addrinfo *res, struct spdk_sock_opts *opts,
424 		struct spdk_sock_impl_opts *impl_opts)
425 {
426 	int fd;
427 	int val = 1;
428 	int rc, sz;
429 #if defined(__linux__)
430 	int to;
431 #endif
432 
433 	fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
434 	if (fd < 0) {
435 		/* error */
436 		return -1;
437 	}
438 
439 	sz = impl_opts->recv_buf_size;
440 	rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
441 	if (rc) {
442 		/* Not fatal */
443 	}
444 
445 	sz = impl_opts->send_buf_size;
446 	rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
447 	if (rc) {
448 		/* Not fatal */
449 	}
450 
451 	rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
452 	if (rc != 0) {
453 		close(fd);
454 		/* error */
455 		return -1;
456 	}
457 	rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
458 	if (rc != 0) {
459 		close(fd);
460 		/* error */
461 		return -1;
462 	}
463 
464 #if defined(SO_PRIORITY)
465 	if (opts->priority) {
466 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
467 		if (rc != 0) {
468 			close(fd);
469 			/* error */
470 			return -1;
471 		}
472 	}
473 #endif
474 
475 	if (res->ai_family == AF_INET6) {
476 		rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
477 		if (rc != 0) {
478 			close(fd);
479 			/* error */
480 			return -1;
481 		}
482 	}
483 
484 	if (opts->ack_timeout) {
485 #if defined(__linux__)
486 		to = opts->ack_timeout;
487 		rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &to, sizeof(to));
488 		if (rc != 0) {
489 			close(fd);
490 			/* error */
491 			return -1;
492 		}
493 #else
494 		SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
495 #endif
496 	}
497 
498 	return fd;
499 }
500 
501 #define PSK_ID  "nqn.2014-08.org.nvmexpress:uuid:f81d4fae-7dec-11d0-a765-00a0c91e6bf6"
502 #define PSK_KEY "1234567890ABCDEF"
503 
504 static unsigned int
505 posix_sock_tls_psk_server_cb(SSL *ssl,
506 			     const char *id,
507 			     unsigned char *psk,
508 			     unsigned int max_psk_len)
509 {
510 	long key_len;
511 	unsigned char *default_psk;
512 
513 	if (PSK_KEY == NULL) {
514 		SPDK_ERRLOG("PSK is not set\n");
515 		goto err;
516 	}
517 	SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK ID %lu\n", strlen(PSK_ID));
518 	if (id == NULL) {
519 		SPDK_ERRLOG("Received empty PSK ID\n");
520 		goto err;
521 	}
522 	SPDK_DEBUGLOG(sock_posix,  "Received PSK ID '%s'\n", id);
523 	if (strcmp(PSK_ID, id) != 0) {
524 		SPDK_ERRLOG("Unknown Client's PSK ID\n");
525 		goto err;
526 	}
527 
528 	SPDK_DEBUGLOG(sock_posix, "Length of Client's PSK KEY %u\n", max_psk_len);
529 	default_psk = OPENSSL_hexstr2buf(PSK_KEY, &key_len);
530 	if (default_psk == NULL) {
531 		SPDK_ERRLOG("Could not unhexlify PSK\n");
532 		goto err;
533 	}
534 	if (key_len > max_psk_len) {
535 		SPDK_ERRLOG("Insufficient buffer size to copy PSK\n");
536 		goto err;
537 	}
538 
539 	memcpy(psk, default_psk, key_len);
540 
541 	return key_len;
542 
543 err:
544 	return 0;
545 }
546 
547 static unsigned int
548 posix_sock_tls_psk_client_cb(SSL *ssl, const char *hint,
549 			     char *identity,
550 			     unsigned int max_identity_len,
551 			     unsigned char *psk,
552 			     unsigned int max_psk_len)
553 {
554 	long key_len;
555 	unsigned char *default_psk;
556 
557 	if (hint) {
558 		SPDK_DEBUGLOG(sock_posix,  "Received PSK identity hint '%s'\n", hint);
559 	}
560 
561 	if (PSK_KEY == NULL) {
562 		SPDK_ERRLOG("PSK is not set\n");
563 		goto err;
564 	}
565 	default_psk = OPENSSL_hexstr2buf(PSK_KEY, &key_len);
566 	if (default_psk == NULL) {
567 		SPDK_ERRLOG("Could not unhexlify PSK\n");
568 		goto err;
569 	}
570 	if ((strlen(PSK_ID) + 1 > max_identity_len)
571 	    || (key_len > max_psk_len)) {
572 		SPDK_ERRLOG("PSK ID or Key buffer is not sufficient\n");
573 		goto err;
574 	}
575 	spdk_strcpy_pad(identity, PSK_ID, strlen(PSK_ID), 0);
576 	SPDK_DEBUGLOG(sock_posix, "Sending PSK identity '%s'\n", identity);
577 
578 	memcpy(psk, default_psk, key_len);
579 	SPDK_DEBUGLOG(sock_posix, "Provided out-of-band (OOB) PSK for TLS1.3 client\n");
580 
581 	return key_len;
582 
583 err:
584 	return 0;
585 }
586 
587 static SSL_CTX *
588 posix_sock_create_ssl_context(const SSL_METHOD *method, struct spdk_sock_opts *opts,
589 			      struct spdk_sock_impl_opts *impl_opts)
590 {
591 	SSL_CTX *ctx;
592 	int tls_version = 0;
593 	bool ktls_enabled = false;
594 #ifdef SSL_OP_ENABLE_KTLS
595 	long options;
596 #endif
597 
598 	SSL_library_init();
599 	OpenSSL_add_all_algorithms();
600 	SSL_load_error_strings();
601 	/* Produce a SSL CTX in SSL V2 and V3 standards compliant way */
602 	ctx = SSL_CTX_new(method);
603 	if (!ctx) {
604 		SPDK_ERRLOG("SSL_CTX_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
605 		return NULL;
606 	}
607 	SPDK_DEBUGLOG(sock_posix, "SSL context created\n");
608 
609 	switch (impl_opts->tls_version) {
610 	case 0:
611 		/* auto-negotioation */
612 		break;
613 	case SPDK_TLS_VERSION_1_1:
614 		tls_version = TLS1_1_VERSION;
615 		break;
616 	case SPDK_TLS_VERSION_1_2:
617 		tls_version = TLS1_2_VERSION;
618 		break;
619 	case SPDK_TLS_VERSION_1_3:
620 		tls_version = TLS1_3_VERSION;
621 		break;
622 	default:
623 		SPDK_ERRLOG("Incorrect TLS version provided: %d\n", impl_opts->tls_version);
624 		goto err;
625 	}
626 
627 	if (tls_version) {
628 		SPDK_DEBUGLOG(sock_posix, "Hardening TLS version to '%d'='0x%X'\n", impl_opts->tls_version,
629 			      tls_version);
630 		if (!SSL_CTX_set_min_proto_version(ctx, tls_version)) {
631 			SPDK_ERRLOG("Unable to set Min TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version);
632 			goto err;
633 		}
634 		if (!SSL_CTX_set_max_proto_version(ctx, tls_version)) {
635 			SPDK_ERRLOG("Unable to set Max TLS version to '%d'='0x%X\n", impl_opts->tls_version, tls_version);
636 			goto err;
637 		}
638 	}
639 	if (impl_opts->enable_ktls) {
640 		SPDK_DEBUGLOG(sock_posix, "Enabling kTLS offload\n");
641 #ifdef SSL_OP_ENABLE_KTLS
642 		options = SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS);
643 		ktls_enabled = options & SSL_OP_ENABLE_KTLS;
644 #else
645 		ktls_enabled = false;
646 #endif
647 		if (!ktls_enabled) {
648 			SPDK_ERRLOG("Unable to set kTLS offload via SSL_CTX_set_options(). Configure openssl with 'enable-ktls'\n");
649 			goto err;
650 		}
651 	}
652 
653 	return ctx;
654 
655 err:
656 	SSL_CTX_free(ctx);
657 	return NULL;
658 }
659 
660 static SSL *
661 ssl_sock_connect_loop(SSL_CTX *ctx, int fd)
662 {
663 	int rc;
664 	SSL *ssl;
665 	int ssl_get_error;
666 
667 	ssl = SSL_new(ctx);
668 	if (!ssl) {
669 		SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
670 		return NULL;
671 	}
672 	SSL_set_fd(ssl, fd);
673 	SSL_set_psk_client_callback(ssl, posix_sock_tls_psk_client_cb);
674 	SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl);
675 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
676 	while ((rc = SSL_connect(ssl)) != 1) {
677 		SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
678 		ssl_get_error = SSL_get_error(ssl, rc);
679 		SPDK_DEBUGLOG(sock_posix, "SSL_connect failed %d = SSL_connect(%p), %d = SSL_get_error(%p, %d)\n",
680 			      rc, ssl, ssl_get_error, ssl, rc);
681 		switch (ssl_get_error) {
682 		case SSL_ERROR_WANT_READ:
683 		case SSL_ERROR_WANT_WRITE:
684 			continue;
685 		default:
686 			break;
687 		}
688 		SPDK_ERRLOG("SSL_connect() failed, errno = %d\n", errno);
689 		SSL_free(ssl);
690 		return NULL;
691 	}
692 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
693 	SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n",
694 		      SSL_CIPHER_get_name(SSL_get_current_cipher(ssl)));
695 	return ssl;
696 }
697 
698 static SSL *
699 ssl_sock_accept_loop(SSL_CTX *ctx, int fd)
700 {
701 	int rc;
702 	SSL *ssl;
703 	int ssl_get_error;
704 
705 	ssl = SSL_new(ctx);
706 	if (!ssl) {
707 		SPDK_ERRLOG("SSL_new() failed, msg = %s\n", ERR_error_string(ERR_peek_last_error(), NULL));
708 		return NULL;
709 	}
710 	SSL_set_fd(ssl, fd);
711 	SSL_set_psk_server_callback(ssl, posix_sock_tls_psk_server_cb);
712 	SPDK_DEBUGLOG(sock_posix, "SSL object creation finished: %p\n", ssl);
713 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
714 	while ((rc = SSL_accept(ssl)) != 1) {
715 		SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
716 		ssl_get_error = SSL_get_error(ssl, rc);
717 		SPDK_DEBUGLOG(sock_posix, "SSL_accept failed %d = SSL_accept(%p), %d = SSL_get_error(%p, %d)\n", rc,
718 			      ssl, ssl_get_error, ssl, rc);
719 		switch (ssl_get_error) {
720 		case SSL_ERROR_WANT_READ:
721 		case SSL_ERROR_WANT_WRITE:
722 			continue;
723 		default:
724 			break;
725 		}
726 		SPDK_ERRLOG("SSL_accept() failed, errno = %d\n", errno);
727 		SSL_free(ssl);
728 		return NULL;
729 	}
730 	SPDK_DEBUGLOG(sock_posix, "%s = SSL_state_string_long(%p)\n", SSL_state_string_long(ssl), ssl);
731 	SPDK_DEBUGLOG(sock_posix, "Negotiated Cipher suite:%s\n",
732 		      SSL_CIPHER_get_name(SSL_get_current_cipher(ssl)));
733 	return ssl;
734 }
735 
736 static ssize_t
737 SSL_readv(SSL *ssl, const struct iovec *iov, int iovcnt)
738 {
739 	int i, rc = 0;
740 	ssize_t total = 0;
741 
742 	for (i = 0; i < iovcnt; i++) {
743 		rc = SSL_read(ssl, iov[i].iov_base, iov[i].iov_len);
744 
745 		if (rc > 0) {
746 			total += rc;
747 		}
748 		if (rc != (int)iov[i].iov_len) {
749 			break;
750 		}
751 	}
752 	if (total > 0) {
753 		errno = 0;
754 		return total;
755 	}
756 	switch (SSL_get_error(ssl, rc)) {
757 	case SSL_ERROR_ZERO_RETURN:
758 		errno = ENOTCONN;
759 		return 0;
760 	case SSL_ERROR_WANT_READ:
761 	case SSL_ERROR_WANT_WRITE:
762 	case SSL_ERROR_WANT_CONNECT:
763 	case SSL_ERROR_WANT_ACCEPT:
764 	case SSL_ERROR_WANT_X509_LOOKUP:
765 	case SSL_ERROR_WANT_ASYNC:
766 	case SSL_ERROR_WANT_ASYNC_JOB:
767 	case SSL_ERROR_WANT_CLIENT_HELLO_CB:
768 		errno = EAGAIN;
769 		return -1;
770 	case SSL_ERROR_SYSCALL:
771 	case SSL_ERROR_SSL:
772 		errno = ENOTCONN;
773 		return -1;
774 	default:
775 		errno = ENOTCONN;
776 		return -1;
777 	}
778 }
779 
780 static ssize_t
781 SSL_writev(SSL *ssl, struct iovec *iov, int iovcnt)
782 {
783 	int i, rc = 0;
784 	ssize_t total = 0;
785 
786 	for (i = 0; i < iovcnt; i++) {
787 		rc = SSL_write(ssl, iov[i].iov_base, iov[i].iov_len);
788 
789 		if (rc > 0) {
790 			total += rc;
791 		}
792 		if (rc != (int)iov[i].iov_len) {
793 			break;
794 		}
795 	}
796 	if (total > 0) {
797 		errno = 0;
798 		return total;
799 	}
800 	switch (SSL_get_error(ssl, rc)) {
801 	case SSL_ERROR_ZERO_RETURN:
802 		errno = ENOTCONN;
803 		return 0;
804 	case SSL_ERROR_WANT_READ:
805 	case SSL_ERROR_WANT_WRITE:
806 	case SSL_ERROR_WANT_CONNECT:
807 	case SSL_ERROR_WANT_ACCEPT:
808 	case SSL_ERROR_WANT_X509_LOOKUP:
809 	case SSL_ERROR_WANT_ASYNC:
810 	case SSL_ERROR_WANT_ASYNC_JOB:
811 	case SSL_ERROR_WANT_CLIENT_HELLO_CB:
812 		errno = EAGAIN;
813 		return -1;
814 	case SSL_ERROR_SYSCALL:
815 	case SSL_ERROR_SSL:
816 		errno = ENOTCONN;
817 		return -1;
818 	default:
819 		errno = ENOTCONN;
820 		return -1;
821 	}
822 }
823 
824 static struct spdk_sock *
825 posix_sock_create(const char *ip, int port,
826 		  enum posix_sock_create_type type,
827 		  struct spdk_sock_opts *opts,
828 		  bool enable_ssl)
829 {
830 	struct spdk_posix_sock *sock;
831 	struct spdk_sock_impl_opts impl_opts;
832 	char buf[MAX_TMPBUF];
833 	char portnum[PORTNUMLEN];
834 	char *p;
835 	struct addrinfo hints, *res, *res0;
836 	int fd, flag;
837 	int rc;
838 	bool enable_zcopy_user_opts = true;
839 	bool enable_zcopy_impl_opts = true;
840 	SSL_CTX *ctx = 0;
841 	SSL *ssl = 0;
842 
843 	assert(opts != NULL);
844 	posix_opts_get_impl_opts(opts, &impl_opts);
845 
846 	if (ip == NULL) {
847 		return NULL;
848 	}
849 	if (ip[0] == '[') {
850 		snprintf(buf, sizeof(buf), "%s", ip + 1);
851 		p = strchr(buf, ']');
852 		if (p != NULL) {
853 			*p = '\0';
854 		}
855 		ip = (const char *) &buf[0];
856 	}
857 
858 	snprintf(portnum, sizeof portnum, "%d", port);
859 	memset(&hints, 0, sizeof hints);
860 	hints.ai_family = PF_UNSPEC;
861 	hints.ai_socktype = SOCK_STREAM;
862 	hints.ai_flags = AI_NUMERICSERV;
863 	hints.ai_flags |= AI_PASSIVE;
864 	hints.ai_flags |= AI_NUMERICHOST;
865 	rc = getaddrinfo(ip, portnum, &hints, &res0);
866 	if (rc != 0) {
867 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
868 		return NULL;
869 	}
870 
871 	/* try listen */
872 	fd = -1;
873 	for (res = res0; res != NULL; res = res->ai_next) {
874 retry:
875 		fd = posix_fd_create(res, opts, &impl_opts);
876 		if (fd < 0) {
877 			continue;
878 		}
879 		if (type == SPDK_SOCK_CREATE_LISTEN) {
880 			if (enable_ssl) {
881 				ctx = posix_sock_create_ssl_context(TLS_server_method(), opts, &impl_opts);
882 				if (!ctx) {
883 					SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno);
884 					close(fd);
885 					fd = -1;
886 					break;
887 				}
888 			}
889 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
890 			if (rc != 0) {
891 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
892 				switch (errno) {
893 				case EINTR:
894 					/* interrupted? */
895 					close(fd);
896 					goto retry;
897 				case EADDRNOTAVAIL:
898 					SPDK_ERRLOG("IP address %s not available. "
899 						    "Verify IP address in config file "
900 						    "and make sure setup script is "
901 						    "run before starting spdk app.\n", ip);
902 				/* FALLTHROUGH */
903 				default:
904 					/* try next family */
905 					close(fd);
906 					fd = -1;
907 					continue;
908 				}
909 			}
910 			/* bind OK */
911 			rc = listen(fd, 512);
912 			if (rc != 0) {
913 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
914 				close(fd);
915 				fd = -1;
916 				break;
917 			}
918 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
919 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
920 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
921 			if (rc != 0) {
922 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
923 				/* try next family */
924 				close(fd);
925 				fd = -1;
926 				continue;
927 			}
928 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
929 			if (enable_ssl) {
930 				ctx = posix_sock_create_ssl_context(TLS_client_method(), opts, &impl_opts);
931 				if (!ctx) {
932 					SPDK_ERRLOG("posix_sock_create_ssl_context() failed, errno = %d\n", errno);
933 					close(fd);
934 					fd = -1;
935 					break;
936 				}
937 				ssl = ssl_sock_connect_loop(ctx, fd);
938 				if (!ssl) {
939 					SPDK_ERRLOG("ssl_sock_connect_loop() failed, errno = %d\n", errno);
940 					close(fd);
941 					fd = -1;
942 					SSL_CTX_free(ctx);
943 					break;
944 				}
945 			}
946 		}
947 
948 		flag = fcntl(fd, F_GETFL);
949 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
950 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
951 			close(fd);
952 			fd = -1;
953 			break;
954 		}
955 		break;
956 	}
957 	freeaddrinfo(res0);
958 
959 	if (fd < 0) {
960 		return NULL;
961 	}
962 
963 	/* Only enable zero copy for non-loopback and non-ssl sockets. */
964 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd) && !enable_ssl;
965 
966 	sock = posix_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
967 	if (sock == NULL) {
968 		SPDK_ERRLOG("sock allocation failed\n");
969 		close(fd);
970 		return NULL;
971 	}
972 
973 	if (ctx) {
974 		sock->ctx = ctx;
975 	}
976 
977 	if (ssl) {
978 		sock->ssl = ssl;
979 	}
980 
981 	return &sock->base;
982 }
983 
984 static struct spdk_sock *
985 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
986 {
987 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, false);
988 }
989 
990 static struct spdk_sock *
991 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
992 {
993 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, false);
994 }
995 
996 static struct spdk_sock *
997 posix_sock_accept(struct spdk_sock *_sock)
998 {
999 	struct spdk_posix_sock		*sock = __posix_sock(_sock);
1000 	struct sockaddr_storage		sa;
1001 	socklen_t			salen;
1002 	int				rc, fd;
1003 	struct spdk_posix_sock		*new_sock;
1004 	int				flag;
1005 	SSL *ssl = 0;
1006 
1007 	memset(&sa, 0, sizeof(sa));
1008 	salen = sizeof(sa);
1009 
1010 	assert(sock != NULL);
1011 
1012 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
1013 
1014 	if (rc == -1) {
1015 		return NULL;
1016 	}
1017 
1018 	fd = rc;
1019 
1020 	flag = fcntl(fd, F_GETFL);
1021 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
1022 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
1023 		close(fd);
1024 		return NULL;
1025 	}
1026 
1027 #if defined(SO_PRIORITY)
1028 	/* The priority is not inherited, so call this function again */
1029 	if (sock->base.opts.priority) {
1030 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
1031 		if (rc != 0) {
1032 			close(fd);
1033 			return NULL;
1034 		}
1035 	}
1036 #endif
1037 
1038 	/* Establish SSL connection */
1039 	if (sock->ctx) {
1040 		ssl = ssl_sock_accept_loop(sock->ctx, fd);
1041 		if (!ssl) {
1042 			SPDK_ERRLOG("ssl_sock_accept_loop() failed, errno = %d\n", errno);
1043 			close(fd);
1044 			SSL_CTX_free(sock->ctx);
1045 			return NULL;
1046 		}
1047 	}
1048 
1049 	/* Inherit the zero copy feature from the listen socket */
1050 	new_sock = posix_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
1051 	if (new_sock == NULL) {
1052 		close(fd);
1053 		return NULL;
1054 	}
1055 
1056 	if (sock->ctx) {
1057 		new_sock->ctx = sock->ctx;
1058 	}
1059 
1060 	if (ssl) {
1061 		new_sock->ssl = ssl;
1062 	}
1063 
1064 	return &new_sock->base;
1065 }
1066 
1067 static int
1068 posix_sock_close(struct spdk_sock *_sock)
1069 {
1070 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1071 
1072 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
1073 
1074 	/* If the socket fails to close, the best choice is to
1075 	 * leak the fd but continue to free the rest of the sock
1076 	 * memory. */
1077 	close(sock->fd);
1078 
1079 	spdk_pipe_destroy(sock->recv_pipe);
1080 	free(sock->recv_buf);
1081 	free(sock);
1082 
1083 	return 0;
1084 }
1085 
1086 #ifdef SPDK_ZEROCOPY
1087 static int
1088 _sock_check_zcopy(struct spdk_sock *sock)
1089 {
1090 	struct spdk_posix_sock *psock = __posix_sock(sock);
1091 	struct msghdr msgh = {};
1092 	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
1093 	ssize_t rc;
1094 	struct sock_extended_err *serr;
1095 	struct cmsghdr *cm;
1096 	uint32_t idx;
1097 	struct spdk_sock_request *req, *treq;
1098 	bool found;
1099 
1100 	msgh.msg_control = buf;
1101 	msgh.msg_controllen = sizeof(buf);
1102 
1103 	while (true) {
1104 		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
1105 
1106 		if (rc < 0) {
1107 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
1108 				return 0;
1109 			}
1110 
1111 			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
1112 				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
1113 			} else {
1114 				SPDK_WARNLOG("Recvmsg yielded an error!\n");
1115 			}
1116 			return 0;
1117 		}
1118 
1119 		cm = CMSG_FIRSTHDR(&msgh);
1120 		if (!(cm &&
1121 		      ((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
1122 		       (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR)))) {
1123 			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
1124 			return 0;
1125 		}
1126 
1127 		serr = (struct sock_extended_err *)CMSG_DATA(cm);
1128 		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
1129 			SPDK_WARNLOG("Unexpected extended error origin\n");
1130 			return 0;
1131 		}
1132 
1133 		/* Most of the time, the pending_reqs array is in the exact
1134 		 * order we need such that all of the requests to complete are
1135 		 * in order, in the front. It is guaranteed that all requests
1136 		 * belonging to the same sendmsg call are sequential, so once
1137 		 * we encounter one match we can stop looping as soon as a
1138 		 * non-match is found.
1139 		 */
1140 		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
1141 			found = false;
1142 			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
1143 				if (!req->internal.is_zcopy) {
1144 					/* This wasn't a zcopy request. It was just waiting in line to complete */
1145 					rc = spdk_sock_request_put(sock, req, 0);
1146 					if (rc < 0) {
1147 						return rc;
1148 					}
1149 				} else if (req->internal.offset == idx) {
1150 					found = true;
1151 					rc = spdk_sock_request_put(sock, req, 0);
1152 					if (rc < 0) {
1153 						return rc;
1154 					}
1155 				} else if (found) {
1156 					break;
1157 				}
1158 			}
1159 		}
1160 	}
1161 
1162 	return 0;
1163 }
1164 #endif
1165 
1166 static int
1167 _sock_flush(struct spdk_sock *sock)
1168 {
1169 	struct spdk_posix_sock *psock = __posix_sock(sock);
1170 	struct msghdr msg = {};
1171 	int flags;
1172 	struct iovec iovs[IOV_BATCH_SIZE];
1173 	int iovcnt;
1174 	int retval;
1175 	struct spdk_sock_request *req;
1176 	int i;
1177 	ssize_t rc;
1178 	unsigned int offset;
1179 	size_t len;
1180 	bool is_zcopy = false;
1181 
1182 	/* Can't flush from within a callback or we end up with recursive calls */
1183 	if (sock->cb_cnt > 0) {
1184 		return 0;
1185 	}
1186 
1187 #ifdef SPDK_ZEROCOPY
1188 	if (psock->zcopy) {
1189 		flags = MSG_ZEROCOPY | MSG_NOSIGNAL;
1190 	} else
1191 #endif
1192 	{
1193 		flags = MSG_NOSIGNAL;
1194 	}
1195 
1196 	iovcnt = spdk_sock_prep_reqs(sock, iovs, 0, NULL, &flags);
1197 	if (iovcnt == 0) {
1198 		return 0;
1199 	}
1200 
1201 #ifdef SPDK_ZEROCOPY
1202 	is_zcopy = flags & MSG_ZEROCOPY;
1203 #endif
1204 
1205 	/* Perform the vectored write */
1206 	msg.msg_iov = iovs;
1207 	msg.msg_iovlen = iovcnt;
1208 
1209 	if (psock->ssl) {
1210 		rc = SSL_writev(psock->ssl, iovs, iovcnt);
1211 	} else {
1212 		rc = sendmsg(psock->fd, &msg, flags);
1213 	}
1214 	if (rc <= 0) {
1215 		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
1216 			return 0;
1217 		}
1218 		return rc;
1219 	}
1220 
1221 	if (is_zcopy) {
1222 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
1223 		 * req->internal.offset, so sendmsg_idx should not be zero  */
1224 		if (spdk_unlikely(psock->sendmsg_idx == UINT32_MAX)) {
1225 			psock->sendmsg_idx = 1;
1226 		} else {
1227 			psock->sendmsg_idx++;
1228 		}
1229 	}
1230 
1231 	/* Consume the requests that were actually written */
1232 	req = TAILQ_FIRST(&sock->queued_reqs);
1233 	while (req) {
1234 		offset = req->internal.offset;
1235 
1236 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
1237 		req->internal.is_zcopy = is_zcopy;
1238 
1239 		for (i = 0; i < req->iovcnt; i++) {
1240 			/* Advance by the offset first */
1241 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
1242 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
1243 				continue;
1244 			}
1245 
1246 			/* Calculate the remaining length of this element */
1247 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
1248 
1249 			if (len > (size_t)rc) {
1250 				/* This element was partially sent. */
1251 				req->internal.offset += rc;
1252 				return 0;
1253 			}
1254 
1255 			offset = 0;
1256 			req->internal.offset += len;
1257 			rc -= len;
1258 		}
1259 
1260 		/* Handled a full request. */
1261 		spdk_sock_request_pend(sock, req);
1262 
1263 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&sock->pending_reqs)) {
1264 			/* The sendmsg syscall above isn't currently asynchronous,
1265 			* so it's already done. */
1266 			retval = spdk_sock_request_put(sock, req, 0);
1267 			if (retval) {
1268 				break;
1269 			}
1270 		} else {
1271 			/* Re-use the offset field to hold the sendmsg call index. The
1272 			 * index is 0 based, so subtract one here because we've already
1273 			 * incremented above. */
1274 			req->internal.offset = psock->sendmsg_idx - 1;
1275 		}
1276 
1277 		if (rc == 0) {
1278 			break;
1279 		}
1280 
1281 		req = TAILQ_FIRST(&sock->queued_reqs);
1282 	}
1283 
1284 	return 0;
1285 }
1286 
1287 static int
1288 posix_sock_flush(struct spdk_sock *sock)
1289 {
1290 #ifdef SPDK_ZEROCOPY
1291 	struct spdk_posix_sock *psock = __posix_sock(sock);
1292 
1293 	if (psock->zcopy && !TAILQ_EMPTY(&sock->pending_reqs)) {
1294 		_sock_check_zcopy(sock);
1295 	}
1296 #endif
1297 
1298 	return _sock_flush(sock);
1299 }
1300 
1301 static ssize_t
1302 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
1303 {
1304 	struct iovec siov[2];
1305 	int sbytes;
1306 	ssize_t bytes;
1307 	struct spdk_posix_sock_group_impl *group;
1308 
1309 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
1310 	if (sbytes < 0) {
1311 		errno = EINVAL;
1312 		return -1;
1313 	} else if (sbytes == 0) {
1314 		errno = EAGAIN;
1315 		return -1;
1316 	}
1317 
1318 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
1319 
1320 	if (bytes == 0) {
1321 		/* The only way this happens is if diov is 0 length */
1322 		errno = EINVAL;
1323 		return -1;
1324 	}
1325 
1326 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
1327 
1328 	/* If we drained the pipe, mark it appropriately */
1329 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
1330 		assert(sock->pipe_has_data == true);
1331 
1332 		group = __posix_group_impl(sock->base.group_impl);
1333 		if (group && !sock->socket_has_data) {
1334 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1335 		}
1336 
1337 		sock->pipe_has_data = false;
1338 	}
1339 
1340 	return bytes;
1341 }
1342 
1343 static inline ssize_t
1344 posix_sock_read(struct spdk_posix_sock *sock)
1345 {
1346 	struct iovec iov[2];
1347 	int bytes_avail, bytes_recvd;
1348 	struct spdk_posix_sock_group_impl *group;
1349 
1350 	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
1351 
1352 	if (bytes_avail <= 0) {
1353 		return bytes_avail;
1354 	}
1355 
1356 	if (sock->ssl) {
1357 		bytes_recvd = SSL_readv(sock->ssl, iov, 2);
1358 	} else {
1359 		bytes_recvd = readv(sock->fd, iov, 2);
1360 	}
1361 
1362 	assert(sock->pipe_has_data == false);
1363 
1364 	if (bytes_recvd <= 0) {
1365 		/* Errors count as draining the socket data */
1366 		if (sock->base.group_impl && sock->socket_has_data) {
1367 			group = __posix_group_impl(sock->base.group_impl);
1368 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1369 		}
1370 
1371 		sock->socket_has_data = false;
1372 
1373 		return bytes_recvd;
1374 	}
1375 
1376 	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
1377 
1378 #if DEBUG
1379 	if (sock->base.group_impl) {
1380 		assert(sock->socket_has_data == true);
1381 	}
1382 #endif
1383 
1384 	sock->pipe_has_data = true;
1385 	if (bytes_recvd < bytes_avail) {
1386 		/* We drained the kernel socket entirely. */
1387 		sock->socket_has_data = false;
1388 	}
1389 
1390 	return bytes_recvd;
1391 }
1392 
1393 static ssize_t
1394 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1395 {
1396 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1397 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(sock->base.group_impl);
1398 	int rc, i;
1399 	size_t len;
1400 
1401 	if (sock->recv_pipe == NULL) {
1402 		assert(sock->pipe_has_data == false);
1403 		if (group && sock->socket_has_data) {
1404 			sock->socket_has_data = false;
1405 			TAILQ_REMOVE(&group->socks_with_data, sock, link);
1406 		}
1407 		if (sock->ssl) {
1408 			return SSL_readv(sock->ssl, iov, iovcnt);
1409 		} else {
1410 			return readv(sock->fd, iov, iovcnt);
1411 		}
1412 	}
1413 
1414 	/* If the socket is not in a group, we must assume it always has
1415 	 * data waiting for us because it is not epolled */
1416 	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
1417 		/* If the user is receiving a sufficiently large amount of data,
1418 		 * receive directly to their buffers. */
1419 		len = 0;
1420 		for (i = 0; i < iovcnt; i++) {
1421 			len += iov[i].iov_len;
1422 		}
1423 
1424 		if (len >= MIN_SOCK_PIPE_SIZE) {
1425 			/* TODO: Should this detect if kernel socket is drained? */
1426 			if (sock->ssl) {
1427 				return SSL_readv(sock->ssl, iov, iovcnt);
1428 			} else {
1429 				return readv(sock->fd, iov, iovcnt);
1430 			}
1431 		}
1432 
1433 		/* Otherwise, do a big read into our pipe */
1434 		rc = posix_sock_read(sock);
1435 		if (rc <= 0) {
1436 			return rc;
1437 		}
1438 	}
1439 
1440 	return posix_sock_recv_from_pipe(sock, iov, iovcnt);
1441 }
1442 
1443 static ssize_t
1444 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1445 {
1446 	struct iovec iov[1];
1447 
1448 	iov[0].iov_base = buf;
1449 	iov[0].iov_len = len;
1450 
1451 	return posix_sock_readv(sock, iov, 1);
1452 }
1453 
1454 static void
1455 posix_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1456 {
1457 	req->cb_fn(req->cb_arg, -ENOTSUP);
1458 }
1459 
1460 static ssize_t
1461 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1462 {
1463 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1464 	int rc;
1465 
1466 	/* In order to process a writev, we need to flush any asynchronous writes
1467 	 * first. */
1468 	rc = _sock_flush(_sock);
1469 	if (rc < 0) {
1470 		return rc;
1471 	}
1472 
1473 	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
1474 		/* We weren't able to flush all requests */
1475 		errno = EAGAIN;
1476 		return -1;
1477 	}
1478 
1479 	if (sock->ssl) {
1480 		return SSL_writev(sock->ssl, iov, iovcnt);
1481 	} else {
1482 		return writev(sock->fd, iov, iovcnt);
1483 	}
1484 }
1485 
1486 static void
1487 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1488 {
1489 	int rc;
1490 
1491 	spdk_sock_request_queue(sock, req);
1492 
1493 	/* If there are a sufficient number queued, just flush them out immediately. */
1494 	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1495 		rc = _sock_flush(sock);
1496 		if (rc) {
1497 			spdk_sock_abort_requests(sock);
1498 		}
1499 	}
1500 }
1501 
1502 static int
1503 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1504 {
1505 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1506 	int val;
1507 	int rc;
1508 
1509 	assert(sock != NULL);
1510 
1511 	val = nbytes;
1512 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1513 	if (rc != 0) {
1514 		return -1;
1515 	}
1516 	return 0;
1517 }
1518 
1519 static bool
1520 posix_sock_is_ipv6(struct spdk_sock *_sock)
1521 {
1522 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1523 	struct sockaddr_storage sa;
1524 	socklen_t salen;
1525 	int rc;
1526 
1527 	assert(sock != NULL);
1528 
1529 	memset(&sa, 0, sizeof sa);
1530 	salen = sizeof sa;
1531 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1532 	if (rc != 0) {
1533 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1534 		return false;
1535 	}
1536 
1537 	return (sa.ss_family == AF_INET6);
1538 }
1539 
1540 static bool
1541 posix_sock_is_ipv4(struct spdk_sock *_sock)
1542 {
1543 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1544 	struct sockaddr_storage sa;
1545 	socklen_t salen;
1546 	int rc;
1547 
1548 	assert(sock != NULL);
1549 
1550 	memset(&sa, 0, sizeof sa);
1551 	salen = sizeof sa;
1552 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1553 	if (rc != 0) {
1554 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1555 		return false;
1556 	}
1557 
1558 	return (sa.ss_family == AF_INET);
1559 }
1560 
1561 static bool
1562 posix_sock_is_connected(struct spdk_sock *_sock)
1563 {
1564 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1565 	uint8_t byte;
1566 	int rc;
1567 
1568 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1569 	if (rc == 0) {
1570 		return false;
1571 	}
1572 
1573 	if (rc < 0) {
1574 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1575 			return true;
1576 		}
1577 
1578 		return false;
1579 	}
1580 
1581 	return true;
1582 }
1583 
1584 static struct spdk_sock_group_impl *
1585 posix_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1586 {
1587 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1588 	struct spdk_sock_group_impl *group_impl;
1589 
1590 	if (sock->placement_id != -1) {
1591 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group_impl, hint);
1592 		return group_impl;
1593 	}
1594 
1595 	return NULL;
1596 }
1597 
1598 static struct spdk_sock_group_impl *
1599 posix_sock_group_impl_create(void)
1600 {
1601 	struct spdk_posix_sock_group_impl *group_impl;
1602 	int fd;
1603 
1604 #if defined(SPDK_EPOLL)
1605 	fd = epoll_create1(0);
1606 #elif defined(SPDK_KEVENT)
1607 	fd = kqueue();
1608 #endif
1609 	if (fd == -1) {
1610 		return NULL;
1611 	}
1612 
1613 	group_impl = calloc(1, sizeof(*group_impl));
1614 	if (group_impl == NULL) {
1615 		SPDK_ERRLOG("group_impl allocation failed\n");
1616 		close(fd);
1617 		return NULL;
1618 	}
1619 
1620 	group_impl->fd = fd;
1621 	TAILQ_INIT(&group_impl->socks_with_data);
1622 	group_impl->placement_id = -1;
1623 
1624 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1625 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1626 		group_impl->placement_id = spdk_env_get_current_core();
1627 	}
1628 
1629 	return &group_impl->base;
1630 }
1631 
1632 static void
1633 posix_sock_mark(struct spdk_posix_sock_group_impl *group, struct spdk_posix_sock *sock,
1634 		int placement_id)
1635 {
1636 #if defined(SO_MARK)
1637 	int rc;
1638 
1639 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_MARK,
1640 			&placement_id, sizeof(placement_id));
1641 	if (rc != 0) {
1642 		/* Not fatal */
1643 		SPDK_ERRLOG("Error setting SO_MARK\n");
1644 		return;
1645 	}
1646 
1647 	rc = spdk_sock_map_insert(&g_map, placement_id, &group->base);
1648 	if (rc != 0) {
1649 		/* Not fatal */
1650 		SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1651 		return;
1652 	}
1653 
1654 	sock->placement_id = placement_id;
1655 #endif
1656 }
1657 
1658 static void
1659 posix_sock_update_mark(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1660 {
1661 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1662 
1663 	if (group->placement_id == -1) {
1664 		group->placement_id = spdk_sock_map_find_free(&g_map);
1665 
1666 		/* If a free placement id is found, update existing sockets in this group */
1667 		if (group->placement_id != -1) {
1668 			struct spdk_sock  *sock, *tmp;
1669 
1670 			TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1671 				posix_sock_mark(group, __posix_sock(sock), group->placement_id);
1672 			}
1673 		}
1674 	}
1675 
1676 	if (group->placement_id != -1) {
1677 		/*
1678 		 * group placement id is already determined for this poll group.
1679 		 * Mark socket with group's placement id.
1680 		 */
1681 		posix_sock_mark(group, __posix_sock(_sock), group->placement_id);
1682 	}
1683 }
1684 
1685 static int
1686 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1687 {
1688 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1689 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1690 	int rc;
1691 
1692 #if defined(SPDK_EPOLL)
1693 	struct epoll_event event;
1694 
1695 	memset(&event, 0, sizeof(event));
1696 	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1697 	event.events = EPOLLIN | EPOLLERR;
1698 	event.data.ptr = sock;
1699 
1700 	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1701 #elif defined(SPDK_KEVENT)
1702 	struct kevent event;
1703 	struct timespec ts = {0};
1704 
1705 	EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1706 
1707 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1708 #endif
1709 
1710 	if (rc != 0) {
1711 		return rc;
1712 	}
1713 
1714 	/* switched from another polling group due to scheduling */
1715 	if (spdk_unlikely(sock->recv_pipe != NULL  &&
1716 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1717 		sock->pipe_has_data = true;
1718 		sock->socket_has_data = false;
1719 		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
1720 	}
1721 
1722 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
1723 		posix_sock_update_mark(_group, _sock);
1724 	} else if (sock->placement_id != -1) {
1725 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1726 		if (rc != 0) {
1727 			SPDK_ERRLOG("Failed to insert sock group into map: %d\n", rc);
1728 			/* Do not treat this as an error. The system will continue running. */
1729 		}
1730 	}
1731 
1732 	return rc;
1733 }
1734 
1735 static int
1736 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1737 {
1738 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1739 	struct spdk_posix_sock *sock = __posix_sock(_sock);
1740 	int rc;
1741 
1742 	if (sock->pipe_has_data || sock->socket_has_data) {
1743 		TAILQ_REMOVE(&group->socks_with_data, sock, link);
1744 		sock->pipe_has_data = false;
1745 		sock->socket_has_data = false;
1746 	}
1747 
1748 	if (sock->placement_id != -1) {
1749 		spdk_sock_map_release(&g_map, sock->placement_id);
1750 	}
1751 
1752 #if defined(SPDK_EPOLL)
1753 	struct epoll_event event;
1754 
1755 	/* Event parameter is ignored but some old kernel version still require it. */
1756 	rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1757 #elif defined(SPDK_KEVENT)
1758 	struct kevent event;
1759 	struct timespec ts = {0};
1760 
1761 	EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1762 
1763 	rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1764 	if (rc == 0 && event.flags & EV_ERROR) {
1765 		rc = -1;
1766 		errno = event.data;
1767 	}
1768 #endif
1769 
1770 	spdk_sock_abort_requests(_sock);
1771 
1772 	return rc;
1773 }
1774 
1775 static int
1776 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1777 			   struct spdk_sock **socks)
1778 {
1779 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1780 	struct spdk_sock *sock, *tmp;
1781 	int num_events, i, rc;
1782 	struct spdk_posix_sock *psock, *ptmp;
1783 #if defined(SPDK_EPOLL)
1784 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1785 #elif defined(SPDK_KEVENT)
1786 	struct kevent events[MAX_EVENTS_PER_POLL];
1787 	struct timespec ts = {0};
1788 #endif
1789 
1790 #ifdef SPDK_ZEROCOPY
1791 	/* When all of the following conditions are met
1792 	 * - non-blocking socket
1793 	 * - zero copy is enabled
1794 	 * - interrupts suppressed (i.e. busy polling)
1795 	 * - the NIC tx queue is full at the time sendmsg() is called
1796 	 * - epoll_wait determines there is an EPOLLIN event for the socket
1797 	 * then we can get into a situation where data we've sent is queued
1798 	 * up in the kernel network stack, but interrupts have been suppressed
1799 	 * because other traffic is flowing so the kernel misses the signal
1800 	 * to flush the software tx queue. If there wasn't incoming data
1801 	 * pending on the socket, then epoll_wait would have been sufficient
1802 	 * to kick off the send operation, but since there is a pending event
1803 	 * epoll_wait does not trigger the necessary operation.
1804 	 *
1805 	 * We deal with this by checking for all of the above conditions and
1806 	 * additionally looking for EPOLLIN events that were not consumed from
1807 	 * the last poll loop. We take this to mean that the upper layer is
1808 	 * unable to consume them because it is blocked waiting for resources
1809 	 * to free up, and those resources are most likely freed in response
1810 	 * to a pending asynchronous write completing.
1811 	 *
1812 	 * Additionally, sockets that have the same placement_id actually share
1813 	 * an underlying hardware queue. That means polling one of them is
1814 	 * equivalent to polling all of them. As a quick mechanism to avoid
1815 	 * making extra poll() calls, stash the last placement_id during the loop
1816 	 * and only poll if it's not the same. The overwhelmingly common case
1817 	 * is that all sockets in this list have the same placement_id because
1818 	 * SPDK is intentionally grouping sockets by that value, so even
1819 	 * though this won't stop all extra calls to poll(), it's very fast
1820 	 * and will catch all of them in practice.
1821 	 */
1822 	int last_placement_id = -1;
1823 
1824 	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
1825 		if (psock->zcopy && psock->placement_id >= 0 &&
1826 		    psock->placement_id != last_placement_id) {
1827 			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
1828 
1829 			poll(&pfd, 1, 0);
1830 			last_placement_id = psock->placement_id;
1831 		}
1832 	}
1833 #endif
1834 
1835 	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
1836 	 * a completion callback could remove the sock from the
1837 	 * group. */
1838 	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1839 		rc = _sock_flush(sock);
1840 		if (rc) {
1841 			spdk_sock_abort_requests(sock);
1842 		}
1843 	}
1844 
1845 	assert(max_events > 0);
1846 
1847 #if defined(SPDK_EPOLL)
1848 	num_events = epoll_wait(group->fd, events, max_events, 0);
1849 #elif defined(SPDK_KEVENT)
1850 	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1851 #endif
1852 
1853 	if (num_events == -1) {
1854 		return -1;
1855 	} else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1856 		sock = TAILQ_FIRST(&_group->socks);
1857 		psock = __posix_sock(sock);
1858 		/* poll() is called here to busy poll the queue associated with
1859 		 * first socket in list and potentially reap incoming data.
1860 		 */
1861 		if (sock->opts.priority) {
1862 			struct pollfd pfd = {0, 0, 0};
1863 
1864 			pfd.fd = psock->fd;
1865 			pfd.events = POLLIN | POLLERR;
1866 			poll(&pfd, 1, 0);
1867 		}
1868 	}
1869 
1870 	for (i = 0; i < num_events; i++) {
1871 #if defined(SPDK_EPOLL)
1872 		sock = events[i].data.ptr;
1873 		psock = __posix_sock(sock);
1874 
1875 #ifdef SPDK_ZEROCOPY
1876 		if (events[i].events & EPOLLERR) {
1877 			rc = _sock_check_zcopy(sock);
1878 			/* If the socket was closed or removed from
1879 			 * the group in response to a send ack, don't
1880 			 * add it to the array here. */
1881 			if (rc || sock->cb_fn == NULL) {
1882 				continue;
1883 			}
1884 		}
1885 #endif
1886 		if ((events[i].events & EPOLLIN) == 0) {
1887 			continue;
1888 		}
1889 
1890 #elif defined(SPDK_KEVENT)
1891 		sock = events[i].udata;
1892 		psock = __posix_sock(sock);
1893 #endif
1894 
1895 		/* If the socket is not already in the list, add it now */
1896 		if (!psock->socket_has_data && !psock->pipe_has_data) {
1897 			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
1898 		}
1899 		psock->socket_has_data = true;
1900 	}
1901 
1902 	num_events = 0;
1903 
1904 	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
1905 		if (num_events == max_events) {
1906 			break;
1907 		}
1908 
1909 		/* If the socket's cb_fn is NULL, just remove it from the
1910 		 * list and do not add it to socks array */
1911 		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
1912 			psock->socket_has_data = false;
1913 			psock->pipe_has_data = false;
1914 			TAILQ_REMOVE(&group->socks_with_data, psock, link);
1915 			continue;
1916 		}
1917 
1918 		socks[num_events++] = &psock->base;
1919 	}
1920 
1921 	/* Cycle the has_data list so that each time we poll things aren't
1922 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1923 	 * A B C D E F
1924 	 * And all 6 sockets had epoll events, but max_events is only 3. That means
1925 	 * psock currently points at D. We want to rearrange the list to the following:
1926 	 * D E F A B C
1927 	 *
1928 	 * The variables below are named according to this example to make it easier to
1929 	 * follow the swaps.
1930 	 */
1931 	if (psock != NULL) {
1932 		struct spdk_posix_sock *pa, *pc, *pd, *pf;
1933 
1934 		/* Capture pointers to the elements we need */
1935 		pd = psock;
1936 		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
1937 		pa = TAILQ_FIRST(&group->socks_with_data);
1938 		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);
1939 
1940 		/* Break the link between C and D */
1941 		pc->link.tqe_next = NULL;
1942 
1943 		/* Connect F to A */
1944 		pf->link.tqe_next = pa;
1945 		pa->link.tqe_prev = &pf->link.tqe_next;
1946 
1947 		/* Fix up the list first/last pointers */
1948 		group->socks_with_data.tqh_first = pd;
1949 		group->socks_with_data.tqh_last = &pc->link.tqe_next;
1950 
1951 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1952 		pd->link.tqe_prev = &group->socks_with_data.tqh_first;
1953 	}
1954 
1955 	return num_events;
1956 }
1957 
1958 static int
1959 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1960 {
1961 	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1962 	int rc;
1963 
1964 	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1965 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1966 	}
1967 
1968 	rc = close(group->fd);
1969 	free(group);
1970 	return rc;
1971 }
1972 
1973 static struct spdk_net_impl g_posix_net_impl = {
1974 	.name		= "posix",
1975 	.getaddr	= posix_sock_getaddr,
1976 	.connect	= posix_sock_connect,
1977 	.listen		= posix_sock_listen,
1978 	.accept		= posix_sock_accept,
1979 	.close		= posix_sock_close,
1980 	.recv		= posix_sock_recv,
1981 	.readv		= posix_sock_readv,
1982 	.readv_async	= posix_sock_readv_async,
1983 	.writev		= posix_sock_writev,
1984 	.writev_async	= posix_sock_writev_async,
1985 	.flush		= posix_sock_flush,
1986 	.set_recvlowat	= posix_sock_set_recvlowat,
1987 	.set_recvbuf	= posix_sock_set_recvbuf,
1988 	.set_sendbuf	= posix_sock_set_sendbuf,
1989 	.is_ipv6	= posix_sock_is_ipv6,
1990 	.is_ipv4	= posix_sock_is_ipv4,
1991 	.is_connected	= posix_sock_is_connected,
1992 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
1993 	.group_impl_create	= posix_sock_group_impl_create,
1994 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
1995 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1996 	.group_impl_poll	= posix_sock_group_impl_poll,
1997 	.group_impl_close	= posix_sock_group_impl_close,
1998 	.get_opts	= posix_sock_impl_get_opts,
1999 	.set_opts	= posix_sock_impl_set_opts,
2000 };
2001 
2002 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
2003 
2004 static struct spdk_sock *
2005 ssl_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
2006 {
2007 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts, true);
2008 }
2009 
2010 static struct spdk_sock *
2011 ssl_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
2012 {
2013 	return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts, true);
2014 }
2015 
2016 static struct spdk_net_impl g_ssl_net_impl = {
2017 	.name		= "ssl",
2018 	.getaddr	= posix_sock_getaddr,
2019 	.connect	= ssl_sock_connect,
2020 	.listen		= ssl_sock_listen,
2021 	.accept		= posix_sock_accept,
2022 	.close		= posix_sock_close,
2023 	.recv		= posix_sock_recv,
2024 	.readv		= posix_sock_readv,
2025 	.writev		= posix_sock_writev,
2026 	.writev_async	= posix_sock_writev_async,
2027 	.flush		= posix_sock_flush,
2028 	.set_recvlowat	= posix_sock_set_recvlowat,
2029 	.set_recvbuf	= posix_sock_set_recvbuf,
2030 	.set_sendbuf	= posix_sock_set_sendbuf,
2031 	.is_ipv6	= posix_sock_is_ipv6,
2032 	.is_ipv4	= posix_sock_is_ipv4,
2033 	.is_connected	= posix_sock_is_connected,
2034 	.group_impl_get_optimal	= posix_sock_group_impl_get_optimal,
2035 	.group_impl_create	= posix_sock_group_impl_create,
2036 	.group_impl_add_sock	= posix_sock_group_impl_add_sock,
2037 	.group_impl_remove_sock = posix_sock_group_impl_remove_sock,
2038 	.group_impl_poll	= posix_sock_group_impl_poll,
2039 	.group_impl_close	= posix_sock_group_impl_close,
2040 	.get_opts	= posix_sock_impl_get_opts,
2041 	.set_opts	= posix_sock_impl_set_opts,
2042 };
2043 
2044 SPDK_NET_IMPL_REGISTER(ssl, &g_ssl_net_impl, DEFAULT_SOCK_PRIORITY);
2045 SPDK_LOG_REGISTER_COMPONENT(sock_posix)
2046