xref: /spdk/module/sock/uring/uring.c (revision a6dbe3721eb3b5990707fc3e378c95e505dd8ab5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/config.h"
8 
9 #include <linux/errqueue.h>
10 #include <sys/epoll.h>
11 #include <liburing.h>
12 
13 #include "spdk/barrier.h"
14 #include "spdk/env.h"
15 #include "spdk/log.h"
16 #include "spdk/pipe.h"
17 #include "spdk/sock.h"
18 #include "spdk/string.h"
19 #include "spdk/util.h"
20 
21 #include "spdk_internal/sock.h"
22 #include "spdk_internal/assert.h"
23 #include "../sock_kernel.h"
24 
25 #define MAX_TMPBUF 1024
26 #define PORTNUMLEN 32
27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
29 
30 enum spdk_sock_task_type {
31 	SPDK_SOCK_TASK_POLLIN = 0,
32 	SPDK_SOCK_TASK_ERRQUEUE,
33 	SPDK_SOCK_TASK_WRITE,
34 	SPDK_SOCK_TASK_CANCEL,
35 };
36 
37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
38 #define SPDK_ZEROCOPY
39 #endif
40 
41 enum spdk_uring_sock_task_status {
42 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
43 	SPDK_URING_SOCK_TASK_IN_PROCESS,
44 };
45 
46 struct spdk_uring_task {
47 	enum spdk_uring_sock_task_status	status;
48 	enum spdk_sock_task_type		type;
49 	struct spdk_uring_sock			*sock;
50 	struct msghdr				msg;
51 	struct iovec				iovs[IOV_BATCH_SIZE];
52 	int					iov_cnt;
53 	struct spdk_sock_request		*last_req;
54 	bool					is_zcopy;
55 	STAILQ_ENTRY(spdk_uring_task)		link;
56 };
57 
58 struct spdk_uring_sock {
59 	struct spdk_sock			base;
60 	int					fd;
61 	uint32_t				sendmsg_idx;
62 	struct spdk_uring_sock_group_impl	*group;
63 	struct spdk_uring_task			write_task;
64 	struct spdk_uring_task			errqueue_task;
65 	struct spdk_uring_task			pollin_task;
66 	struct spdk_uring_task			cancel_task;
67 	struct spdk_pipe			*recv_pipe;
68 	void					*recv_buf;
69 	int					recv_buf_sz;
70 	bool					zcopy;
71 	bool					pending_recv;
72 	int					zcopy_send_flags;
73 	int					connection_status;
74 	int					placement_id;
75 	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
76 	TAILQ_ENTRY(spdk_uring_sock)		link;
77 };
78 
79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
80 
81 struct spdk_uring_sock_group_impl {
82 	struct spdk_sock_group_impl		base;
83 	struct io_uring				uring;
84 	uint32_t				io_inflight;
85 	uint32_t				io_queued;
86 	uint32_t				io_avail;
87 	struct pending_recv_list		pending_recv;
88 };
89 
90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
91 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
92 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
93 	.enable_recv_pipe = true,
94 	.enable_quickack = false,
95 	.enable_placement_id = PLACEMENT_NONE,
96 	.enable_zerocopy_send_server = false,
97 	.enable_zerocopy_send_client = false,
98 	.zerocopy_threshold = 0,
99 	.tls_version = 0,
100 	.enable_ktls = false,
101 	.psk_key = NULL,
102 	.psk_identity = NULL
103 };
104 
105 static struct spdk_sock_map g_map = {
106 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
107 	.mtx = PTHREAD_MUTEX_INITIALIZER
108 };
109 
110 __attribute((destructor)) static void
111 uring_sock_map_cleanup(void)
112 {
113 	spdk_sock_map_cleanup(&g_map);
114 }
115 
116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
117 
118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
120 
121 static void
122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
123 			  size_t len)
124 {
125 #define FIELD_OK(field) \
126 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
127 
128 #define SET_FIELD(field) \
129 	if (FIELD_OK(field)) { \
130 		dest->field = src->field; \
131 	}
132 
133 	SET_FIELD(recv_buf_size);
134 	SET_FIELD(send_buf_size);
135 	SET_FIELD(enable_recv_pipe);
136 	SET_FIELD(enable_quickack);
137 	SET_FIELD(enable_placement_id);
138 	SET_FIELD(enable_zerocopy_send_server);
139 	SET_FIELD(enable_zerocopy_send_client);
140 	SET_FIELD(zerocopy_threshold);
141 	SET_FIELD(tls_version);
142 	SET_FIELD(enable_ktls);
143 	SET_FIELD(psk_key);
144 	SET_FIELD(psk_identity);
145 
146 #undef SET_FIELD
147 #undef FIELD_OK
148 }
149 
150 static int
151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
152 {
153 	if (!opts || !len) {
154 		errno = EINVAL;
155 		return -1;
156 	}
157 
158 	assert(sizeof(*opts) >= *len);
159 	memset(opts, 0, *len);
160 
161 	uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len);
162 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
163 
164 	return 0;
165 }
166 
167 static int
168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
169 {
170 	if (!opts) {
171 		errno = EINVAL;
172 		return -1;
173 	}
174 
175 	assert(sizeof(*opts) >= len);
176 	uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len);
177 
178 	return 0;
179 }
180 
181 static void
182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
183 {
184 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
185 	memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest));
186 
187 	if (opts->impl_opts != NULL) {
188 		assert(sizeof(*dest) >= opts->impl_opts_size);
189 		uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
190 	}
191 }
192 
193 static int
194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
195 		   char *caddr, int clen, uint16_t *cport)
196 {
197 	struct spdk_uring_sock *sock = __uring_sock(_sock);
198 	struct sockaddr_storage sa;
199 	socklen_t salen;
200 	int rc;
201 
202 	assert(sock != NULL);
203 
204 	memset(&sa, 0, sizeof sa);
205 	salen = sizeof sa;
206 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
207 	if (rc != 0) {
208 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
209 		return -1;
210 	}
211 
212 	switch (sa.ss_family) {
213 	case AF_UNIX:
214 		/* Acceptable connection types that don't have IPs */
215 		return 0;
216 	case AF_INET:
217 	case AF_INET6:
218 		/* Code below will get IP addresses */
219 		break;
220 	default:
221 		/* Unsupported socket family */
222 		return -1;
223 	}
224 
225 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
226 	if (rc != 0) {
227 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
228 		return -1;
229 	}
230 
231 	if (sport) {
232 		if (sa.ss_family == AF_INET) {
233 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
234 		} else if (sa.ss_family == AF_INET6) {
235 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
236 		}
237 	}
238 
239 	memset(&sa, 0, sizeof sa);
240 	salen = sizeof sa;
241 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
242 	if (rc != 0) {
243 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
244 		return -1;
245 	}
246 
247 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
248 	if (rc != 0) {
249 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
250 		return -1;
251 	}
252 
253 	if (cport) {
254 		if (sa.ss_family == AF_INET) {
255 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
256 		} else if (sa.ss_family == AF_INET6) {
257 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
258 		}
259 	}
260 
261 	return 0;
262 }
263 
264 enum uring_sock_create_type {
265 	SPDK_SOCK_CREATE_LISTEN,
266 	SPDK_SOCK_CREATE_CONNECT,
267 };
268 
269 static int
270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
271 {
272 	uint8_t *new_buf;
273 	struct spdk_pipe *new_pipe;
274 	struct iovec siov[2];
275 	struct iovec diov[2];
276 	int sbytes;
277 	ssize_t bytes;
278 
279 	if (sock->recv_buf_sz == sz) {
280 		return 0;
281 	}
282 
283 	/* If the new size is 0, just free the pipe */
284 	if (sz == 0) {
285 		spdk_pipe_destroy(sock->recv_pipe);
286 		free(sock->recv_buf);
287 		sock->recv_pipe = NULL;
288 		sock->recv_buf = NULL;
289 		return 0;
290 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
291 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
292 		return -1;
293 	}
294 
295 	/* Round up to next 64 byte multiple */
296 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
297 	if (!new_buf) {
298 		SPDK_ERRLOG("socket recv buf allocation failed\n");
299 		return -ENOMEM;
300 	}
301 
302 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
303 	if (new_pipe == NULL) {
304 		SPDK_ERRLOG("socket pipe allocation failed\n");
305 		free(new_buf);
306 		return -ENOMEM;
307 	}
308 
309 	if (sock->recv_pipe != NULL) {
310 		/* Pull all of the data out of the old pipe */
311 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
312 		if (sbytes > sz) {
313 			/* Too much data to fit into the new pipe size */
314 			spdk_pipe_destroy(new_pipe);
315 			free(new_buf);
316 			return -EINVAL;
317 		}
318 
319 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
320 		assert(sbytes == sz);
321 
322 		bytes = spdk_iovcpy(siov, 2, diov, 2);
323 		spdk_pipe_writer_advance(new_pipe, bytes);
324 
325 		spdk_pipe_destroy(sock->recv_pipe);
326 		free(sock->recv_buf);
327 	}
328 
329 	sock->recv_buf_sz = sz;
330 	sock->recv_buf = new_buf;
331 	sock->recv_pipe = new_pipe;
332 
333 	return 0;
334 }
335 
336 static int
337 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
338 {
339 	struct spdk_uring_sock *sock = __uring_sock(_sock);
340 	int min_size;
341 	int rc;
342 
343 	assert(sock != NULL);
344 
345 	if (_sock->impl_opts.enable_recv_pipe) {
346 		rc = uring_sock_alloc_pipe(sock, sz);
347 		if (rc) {
348 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
349 			return rc;
350 		}
351 	}
352 
353 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
354 	 * g_spdk_uring_sock_impl_opts.recv_buf_size. */
355 	min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size);
356 
357 	if (sz < min_size) {
358 		sz = min_size;
359 	}
360 
361 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
362 	if (rc < 0) {
363 		return rc;
364 	}
365 
366 	_sock->impl_opts.recv_buf_size = sz;
367 
368 	return 0;
369 }
370 
371 static int
372 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
373 {
374 	struct spdk_uring_sock *sock = __uring_sock(_sock);
375 	int min_size;
376 	int rc;
377 
378 	assert(sock != NULL);
379 
380 	/* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
381 	 * g_spdk_uring_sock_impl_opts.seend_buf_size. */
382 	min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size);
383 
384 	if (sz < min_size) {
385 		sz = min_size;
386 	}
387 
388 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
389 	if (rc < 0) {
390 		return rc;
391 	}
392 
393 	_sock->impl_opts.send_buf_size = sz;
394 
395 	return 0;
396 }
397 
398 static struct spdk_uring_sock *
399 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
400 {
401 	struct spdk_uring_sock *sock;
402 #if defined(__linux__)
403 	int flag;
404 	int rc;
405 #endif
406 
407 	sock = calloc(1, sizeof(*sock));
408 	if (sock == NULL) {
409 		SPDK_ERRLOG("sock allocation failed\n");
410 		return NULL;
411 	}
412 
413 	sock->fd = fd;
414 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
415 
416 #if defined(__linux__)
417 	flag = 1;
418 
419 	if (sock->base.impl_opts.enable_quickack) {
420 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
421 		if (rc != 0) {
422 			SPDK_ERRLOG("quickack was failed to set\n");
423 		}
424 	}
425 
426 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
427 				   &sock->placement_id);
428 #ifdef SPDK_ZEROCOPY
429 	/* Try to turn on zero copy sends */
430 	flag = 1;
431 
432 	if (enable_zero_copy) {
433 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
434 		if (rc == 0) {
435 			sock->zcopy = true;
436 			sock->zcopy_send_flags = MSG_ZEROCOPY;
437 		}
438 	}
439 #endif
440 #endif
441 
442 	return sock;
443 }
444 
445 static struct spdk_sock *
446 uring_sock_create(const char *ip, int port,
447 		  enum uring_sock_create_type type,
448 		  struct spdk_sock_opts *opts)
449 {
450 	struct spdk_uring_sock *sock;
451 	struct spdk_sock_impl_opts impl_opts;
452 	char buf[MAX_TMPBUF];
453 	char portnum[PORTNUMLEN];
454 	char *p;
455 	struct addrinfo hints, *res, *res0;
456 	int fd, flag;
457 	int val = 1;
458 	int rc;
459 	bool enable_zcopy_impl_opts = false;
460 	bool enable_zcopy_user_opts = true;
461 
462 	assert(opts != NULL);
463 	uring_opts_get_impl_opts(opts, &impl_opts);
464 
465 	if (ip == NULL) {
466 		return NULL;
467 	}
468 	if (ip[0] == '[') {
469 		snprintf(buf, sizeof(buf), "%s", ip + 1);
470 		p = strchr(buf, ']');
471 		if (p != NULL) {
472 			*p = '\0';
473 		}
474 		ip = (const char *) &buf[0];
475 	}
476 
477 	snprintf(portnum, sizeof portnum, "%d", port);
478 	memset(&hints, 0, sizeof hints);
479 	hints.ai_family = PF_UNSPEC;
480 	hints.ai_socktype = SOCK_STREAM;
481 	hints.ai_flags = AI_NUMERICSERV;
482 	hints.ai_flags |= AI_PASSIVE;
483 	hints.ai_flags |= AI_NUMERICHOST;
484 	rc = getaddrinfo(ip, portnum, &hints, &res0);
485 	if (rc != 0) {
486 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
487 		return NULL;
488 	}
489 
490 	/* try listen */
491 	fd = -1;
492 	for (res = res0; res != NULL; res = res->ai_next) {
493 retry:
494 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
495 		if (fd < 0) {
496 			/* error */
497 			continue;
498 		}
499 
500 		val = impl_opts.recv_buf_size;
501 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
502 		if (rc) {
503 			/* Not fatal */
504 		}
505 
506 		val = impl_opts.send_buf_size;
507 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
508 		if (rc) {
509 			/* Not fatal */
510 		}
511 
512 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
513 		if (rc != 0) {
514 			close(fd);
515 			fd = -1;
516 			/* error */
517 			continue;
518 		}
519 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
520 		if (rc != 0) {
521 			close(fd);
522 			fd = -1;
523 			/* error */
524 			continue;
525 		}
526 
527 		if (opts->ack_timeout) {
528 #if defined(__linux__)
529 			val = opts->ack_timeout;
530 			rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val);
531 			if (rc != 0) {
532 				close(fd);
533 				fd = -1;
534 				/* error */
535 				continue;
536 			}
537 #else
538 			SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
539 #endif
540 		}
541 
542 
543 
544 #if defined(SO_PRIORITY)
545 		if (opts != NULL && opts->priority) {
546 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
547 			if (rc != 0) {
548 				close(fd);
549 				fd = -1;
550 				/* error */
551 				continue;
552 			}
553 		}
554 #endif
555 		if (res->ai_family == AF_INET6) {
556 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
557 			if (rc != 0) {
558 				close(fd);
559 				fd = -1;
560 				/* error */
561 				continue;
562 			}
563 		}
564 
565 		if (type == SPDK_SOCK_CREATE_LISTEN) {
566 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
567 			if (rc != 0) {
568 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
569 				switch (errno) {
570 				case EINTR:
571 					/* interrupted? */
572 					close(fd);
573 					goto retry;
574 				case EADDRNOTAVAIL:
575 					SPDK_ERRLOG("IP address %s not available. "
576 						    "Verify IP address in config file "
577 						    "and make sure setup script is "
578 						    "run before starting spdk app.\n", ip);
579 				/* FALLTHROUGH */
580 				default:
581 					/* try next family */
582 					close(fd);
583 					fd = -1;
584 					continue;
585 				}
586 			}
587 			/* bind OK */
588 			rc = listen(fd, 512);
589 			if (rc != 0) {
590 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
591 				close(fd);
592 				fd = -1;
593 				break;
594 			}
595 
596 			flag = fcntl(fd, F_GETFL);
597 			if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
598 				SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
599 				close(fd);
600 				fd = -1;
601 				break;
602 			}
603 
604 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
605 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
606 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
607 			if (rc != 0) {
608 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
609 				/* try next family */
610 				close(fd);
611 				fd = -1;
612 				continue;
613 			}
614 
615 			flag = fcntl(fd, F_GETFL);
616 			if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
617 				SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
618 				close(fd);
619 				fd = -1;
620 				break;
621 			}
622 
623 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
624 		}
625 		break;
626 	}
627 	freeaddrinfo(res0);
628 
629 	if (fd < 0) {
630 		return NULL;
631 	}
632 
633 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
634 	sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
635 	if (sock == NULL) {
636 		SPDK_ERRLOG("sock allocation failed\n");
637 		close(fd);
638 		return NULL;
639 	}
640 
641 	return &sock->base;
642 }
643 
644 static struct spdk_sock *
645 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
646 {
647 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
648 }
649 
650 static struct spdk_sock *
651 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
652 {
653 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
654 }
655 
656 static struct spdk_sock *
657 uring_sock_accept(struct spdk_sock *_sock)
658 {
659 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
660 	struct sockaddr_storage		sa;
661 	socklen_t			salen;
662 	int				rc, fd;
663 	struct spdk_uring_sock		*new_sock;
664 	int				flag;
665 
666 	memset(&sa, 0, sizeof(sa));
667 	salen = sizeof(sa);
668 
669 	assert(sock != NULL);
670 
671 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
672 
673 	if (rc == -1) {
674 		return NULL;
675 	}
676 
677 	fd = rc;
678 
679 	flag = fcntl(fd, F_GETFL);
680 	if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
681 		SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
682 		close(fd);
683 		return NULL;
684 	}
685 
686 #if defined(SO_PRIORITY)
687 	/* The priority is not inherited, so call this function again */
688 	if (sock->base.opts.priority) {
689 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
690 		if (rc != 0) {
691 			close(fd);
692 			return NULL;
693 		}
694 	}
695 #endif
696 
697 	new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
698 	if (new_sock == NULL) {
699 		close(fd);
700 		return NULL;
701 	}
702 
703 	return &new_sock->base;
704 }
705 
706 static int
707 uring_sock_close(struct spdk_sock *_sock)
708 {
709 	struct spdk_uring_sock *sock = __uring_sock(_sock);
710 
711 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
712 	assert(sock->group == NULL);
713 
714 	/* If the socket fails to close, the best choice is to
715 	 * leak the fd but continue to free the rest of the sock
716 	 * memory. */
717 	close(sock->fd);
718 
719 	spdk_pipe_destroy(sock->recv_pipe);
720 	free(sock->recv_buf);
721 	free(sock);
722 
723 	return 0;
724 }
725 
726 static ssize_t
727 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
728 {
729 	struct iovec siov[2];
730 	int sbytes;
731 	ssize_t bytes;
732 	struct spdk_uring_sock_group_impl *group;
733 
734 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
735 	if (sbytes < 0) {
736 		errno = EINVAL;
737 		return -1;
738 	} else if (sbytes == 0) {
739 		errno = EAGAIN;
740 		return -1;
741 	}
742 
743 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
744 
745 	if (bytes == 0) {
746 		/* The only way this happens is if diov is 0 length */
747 		errno = EINVAL;
748 		return -1;
749 	}
750 
751 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
752 
753 	/* If we drained the pipe, take it off the level-triggered list */
754 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
755 		group = __uring_group_impl(sock->base.group_impl);
756 		TAILQ_REMOVE(&group->pending_recv, sock, link);
757 		sock->pending_recv = false;
758 	}
759 
760 	return bytes;
761 }
762 
763 static inline ssize_t
764 sock_readv(int fd, struct iovec *iov, int iovcnt)
765 {
766 	struct msghdr msg = {
767 		.msg_iov = iov,
768 		.msg_iovlen = iovcnt,
769 	};
770 
771 	return recvmsg(fd, &msg, MSG_DONTWAIT);
772 }
773 
774 static inline ssize_t
775 uring_sock_read(struct spdk_uring_sock *sock)
776 {
777 	struct iovec iov[2];
778 	int bytes;
779 	struct spdk_uring_sock_group_impl *group;
780 
781 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
782 
783 	if (bytes > 0) {
784 		bytes = sock_readv(sock->fd, iov, 2);
785 		if (bytes > 0) {
786 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
787 			if (sock->base.group_impl) {
788 				group = __uring_group_impl(sock->base.group_impl);
789 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
790 				sock->pending_recv = true;
791 			}
792 		}
793 	}
794 
795 	return bytes;
796 }
797 
798 static ssize_t
799 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
800 {
801 	struct spdk_uring_sock *sock = __uring_sock(_sock);
802 	int rc, i;
803 	size_t len;
804 
805 	if (sock->recv_pipe == NULL) {
806 		return sock_readv(sock->fd, iov, iovcnt);
807 	}
808 
809 	len = 0;
810 	for (i = 0; i < iovcnt; i++) {
811 		len += iov[i].iov_len;
812 	}
813 
814 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
815 		/* If the user is receiving a sufficiently large amount of data,
816 		 * receive directly to their buffers. */
817 		if (len >= MIN_SOCK_PIPE_SIZE) {
818 			return sock_readv(sock->fd, iov, iovcnt);
819 		}
820 
821 		/* Otherwise, do a big read into our pipe */
822 		rc = uring_sock_read(sock);
823 		if (rc <= 0) {
824 			return rc;
825 		}
826 	}
827 
828 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
829 }
830 
831 static ssize_t
832 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
833 {
834 	struct iovec iov[1];
835 
836 	iov[0].iov_base = buf;
837 	iov[0].iov_len = len;
838 
839 	return uring_sock_readv(sock, iov, 1);
840 }
841 
842 static ssize_t
843 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
844 {
845 	struct spdk_uring_sock *sock = __uring_sock(_sock);
846 	struct msghdr msg = {
847 		.msg_iov = iov,
848 		.msg_iovlen = iovcnt,
849 	};
850 
851 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
852 		errno = EAGAIN;
853 		return -1;
854 	}
855 
856 	return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
857 }
858 
859 static ssize_t
860 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc)
861 {
862 	unsigned int offset;
863 	size_t len;
864 	int i;
865 
866 	offset = req->internal.offset;
867 	for (i = 0; i < req->iovcnt; i++) {
868 		/* Advance by the offset first */
869 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
870 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
871 			continue;
872 		}
873 
874 		/* Calculate the remaining length of this element */
875 		len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
876 
877 		if (len > (size_t)rc) {
878 			req->internal.offset += rc;
879 			return -1;
880 		}
881 
882 		offset = 0;
883 		req->internal.offset += len;
884 		rc -= len;
885 	}
886 
887 	return rc;
888 }
889 
890 static int
891 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
892 {
893 	struct spdk_uring_sock *sock = __uring_sock(_sock);
894 	struct spdk_sock_request *req;
895 	int retval;
896 
897 	if (is_zcopy) {
898 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
899 		 * req->internal.offset, so sendmsg_idx should not be zero */
900 		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
901 			sock->sendmsg_idx = 1;
902 		} else {
903 			sock->sendmsg_idx++;
904 		}
905 	}
906 
907 	/* Consume the requests that were actually written */
908 	req = TAILQ_FIRST(&_sock->queued_reqs);
909 	while (req) {
910 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
911 		req->internal.is_zcopy = is_zcopy;
912 
913 		rc = sock_request_advance_offset(req, rc);
914 		if (rc < 0) {
915 			/* This element was partially sent. */
916 			return 0;
917 		}
918 
919 		/* Handled a full request. */
920 		spdk_sock_request_pend(_sock, req);
921 
922 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
923 			retval = spdk_sock_request_put(_sock, req, 0);
924 			if (retval) {
925 				return retval;
926 			}
927 		} else {
928 			/* Re-use the offset field to hold the sendmsg call index. The
929 			 * index is 0 based, so subtract one here because we've already
930 			 * incremented above. */
931 			req->internal.offset = sock->sendmsg_idx - 1;
932 		}
933 
934 		if (rc == 0) {
935 			break;
936 		}
937 
938 		req = TAILQ_FIRST(&_sock->queued_reqs);
939 	}
940 
941 	return 0;
942 }
943 
944 #ifdef SPDK_ZEROCOPY
945 static int
946 _sock_check_zcopy(struct spdk_sock *_sock, int status)
947 {
948 	struct spdk_uring_sock *sock = __uring_sock(_sock);
949 	ssize_t rc;
950 	struct sock_extended_err *serr;
951 	struct cmsghdr *cm;
952 	uint32_t idx;
953 	struct spdk_sock_request *req, *treq;
954 	bool found;
955 
956 	assert(sock->zcopy == true);
957 	if (spdk_unlikely(status) < 0) {
958 		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
959 			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
960 				    status);
961 		} else {
962 			SPDK_WARNLOG("Recvmsg yielded an error!\n");
963 		}
964 		return 0;
965 	}
966 
967 	cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
968 	if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
969 	      (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) {
970 		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
971 		return 0;
972 	}
973 
974 	serr = (struct sock_extended_err *)CMSG_DATA(cm);
975 	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
976 		SPDK_WARNLOG("Unexpected extended error origin\n");
977 		return 0;
978 	}
979 
980 	/* Most of the time, the pending_reqs array is in the exact
981 	 * order we need such that all of the requests to complete are
982 	 * in order, in the front. It is guaranteed that all requests
983 	 * belonging to the same sendmsg call are sequential, so once
984 	 * we encounter one match we can stop looping as soon as a
985 	 * non-match is found.
986 	 */
987 	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
988 		found = false;
989 		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
990 			if (!req->internal.is_zcopy) {
991 				/* This wasn't a zcopy request. It was just waiting in line to complete */
992 				rc = spdk_sock_request_put(_sock, req, 0);
993 				if (rc < 0) {
994 					return rc;
995 				}
996 			} else if (req->internal.offset == idx) {
997 				found = true;
998 				rc = spdk_sock_request_put(_sock, req, 0);
999 				if (rc < 0) {
1000 					return rc;
1001 				}
1002 			} else if (found) {
1003 				break;
1004 			}
1005 		}
1006 	}
1007 
1008 	return 0;
1009 }
1010 
1011 static void
1012 _sock_prep_errqueue(struct spdk_sock *_sock)
1013 {
1014 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1015 	struct spdk_uring_task *task = &sock->errqueue_task;
1016 	struct io_uring_sqe *sqe;
1017 
1018 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1019 		return;
1020 	}
1021 
1022 	assert(sock->group != NULL);
1023 	sock->group->io_queued++;
1024 
1025 	sqe = io_uring_get_sqe(&sock->group->uring);
1026 	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
1027 	io_uring_sqe_set_data(sqe, task);
1028 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1029 }
1030 
1031 #endif
1032 
1033 static void
1034 _sock_flush(struct spdk_sock *_sock)
1035 {
1036 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1037 	struct spdk_uring_task *task = &sock->write_task;
1038 	uint32_t iovcnt;
1039 	struct io_uring_sqe *sqe;
1040 	int flags;
1041 
1042 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1043 		return;
1044 	}
1045 
1046 #ifdef SPDK_ZEROCOPY
1047 	if (sock->zcopy) {
1048 		flags = MSG_DONTWAIT | sock->zcopy_send_flags;
1049 	} else
1050 #endif
1051 	{
1052 		flags = MSG_DONTWAIT;
1053 	}
1054 
1055 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
1056 	if (!iovcnt) {
1057 		return;
1058 	}
1059 
1060 	task->iov_cnt = iovcnt;
1061 	assert(sock->group != NULL);
1062 	task->msg.msg_iov = task->iovs;
1063 	task->msg.msg_iovlen = task->iov_cnt;
1064 #ifdef SPDK_ZEROCOPY
1065 	task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
1066 #endif
1067 	sock->group->io_queued++;
1068 
1069 	sqe = io_uring_get_sqe(&sock->group->uring);
1070 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
1071 	io_uring_sqe_set_data(sqe, task);
1072 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1073 }
1074 
1075 static void
1076 _sock_prep_pollin(struct spdk_sock *_sock)
1077 {
1078 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1079 	struct spdk_uring_task *task = &sock->pollin_task;
1080 	struct io_uring_sqe *sqe;
1081 
1082 	/* Do not prepare pollin event */
1083 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) {
1084 		return;
1085 	}
1086 
1087 	assert(sock->group != NULL);
1088 	sock->group->io_queued++;
1089 
1090 	sqe = io_uring_get_sqe(&sock->group->uring);
1091 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR);
1092 	io_uring_sqe_set_data(sqe, task);
1093 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1094 }
1095 
1096 static void
1097 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
1098 {
1099 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1100 	struct spdk_uring_task *task = &sock->cancel_task;
1101 	struct io_uring_sqe *sqe;
1102 
1103 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1104 		return;
1105 	}
1106 
1107 	assert(sock->group != NULL);
1108 	sock->group->io_queued++;
1109 
1110 	sqe = io_uring_get_sqe(&sock->group->uring);
1111 	io_uring_prep_cancel(sqe, user_data, 0);
1112 	io_uring_sqe_set_data(sqe, task);
1113 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1114 }
1115 
1116 static int
1117 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
1118 		      struct spdk_sock **socks)
1119 {
1120 	int i, count, ret;
1121 	struct io_uring_cqe *cqe;
1122 	struct spdk_uring_sock *sock, *tmp;
1123 	struct spdk_uring_task *task;
1124 	int status;
1125 
1126 	for (i = 0; i < max; i++) {
1127 		ret = io_uring_peek_cqe(&group->uring, &cqe);
1128 		if (ret != 0) {
1129 			break;
1130 		}
1131 
1132 		if (cqe == NULL) {
1133 			break;
1134 		}
1135 
1136 		task = (struct spdk_uring_task *)cqe->user_data;
1137 		assert(task != NULL);
1138 		sock = task->sock;
1139 		assert(sock != NULL);
1140 		assert(sock->group != NULL);
1141 		assert(sock->group == group);
1142 		sock->group->io_inflight--;
1143 		sock->group->io_avail++;
1144 		status = cqe->res;
1145 		io_uring_cqe_seen(&group->uring, cqe);
1146 
1147 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
1148 
1149 		if (spdk_unlikely(status <= 0)) {
1150 			if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) {
1151 				continue;
1152 			}
1153 		}
1154 
1155 		switch (task->type) {
1156 		case SPDK_SOCK_TASK_POLLIN:
1157 #ifdef SPDK_ZEROCOPY
1158 			if ((status & POLLERR) == POLLERR) {
1159 				_sock_prep_errqueue(&sock->base);
1160 			}
1161 #endif
1162 			if ((status & POLLIN) == POLLIN) {
1163 				if (sock->base.cb_fn != NULL &&
1164 				    sock->pending_recv == false) {
1165 					sock->pending_recv = true;
1166 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1167 				}
1168 			}
1169 			break;
1170 		case SPDK_SOCK_TASK_WRITE:
1171 			task->last_req = NULL;
1172 			task->iov_cnt = 0;
1173 			task->is_zcopy = false;
1174 			if (spdk_unlikely(status) < 0) {
1175 				sock->connection_status = status;
1176 				spdk_sock_abort_requests(&sock->base);
1177 			} else {
1178 				sock_complete_write_reqs(&sock->base, status, task->is_zcopy);
1179 			}
1180 
1181 			break;
1182 #ifdef SPDK_ZEROCOPY
1183 		case SPDK_SOCK_TASK_ERRQUEUE:
1184 			if (spdk_unlikely(status == -ECANCELED)) {
1185 				sock->connection_status = status;
1186 				break;
1187 			}
1188 			_sock_check_zcopy(&sock->base, status);
1189 			break;
1190 #endif
1191 		case SPDK_SOCK_TASK_CANCEL:
1192 			/* Do nothing */
1193 			break;
1194 		default:
1195 			SPDK_UNREACHABLE();
1196 		}
1197 	}
1198 
1199 	if (!socks) {
1200 		return 0;
1201 	}
1202 	count = 0;
1203 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1204 		if (count == max_read_events) {
1205 			break;
1206 		}
1207 
1208 		if (spdk_unlikely(sock->base.cb_fn == NULL) ||
1209 		    (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) {
1210 			sock->pending_recv = false;
1211 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1212 			if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1213 				/* If the socket's cb_fn is NULL, do not add it to socks array */
1214 				continue;
1215 			}
1216 		}
1217 
1218 		socks[count++] = &sock->base;
1219 	}
1220 
1221 
1222 	/* Cycle the pending_recv list so that each time we poll things aren't
1223 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1224 	 * A B C D E F
1225 	 * And all 6 sockets had the poll events, but max_events is only 3. That means
1226 	 * psock currently points at D. We want to rearrange the list to the following:
1227 	 * D E F A B C
1228 	 *
1229 	 * The variables below are named according to this example to make it easier to
1230 	 * follow the swaps.
1231 	 */
1232 	if (sock != NULL) {
1233 		struct spdk_uring_sock *ua, *uc, *ud, *uf;
1234 
1235 		/* Capture pointers to the elements we need */
1236 		ud = sock;
1237 
1238 		ua = TAILQ_FIRST(&group->pending_recv);
1239 		if (ua == ud) {
1240 			goto end;
1241 		}
1242 
1243 		uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
1244 		if (uf == ud) {
1245 			TAILQ_REMOVE(&group->pending_recv, ud, link);
1246 			TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
1247 			goto end;
1248 		}
1249 
1250 		uc = TAILQ_PREV(ud, pending_recv_list, link);
1251 		assert(uc != NULL);
1252 
1253 		/* Break the link between C and D */
1254 		uc->link.tqe_next = NULL;
1255 
1256 		/* Connect F to A */
1257 		uf->link.tqe_next = ua;
1258 		ua->link.tqe_prev = &uf->link.tqe_next;
1259 
1260 		/* Fix up the list first/last pointers */
1261 		group->pending_recv.tqh_first = ud;
1262 		group->pending_recv.tqh_last = &uc->link.tqe_next;
1263 
1264 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1265 		ud->link.tqe_prev = &group->pending_recv.tqh_first;
1266 	}
1267 
1268 end:
1269 	return count;
1270 }
1271 
1272 static int
1273 _sock_flush_client(struct spdk_sock *_sock)
1274 {
1275 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1276 	struct msghdr msg = {};
1277 	struct iovec iovs[IOV_BATCH_SIZE];
1278 	int iovcnt;
1279 	ssize_t rc;
1280 	int flags = sock->zcopy_send_flags;
1281 	int retval;
1282 	bool is_zcopy = false;
1283 
1284 	/* Can't flush from within a callback or we end up with recursive calls */
1285 	if (_sock->cb_cnt > 0) {
1286 		return 0;
1287 	}
1288 
1289 	/* Gather an iov */
1290 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
1291 	if (iovcnt == 0) {
1292 		return 0;
1293 	}
1294 
1295 	/* Perform the vectored write */
1296 	msg.msg_iov = iovs;
1297 	msg.msg_iovlen = iovcnt;
1298 	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
1299 	if (rc <= 0) {
1300 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1301 			return 0;
1302 		}
1303 		return rc;
1304 	}
1305 
1306 #ifdef SPDK_ZEROCOPY
1307 	is_zcopy = flags & MSG_ZEROCOPY;
1308 #endif
1309 	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
1310 	if (retval < 0) {
1311 		/* if the socket is closed, return to avoid heap-use-after-free error */
1312 		return retval;
1313 	}
1314 
1315 #ifdef SPDK_ZEROCOPY
1316 	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
1317 		_sock_check_zcopy(_sock, 0);
1318 	}
1319 #endif
1320 
1321 	return 0;
1322 }
1323 
1324 static void
1325 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1326 {
1327 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1328 	int rc;
1329 
1330 	if (spdk_unlikely(sock->connection_status)) {
1331 		req->cb_fn(req->cb_arg, sock->connection_status);
1332 		return;
1333 	}
1334 
1335 	spdk_sock_request_queue(_sock, req);
1336 
1337 	if (!sock->group) {
1338 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1339 			rc = _sock_flush_client(_sock);
1340 			if (rc) {
1341 				spdk_sock_abort_requests(_sock);
1342 			}
1343 		}
1344 	}
1345 }
1346 
1347 static void
1348 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1349 {
1350 	req->cb_fn(req->cb_arg, -ENOTSUP);
1351 }
1352 
1353 static int
1354 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1355 {
1356 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1357 	int val;
1358 	int rc;
1359 
1360 	assert(sock != NULL);
1361 
1362 	val = nbytes;
1363 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1364 	if (rc != 0) {
1365 		return -1;
1366 	}
1367 	return 0;
1368 }
1369 
1370 static bool
1371 uring_sock_is_ipv6(struct spdk_sock *_sock)
1372 {
1373 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1374 	struct sockaddr_storage sa;
1375 	socklen_t salen;
1376 	int rc;
1377 
1378 	assert(sock != NULL);
1379 
1380 	memset(&sa, 0, sizeof sa);
1381 	salen = sizeof sa;
1382 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1383 	if (rc != 0) {
1384 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1385 		return false;
1386 	}
1387 
1388 	return (sa.ss_family == AF_INET6);
1389 }
1390 
1391 static bool
1392 uring_sock_is_ipv4(struct spdk_sock *_sock)
1393 {
1394 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1395 	struct sockaddr_storage sa;
1396 	socklen_t salen;
1397 	int rc;
1398 
1399 	assert(sock != NULL);
1400 
1401 	memset(&sa, 0, sizeof sa);
1402 	salen = sizeof sa;
1403 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1404 	if (rc != 0) {
1405 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1406 		return false;
1407 	}
1408 
1409 	return (sa.ss_family == AF_INET);
1410 }
1411 
1412 static bool
1413 uring_sock_is_connected(struct spdk_sock *_sock)
1414 {
1415 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1416 	uint8_t byte;
1417 	int rc;
1418 
1419 	rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
1420 	if (rc == 0) {
1421 		return false;
1422 	}
1423 
1424 	if (rc < 0) {
1425 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1426 			return true;
1427 		}
1428 
1429 		return false;
1430 	}
1431 
1432 	return true;
1433 }
1434 
1435 static struct spdk_sock_group_impl *
1436 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1437 {
1438 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1439 	struct spdk_sock_group_impl *group;
1440 
1441 	if (sock->placement_id != -1) {
1442 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
1443 		return group;
1444 	}
1445 
1446 	return NULL;
1447 }
1448 
1449 static struct spdk_sock_group_impl *
1450 uring_sock_group_impl_create(void)
1451 {
1452 	struct spdk_uring_sock_group_impl *group_impl;
1453 
1454 	group_impl = calloc(1, sizeof(*group_impl));
1455 	if (group_impl == NULL) {
1456 		SPDK_ERRLOG("group_impl allocation failed\n");
1457 		return NULL;
1458 	}
1459 
1460 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1461 
1462 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1463 		SPDK_ERRLOG("uring I/O context setup failure\n");
1464 		free(group_impl);
1465 		return NULL;
1466 	}
1467 
1468 	TAILQ_INIT(&group_impl->pending_recv);
1469 
1470 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1471 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1472 	}
1473 
1474 	return &group_impl->base;
1475 }
1476 
1477 static int
1478 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1479 			       struct spdk_sock *_sock)
1480 {
1481 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1482 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1483 	int rc;
1484 
1485 	sock->group = group;
1486 	sock->write_task.sock = sock;
1487 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1488 
1489 	sock->pollin_task.sock = sock;
1490 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1491 
1492 	sock->errqueue_task.sock = sock;
1493 	sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE;
1494 	sock->errqueue_task.msg.msg_control = sock->buf;
1495 	sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
1496 
1497 	sock->cancel_task.sock = sock;
1498 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1499 
1500 	/* switched from another polling group due to scheduling */
1501 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1502 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1503 		assert(sock->pending_recv == false);
1504 		sock->pending_recv = true;
1505 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1506 	}
1507 
1508 	if (sock->placement_id != -1) {
1509 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1510 		if (rc != 0) {
1511 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1512 			/* Do not treat this as an error. The system will continue running. */
1513 		}
1514 	}
1515 
1516 	return 0;
1517 }
1518 
1519 static int
1520 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1521 			   struct spdk_sock **socks)
1522 {
1523 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1524 	int count, ret;
1525 	int to_complete, to_submit;
1526 	struct spdk_sock *_sock, *tmp;
1527 	struct spdk_uring_sock *sock;
1528 
1529 	if (spdk_likely(socks)) {
1530 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1531 			sock = __uring_sock(_sock);
1532 			if (spdk_unlikely(sock->connection_status)) {
1533 				continue;
1534 			}
1535 			_sock_flush(_sock);
1536 			_sock_prep_pollin(_sock);
1537 		}
1538 	}
1539 
1540 	to_submit = group->io_queued;
1541 
1542 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1543 	if (to_submit > 0) {
1544 		/* If there are I/O to submit, use io_uring_submit here.
1545 		 * It will automatically call io_uring_enter appropriately. */
1546 		ret = io_uring_submit(&group->uring);
1547 		if (ret < 0) {
1548 			return 1;
1549 		}
1550 		group->io_queued = 0;
1551 		group->io_inflight += to_submit;
1552 		group->io_avail -= to_submit;
1553 	}
1554 
1555 	count = 0;
1556 	to_complete = group->io_inflight;
1557 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1558 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1559 	}
1560 
1561 	return count;
1562 }
1563 
1564 static int
1565 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1566 				  struct spdk_sock *_sock)
1567 {
1568 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1569 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1570 
1571 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1572 		_sock_prep_cancel_task(_sock, &sock->write_task);
1573 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1574 		 * currently can use a while loop here. */
1575 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1576 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1577 			uring_sock_group_impl_poll(_group, 32, NULL);
1578 		}
1579 	}
1580 
1581 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1582 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1583 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1584 		 * currently can use a while loop here. */
1585 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1586 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1587 			uring_sock_group_impl_poll(_group, 32, NULL);
1588 		}
1589 	}
1590 
1591 	if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1592 		_sock_prep_cancel_task(_sock, &sock->errqueue_task);
1593 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1594 		 * currently can use a while loop here. */
1595 		while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1596 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1597 			uring_sock_group_impl_poll(_group, 32, NULL);
1598 		}
1599 	}
1600 
1601 	/* Make sure the cancelling the tasks above didn't cause sending new requests */
1602 	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1603 	assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1604 	assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1605 
1606 	if (sock->pending_recv) {
1607 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1608 		sock->pending_recv = false;
1609 	}
1610 	assert(sock->pending_recv == false);
1611 
1612 	if (sock->placement_id != -1) {
1613 		spdk_sock_map_release(&g_map, sock->placement_id);
1614 	}
1615 
1616 	sock->group = NULL;
1617 	return 0;
1618 }
1619 
1620 static int
1621 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1622 {
1623 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1624 
1625 	/* try to reap all the active I/O */
1626 	while (group->io_inflight) {
1627 		uring_sock_group_impl_poll(_group, 32, NULL);
1628 	}
1629 	assert(group->io_inflight == 0);
1630 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1631 
1632 	io_uring_queue_exit(&group->uring);
1633 
1634 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1635 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1636 	}
1637 
1638 	free(group);
1639 	return 0;
1640 }
1641 
1642 static int
1643 uring_sock_flush(struct spdk_sock *_sock)
1644 {
1645 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1646 
1647 	if (!sock->group) {
1648 		return _sock_flush_client(_sock);
1649 	}
1650 
1651 	return 0;
1652 }
1653 
1654 static struct spdk_net_impl g_uring_net_impl = {
1655 	.name		= "uring",
1656 	.getaddr	= uring_sock_getaddr,
1657 	.connect	= uring_sock_connect,
1658 	.listen		= uring_sock_listen,
1659 	.accept		= uring_sock_accept,
1660 	.close		= uring_sock_close,
1661 	.recv		= uring_sock_recv,
1662 	.readv		= uring_sock_readv,
1663 	.readv_async	= uring_sock_readv_async,
1664 	.writev		= uring_sock_writev,
1665 	.writev_async	= uring_sock_writev_async,
1666 	.flush          = uring_sock_flush,
1667 	.set_recvlowat	= uring_sock_set_recvlowat,
1668 	.set_recvbuf	= uring_sock_set_recvbuf,
1669 	.set_sendbuf	= uring_sock_set_sendbuf,
1670 	.is_ipv6	= uring_sock_is_ipv6,
1671 	.is_ipv4	= uring_sock_is_ipv4,
1672 	.is_connected   = uring_sock_is_connected,
1673 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
1674 	.group_impl_create	= uring_sock_group_impl_create,
1675 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1676 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1677 	.group_impl_poll	= uring_sock_group_impl_poll,
1678 	.group_impl_close	= uring_sock_group_impl_close,
1679 	.get_opts		= uring_sock_impl_get_opts,
1680 	.set_opts		= uring_sock_impl_set_opts,
1681 };
1682 
1683 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 2);
1684