xref: /spdk/module/sock/uring/uring.c (revision aaba5d9c9e8fca9925d5812030ff3ec9ba869fa3)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/config.h"
8 
9 #include <linux/errqueue.h>
10 #include <sys/epoll.h>
11 #include <liburing.h>
12 
13 #include "spdk/barrier.h"
14 #include "spdk/env.h"
15 #include "spdk/log.h"
16 #include "spdk/pipe.h"
17 #include "spdk/sock.h"
18 #include "spdk/string.h"
19 #include "spdk/util.h"
20 
21 #include "spdk_internal/sock.h"
22 #include "spdk_internal/assert.h"
23 #include "../sock_kernel.h"
24 
25 #define MAX_TMPBUF 1024
26 #define PORTNUMLEN 32
27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
29 
30 enum spdk_sock_task_type {
31 	SPDK_SOCK_TASK_POLLIN = 0,
32 	SPDK_SOCK_TASK_ERRQUEUE,
33 	SPDK_SOCK_TASK_WRITE,
34 	SPDK_SOCK_TASK_CANCEL,
35 };
36 
37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
38 #define SPDK_ZEROCOPY
39 #endif
40 
41 enum spdk_uring_sock_task_status {
42 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
43 	SPDK_URING_SOCK_TASK_IN_PROCESS,
44 };
45 
46 struct spdk_uring_task {
47 	enum spdk_uring_sock_task_status	status;
48 	enum spdk_sock_task_type		type;
49 	struct spdk_uring_sock			*sock;
50 	struct msghdr				msg;
51 	struct iovec				iovs[IOV_BATCH_SIZE];
52 	int					iov_cnt;
53 	struct spdk_sock_request		*last_req;
54 	bool					is_zcopy;
55 	STAILQ_ENTRY(spdk_uring_task)		link;
56 };
57 
58 struct spdk_uring_sock {
59 	struct spdk_sock			base;
60 	int					fd;
61 	uint32_t				sendmsg_idx;
62 	struct spdk_uring_sock_group_impl	*group;
63 	struct spdk_uring_task			write_task;
64 	struct spdk_uring_task			errqueue_task;
65 	struct spdk_uring_task			pollin_task;
66 	struct spdk_uring_task			cancel_task;
67 	struct spdk_pipe			*recv_pipe;
68 	void					*recv_buf;
69 	int					recv_buf_sz;
70 	bool					zcopy;
71 	bool					pending_recv;
72 	int					zcopy_send_flags;
73 	int					connection_status;
74 	int					placement_id;
75 	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
76 	TAILQ_ENTRY(spdk_uring_sock)		link;
77 };
78 
79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
80 
81 struct spdk_uring_sock_group_impl {
82 	struct spdk_sock_group_impl		base;
83 	struct io_uring				uring;
84 	uint32_t				io_inflight;
85 	uint32_t				io_queued;
86 	uint32_t				io_avail;
87 	struct pending_recv_list		pending_recv;
88 };
89 
90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
91 	.recv_buf_size = DEFAULT_SO_RCVBUF_SIZE,
92 	.send_buf_size = DEFAULT_SO_SNDBUF_SIZE,
93 	.enable_recv_pipe = true,
94 	.enable_quickack = false,
95 	.enable_placement_id = PLACEMENT_NONE,
96 	.enable_zerocopy_send_server = false,
97 	.enable_zerocopy_send_client = false,
98 	.zerocopy_threshold = 0,
99 	.tls_version = 0,
100 	.enable_ktls = false,
101 	.psk_key = NULL,
102 	.psk_identity = NULL
103 };
104 
105 static struct spdk_sock_map g_map = {
106 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
107 	.mtx = PTHREAD_MUTEX_INITIALIZER
108 };
109 
110 __attribute((destructor)) static void
111 uring_sock_map_cleanup(void)
112 {
113 	spdk_sock_map_cleanup(&g_map);
114 }
115 
116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
117 
118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
120 
121 static void
122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
123 			  size_t len)
124 {
125 #define FIELD_OK(field) \
126 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
127 
128 #define SET_FIELD(field) \
129 	if (FIELD_OK(field)) { \
130 		dest->field = src->field; \
131 	}
132 
133 	SET_FIELD(recv_buf_size);
134 	SET_FIELD(send_buf_size);
135 	SET_FIELD(enable_recv_pipe);
136 	SET_FIELD(enable_quickack);
137 	SET_FIELD(enable_placement_id);
138 	SET_FIELD(enable_zerocopy_send_server);
139 	SET_FIELD(enable_zerocopy_send_client);
140 	SET_FIELD(zerocopy_threshold);
141 	SET_FIELD(tls_version);
142 	SET_FIELD(enable_ktls);
143 	SET_FIELD(psk_key);
144 	SET_FIELD(psk_identity);
145 
146 #undef SET_FIELD
147 #undef FIELD_OK
148 }
149 
150 static int
151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
152 {
153 	if (!opts || !len) {
154 		errno = EINVAL;
155 		return -1;
156 	}
157 
158 	assert(sizeof(*opts) >= *len);
159 	memset(opts, 0, *len);
160 
161 	uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len);
162 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
163 
164 	return 0;
165 }
166 
167 static int
168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
169 {
170 	if (!opts) {
171 		errno = EINVAL;
172 		return -1;
173 	}
174 
175 	assert(sizeof(*opts) >= len);
176 	uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len);
177 
178 	return 0;
179 }
180 
181 static void
182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
183 {
184 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
185 	memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest));
186 
187 	if (opts->impl_opts != NULL) {
188 		assert(sizeof(*dest) >= opts->impl_opts_size);
189 		uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
190 	}
191 }
192 
193 static int
194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
195 		   char *caddr, int clen, uint16_t *cport)
196 {
197 	struct spdk_uring_sock *sock = __uring_sock(_sock);
198 	struct sockaddr_storage sa;
199 	socklen_t salen;
200 	int rc;
201 
202 	assert(sock != NULL);
203 
204 	memset(&sa, 0, sizeof sa);
205 	salen = sizeof sa;
206 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
207 	if (rc != 0) {
208 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
209 		return -1;
210 	}
211 
212 	switch (sa.ss_family) {
213 	case AF_UNIX:
214 		/* Acceptable connection types that don't have IPs */
215 		return 0;
216 	case AF_INET:
217 	case AF_INET6:
218 		/* Code below will get IP addresses */
219 		break;
220 	default:
221 		/* Unsupported socket family */
222 		return -1;
223 	}
224 
225 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
226 	if (rc != 0) {
227 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
228 		return -1;
229 	}
230 
231 	if (sport) {
232 		if (sa.ss_family == AF_INET) {
233 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
234 		} else if (sa.ss_family == AF_INET6) {
235 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
236 		}
237 	}
238 
239 	memset(&sa, 0, sizeof sa);
240 	salen = sizeof sa;
241 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
242 	if (rc != 0) {
243 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
244 		return -1;
245 	}
246 
247 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
248 	if (rc != 0) {
249 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
250 		return -1;
251 	}
252 
253 	if (cport) {
254 		if (sa.ss_family == AF_INET) {
255 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
256 		} else if (sa.ss_family == AF_INET6) {
257 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
258 		}
259 	}
260 
261 	return 0;
262 }
263 
264 enum uring_sock_create_type {
265 	SPDK_SOCK_CREATE_LISTEN,
266 	SPDK_SOCK_CREATE_CONNECT,
267 };
268 
269 static int
270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
271 {
272 	uint8_t *new_buf;
273 	struct spdk_pipe *new_pipe;
274 	struct iovec siov[2];
275 	struct iovec diov[2];
276 	int sbytes;
277 	ssize_t bytes;
278 	int rc;
279 
280 	if (sock->recv_buf_sz == sz) {
281 		return 0;
282 	}
283 
284 	/* If the new size is 0, just free the pipe */
285 	if (sz == 0) {
286 		spdk_pipe_destroy(sock->recv_pipe);
287 		free(sock->recv_buf);
288 		sock->recv_pipe = NULL;
289 		sock->recv_buf = NULL;
290 		return 0;
291 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
292 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
293 		return -1;
294 	}
295 
296 	/* Round up to next 64 byte multiple */
297 	rc = posix_memalign((void **)&new_buf, 64, sz);
298 	if (rc != 0) {
299 		SPDK_ERRLOG("socket recv buf allocation failed\n");
300 		return -ENOMEM;
301 	}
302 	memset(new_buf, 0, sz);
303 
304 	new_pipe = spdk_pipe_create(new_buf, sz);
305 	if (new_pipe == NULL) {
306 		SPDK_ERRLOG("socket pipe allocation failed\n");
307 		free(new_buf);
308 		return -ENOMEM;
309 	}
310 
311 	if (sock->recv_pipe != NULL) {
312 		/* Pull all of the data out of the old pipe */
313 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
314 		if (sbytes > sz) {
315 			/* Too much data to fit into the new pipe size */
316 			spdk_pipe_destroy(new_pipe);
317 			free(new_buf);
318 			return -EINVAL;
319 		}
320 
321 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
322 		assert(sbytes == sz);
323 
324 		bytes = spdk_iovcpy(siov, 2, diov, 2);
325 		spdk_pipe_writer_advance(new_pipe, bytes);
326 
327 		spdk_pipe_destroy(sock->recv_pipe);
328 		free(sock->recv_buf);
329 	}
330 
331 	sock->recv_buf_sz = sz;
332 	sock->recv_buf = new_buf;
333 	sock->recv_pipe = new_pipe;
334 
335 	return 0;
336 }
337 
338 static int
339 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
340 {
341 	struct spdk_uring_sock *sock = __uring_sock(_sock);
342 	int min_size;
343 	int rc;
344 
345 	assert(sock != NULL);
346 
347 	if (_sock->impl_opts.enable_recv_pipe) {
348 		rc = uring_sock_alloc_pipe(sock, sz);
349 		if (rc) {
350 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
351 			return rc;
352 		}
353 	}
354 
355 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
356 	 * g_spdk_uring_sock_impl_opts.recv_buf_size. */
357 	min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size);
358 
359 	if (sz < min_size) {
360 		sz = min_size;
361 	}
362 
363 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
364 	if (rc < 0) {
365 		return rc;
366 	}
367 
368 	_sock->impl_opts.recv_buf_size = sz;
369 
370 	return 0;
371 }
372 
373 static int
374 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
375 {
376 	struct spdk_uring_sock *sock = __uring_sock(_sock);
377 	int min_size;
378 	int rc;
379 
380 	assert(sock != NULL);
381 
382 	/* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
383 	 * g_spdk_uring_sock_impl_opts.seend_buf_size. */
384 	min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size);
385 
386 	if (sz < min_size) {
387 		sz = min_size;
388 	}
389 
390 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
391 	if (rc < 0) {
392 		return rc;
393 	}
394 
395 	_sock->impl_opts.send_buf_size = sz;
396 
397 	return 0;
398 }
399 
400 static struct spdk_uring_sock *
401 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
402 {
403 	struct spdk_uring_sock *sock;
404 #if defined(__linux__)
405 	int flag;
406 	int rc;
407 #endif
408 
409 	sock = calloc(1, sizeof(*sock));
410 	if (sock == NULL) {
411 		SPDK_ERRLOG("sock allocation failed\n");
412 		return NULL;
413 	}
414 
415 	sock->fd = fd;
416 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
417 
418 #if defined(__linux__)
419 	flag = 1;
420 
421 	if (sock->base.impl_opts.enable_quickack) {
422 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
423 		if (rc != 0) {
424 			SPDK_ERRLOG("quickack was failed to set\n");
425 		}
426 	}
427 
428 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
429 				   &sock->placement_id);
430 #ifdef SPDK_ZEROCOPY
431 	/* Try to turn on zero copy sends */
432 	flag = 1;
433 
434 	if (enable_zero_copy) {
435 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
436 		if (rc == 0) {
437 			sock->zcopy = true;
438 			sock->zcopy_send_flags = MSG_ZEROCOPY;
439 		}
440 	}
441 #endif
442 #endif
443 
444 	return sock;
445 }
446 
447 static struct spdk_sock *
448 uring_sock_create(const char *ip, int port,
449 		  enum uring_sock_create_type type,
450 		  struct spdk_sock_opts *opts)
451 {
452 	struct spdk_uring_sock *sock;
453 	struct spdk_sock_impl_opts impl_opts;
454 	char buf[MAX_TMPBUF];
455 	char portnum[PORTNUMLEN];
456 	char *p;
457 	struct addrinfo hints, *res, *res0;
458 	int fd, flag;
459 	int val = 1;
460 	int rc;
461 	bool enable_zcopy_impl_opts = false;
462 	bool enable_zcopy_user_opts = true;
463 
464 	assert(opts != NULL);
465 	uring_opts_get_impl_opts(opts, &impl_opts);
466 
467 	if (ip == NULL) {
468 		return NULL;
469 	}
470 	if (ip[0] == '[') {
471 		snprintf(buf, sizeof(buf), "%s", ip + 1);
472 		p = strchr(buf, ']');
473 		if (p != NULL) {
474 			*p = '\0';
475 		}
476 		ip = (const char *) &buf[0];
477 	}
478 
479 	snprintf(portnum, sizeof portnum, "%d", port);
480 	memset(&hints, 0, sizeof hints);
481 	hints.ai_family = PF_UNSPEC;
482 	hints.ai_socktype = SOCK_STREAM;
483 	hints.ai_flags = AI_NUMERICSERV;
484 	hints.ai_flags |= AI_PASSIVE;
485 	hints.ai_flags |= AI_NUMERICHOST;
486 	rc = getaddrinfo(ip, portnum, &hints, &res0);
487 	if (rc != 0) {
488 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
489 		return NULL;
490 	}
491 
492 	/* try listen */
493 	fd = -1;
494 	for (res = res0; res != NULL; res = res->ai_next) {
495 retry:
496 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
497 		if (fd < 0) {
498 			/* error */
499 			continue;
500 		}
501 
502 		val = impl_opts.recv_buf_size;
503 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
504 		if (rc) {
505 			/* Not fatal */
506 		}
507 
508 		val = impl_opts.send_buf_size;
509 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
510 		if (rc) {
511 			/* Not fatal */
512 		}
513 
514 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
515 		if (rc != 0) {
516 			close(fd);
517 			fd = -1;
518 			/* error */
519 			continue;
520 		}
521 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
522 		if (rc != 0) {
523 			close(fd);
524 			fd = -1;
525 			/* error */
526 			continue;
527 		}
528 
529 		if (opts->ack_timeout) {
530 #if defined(__linux__)
531 			val = opts->ack_timeout;
532 			rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val);
533 			if (rc != 0) {
534 				close(fd);
535 				fd = -1;
536 				/* error */
537 				continue;
538 			}
539 #else
540 			SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
541 #endif
542 		}
543 
544 
545 
546 #if defined(SO_PRIORITY)
547 		if (opts != NULL && opts->priority) {
548 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
549 			if (rc != 0) {
550 				close(fd);
551 				fd = -1;
552 				/* error */
553 				continue;
554 			}
555 		}
556 #endif
557 		if (res->ai_family == AF_INET6) {
558 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
559 			if (rc != 0) {
560 				close(fd);
561 				fd = -1;
562 				/* error */
563 				continue;
564 			}
565 		}
566 
567 		if (type == SPDK_SOCK_CREATE_LISTEN) {
568 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
569 			if (rc != 0) {
570 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
571 				switch (errno) {
572 				case EINTR:
573 					/* interrupted? */
574 					close(fd);
575 					goto retry;
576 				case EADDRNOTAVAIL:
577 					SPDK_ERRLOG("IP address %s not available. "
578 						    "Verify IP address in config file "
579 						    "and make sure setup script is "
580 						    "run before starting spdk app.\n", ip);
581 				/* FALLTHROUGH */
582 				default:
583 					/* try next family */
584 					close(fd);
585 					fd = -1;
586 					continue;
587 				}
588 			}
589 			/* bind OK */
590 			rc = listen(fd, 512);
591 			if (rc != 0) {
592 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
593 				close(fd);
594 				fd = -1;
595 				break;
596 			}
597 
598 			flag = fcntl(fd, F_GETFL);
599 			if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
600 				SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
601 				close(fd);
602 				fd = -1;
603 				break;
604 			}
605 
606 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
607 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
608 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
609 			if (rc != 0) {
610 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
611 				/* try next family */
612 				close(fd);
613 				fd = -1;
614 				continue;
615 			}
616 
617 			flag = fcntl(fd, F_GETFL);
618 			if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
619 				SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
620 				close(fd);
621 				fd = -1;
622 				break;
623 			}
624 
625 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
626 		}
627 		break;
628 	}
629 	freeaddrinfo(res0);
630 
631 	if (fd < 0) {
632 		return NULL;
633 	}
634 
635 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
636 	sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
637 	if (sock == NULL) {
638 		SPDK_ERRLOG("sock allocation failed\n");
639 		close(fd);
640 		return NULL;
641 	}
642 
643 	return &sock->base;
644 }
645 
646 static struct spdk_sock *
647 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
648 {
649 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
650 }
651 
652 static struct spdk_sock *
653 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
654 {
655 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
656 }
657 
658 static struct spdk_sock *
659 uring_sock_accept(struct spdk_sock *_sock)
660 {
661 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
662 	struct sockaddr_storage		sa;
663 	socklen_t			salen;
664 	int				rc, fd;
665 	struct spdk_uring_sock		*new_sock;
666 	int				flag;
667 
668 	memset(&sa, 0, sizeof(sa));
669 	salen = sizeof(sa);
670 
671 	assert(sock != NULL);
672 
673 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
674 
675 	if (rc == -1) {
676 		return NULL;
677 	}
678 
679 	fd = rc;
680 
681 	flag = fcntl(fd, F_GETFL);
682 	if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
683 		SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
684 		close(fd);
685 		return NULL;
686 	}
687 
688 #if defined(SO_PRIORITY)
689 	/* The priority is not inherited, so call this function again */
690 	if (sock->base.opts.priority) {
691 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
692 		if (rc != 0) {
693 			close(fd);
694 			return NULL;
695 		}
696 	}
697 #endif
698 
699 	new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
700 	if (new_sock == NULL) {
701 		close(fd);
702 		return NULL;
703 	}
704 
705 	return &new_sock->base;
706 }
707 
708 static int
709 uring_sock_close(struct spdk_sock *_sock)
710 {
711 	struct spdk_uring_sock *sock = __uring_sock(_sock);
712 
713 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
714 	assert(sock->group == NULL);
715 
716 	/* If the socket fails to close, the best choice is to
717 	 * leak the fd but continue to free the rest of the sock
718 	 * memory. */
719 	close(sock->fd);
720 
721 	spdk_pipe_destroy(sock->recv_pipe);
722 	free(sock->recv_buf);
723 	free(sock);
724 
725 	return 0;
726 }
727 
728 static ssize_t
729 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
730 {
731 	struct iovec siov[2];
732 	int sbytes;
733 	ssize_t bytes;
734 	struct spdk_uring_sock_group_impl *group;
735 
736 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
737 	if (sbytes < 0) {
738 		errno = EINVAL;
739 		return -1;
740 	} else if (sbytes == 0) {
741 		errno = EAGAIN;
742 		return -1;
743 	}
744 
745 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
746 
747 	if (bytes == 0) {
748 		/* The only way this happens is if diov is 0 length */
749 		errno = EINVAL;
750 		return -1;
751 	}
752 
753 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
754 
755 	/* If we drained the pipe, take it off the level-triggered list */
756 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
757 		group = __uring_group_impl(sock->base.group_impl);
758 		TAILQ_REMOVE(&group->pending_recv, sock, link);
759 		sock->pending_recv = false;
760 	}
761 
762 	return bytes;
763 }
764 
765 static inline ssize_t
766 sock_readv(int fd, struct iovec *iov, int iovcnt)
767 {
768 	struct msghdr msg = {
769 		.msg_iov = iov,
770 		.msg_iovlen = iovcnt,
771 	};
772 
773 	return recvmsg(fd, &msg, MSG_DONTWAIT);
774 }
775 
776 static inline ssize_t
777 uring_sock_read(struct spdk_uring_sock *sock)
778 {
779 	struct iovec iov[2];
780 	int bytes;
781 	struct spdk_uring_sock_group_impl *group;
782 
783 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
784 
785 	if (bytes > 0) {
786 		bytes = sock_readv(sock->fd, iov, 2);
787 		if (bytes > 0) {
788 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
789 			if (sock->base.group_impl) {
790 				group = __uring_group_impl(sock->base.group_impl);
791 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
792 				sock->pending_recv = true;
793 			}
794 		}
795 	}
796 
797 	return bytes;
798 }
799 
800 static ssize_t
801 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
802 {
803 	struct spdk_uring_sock *sock = __uring_sock(_sock);
804 	int rc, i;
805 	size_t len;
806 
807 	if (sock->recv_pipe == NULL) {
808 		return sock_readv(sock->fd, iov, iovcnt);
809 	}
810 
811 	len = 0;
812 	for (i = 0; i < iovcnt; i++) {
813 		len += iov[i].iov_len;
814 	}
815 
816 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
817 		/* If the user is receiving a sufficiently large amount of data,
818 		 * receive directly to their buffers. */
819 		if (len >= MIN_SOCK_PIPE_SIZE) {
820 			return sock_readv(sock->fd, iov, iovcnt);
821 		}
822 
823 		/* Otherwise, do a big read into our pipe */
824 		rc = uring_sock_read(sock);
825 		if (rc <= 0) {
826 			return rc;
827 		}
828 	}
829 
830 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
831 }
832 
833 static ssize_t
834 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
835 {
836 	struct iovec iov[1];
837 
838 	iov[0].iov_base = buf;
839 	iov[0].iov_len = len;
840 
841 	return uring_sock_readv(sock, iov, 1);
842 }
843 
844 static ssize_t
845 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
846 {
847 	struct spdk_uring_sock *sock = __uring_sock(_sock);
848 	struct msghdr msg = {
849 		.msg_iov = iov,
850 		.msg_iovlen = iovcnt,
851 	};
852 
853 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
854 		errno = EAGAIN;
855 		return -1;
856 	}
857 
858 	return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
859 }
860 
861 static ssize_t
862 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc)
863 {
864 	unsigned int offset;
865 	size_t len;
866 	int i;
867 
868 	offset = req->internal.offset;
869 	for (i = 0; i < req->iovcnt; i++) {
870 		/* Advance by the offset first */
871 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
872 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
873 			continue;
874 		}
875 
876 		/* Calculate the remaining length of this element */
877 		len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
878 
879 		if (len > (size_t)rc) {
880 			req->internal.offset += rc;
881 			return -1;
882 		}
883 
884 		offset = 0;
885 		req->internal.offset += len;
886 		rc -= len;
887 	}
888 
889 	return rc;
890 }
891 
892 static int
893 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
894 {
895 	struct spdk_uring_sock *sock = __uring_sock(_sock);
896 	struct spdk_sock_request *req;
897 	int retval;
898 
899 	if (is_zcopy) {
900 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
901 		 * req->internal.offset, so sendmsg_idx should not be zero */
902 		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
903 			sock->sendmsg_idx = 1;
904 		} else {
905 			sock->sendmsg_idx++;
906 		}
907 	}
908 
909 	/* Consume the requests that were actually written */
910 	req = TAILQ_FIRST(&_sock->queued_reqs);
911 	while (req) {
912 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
913 		req->internal.is_zcopy = is_zcopy;
914 
915 		rc = sock_request_advance_offset(req, rc);
916 		if (rc < 0) {
917 			/* This element was partially sent. */
918 			return 0;
919 		}
920 
921 		/* Handled a full request. */
922 		spdk_sock_request_pend(_sock, req);
923 
924 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
925 			retval = spdk_sock_request_put(_sock, req, 0);
926 			if (retval) {
927 				return retval;
928 			}
929 		} else {
930 			/* Re-use the offset field to hold the sendmsg call index. The
931 			 * index is 0 based, so subtract one here because we've already
932 			 * incremented above. */
933 			req->internal.offset = sock->sendmsg_idx - 1;
934 		}
935 
936 		if (rc == 0) {
937 			break;
938 		}
939 
940 		req = TAILQ_FIRST(&_sock->queued_reqs);
941 	}
942 
943 	return 0;
944 }
945 
946 #ifdef SPDK_ZEROCOPY
947 static int
948 _sock_check_zcopy(struct spdk_sock *_sock, int status)
949 {
950 	struct spdk_uring_sock *sock = __uring_sock(_sock);
951 	ssize_t rc;
952 	struct sock_extended_err *serr;
953 	struct cmsghdr *cm;
954 	uint32_t idx;
955 	struct spdk_sock_request *req, *treq;
956 	bool found;
957 
958 	assert(sock->zcopy == true);
959 	if (spdk_unlikely(status) < 0) {
960 		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
961 			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
962 				    status);
963 		} else {
964 			SPDK_WARNLOG("Recvmsg yielded an error!\n");
965 		}
966 		return 0;
967 	}
968 
969 	cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
970 	if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
971 	      (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) {
972 		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
973 		return 0;
974 	}
975 
976 	serr = (struct sock_extended_err *)CMSG_DATA(cm);
977 	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
978 		SPDK_WARNLOG("Unexpected extended error origin\n");
979 		return 0;
980 	}
981 
982 	/* Most of the time, the pending_reqs array is in the exact
983 	 * order we need such that all of the requests to complete are
984 	 * in order, in the front. It is guaranteed that all requests
985 	 * belonging to the same sendmsg call are sequential, so once
986 	 * we encounter one match we can stop looping as soon as a
987 	 * non-match is found.
988 	 */
989 	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
990 		found = false;
991 		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
992 			if (!req->internal.is_zcopy) {
993 				/* This wasn't a zcopy request. It was just waiting in line to complete */
994 				rc = spdk_sock_request_put(_sock, req, 0);
995 				if (rc < 0) {
996 					return rc;
997 				}
998 			} else if (req->internal.offset == idx) {
999 				found = true;
1000 				rc = spdk_sock_request_put(_sock, req, 0);
1001 				if (rc < 0) {
1002 					return rc;
1003 				}
1004 			} else if (found) {
1005 				break;
1006 			}
1007 		}
1008 	}
1009 
1010 	return 0;
1011 }
1012 
1013 static void
1014 _sock_prep_errqueue(struct spdk_sock *_sock)
1015 {
1016 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1017 	struct spdk_uring_task *task = &sock->errqueue_task;
1018 	struct io_uring_sqe *sqe;
1019 
1020 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1021 		return;
1022 	}
1023 
1024 	assert(sock->group != NULL);
1025 	sock->group->io_queued++;
1026 
1027 	sqe = io_uring_get_sqe(&sock->group->uring);
1028 	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
1029 	io_uring_sqe_set_data(sqe, task);
1030 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1031 }
1032 
1033 #endif
1034 
1035 static void
1036 _sock_flush(struct spdk_sock *_sock)
1037 {
1038 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1039 	struct spdk_uring_task *task = &sock->write_task;
1040 	uint32_t iovcnt;
1041 	struct io_uring_sqe *sqe;
1042 	int flags;
1043 
1044 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1045 		return;
1046 	}
1047 
1048 #ifdef SPDK_ZEROCOPY
1049 	if (sock->zcopy) {
1050 		flags = MSG_DONTWAIT | sock->zcopy_send_flags;
1051 	} else
1052 #endif
1053 	{
1054 		flags = MSG_DONTWAIT;
1055 	}
1056 
1057 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
1058 	if (!iovcnt) {
1059 		return;
1060 	}
1061 
1062 	task->iov_cnt = iovcnt;
1063 	assert(sock->group != NULL);
1064 	task->msg.msg_iov = task->iovs;
1065 	task->msg.msg_iovlen = task->iov_cnt;
1066 #ifdef SPDK_ZEROCOPY
1067 	task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
1068 #endif
1069 	sock->group->io_queued++;
1070 
1071 	sqe = io_uring_get_sqe(&sock->group->uring);
1072 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
1073 	io_uring_sqe_set_data(sqe, task);
1074 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1075 }
1076 
1077 static void
1078 _sock_prep_pollin(struct spdk_sock *_sock)
1079 {
1080 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1081 	struct spdk_uring_task *task = &sock->pollin_task;
1082 	struct io_uring_sqe *sqe;
1083 
1084 	/* Do not prepare pollin event */
1085 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) {
1086 		return;
1087 	}
1088 
1089 	assert(sock->group != NULL);
1090 	sock->group->io_queued++;
1091 
1092 	sqe = io_uring_get_sqe(&sock->group->uring);
1093 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR);
1094 	io_uring_sqe_set_data(sqe, task);
1095 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1096 }
1097 
1098 static void
1099 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
1100 {
1101 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1102 	struct spdk_uring_task *task = &sock->cancel_task;
1103 	struct io_uring_sqe *sqe;
1104 
1105 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1106 		return;
1107 	}
1108 
1109 	assert(sock->group != NULL);
1110 	sock->group->io_queued++;
1111 
1112 	sqe = io_uring_get_sqe(&sock->group->uring);
1113 	io_uring_prep_cancel(sqe, user_data, 0);
1114 	io_uring_sqe_set_data(sqe, task);
1115 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1116 }
1117 
1118 static int
1119 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
1120 		      struct spdk_sock **socks)
1121 {
1122 	int i, count, ret;
1123 	struct io_uring_cqe *cqe;
1124 	struct spdk_uring_sock *sock, *tmp;
1125 	struct spdk_uring_task *task;
1126 	int status;
1127 	bool is_zcopy;
1128 
1129 	for (i = 0; i < max; i++) {
1130 		ret = io_uring_peek_cqe(&group->uring, &cqe);
1131 		if (ret != 0) {
1132 			break;
1133 		}
1134 
1135 		if (cqe == NULL) {
1136 			break;
1137 		}
1138 
1139 		task = (struct spdk_uring_task *)cqe->user_data;
1140 		assert(task != NULL);
1141 		sock = task->sock;
1142 		assert(sock != NULL);
1143 		assert(sock->group != NULL);
1144 		assert(sock->group == group);
1145 		sock->group->io_inflight--;
1146 		sock->group->io_avail++;
1147 		status = cqe->res;
1148 		io_uring_cqe_seen(&group->uring, cqe);
1149 
1150 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
1151 
1152 		if (spdk_unlikely(status <= 0)) {
1153 			if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) {
1154 				continue;
1155 			}
1156 		}
1157 
1158 		switch (task->type) {
1159 		case SPDK_SOCK_TASK_POLLIN:
1160 #ifdef SPDK_ZEROCOPY
1161 			if ((status & POLLERR) == POLLERR) {
1162 				_sock_prep_errqueue(&sock->base);
1163 			}
1164 #endif
1165 			if ((status & POLLIN) == POLLIN) {
1166 				if (sock->base.cb_fn != NULL &&
1167 				    sock->pending_recv == false) {
1168 					sock->pending_recv = true;
1169 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1170 				}
1171 			}
1172 			break;
1173 		case SPDK_SOCK_TASK_WRITE:
1174 			task->last_req = NULL;
1175 			task->iov_cnt = 0;
1176 			is_zcopy = task->is_zcopy;
1177 			task->is_zcopy = false;
1178 			if (spdk_unlikely(status) < 0) {
1179 				sock->connection_status = status;
1180 				spdk_sock_abort_requests(&sock->base);
1181 			} else {
1182 				sock_complete_write_reqs(&sock->base, status, is_zcopy);
1183 			}
1184 
1185 			break;
1186 #ifdef SPDK_ZEROCOPY
1187 		case SPDK_SOCK_TASK_ERRQUEUE:
1188 			if (spdk_unlikely(status == -ECANCELED)) {
1189 				sock->connection_status = status;
1190 				break;
1191 			}
1192 			_sock_check_zcopy(&sock->base, status);
1193 			break;
1194 #endif
1195 		case SPDK_SOCK_TASK_CANCEL:
1196 			/* Do nothing */
1197 			break;
1198 		default:
1199 			SPDK_UNREACHABLE();
1200 		}
1201 	}
1202 
1203 	if (!socks) {
1204 		return 0;
1205 	}
1206 	count = 0;
1207 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1208 		if (count == max_read_events) {
1209 			break;
1210 		}
1211 
1212 		if (spdk_unlikely(sock->base.cb_fn == NULL) ||
1213 		    (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) {
1214 			sock->pending_recv = false;
1215 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1216 			if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1217 				/* If the socket's cb_fn is NULL, do not add it to socks array */
1218 				continue;
1219 			}
1220 		}
1221 
1222 		socks[count++] = &sock->base;
1223 	}
1224 
1225 
1226 	/* Cycle the pending_recv list so that each time we poll things aren't
1227 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1228 	 * A B C D E F
1229 	 * And all 6 sockets had the poll events, but max_events is only 3. That means
1230 	 * psock currently points at D. We want to rearrange the list to the following:
1231 	 * D E F A B C
1232 	 *
1233 	 * The variables below are named according to this example to make it easier to
1234 	 * follow the swaps.
1235 	 */
1236 	if (sock != NULL) {
1237 		struct spdk_uring_sock *ua, *uc, *ud, *uf;
1238 
1239 		/* Capture pointers to the elements we need */
1240 		ud = sock;
1241 
1242 		ua = TAILQ_FIRST(&group->pending_recv);
1243 		if (ua == ud) {
1244 			goto end;
1245 		}
1246 
1247 		uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
1248 		if (uf == ud) {
1249 			TAILQ_REMOVE(&group->pending_recv, ud, link);
1250 			TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
1251 			goto end;
1252 		}
1253 
1254 		uc = TAILQ_PREV(ud, pending_recv_list, link);
1255 		assert(uc != NULL);
1256 
1257 		/* Break the link between C and D */
1258 		uc->link.tqe_next = NULL;
1259 
1260 		/* Connect F to A */
1261 		uf->link.tqe_next = ua;
1262 		ua->link.tqe_prev = &uf->link.tqe_next;
1263 
1264 		/* Fix up the list first/last pointers */
1265 		group->pending_recv.tqh_first = ud;
1266 		group->pending_recv.tqh_last = &uc->link.tqe_next;
1267 
1268 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1269 		ud->link.tqe_prev = &group->pending_recv.tqh_first;
1270 	}
1271 
1272 end:
1273 	return count;
1274 }
1275 
1276 static int uring_sock_flush(struct spdk_sock *_sock);
1277 
1278 static void
1279 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1280 {
1281 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1282 	int rc;
1283 
1284 	if (spdk_unlikely(sock->connection_status)) {
1285 		req->cb_fn(req->cb_arg, sock->connection_status);
1286 		return;
1287 	}
1288 
1289 	spdk_sock_request_queue(_sock, req);
1290 
1291 	if (!sock->group) {
1292 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1293 			rc = uring_sock_flush(_sock);
1294 			if (rc < 0 && errno != EAGAIN) {
1295 				spdk_sock_abort_requests(_sock);
1296 			}
1297 		}
1298 	}
1299 }
1300 
1301 static void
1302 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1303 {
1304 	req->cb_fn(req->cb_arg, -ENOTSUP);
1305 }
1306 
1307 static int
1308 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1309 {
1310 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1311 	int val;
1312 	int rc;
1313 
1314 	assert(sock != NULL);
1315 
1316 	val = nbytes;
1317 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1318 	if (rc != 0) {
1319 		return -1;
1320 	}
1321 	return 0;
1322 }
1323 
1324 static bool
1325 uring_sock_is_ipv6(struct spdk_sock *_sock)
1326 {
1327 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1328 	struct sockaddr_storage sa;
1329 	socklen_t salen;
1330 	int rc;
1331 
1332 	assert(sock != NULL);
1333 
1334 	memset(&sa, 0, sizeof sa);
1335 	salen = sizeof sa;
1336 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1337 	if (rc != 0) {
1338 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1339 		return false;
1340 	}
1341 
1342 	return (sa.ss_family == AF_INET6);
1343 }
1344 
1345 static bool
1346 uring_sock_is_ipv4(struct spdk_sock *_sock)
1347 {
1348 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1349 	struct sockaddr_storage sa;
1350 	socklen_t salen;
1351 	int rc;
1352 
1353 	assert(sock != NULL);
1354 
1355 	memset(&sa, 0, sizeof sa);
1356 	salen = sizeof sa;
1357 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1358 	if (rc != 0) {
1359 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1360 		return false;
1361 	}
1362 
1363 	return (sa.ss_family == AF_INET);
1364 }
1365 
1366 static bool
1367 uring_sock_is_connected(struct spdk_sock *_sock)
1368 {
1369 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1370 	uint8_t byte;
1371 	int rc;
1372 
1373 	rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
1374 	if (rc == 0) {
1375 		return false;
1376 	}
1377 
1378 	if (rc < 0) {
1379 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1380 			return true;
1381 		}
1382 
1383 		return false;
1384 	}
1385 
1386 	return true;
1387 }
1388 
1389 static struct spdk_sock_group_impl *
1390 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1391 {
1392 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1393 	struct spdk_sock_group_impl *group;
1394 
1395 	if (sock->placement_id != -1) {
1396 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
1397 		return group;
1398 	}
1399 
1400 	return NULL;
1401 }
1402 
1403 static struct spdk_sock_group_impl *
1404 uring_sock_group_impl_create(void)
1405 {
1406 	struct spdk_uring_sock_group_impl *group_impl;
1407 
1408 	group_impl = calloc(1, sizeof(*group_impl));
1409 	if (group_impl == NULL) {
1410 		SPDK_ERRLOG("group_impl allocation failed\n");
1411 		return NULL;
1412 	}
1413 
1414 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1415 
1416 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1417 		SPDK_ERRLOG("uring I/O context setup failure\n");
1418 		free(group_impl);
1419 		return NULL;
1420 	}
1421 
1422 	TAILQ_INIT(&group_impl->pending_recv);
1423 
1424 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1425 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1426 	}
1427 
1428 	return &group_impl->base;
1429 }
1430 
1431 static int
1432 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1433 			       struct spdk_sock *_sock)
1434 {
1435 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1436 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1437 	int rc;
1438 
1439 	sock->group = group;
1440 	sock->write_task.sock = sock;
1441 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1442 
1443 	sock->pollin_task.sock = sock;
1444 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1445 
1446 	sock->errqueue_task.sock = sock;
1447 	sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE;
1448 	sock->errqueue_task.msg.msg_control = sock->buf;
1449 	sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
1450 
1451 	sock->cancel_task.sock = sock;
1452 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1453 
1454 	/* switched from another polling group due to scheduling */
1455 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1456 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1457 		assert(sock->pending_recv == false);
1458 		sock->pending_recv = true;
1459 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1460 	}
1461 
1462 	if (sock->placement_id != -1) {
1463 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1464 		if (rc != 0) {
1465 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1466 			/* Do not treat this as an error. The system will continue running. */
1467 		}
1468 	}
1469 
1470 	return 0;
1471 }
1472 
1473 static int
1474 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1475 			   struct spdk_sock **socks)
1476 {
1477 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1478 	int count, ret;
1479 	int to_complete, to_submit;
1480 	struct spdk_sock *_sock, *tmp;
1481 	struct spdk_uring_sock *sock;
1482 
1483 	if (spdk_likely(socks)) {
1484 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1485 			sock = __uring_sock(_sock);
1486 			if (spdk_unlikely(sock->connection_status)) {
1487 				continue;
1488 			}
1489 			_sock_flush(_sock);
1490 			_sock_prep_pollin(_sock);
1491 		}
1492 	}
1493 
1494 	to_submit = group->io_queued;
1495 
1496 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1497 	if (to_submit > 0) {
1498 		/* If there are I/O to submit, use io_uring_submit here.
1499 		 * It will automatically call io_uring_enter appropriately. */
1500 		ret = io_uring_submit(&group->uring);
1501 		if (ret < 0) {
1502 			return 1;
1503 		}
1504 		group->io_queued = 0;
1505 		group->io_inflight += to_submit;
1506 		group->io_avail -= to_submit;
1507 	}
1508 
1509 	count = 0;
1510 	to_complete = group->io_inflight;
1511 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1512 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1513 	}
1514 
1515 	return count;
1516 }
1517 
1518 static int
1519 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1520 				  struct spdk_sock *_sock)
1521 {
1522 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1523 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1524 
1525 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1526 		_sock_prep_cancel_task(_sock, &sock->write_task);
1527 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1528 		 * currently can use a while loop here. */
1529 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1530 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1531 			uring_sock_group_impl_poll(_group, 32, NULL);
1532 		}
1533 	}
1534 
1535 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1536 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1537 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1538 		 * currently can use a while loop here. */
1539 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1540 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1541 			uring_sock_group_impl_poll(_group, 32, NULL);
1542 		}
1543 	}
1544 
1545 	if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1546 		_sock_prep_cancel_task(_sock, &sock->errqueue_task);
1547 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1548 		 * currently can use a while loop here. */
1549 		while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1550 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1551 			uring_sock_group_impl_poll(_group, 32, NULL);
1552 		}
1553 	}
1554 
1555 	/* Make sure the cancelling the tasks above didn't cause sending new requests */
1556 	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1557 	assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1558 	assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1559 
1560 	if (sock->pending_recv) {
1561 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1562 		sock->pending_recv = false;
1563 	}
1564 	assert(sock->pending_recv == false);
1565 
1566 	if (sock->placement_id != -1) {
1567 		spdk_sock_map_release(&g_map, sock->placement_id);
1568 	}
1569 
1570 	sock->group = NULL;
1571 	return 0;
1572 }
1573 
1574 static int
1575 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1576 {
1577 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1578 
1579 	/* try to reap all the active I/O */
1580 	while (group->io_inflight) {
1581 		uring_sock_group_impl_poll(_group, 32, NULL);
1582 	}
1583 	assert(group->io_inflight == 0);
1584 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1585 
1586 	io_uring_queue_exit(&group->uring);
1587 
1588 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1589 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1590 	}
1591 
1592 	free(group);
1593 	return 0;
1594 }
1595 
1596 static int
1597 uring_sock_flush(struct spdk_sock *_sock)
1598 {
1599 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1600 	struct msghdr msg = {};
1601 	struct iovec iovs[IOV_BATCH_SIZE];
1602 	int iovcnt;
1603 	ssize_t rc;
1604 	int flags = sock->zcopy_send_flags;
1605 	int retval;
1606 	bool is_zcopy = false;
1607 
1608 	/* Can't flush from within a callback or we end up with recursive calls */
1609 	if (_sock->cb_cnt > 0) {
1610 		errno = EAGAIN;
1611 		return -1;
1612 	}
1613 
1614 	/* Can't flush while a write is already outstanding */
1615 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1616 		errno = EAGAIN;
1617 		return -1;
1618 	}
1619 
1620 	/* Gather an iov */
1621 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
1622 	if (iovcnt == 0) {
1623 		/* Nothing to send */
1624 		return 0;
1625 	}
1626 
1627 	/* Perform the vectored write */
1628 	msg.msg_iov = iovs;
1629 	msg.msg_iovlen = iovcnt;
1630 	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
1631 	if (rc <= 0) {
1632 		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) {
1633 			errno = EAGAIN;
1634 		}
1635 		return -1;
1636 	}
1637 
1638 #ifdef SPDK_ZEROCOPY
1639 	is_zcopy = flags & MSG_ZEROCOPY;
1640 #endif
1641 	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
1642 	if (retval < 0) {
1643 		/* if the socket is closed, return to avoid heap-use-after-free error */
1644 		errno = ENOTCONN;
1645 		return -1;
1646 	}
1647 
1648 #ifdef SPDK_ZEROCOPY
1649 	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
1650 		_sock_check_zcopy(_sock, 0);
1651 	}
1652 #endif
1653 
1654 	return rc;
1655 }
1656 
1657 static struct spdk_net_impl g_uring_net_impl = {
1658 	.name		= "uring",
1659 	.getaddr	= uring_sock_getaddr,
1660 	.connect	= uring_sock_connect,
1661 	.listen		= uring_sock_listen,
1662 	.accept		= uring_sock_accept,
1663 	.close		= uring_sock_close,
1664 	.recv		= uring_sock_recv,
1665 	.readv		= uring_sock_readv,
1666 	.readv_async	= uring_sock_readv_async,
1667 	.writev		= uring_sock_writev,
1668 	.writev_async	= uring_sock_writev_async,
1669 	.flush          = uring_sock_flush,
1670 	.set_recvlowat	= uring_sock_set_recvlowat,
1671 	.set_recvbuf	= uring_sock_set_recvbuf,
1672 	.set_sendbuf	= uring_sock_set_sendbuf,
1673 	.is_ipv6	= uring_sock_is_ipv6,
1674 	.is_ipv4	= uring_sock_is_ipv4,
1675 	.is_connected   = uring_sock_is_connected,
1676 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
1677 	.group_impl_create	= uring_sock_group_impl_create,
1678 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1679 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1680 	.group_impl_poll	= uring_sock_group_impl_poll,
1681 	.group_impl_close	= uring_sock_group_impl_close,
1682 	.get_opts		= uring_sock_impl_get_opts,
1683 	.set_opts		= uring_sock_impl_set_opts,
1684 };
1685 
1686 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 2);
1687