xref: /spdk/module/sock/uring/uring.c (revision 9ca3be6fa39191ea878a1a25daf0bd91dad237d1)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/config.h"
8 
9 #include <linux/errqueue.h>
10 #include <sys/epoll.h>
11 #include <liburing.h>
12 
13 #include "spdk/barrier.h"
14 #include "spdk/env.h"
15 #include "spdk/log.h"
16 #include "spdk/pipe.h"
17 #include "spdk/sock.h"
18 #include "spdk/string.h"
19 #include "spdk/util.h"
20 #include "spdk/net.h"
21 #include "spdk/file.h"
22 
23 #include "spdk_internal/sock.h"
24 #include "spdk_internal/assert.h"
25 #include "spdk/net.h"
26 
27 #define MAX_TMPBUF 1024
28 #define PORTNUMLEN 32
29 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
30 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
31 
32 enum uring_task_type {
33 	URING_TASK_READ = 0,
34 	URING_TASK_ERRQUEUE,
35 	URING_TASK_WRITE,
36 	URING_TASK_CANCEL,
37 };
38 
39 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
40 #define SPDK_ZEROCOPY
41 #endif
42 
43 /* We don't know how big the buffers that the user posts will be, but this
44  * is the maximum we'll ever allow it to receive in a single command.
45  * If the user buffers are smaller, it will just receive less. */
46 #define URING_MAX_RECV_SIZE (128 * 1024)
47 
48 /* We don't know how many buffers the user will post, but this is the
49  * maximum number we'll take from the pool to post per group. */
50 #define URING_BUF_POOL_SIZE 128
51 
52 /* We use 1 just so it's not zero and we can validate it's right. */
53 #define URING_BUF_GROUP_ID 1
54 
55 enum spdk_uring_sock_task_status {
56 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
57 	SPDK_URING_SOCK_TASK_IN_PROCESS,
58 };
59 
60 struct spdk_uring_task {
61 	enum spdk_uring_sock_task_status	status;
62 	enum uring_task_type		type;
63 	struct spdk_uring_sock			*sock;
64 	struct msghdr				msg;
65 	struct iovec				iovs[IOV_BATCH_SIZE];
66 	int					iov_cnt;
67 	struct spdk_sock_request		*last_req;
68 	bool					is_zcopy;
69 	STAILQ_ENTRY(spdk_uring_task)		link;
70 };
71 
72 struct spdk_uring_sock {
73 	struct spdk_sock			base;
74 	int					fd;
75 	uint32_t				sendmsg_idx;
76 	struct spdk_uring_sock_group_impl	*group;
77 	STAILQ_HEAD(, spdk_uring_buf_tracker)	recv_stream;
78 	size_t					recv_offset;
79 	struct spdk_uring_task			write_task;
80 	struct spdk_uring_task			errqueue_task;
81 	struct spdk_uring_task			read_task;
82 	struct spdk_uring_task			cancel_task;
83 	struct spdk_pipe			*recv_pipe;
84 	void					*recv_buf;
85 	int					recv_buf_sz;
86 	bool					zcopy;
87 	bool					pending_recv;
88 	bool					pending_group_remove;
89 	int					zcopy_send_flags;
90 	int					connection_status;
91 	int					placement_id;
92 	uint8_t                                 reserved[4];
93 	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
94 	TAILQ_ENTRY(spdk_uring_sock)		link;
95 	char					interface_name[IFNAMSIZ];
96 };
97 /* 'struct cmsghdr' is mapped to the buffer 'buf', and while first element
98  * of this control message header has a size of 8 bytes, 'buf'
99  * must be 8-byte aligned.
100  */
101 SPDK_STATIC_ASSERT(offsetof(struct spdk_uring_sock, buf) % 8 == 0,
102 		   "Incorrect alignment: `buf` must be aligned to 8 bytes");
103 
104 TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
105 
106 struct spdk_uring_buf_tracker {
107 	void					*buf;
108 	size_t					buflen;
109 	size_t					len;
110 	void					*ctx;
111 	int					id;
112 	STAILQ_ENTRY(spdk_uring_buf_tracker)	link;
113 };
114 
115 struct spdk_uring_sock_group_impl {
116 	struct spdk_sock_group_impl		base;
117 	struct io_uring				uring;
118 	uint32_t				io_inflight;
119 	uint32_t				io_queued;
120 	uint32_t				io_avail;
121 	struct pending_recv_list		pending_recv;
122 
123 	struct io_uring_buf_ring		*buf_ring;
124 	uint32_t				buf_ring_count;
125 	struct spdk_uring_buf_tracker		*trackers;
126 	STAILQ_HEAD(, spdk_uring_buf_tracker)	free_trackers;
127 };
128 
129 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
130 	.recv_buf_size = DEFAULT_SO_RCVBUF_SIZE,
131 	.send_buf_size = DEFAULT_SO_SNDBUF_SIZE,
132 	.enable_recv_pipe = true,
133 	.enable_quickack = false,
134 	.enable_placement_id = PLACEMENT_NONE,
135 	.enable_zerocopy_send_server = false,
136 	.enable_zerocopy_send_client = false,
137 	.zerocopy_threshold = 0,
138 	.tls_version = 0,
139 	.enable_ktls = false,
140 	.psk_key = NULL,
141 	.psk_identity = NULL
142 };
143 
144 static struct spdk_sock_map g_map = {
145 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
146 	.mtx = PTHREAD_MUTEX_INITIALIZER
147 };
148 
149 __attribute((destructor)) static void
150 uring_sock_map_cleanup(void)
151 {
152 	spdk_sock_map_cleanup(&g_map);
153 }
154 
155 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
156 
157 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
158 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
159 
160 static void
161 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
162 			  size_t len)
163 {
164 #define FIELD_OK(field) \
165 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
166 
167 #define SET_FIELD(field) \
168 	if (FIELD_OK(field)) { \
169 		dest->field = src->field; \
170 	}
171 
172 	SET_FIELD(recv_buf_size);
173 	SET_FIELD(send_buf_size);
174 	SET_FIELD(enable_recv_pipe);
175 	SET_FIELD(enable_quickack);
176 	SET_FIELD(enable_placement_id);
177 	SET_FIELD(enable_zerocopy_send_server);
178 	SET_FIELD(enable_zerocopy_send_client);
179 	SET_FIELD(zerocopy_threshold);
180 	SET_FIELD(tls_version);
181 	SET_FIELD(enable_ktls);
182 	SET_FIELD(psk_key);
183 	SET_FIELD(psk_identity);
184 
185 #undef SET_FIELD
186 #undef FIELD_OK
187 }
188 
189 static int
190 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
191 {
192 	if (!opts || !len) {
193 		errno = EINVAL;
194 		return -1;
195 	}
196 
197 	assert(sizeof(*opts) >= *len);
198 	memset(opts, 0, *len);
199 
200 	uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len);
201 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
202 
203 	return 0;
204 }
205 
206 static int
207 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
208 {
209 	if (!opts) {
210 		errno = EINVAL;
211 		return -1;
212 	}
213 
214 	assert(sizeof(*opts) >= len);
215 	uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len);
216 
217 	return 0;
218 }
219 
220 static void
221 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
222 {
223 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
224 	memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest));
225 
226 	if (opts->impl_opts != NULL) {
227 		assert(sizeof(*dest) >= opts->impl_opts_size);
228 		uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
229 	}
230 }
231 
232 static int
233 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
234 		   char *caddr, int clen, uint16_t *cport)
235 {
236 	struct spdk_uring_sock *sock = __uring_sock(_sock);
237 
238 	assert(sock != NULL);
239 	return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport);
240 }
241 
242 static const char *
243 uring_sock_get_interface_name(struct spdk_sock *_sock)
244 {
245 	struct spdk_uring_sock *sock = __uring_sock(_sock);
246 	char saddr[64];
247 	int rc;
248 
249 	rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL);
250 	if (rc != 0) {
251 		return NULL;
252 	}
253 
254 	rc = spdk_net_get_interface_name(saddr, sock->interface_name,
255 					 sizeof(sock->interface_name));
256 	if (rc != 0) {
257 		return NULL;
258 	}
259 
260 	return sock->interface_name;
261 }
262 
263 static uint32_t
264 uring_sock_get_numa_socket_id(struct spdk_sock *sock)
265 {
266 	const char *interface_name;
267 	uint32_t numa_socket_id;
268 	int rc;
269 
270 	interface_name = uring_sock_get_interface_name(sock);
271 	if (interface_name == NULL) {
272 		return SPDK_ENV_SOCKET_ID_ANY;
273 	}
274 
275 	rc = spdk_read_sysfs_attribute_uint32(&numa_socket_id,
276 					      "/sys/class/net/%s/device/numa_node", interface_name);
277 	if (rc == 0) {
278 		return numa_socket_id;
279 	} else {
280 		return SPDK_ENV_SOCKET_ID_ANY;
281 	}
282 }
283 
284 enum uring_sock_create_type {
285 	SPDK_SOCK_CREATE_LISTEN,
286 	SPDK_SOCK_CREATE_CONNECT,
287 };
288 
289 static int
290 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
291 {
292 	uint8_t *new_buf;
293 	struct spdk_pipe *new_pipe;
294 	struct iovec siov[2];
295 	struct iovec diov[2];
296 	int sbytes;
297 	ssize_t bytes;
298 	int rc;
299 
300 	if (sock->recv_buf_sz == sz) {
301 		return 0;
302 	}
303 
304 	/* If the new size is 0, just free the pipe */
305 	if (sz == 0) {
306 		spdk_pipe_destroy(sock->recv_pipe);
307 		free(sock->recv_buf);
308 		sock->recv_pipe = NULL;
309 		sock->recv_buf = NULL;
310 		return 0;
311 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
312 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
313 		return -1;
314 	}
315 
316 	/* Round up to next 64 byte multiple */
317 	rc = posix_memalign((void **)&new_buf, 64, sz);
318 	if (rc != 0) {
319 		SPDK_ERRLOG("socket recv buf allocation failed\n");
320 		return -ENOMEM;
321 	}
322 	memset(new_buf, 0, sz);
323 
324 	new_pipe = spdk_pipe_create(new_buf, sz);
325 	if (new_pipe == NULL) {
326 		SPDK_ERRLOG("socket pipe allocation failed\n");
327 		free(new_buf);
328 		return -ENOMEM;
329 	}
330 
331 	if (sock->recv_pipe != NULL) {
332 		/* Pull all of the data out of the old pipe */
333 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
334 		if (sbytes > sz) {
335 			/* Too much data to fit into the new pipe size */
336 			spdk_pipe_destroy(new_pipe);
337 			free(new_buf);
338 			return -EINVAL;
339 		}
340 
341 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
342 		assert(sbytes == sz);
343 
344 		bytes = spdk_iovcpy(siov, 2, diov, 2);
345 		spdk_pipe_writer_advance(new_pipe, bytes);
346 
347 		spdk_pipe_destroy(sock->recv_pipe);
348 		free(sock->recv_buf);
349 	}
350 
351 	sock->recv_buf_sz = sz;
352 	sock->recv_buf = new_buf;
353 	sock->recv_pipe = new_pipe;
354 
355 	return 0;
356 }
357 
358 static int
359 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
360 {
361 	struct spdk_uring_sock *sock = __uring_sock(_sock);
362 	int min_size;
363 	int rc;
364 
365 	assert(sock != NULL);
366 
367 	if (_sock->impl_opts.enable_recv_pipe) {
368 		rc = uring_sock_alloc_pipe(sock, sz);
369 		if (rc) {
370 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
371 			return rc;
372 		}
373 	}
374 
375 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
376 	 * g_spdk_uring_sock_impl_opts.recv_buf_size. */
377 	min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size);
378 
379 	if (sz < min_size) {
380 		sz = min_size;
381 	}
382 
383 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
384 	if (rc < 0) {
385 		return rc;
386 	}
387 
388 	_sock->impl_opts.recv_buf_size = sz;
389 
390 	return 0;
391 }
392 
393 static int
394 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
395 {
396 	struct spdk_uring_sock *sock = __uring_sock(_sock);
397 	int min_size;
398 	int rc;
399 
400 	assert(sock != NULL);
401 
402 	/* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
403 	 * g_spdk_uring_sock_impl_opts.seend_buf_size. */
404 	min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size);
405 
406 	if (sz < min_size) {
407 		sz = min_size;
408 	}
409 
410 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
411 	if (rc < 0) {
412 		return rc;
413 	}
414 
415 	_sock->impl_opts.send_buf_size = sz;
416 
417 	return 0;
418 }
419 
420 static struct spdk_uring_sock *
421 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
422 {
423 	struct spdk_uring_sock *sock;
424 #if defined(__linux__)
425 	int flag;
426 	int rc;
427 #endif
428 
429 	sock = calloc(1, sizeof(*sock));
430 	if (sock == NULL) {
431 		SPDK_ERRLOG("sock allocation failed\n");
432 		return NULL;
433 	}
434 
435 	sock->fd = fd;
436 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
437 
438 	STAILQ_INIT(&sock->recv_stream);
439 
440 #if defined(__linux__)
441 	flag = 1;
442 
443 	if (sock->base.impl_opts.enable_quickack) {
444 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
445 		if (rc != 0) {
446 			SPDK_ERRLOG("quickack was failed to set\n");
447 		}
448 	}
449 
450 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
451 				   &sock->placement_id);
452 #ifdef SPDK_ZEROCOPY
453 	/* Try to turn on zero copy sends */
454 	flag = 1;
455 
456 	if (enable_zero_copy) {
457 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
458 		if (rc == 0) {
459 			sock->zcopy = true;
460 			sock->zcopy_send_flags = MSG_ZEROCOPY;
461 		}
462 	}
463 #endif
464 #endif
465 
466 	return sock;
467 }
468 
469 static struct spdk_sock *
470 uring_sock_create(const char *ip, int port,
471 		  enum uring_sock_create_type type,
472 		  struct spdk_sock_opts *opts)
473 {
474 	struct spdk_uring_sock *sock;
475 	struct spdk_sock_impl_opts impl_opts;
476 	char buf[MAX_TMPBUF];
477 	char portnum[PORTNUMLEN];
478 	char *p;
479 	const char *src_addr;
480 	uint16_t src_port;
481 	struct addrinfo hints, *res, *res0, *src_ai;
482 	int fd, flag;
483 	int val = 1;
484 	int rc;
485 	bool enable_zcopy_impl_opts = false;
486 	bool enable_zcopy_user_opts = true;
487 
488 	assert(opts != NULL);
489 	uring_opts_get_impl_opts(opts, &impl_opts);
490 
491 	if (ip == NULL) {
492 		return NULL;
493 	}
494 	if (ip[0] == '[') {
495 		snprintf(buf, sizeof(buf), "%s", ip + 1);
496 		p = strchr(buf, ']');
497 		if (p != NULL) {
498 			*p = '\0';
499 		}
500 		ip = (const char *) &buf[0];
501 	}
502 
503 	snprintf(portnum, sizeof portnum, "%d", port);
504 	memset(&hints, 0, sizeof hints);
505 	hints.ai_family = PF_UNSPEC;
506 	hints.ai_socktype = SOCK_STREAM;
507 	hints.ai_flags = AI_NUMERICSERV;
508 	hints.ai_flags |= AI_PASSIVE;
509 	hints.ai_flags |= AI_NUMERICHOST;
510 	rc = getaddrinfo(ip, portnum, &hints, &res0);
511 	if (rc != 0) {
512 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
513 		return NULL;
514 	}
515 
516 	/* try listen */
517 	fd = -1;
518 	for (res = res0; res != NULL; res = res->ai_next) {
519 retry:
520 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
521 		if (fd < 0) {
522 			/* error */
523 			continue;
524 		}
525 
526 		val = impl_opts.recv_buf_size;
527 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
528 		if (rc) {
529 			/* Not fatal */
530 		}
531 
532 		val = impl_opts.send_buf_size;
533 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
534 		if (rc) {
535 			/* Not fatal */
536 		}
537 
538 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
539 		if (rc != 0) {
540 			close(fd);
541 			fd = -1;
542 			/* error */
543 			continue;
544 		}
545 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
546 		if (rc != 0) {
547 			close(fd);
548 			fd = -1;
549 			/* error */
550 			continue;
551 		}
552 
553 		if (opts->ack_timeout) {
554 #if defined(__linux__)
555 			val = opts->ack_timeout;
556 			rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val);
557 			if (rc != 0) {
558 				close(fd);
559 				fd = -1;
560 				/* error */
561 				continue;
562 			}
563 #else
564 			SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
565 #endif
566 		}
567 
568 
569 
570 #if defined(SO_PRIORITY)
571 		if (opts != NULL && opts->priority) {
572 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
573 			if (rc != 0) {
574 				close(fd);
575 				fd = -1;
576 				/* error */
577 				continue;
578 			}
579 		}
580 #endif
581 		if (res->ai_family == AF_INET6) {
582 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
583 			if (rc != 0) {
584 				close(fd);
585 				fd = -1;
586 				/* error */
587 				continue;
588 			}
589 		}
590 
591 		if (type == SPDK_SOCK_CREATE_LISTEN) {
592 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
593 			if (rc != 0) {
594 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
595 				switch (errno) {
596 				case EINTR:
597 					/* interrupted? */
598 					close(fd);
599 					goto retry;
600 				case EADDRNOTAVAIL:
601 					SPDK_ERRLOG("IP address %s not available. "
602 						    "Verify IP address in config file "
603 						    "and make sure setup script is "
604 						    "run before starting spdk app.\n", ip);
605 				/* FALLTHROUGH */
606 				default:
607 					/* try next family */
608 					close(fd);
609 					fd = -1;
610 					continue;
611 				}
612 			}
613 			/* bind OK */
614 			rc = listen(fd, 512);
615 			if (rc != 0) {
616 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
617 				close(fd);
618 				fd = -1;
619 				break;
620 			}
621 
622 			flag = fcntl(fd, F_GETFL);
623 			if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
624 				SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
625 				close(fd);
626 				fd = -1;
627 				break;
628 			}
629 
630 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
631 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
632 			src_addr = SPDK_GET_FIELD(opts, src_addr, NULL, opts->opts_size);
633 			src_port = SPDK_GET_FIELD(opts, src_port, 0, opts->opts_size);
634 			if (src_addr != NULL || src_port != 0) {
635 				snprintf(portnum, sizeof(portnum), "%"PRIu16, src_port);
636 				memset(&hints, 0, sizeof hints);
637 				hints.ai_family = AF_UNSPEC;
638 				hints.ai_socktype = SOCK_STREAM;
639 				hints.ai_flags = AI_NUMERICSERV | AI_NUMERICHOST | AI_PASSIVE;
640 				rc = getaddrinfo(src_addr, src_port > 0 ? portnum : NULL,
641 						 &hints, &src_ai);
642 				if (rc != 0 || src_ai == NULL) {
643 					SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n",
644 						    rc != 0 ? gai_strerror(rc) : "", rc);
645 					close(fd);
646 					fd = -1;
647 					break;
648 				}
649 				rc = bind(fd, src_ai->ai_addr, src_ai->ai_addrlen);
650 				if (rc != 0) {
651 					SPDK_ERRLOG("bind() failed errno %d (%s:%s)\n", errno,
652 						    src_addr ? src_addr : "", portnum);
653 					close(fd);
654 					fd = -1;
655 					break;
656 				}
657 				freeaddrinfo(src_ai);
658 				src_ai = NULL;
659 			}
660 
661 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
662 			if (rc != 0) {
663 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
664 				/* try next family */
665 				close(fd);
666 				fd = -1;
667 				continue;
668 			}
669 
670 			flag = fcntl(fd, F_GETFL);
671 			if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
672 				SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
673 				close(fd);
674 				fd = -1;
675 				break;
676 			}
677 
678 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
679 		}
680 		break;
681 	}
682 	freeaddrinfo(res0);
683 
684 	if (fd < 0) {
685 		return NULL;
686 	}
687 
688 	enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd);
689 	sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
690 	if (sock == NULL) {
691 		SPDK_ERRLOG("sock allocation failed\n");
692 		close(fd);
693 		return NULL;
694 	}
695 
696 	return &sock->base;
697 }
698 
699 static struct spdk_sock *
700 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
701 {
702 	if (spdk_interrupt_mode_is_enabled()) {
703 		SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
704 		return NULL;
705 	}
706 
707 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
708 }
709 
710 static struct spdk_sock *
711 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
712 {
713 	if (spdk_interrupt_mode_is_enabled()) {
714 		SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
715 		return NULL;
716 	}
717 
718 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
719 }
720 
721 static struct spdk_sock *
722 uring_sock_accept(struct spdk_sock *_sock)
723 {
724 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
725 	struct sockaddr_storage		sa;
726 	socklen_t			salen;
727 	int				rc, fd;
728 	struct spdk_uring_sock		*new_sock;
729 	int				flag;
730 
731 	memset(&sa, 0, sizeof(sa));
732 	salen = sizeof(sa);
733 
734 	assert(sock != NULL);
735 
736 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
737 
738 	if (rc == -1) {
739 		return NULL;
740 	}
741 
742 	fd = rc;
743 
744 	flag = fcntl(fd, F_GETFL);
745 	if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
746 		SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
747 		close(fd);
748 		return NULL;
749 	}
750 
751 #if defined(SO_PRIORITY)
752 	/* The priority is not inherited, so call this function again */
753 	if (sock->base.opts.priority) {
754 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
755 		if (rc != 0) {
756 			close(fd);
757 			return NULL;
758 		}
759 	}
760 #endif
761 
762 	new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
763 	if (new_sock == NULL) {
764 		close(fd);
765 		return NULL;
766 	}
767 
768 	return &new_sock->base;
769 }
770 
771 static int
772 uring_sock_close(struct spdk_sock *_sock)
773 {
774 	struct spdk_uring_sock *sock = __uring_sock(_sock);
775 
776 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
777 	assert(sock->group == NULL);
778 
779 	/* If the socket fails to close, the best choice is to
780 	 * leak the fd but continue to free the rest of the sock
781 	 * memory. */
782 	close(sock->fd);
783 
784 	spdk_pipe_destroy(sock->recv_pipe);
785 	free(sock->recv_buf);
786 	free(sock);
787 
788 	return 0;
789 }
790 
791 static ssize_t
792 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
793 {
794 	struct iovec siov[2];
795 	int sbytes;
796 	ssize_t bytes;
797 	struct spdk_uring_sock_group_impl *group;
798 
799 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
800 	if (sbytes < 0) {
801 		errno = EINVAL;
802 		return -1;
803 	} else if (sbytes == 0) {
804 		errno = EAGAIN;
805 		return -1;
806 	}
807 
808 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
809 
810 	if (bytes == 0) {
811 		/* The only way this happens is if diov is 0 length */
812 		errno = EINVAL;
813 		return -1;
814 	}
815 
816 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
817 
818 	/* If we drained the pipe, take it off the level-triggered list */
819 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
820 		group = __uring_group_impl(sock->base.group_impl);
821 		TAILQ_REMOVE(&group->pending_recv, sock, link);
822 		sock->pending_recv = false;
823 	}
824 
825 	return bytes;
826 }
827 
828 static inline ssize_t
829 sock_readv(int fd, struct iovec *iov, int iovcnt)
830 {
831 	struct msghdr msg = {
832 		.msg_iov = iov,
833 		.msg_iovlen = iovcnt,
834 	};
835 
836 	return recvmsg(fd, &msg, MSG_DONTWAIT);
837 }
838 
839 static inline ssize_t
840 uring_sock_read(struct spdk_uring_sock *sock)
841 {
842 	struct iovec iov[2];
843 	int bytes;
844 	struct spdk_uring_sock_group_impl *group;
845 
846 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
847 
848 	if (bytes > 0) {
849 		bytes = sock_readv(sock->fd, iov, 2);
850 		if (bytes > 0) {
851 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
852 			if (sock->base.group_impl && !sock->pending_recv) {
853 				group = __uring_group_impl(sock->base.group_impl);
854 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
855 				sock->pending_recv = true;
856 			}
857 		}
858 	}
859 
860 	return bytes;
861 }
862 
863 static int
864 uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx)
865 {
866 	struct spdk_uring_sock *sock = __uring_sock(_sock);
867 	struct spdk_uring_sock_group_impl *group;
868 	struct spdk_uring_buf_tracker *tr;
869 
870 	if (sock->connection_status < 0) {
871 		errno = -sock->connection_status;
872 		return -1;
873 	}
874 
875 	if (sock->recv_pipe != NULL) {
876 		errno = ENOTSUP;
877 		return -1;
878 	}
879 
880 	group = __uring_group_impl(_sock->group_impl);
881 
882 	tr = STAILQ_FIRST(&sock->recv_stream);
883 	if (tr == NULL) {
884 		if (sock->group->buf_ring_count > 0) {
885 			/* There are buffers posted, but data hasn't arrived. */
886 			errno = EAGAIN;
887 		} else {
888 			/* There are no buffers posted, so this won't ever
889 			 * make forward progress. */
890 			errno = ENOBUFS;
891 		}
892 		return -1;
893 	}
894 	assert(sock->pending_recv == true);
895 	assert(tr->buf != NULL);
896 
897 	*_buf = tr->buf + sock->recv_offset;
898 	*ctx = tr->ctx;
899 
900 	STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
901 	STAILQ_INSERT_HEAD(&group->free_trackers, tr, link);
902 
903 	if (STAILQ_EMPTY(&sock->recv_stream)) {
904 		sock->pending_recv = false;
905 		TAILQ_REMOVE(&group->pending_recv, sock, link);
906 	}
907 
908 	return tr->len - sock->recv_offset;
909 }
910 
911 static ssize_t
912 uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt)
913 {
914 	struct spdk_uring_sock *sock = __uring_sock(_sock);
915 	struct spdk_uring_buf_tracker *tr;
916 	struct iovec iov;
917 	ssize_t total, len;
918 	int i;
919 
920 	if (sock->connection_status < 0) {
921 		errno = -sock->connection_status;
922 		return -1;
923 	}
924 
925 	if (_sock->group_impl == NULL) {
926 		/* If not in a group just read from the socket the regular way. */
927 		return sock_readv(sock->fd, iovs, iovcnt);
928 	}
929 
930 	if (STAILQ_EMPTY(&sock->recv_stream)) {
931 		if (sock->group->buf_ring_count == 0) {
932 			/* If the user hasn't posted any buffers, read from the socket
933 			 * directly. */
934 
935 			if (sock->pending_recv) {
936 				sock->pending_recv = false;
937 				TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link);
938 			}
939 
940 			return sock_readv(sock->fd, iovs, iovcnt);
941 		}
942 
943 		errno = EAGAIN;
944 		return -1;
945 	}
946 
947 	total = 0;
948 	for (i = 0; i < iovcnt; i++) {
949 		/* Copy to stack so we can change it */
950 		iov = iovs[i];
951 
952 		tr = STAILQ_FIRST(&sock->recv_stream);
953 		while (tr != NULL) {
954 			len = spdk_min(iov.iov_len, tr->len - sock->recv_offset);
955 			memcpy(iov.iov_base, tr->buf + sock->recv_offset, len);
956 
957 			total += len;
958 			sock->recv_offset += len;
959 			iov.iov_base += len;
960 			iov.iov_len -= len;
961 
962 			if (sock->recv_offset == tr->len) {
963 				sock->recv_offset = 0;
964 				STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
965 				STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link);
966 				spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx);
967 				tr = STAILQ_FIRST(&sock->recv_stream);
968 			}
969 
970 			if (iov.iov_len == 0) {
971 				break;
972 			}
973 		}
974 	}
975 
976 	if (STAILQ_EMPTY(&sock->recv_stream)) {
977 		struct spdk_uring_sock_group_impl *group;
978 
979 		group = __uring_group_impl(_sock->group_impl);
980 		sock->pending_recv = false;
981 		TAILQ_REMOVE(&group->pending_recv, sock, link);
982 	}
983 
984 	assert(total > 0);
985 	return total;
986 }
987 
988 static ssize_t
989 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
990 {
991 	struct spdk_uring_sock *sock = __uring_sock(_sock);
992 	int rc, i;
993 	size_t len;
994 
995 	if (sock->connection_status < 0) {
996 		errno = -sock->connection_status;
997 		return -1;
998 	}
999 
1000 	if (sock->recv_pipe == NULL) {
1001 		return uring_sock_readv_no_pipe(_sock, iov, iovcnt);
1002 	}
1003 
1004 	len = 0;
1005 	for (i = 0; i < iovcnt; i++) {
1006 		len += iov[i].iov_len;
1007 	}
1008 
1009 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
1010 		/* If the user is receiving a sufficiently large amount of data,
1011 		 * receive directly to their buffers. */
1012 		if (len >= MIN_SOCK_PIPE_SIZE) {
1013 			return sock_readv(sock->fd, iov, iovcnt);
1014 		}
1015 
1016 		/* Otherwise, do a big read into our pipe */
1017 		rc = uring_sock_read(sock);
1018 		if (rc <= 0) {
1019 			return rc;
1020 		}
1021 	}
1022 
1023 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
1024 }
1025 
1026 static ssize_t
1027 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1028 {
1029 	struct iovec iov[1];
1030 
1031 	iov[0].iov_base = buf;
1032 	iov[0].iov_len = len;
1033 
1034 	return uring_sock_readv(sock, iov, 1);
1035 }
1036 
1037 static ssize_t
1038 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1039 {
1040 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1041 	struct msghdr msg = {
1042 		.msg_iov = iov,
1043 		.msg_iovlen = iovcnt,
1044 	};
1045 
1046 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1047 		errno = EAGAIN;
1048 		return -1;
1049 	}
1050 
1051 	return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
1052 }
1053 
1054 static ssize_t
1055 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc)
1056 {
1057 	unsigned int offset;
1058 	size_t len;
1059 	int i;
1060 
1061 	offset = req->internal.offset;
1062 	for (i = 0; i < req->iovcnt; i++) {
1063 		/* Advance by the offset first */
1064 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
1065 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
1066 			continue;
1067 		}
1068 
1069 		/* Calculate the remaining length of this element */
1070 		len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
1071 
1072 		if (len > (size_t)rc) {
1073 			req->internal.offset += rc;
1074 			return -1;
1075 		}
1076 
1077 		offset = 0;
1078 		req->internal.offset += len;
1079 		rc -= len;
1080 	}
1081 
1082 	return rc;
1083 }
1084 
1085 static int
1086 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
1087 {
1088 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1089 	struct spdk_sock_request *req;
1090 	int retval;
1091 
1092 	if (is_zcopy) {
1093 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
1094 		 * req->internal.offset, so sendmsg_idx should not be zero */
1095 		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
1096 			sock->sendmsg_idx = 1;
1097 		} else {
1098 			sock->sendmsg_idx++;
1099 		}
1100 	}
1101 
1102 	/* Consume the requests that were actually written */
1103 	req = TAILQ_FIRST(&_sock->queued_reqs);
1104 	while (req) {
1105 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
1106 		req->internal.is_zcopy = is_zcopy;
1107 
1108 		rc = sock_request_advance_offset(req, rc);
1109 		if (rc < 0) {
1110 			/* This element was partially sent. */
1111 			return 0;
1112 		}
1113 
1114 		/* Handled a full request. */
1115 		spdk_sock_request_pend(_sock, req);
1116 
1117 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
1118 			retval = spdk_sock_request_put(_sock, req, 0);
1119 			if (retval) {
1120 				return retval;
1121 			}
1122 		} else {
1123 			/* Re-use the offset field to hold the sendmsg call index. The
1124 			 * index is 0 based, so subtract one here because we've already
1125 			 * incremented above. */
1126 			req->internal.offset = sock->sendmsg_idx - 1;
1127 		}
1128 
1129 		if (rc == 0) {
1130 			break;
1131 		}
1132 
1133 		req = TAILQ_FIRST(&_sock->queued_reqs);
1134 	}
1135 
1136 	return 0;
1137 }
1138 
1139 #ifdef SPDK_ZEROCOPY
1140 static int
1141 _sock_check_zcopy(struct spdk_sock *_sock, int status)
1142 {
1143 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1144 	ssize_t rc;
1145 	struct sock_extended_err *serr;
1146 	struct cmsghdr *cm;
1147 	uint32_t idx;
1148 	struct spdk_sock_request *req, *treq;
1149 	bool found;
1150 
1151 	assert(sock->zcopy == true);
1152 	if (spdk_unlikely(status) < 0) {
1153 		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
1154 			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
1155 				    status);
1156 		} else {
1157 			SPDK_WARNLOG("Recvmsg yielded an error!\n");
1158 		}
1159 		return 0;
1160 	}
1161 
1162 	cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
1163 	if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
1164 	      (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) {
1165 		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
1166 		return 0;
1167 	}
1168 
1169 	serr = (struct sock_extended_err *)CMSG_DATA(cm);
1170 	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
1171 		SPDK_WARNLOG("Unexpected extended error origin\n");
1172 		return 0;
1173 	}
1174 
1175 	/* Most of the time, the pending_reqs array is in the exact
1176 	 * order we need such that all of the requests to complete are
1177 	 * in order, in the front. It is guaranteed that all requests
1178 	 * belonging to the same sendmsg call are sequential, so once
1179 	 * we encounter one match we can stop looping as soon as a
1180 	 * non-match is found.
1181 	 */
1182 	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
1183 		found = false;
1184 		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
1185 			if (!req->internal.is_zcopy) {
1186 				/* This wasn't a zcopy request. It was just waiting in line to complete */
1187 				rc = spdk_sock_request_put(_sock, req, 0);
1188 				if (rc < 0) {
1189 					return rc;
1190 				}
1191 			} else if (req->internal.offset == idx) {
1192 				found = true;
1193 				rc = spdk_sock_request_put(_sock, req, 0);
1194 				if (rc < 0) {
1195 					return rc;
1196 				}
1197 			} else if (found) {
1198 				break;
1199 			}
1200 		}
1201 	}
1202 
1203 	return 0;
1204 }
1205 
1206 static void
1207 _sock_prep_errqueue(struct spdk_sock *_sock)
1208 {
1209 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1210 	struct spdk_uring_task *task = &sock->errqueue_task;
1211 	struct io_uring_sqe *sqe;
1212 
1213 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1214 		return;
1215 	}
1216 
1217 	if (sock->pending_group_remove) {
1218 		return;
1219 	}
1220 
1221 	assert(sock->group != NULL);
1222 	sock->group->io_queued++;
1223 
1224 	sqe = io_uring_get_sqe(&sock->group->uring);
1225 	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
1226 	io_uring_sqe_set_data(sqe, task);
1227 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1228 }
1229 
1230 #endif
1231 
1232 static void
1233 _sock_flush(struct spdk_sock *_sock)
1234 {
1235 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1236 	struct spdk_uring_task *task = &sock->write_task;
1237 	uint32_t iovcnt;
1238 	struct io_uring_sqe *sqe;
1239 	int flags;
1240 
1241 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1242 		return;
1243 	}
1244 
1245 #ifdef SPDK_ZEROCOPY
1246 	if (sock->zcopy) {
1247 		flags = MSG_DONTWAIT | sock->zcopy_send_flags;
1248 	} else
1249 #endif
1250 	{
1251 		flags = MSG_DONTWAIT;
1252 	}
1253 
1254 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
1255 	if (!iovcnt) {
1256 		return;
1257 	}
1258 
1259 	task->iov_cnt = iovcnt;
1260 	assert(sock->group != NULL);
1261 	task->msg.msg_iov = task->iovs;
1262 	task->msg.msg_iovlen = task->iov_cnt;
1263 #ifdef SPDK_ZEROCOPY
1264 	task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
1265 #endif
1266 	sock->group->io_queued++;
1267 
1268 	sqe = io_uring_get_sqe(&sock->group->uring);
1269 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
1270 	io_uring_sqe_set_data(sqe, task);
1271 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1272 }
1273 
1274 static void
1275 _sock_prep_read(struct spdk_sock *_sock)
1276 {
1277 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1278 	struct spdk_uring_task *task = &sock->read_task;
1279 	struct io_uring_sqe *sqe;
1280 
1281 	/* Do not prepare read event */
1282 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1283 		return;
1284 	}
1285 
1286 	if (sock->pending_group_remove) {
1287 		return;
1288 	}
1289 
1290 	assert(sock->group != NULL);
1291 	sock->group->io_queued++;
1292 
1293 	sqe = io_uring_get_sqe(&sock->group->uring);
1294 	io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0);
1295 	sqe->buf_group = URING_BUF_GROUP_ID;
1296 	sqe->flags |= IOSQE_BUFFER_SELECT;
1297 	io_uring_sqe_set_data(sqe, task);
1298 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1299 }
1300 
1301 static void
1302 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
1303 {
1304 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1305 	struct spdk_uring_task *task = &sock->cancel_task;
1306 	struct io_uring_sqe *sqe;
1307 
1308 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1309 		return;
1310 	}
1311 
1312 	assert(sock->group != NULL);
1313 	sock->group->io_queued++;
1314 
1315 	sqe = io_uring_get_sqe(&sock->group->uring);
1316 	io_uring_prep_cancel(sqe, user_data, 0);
1317 	io_uring_sqe_set_data(sqe, task);
1318 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1319 }
1320 
1321 static void
1322 uring_sock_fail(struct spdk_uring_sock *sock, int status)
1323 {
1324 	struct spdk_uring_sock_group_impl *group = sock->group;
1325 	int rc;
1326 
1327 	sock->connection_status = status;
1328 	rc = spdk_sock_abort_requests(&sock->base);
1329 
1330 	/* The user needs to be notified that this socket is dead. */
1331 	if (rc == 0 && sock->base.cb_fn != NULL &&
1332 	    sock->pending_recv == false) {
1333 		sock->pending_recv = true;
1334 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1335 	}
1336 }
1337 
1338 static int
1339 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
1340 		      struct spdk_sock **socks)
1341 {
1342 	int i, count, ret;
1343 	struct io_uring_cqe *cqe;
1344 	struct spdk_uring_sock *sock, *tmp;
1345 	struct spdk_uring_task *task;
1346 	int status, bid, flags;
1347 	bool is_zcopy;
1348 
1349 	for (i = 0; i < max; i++) {
1350 		ret = io_uring_peek_cqe(&group->uring, &cqe);
1351 		if (ret != 0) {
1352 			break;
1353 		}
1354 
1355 		if (cqe == NULL) {
1356 			break;
1357 		}
1358 
1359 		task = (struct spdk_uring_task *)cqe->user_data;
1360 		assert(task != NULL);
1361 		sock = task->sock;
1362 		assert(sock != NULL);
1363 		assert(sock->group != NULL);
1364 		assert(sock->group == group);
1365 		sock->group->io_inflight--;
1366 		sock->group->io_avail++;
1367 		status = cqe->res;
1368 		flags = cqe->flags;
1369 		io_uring_cqe_seen(&group->uring, cqe);
1370 
1371 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
1372 
1373 		switch (task->type) {
1374 		case URING_TASK_READ:
1375 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
1376 				/* This likely shouldn't happen, but would indicate that the
1377 				 * kernel didn't have enough resources to queue a task internally. */
1378 				_sock_prep_read(&sock->base);
1379 			} else if (status == -ECANCELED) {
1380 				continue;
1381 			} else if (status == -ENOBUFS) {
1382 				/* There's data in the socket but the user hasn't provided any buffers.
1383 				 * We need to notify the user that the socket has data pending. */
1384 				if (sock->base.cb_fn != NULL &&
1385 				    sock->pending_recv == false) {
1386 					sock->pending_recv = true;
1387 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1388 				}
1389 
1390 				_sock_prep_read(&sock->base);
1391 			} else if (spdk_unlikely(status <= 0)) {
1392 				uring_sock_fail(sock, status < 0 ? status : -ECONNRESET);
1393 			} else {
1394 				struct spdk_uring_buf_tracker *tracker;
1395 
1396 				assert((flags & IORING_CQE_F_BUFFER) != 0);
1397 
1398 				bid = flags >> IORING_CQE_BUFFER_SHIFT;
1399 				tracker = &group->trackers[bid];
1400 
1401 				assert(tracker->buf != NULL);
1402 				assert(tracker->len != 0);
1403 
1404 				/* Append this data to the stream */
1405 				tracker->len = status;
1406 				STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link);
1407 				assert(group->buf_ring_count > 0);
1408 				group->buf_ring_count--;
1409 
1410 				if (sock->base.cb_fn != NULL &&
1411 				    sock->pending_recv == false) {
1412 					sock->pending_recv = true;
1413 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1414 				}
1415 
1416 				_sock_prep_read(&sock->base);
1417 			}
1418 			break;
1419 		case URING_TASK_WRITE:
1420 			if (status == -EAGAIN || status == -EWOULDBLOCK ||
1421 			    (status == -ENOBUFS && sock->zcopy) ||
1422 			    status == -ECANCELED) {
1423 				continue;
1424 			} else if (spdk_unlikely(status) < 0) {
1425 				uring_sock_fail(sock, status);
1426 			} else {
1427 				task->last_req = NULL;
1428 				task->iov_cnt = 0;
1429 				is_zcopy = task->is_zcopy;
1430 				task->is_zcopy = false;
1431 				sock_complete_write_reqs(&sock->base, status, is_zcopy);
1432 			}
1433 
1434 			break;
1435 #ifdef SPDK_ZEROCOPY
1436 		case URING_TASK_ERRQUEUE:
1437 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
1438 				_sock_prep_errqueue(&sock->base);
1439 			} else if (status == -ECANCELED) {
1440 				continue;
1441 			} else if (spdk_unlikely(status < 0)) {
1442 				uring_sock_fail(sock, status);
1443 			} else {
1444 				_sock_check_zcopy(&sock->base, status);
1445 				_sock_prep_errqueue(&sock->base);
1446 			}
1447 			break;
1448 #endif
1449 		case URING_TASK_CANCEL:
1450 			/* Do nothing */
1451 			break;
1452 		default:
1453 			SPDK_UNREACHABLE();
1454 		}
1455 	}
1456 
1457 	if (!socks) {
1458 		return 0;
1459 	}
1460 	count = 0;
1461 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1462 		if (count == max_read_events) {
1463 			break;
1464 		}
1465 
1466 		/* If the socket's cb_fn is NULL, do not add it to socks array */
1467 		if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1468 			assert(sock->pending_recv == true);
1469 			sock->pending_recv = false;
1470 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1471 			continue;
1472 		}
1473 
1474 		socks[count++] = &sock->base;
1475 	}
1476 
1477 
1478 	/* Cycle the pending_recv list so that each time we poll things aren't
1479 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1480 	 * A B C D E F
1481 	 * And all 6 sockets had the poll events, but max_events is only 3. That means
1482 	 * psock currently points at D. We want to rearrange the list to the following:
1483 	 * D E F A B C
1484 	 *
1485 	 * The variables below are named according to this example to make it easier to
1486 	 * follow the swaps.
1487 	 */
1488 	if (sock != NULL) {
1489 		struct spdk_uring_sock *ua, *uc, *ud, *uf;
1490 
1491 		/* Capture pointers to the elements we need */
1492 		ud = sock;
1493 
1494 		ua = TAILQ_FIRST(&group->pending_recv);
1495 		if (ua == ud) {
1496 			goto end;
1497 		}
1498 
1499 		uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
1500 		if (uf == ud) {
1501 			TAILQ_REMOVE(&group->pending_recv, ud, link);
1502 			TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
1503 			goto end;
1504 		}
1505 
1506 		uc = TAILQ_PREV(ud, pending_recv_list, link);
1507 		assert(uc != NULL);
1508 
1509 		/* Break the link between C and D */
1510 		uc->link.tqe_next = NULL;
1511 
1512 		/* Connect F to A */
1513 		uf->link.tqe_next = ua;
1514 		ua->link.tqe_prev = &uf->link.tqe_next;
1515 
1516 		/* Fix up the list first/last pointers */
1517 		group->pending_recv.tqh_first = ud;
1518 		group->pending_recv.tqh_last = &uc->link.tqe_next;
1519 
1520 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1521 		ud->link.tqe_prev = &group->pending_recv.tqh_first;
1522 	}
1523 
1524 end:
1525 	return count;
1526 }
1527 
1528 static int uring_sock_flush(struct spdk_sock *_sock);
1529 
1530 static void
1531 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1532 {
1533 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1534 	int rc;
1535 
1536 	if (spdk_unlikely(sock->connection_status)) {
1537 		req->cb_fn(req->cb_arg, sock->connection_status);
1538 		return;
1539 	}
1540 
1541 	spdk_sock_request_queue(_sock, req);
1542 
1543 	if (!sock->group) {
1544 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1545 			rc = uring_sock_flush(_sock);
1546 			if (rc < 0 && errno != EAGAIN) {
1547 				spdk_sock_abort_requests(_sock);
1548 			}
1549 		}
1550 	}
1551 }
1552 
1553 static int
1554 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1555 {
1556 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1557 	int val;
1558 	int rc;
1559 
1560 	assert(sock != NULL);
1561 
1562 	val = nbytes;
1563 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1564 	if (rc != 0) {
1565 		return -1;
1566 	}
1567 	return 0;
1568 }
1569 
1570 static bool
1571 uring_sock_is_ipv6(struct spdk_sock *_sock)
1572 {
1573 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1574 	struct sockaddr_storage sa;
1575 	socklen_t salen;
1576 	int rc;
1577 
1578 	assert(sock != NULL);
1579 
1580 	memset(&sa, 0, sizeof sa);
1581 	salen = sizeof sa;
1582 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1583 	if (rc != 0) {
1584 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1585 		return false;
1586 	}
1587 
1588 	return (sa.ss_family == AF_INET6);
1589 }
1590 
1591 static bool
1592 uring_sock_is_ipv4(struct spdk_sock *_sock)
1593 {
1594 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1595 	struct sockaddr_storage sa;
1596 	socklen_t salen;
1597 	int rc;
1598 
1599 	assert(sock != NULL);
1600 
1601 	memset(&sa, 0, sizeof sa);
1602 	salen = sizeof sa;
1603 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1604 	if (rc != 0) {
1605 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1606 		return false;
1607 	}
1608 
1609 	return (sa.ss_family == AF_INET);
1610 }
1611 
1612 static bool
1613 uring_sock_is_connected(struct spdk_sock *_sock)
1614 {
1615 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1616 	uint8_t byte;
1617 	int rc;
1618 
1619 	rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
1620 	if (rc == 0) {
1621 		return false;
1622 	}
1623 
1624 	if (rc < 0) {
1625 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1626 			return true;
1627 		}
1628 
1629 		return false;
1630 	}
1631 
1632 	return true;
1633 }
1634 
1635 static struct spdk_sock_group_impl *
1636 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1637 {
1638 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1639 	struct spdk_sock_group_impl *group;
1640 
1641 	if (sock->placement_id != -1) {
1642 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
1643 		return group;
1644 	}
1645 
1646 	return NULL;
1647 }
1648 
1649 static int
1650 uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl)
1651 {
1652 	if (group_impl->buf_ring) {
1653 		io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID);
1654 		free(group_impl->buf_ring);
1655 	}
1656 
1657 	free(group_impl->trackers);
1658 
1659 	return 0;
1660 }
1661 
1662 static int
1663 uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl)
1664 {
1665 	struct io_uring_buf_reg buf_reg = {};
1666 	struct io_uring_buf_ring *buf_ring;
1667 	int i, rc;
1668 
1669 	rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf));
1670 	if (rc != 0) {
1671 		/* posix_memalign returns positive errno values */
1672 		return -rc;
1673 	}
1674 
1675 	buf_reg.ring_addr = (unsigned long long)buf_ring;
1676 	buf_reg.ring_entries = URING_BUF_POOL_SIZE;
1677 	buf_reg.bgid = URING_BUF_GROUP_ID;
1678 
1679 	rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0);
1680 	if (rc != 0) {
1681 		free(buf_ring);
1682 		return rc;
1683 	}
1684 
1685 	group_impl->buf_ring = buf_ring;
1686 	io_uring_buf_ring_init(group_impl->buf_ring);
1687 	group_impl->buf_ring_count = 0;
1688 
1689 	group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker));
1690 	if (group_impl->trackers == NULL) {
1691 		uring_sock_group_impl_buf_pool_free(group_impl);
1692 		return -ENOMEM;
1693 	}
1694 
1695 	STAILQ_INIT(&group_impl->free_trackers);
1696 
1697 	for (i = 0; i < URING_BUF_POOL_SIZE; i++) {
1698 		struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i];
1699 
1700 		tracker->buf = NULL;
1701 		tracker->len = 0;
1702 		tracker->ctx = NULL;
1703 		tracker->id = i;
1704 
1705 		STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link);
1706 	}
1707 
1708 	return 0;
1709 }
1710 
1711 static struct spdk_sock_group_impl *
1712 uring_sock_group_impl_create(void)
1713 {
1714 	struct spdk_uring_sock_group_impl *group_impl;
1715 
1716 	group_impl = calloc(1, sizeof(*group_impl));
1717 	if (group_impl == NULL) {
1718 		SPDK_ERRLOG("group_impl allocation failed\n");
1719 		return NULL;
1720 	}
1721 
1722 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1723 
1724 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1725 		SPDK_ERRLOG("uring I/O context setup failure\n");
1726 		free(group_impl);
1727 		return NULL;
1728 	}
1729 
1730 	TAILQ_INIT(&group_impl->pending_recv);
1731 
1732 	if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) {
1733 		SPDK_ERRLOG("Failed to create buffer ring."
1734 			    "uring sock implementation is likely not supported on this kernel.\n");
1735 		io_uring_queue_exit(&group_impl->uring);
1736 		free(group_impl);
1737 		return NULL;
1738 	}
1739 
1740 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1741 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1742 	}
1743 
1744 	return &group_impl->base;
1745 }
1746 
1747 static int
1748 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1749 			       struct spdk_sock *_sock)
1750 {
1751 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1752 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1753 	int rc;
1754 
1755 	sock->group = group;
1756 	sock->write_task.sock = sock;
1757 	sock->write_task.type = URING_TASK_WRITE;
1758 
1759 	sock->read_task.sock = sock;
1760 	sock->read_task.type = URING_TASK_READ;
1761 
1762 	sock->errqueue_task.sock = sock;
1763 	sock->errqueue_task.type = URING_TASK_ERRQUEUE;
1764 	sock->errqueue_task.msg.msg_control = sock->buf;
1765 	sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
1766 
1767 	sock->cancel_task.sock = sock;
1768 	sock->cancel_task.type = URING_TASK_CANCEL;
1769 
1770 	/* switched from another polling group due to scheduling */
1771 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1772 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1773 		assert(sock->pending_recv == false);
1774 		sock->pending_recv = true;
1775 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1776 	}
1777 
1778 	if (sock->placement_id != -1) {
1779 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1780 		if (rc != 0) {
1781 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1782 			/* Do not treat this as an error. The system will continue running. */
1783 		}
1784 	}
1785 
1786 	/* We get an async read going immediately */
1787 	_sock_prep_read(&sock->base);
1788 #ifdef SPDK_ZEROCOPY
1789 	if (sock->zcopy) {
1790 		_sock_prep_errqueue(_sock);
1791 	}
1792 #endif
1793 
1794 	return 0;
1795 }
1796 
1797 static void
1798 uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group)
1799 {
1800 	struct spdk_uring_buf_tracker *tracker;
1801 	int count, mask;
1802 
1803 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
1804 		/* If recv_pipe is enabled, we do not post buffers. */
1805 		return;
1806 	}
1807 
1808 	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
1809 	tracker = STAILQ_FIRST(&group->free_trackers);
1810 	count = 0;
1811 	mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE);
1812 	while (tracker != NULL) {
1813 		tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx);
1814 		if (tracker->buflen == 0) {
1815 			break;
1816 		}
1817 
1818 		assert(tracker->buf != NULL);
1819 		STAILQ_REMOVE_HEAD(&group->free_trackers, link);
1820 		assert(STAILQ_FIRST(&group->free_trackers) != tracker);
1821 
1822 		io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count);
1823 		count++;
1824 		tracker = STAILQ_FIRST(&group->free_trackers);
1825 	}
1826 
1827 	if (count > 0) {
1828 		group->buf_ring_count += count;
1829 		io_uring_buf_ring_advance(group->buf_ring, count);
1830 	}
1831 }
1832 
1833 static int
1834 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1835 			   struct spdk_sock **socks)
1836 {
1837 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1838 	int count, ret;
1839 	int to_complete, to_submit;
1840 	struct spdk_sock *_sock, *tmp;
1841 	struct spdk_uring_sock *sock;
1842 
1843 	if (spdk_likely(socks)) {
1844 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1845 			sock = __uring_sock(_sock);
1846 			if (spdk_unlikely(sock->connection_status)) {
1847 				continue;
1848 			}
1849 			_sock_flush(_sock);
1850 		}
1851 	}
1852 
1853 	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
1854 	uring_sock_group_populate_buf_ring(group);
1855 
1856 	to_submit = group->io_queued;
1857 
1858 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1859 	if (to_submit > 0) {
1860 		/* If there are I/O to submit, use io_uring_submit here.
1861 		 * It will automatically call io_uring_enter appropriately. */
1862 		ret = io_uring_submit(&group->uring);
1863 		if (ret < 0) {
1864 			return 1;
1865 		}
1866 		group->io_queued = 0;
1867 		group->io_inflight += to_submit;
1868 		group->io_avail -= to_submit;
1869 	}
1870 
1871 	count = 0;
1872 	to_complete = group->io_inflight;
1873 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1874 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1875 	}
1876 
1877 	return count;
1878 }
1879 
1880 static int
1881 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1882 				  struct spdk_sock *_sock)
1883 {
1884 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1885 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1886 
1887 	sock->pending_group_remove = true;
1888 
1889 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1890 		_sock_prep_cancel_task(_sock, &sock->write_task);
1891 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1892 		 * currently can use a while loop here. */
1893 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1894 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1895 			uring_sock_group_impl_poll(_group, 32, NULL);
1896 		}
1897 	}
1898 
1899 	if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1900 		_sock_prep_cancel_task(_sock, &sock->read_task);
1901 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1902 		 * currently can use a while loop here. */
1903 		while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1904 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1905 			uring_sock_group_impl_poll(_group, 32, NULL);
1906 		}
1907 	}
1908 
1909 	if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1910 		_sock_prep_cancel_task(_sock, &sock->errqueue_task);
1911 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1912 		 * currently can use a while loop here. */
1913 		while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1914 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1915 			uring_sock_group_impl_poll(_group, 32, NULL);
1916 		}
1917 	}
1918 
1919 	/* Make sure the cancelling the tasks above didn't cause sending new requests */
1920 	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1921 	assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1922 	assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1923 
1924 	if (sock->pending_recv) {
1925 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1926 		sock->pending_recv = false;
1927 	}
1928 	assert(sock->pending_recv == false);
1929 
1930 	/* We have no way to handle this case. We could let the user read this
1931 	 * buffer, but the buffer came from a group and we have lost the association
1932 	 * to that so we couldn't release it. */
1933 	assert(STAILQ_EMPTY(&sock->recv_stream));
1934 
1935 	if (sock->placement_id != -1) {
1936 		spdk_sock_map_release(&g_map, sock->placement_id);
1937 	}
1938 
1939 	sock->pending_group_remove = false;
1940 	sock->group = NULL;
1941 	return 0;
1942 }
1943 
1944 static int
1945 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1946 {
1947 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1948 
1949 	/* try to reap all the active I/O */
1950 	while (group->io_inflight) {
1951 		uring_sock_group_impl_poll(_group, 32, NULL);
1952 	}
1953 	assert(group->io_inflight == 0);
1954 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1955 
1956 	uring_sock_group_impl_buf_pool_free(group);
1957 
1958 	io_uring_queue_exit(&group->uring);
1959 
1960 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1961 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1962 	}
1963 
1964 	free(group);
1965 	return 0;
1966 }
1967 
1968 static int
1969 uring_sock_flush(struct spdk_sock *_sock)
1970 {
1971 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1972 	struct msghdr msg = {};
1973 	struct iovec iovs[IOV_BATCH_SIZE];
1974 	int iovcnt;
1975 	ssize_t rc;
1976 	int flags = sock->zcopy_send_flags;
1977 	int retval;
1978 	bool is_zcopy = false;
1979 	struct spdk_uring_task *task = &sock->errqueue_task;
1980 
1981 	/* Can't flush from within a callback or we end up with recursive calls */
1982 	if (_sock->cb_cnt > 0) {
1983 		errno = EAGAIN;
1984 		return -1;
1985 	}
1986 
1987 	/* Can't flush while a write is already outstanding */
1988 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1989 		errno = EAGAIN;
1990 		return -1;
1991 	}
1992 
1993 	/* Gather an iov */
1994 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
1995 	if (iovcnt == 0) {
1996 		/* Nothing to send */
1997 		return 0;
1998 	}
1999 
2000 	/* Perform the vectored write */
2001 	msg.msg_iov = iovs;
2002 	msg.msg_iovlen = iovcnt;
2003 	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
2004 	if (rc <= 0) {
2005 		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) {
2006 			errno = EAGAIN;
2007 		}
2008 		return -1;
2009 	}
2010 
2011 #ifdef SPDK_ZEROCOPY
2012 	is_zcopy = flags & MSG_ZEROCOPY;
2013 #endif
2014 	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
2015 	if (retval < 0) {
2016 		/* if the socket is closed, return to avoid heap-use-after-free error */
2017 		errno = ENOTCONN;
2018 		return -1;
2019 	}
2020 
2021 #ifdef SPDK_ZEROCOPY
2022 	/* At least do once to check zero copy case */
2023 	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
2024 		retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE);
2025 		if (retval < 0) {
2026 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
2027 				return rc;
2028 			}
2029 		}
2030 		_sock_check_zcopy(_sock, retval);;
2031 	}
2032 #endif
2033 
2034 	return rc;
2035 }
2036 
2037 static int
2038 uring_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events,
2039 		spdk_interrupt_fn fn, void *arg, const char *name)
2040 {
2041 	SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
2042 
2043 	return -ENOTSUP;
2044 }
2045 
2046 static void
2047 uring_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group)
2048 {
2049 }
2050 
2051 static struct spdk_net_impl g_uring_net_impl = {
2052 	.name		= "uring",
2053 	.getaddr	= uring_sock_getaddr,
2054 	.get_interface_name = uring_sock_get_interface_name,
2055 	.get_numa_socket_id = uring_sock_get_numa_socket_id,
2056 	.connect	= uring_sock_connect,
2057 	.listen		= uring_sock_listen,
2058 	.accept		= uring_sock_accept,
2059 	.close		= uring_sock_close,
2060 	.recv		= uring_sock_recv,
2061 	.readv		= uring_sock_readv,
2062 	.writev		= uring_sock_writev,
2063 	.recv_next	= uring_sock_recv_next,
2064 	.writev_async	= uring_sock_writev_async,
2065 	.flush          = uring_sock_flush,
2066 	.set_recvlowat	= uring_sock_set_recvlowat,
2067 	.set_recvbuf	= uring_sock_set_recvbuf,
2068 	.set_sendbuf	= uring_sock_set_sendbuf,
2069 	.is_ipv6	= uring_sock_is_ipv6,
2070 	.is_ipv4	= uring_sock_is_ipv4,
2071 	.is_connected   = uring_sock_is_connected,
2072 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
2073 	.group_impl_create	= uring_sock_group_impl_create,
2074 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
2075 	.group_impl_remove_sock	= uring_sock_group_impl_remove_sock,
2076 	.group_impl_poll	= uring_sock_group_impl_poll,
2077 	.group_impl_register_interrupt    = uring_sock_group_impl_register_interrupt,
2078 	.group_impl_unregister_interrupt  = uring_sock_group_impl_unregister_interrupt,
2079 	.group_impl_close	= uring_sock_group_impl_close,
2080 	.get_opts		= uring_sock_impl_get_opts,
2081 	.set_opts		= uring_sock_impl_set_opts,
2082 };
2083 
2084 __attribute__((constructor)) static void
2085 net_impl_register_uring(void)
2086 {
2087 	struct spdk_sock_group_impl *impl;
2088 
2089 	/* Check if we can create a uring sock group before we register
2090 	 * it as a valid impl. */
2091 	impl = uring_sock_group_impl_create();
2092 	if (impl) {
2093 		uring_sock_group_impl_close(impl);
2094 		spdk_net_impl_register(&g_uring_net_impl);
2095 	}
2096 }
2097