xref: /spdk/module/sock/uring/uring.c (revision 30afc27748e69257ca50f7e3a4b4ca6466ffc26b)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/config.h"
8 
9 #include <linux/errqueue.h>
10 #include <sys/epoll.h>
11 #include <liburing.h>
12 
13 #include "spdk/barrier.h"
14 #include "spdk/env.h"
15 #include "spdk/log.h"
16 #include "spdk/pipe.h"
17 #include "spdk/sock.h"
18 #include "spdk/string.h"
19 #include "spdk/util.h"
20 #include "spdk/net.h"
21 #include "spdk/file.h"
22 
23 #include "spdk_internal/sock.h"
24 #include "spdk_internal/assert.h"
25 #include "spdk/net.h"
26 
27 #define MAX_TMPBUF 1024
28 #define PORTNUMLEN 32
29 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
30 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
31 
32 enum uring_task_type {
33 	URING_TASK_READ = 0,
34 	URING_TASK_ERRQUEUE,
35 	URING_TASK_WRITE,
36 	URING_TASK_CANCEL,
37 };
38 
39 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
40 #define SPDK_ZEROCOPY
41 #endif
42 
43 /* We don't know how big the buffers that the user posts will be, but this
44  * is the maximum we'll ever allow it to receive in a single command.
45  * If the user buffers are smaller, it will just receive less. */
46 #define URING_MAX_RECV_SIZE (128 * 1024)
47 
48 /* We don't know how many buffers the user will post, but this is the
49  * maximum number we'll take from the pool to post per group. */
50 #define URING_BUF_POOL_SIZE 128
51 
52 /* We use 1 just so it's not zero and we can validate it's right. */
53 #define URING_BUF_GROUP_ID 1
54 
55 enum spdk_uring_sock_task_status {
56 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
57 	SPDK_URING_SOCK_TASK_IN_PROCESS,
58 };
59 
60 struct spdk_uring_task {
61 	enum spdk_uring_sock_task_status	status;
62 	enum uring_task_type		type;
63 	struct spdk_uring_sock			*sock;
64 	struct msghdr				msg;
65 	struct iovec				iovs[IOV_BATCH_SIZE];
66 	int					iov_cnt;
67 	struct spdk_sock_request		*last_req;
68 	bool					is_zcopy;
69 	STAILQ_ENTRY(spdk_uring_task)		link;
70 };
71 
72 struct spdk_uring_sock {
73 	struct spdk_sock			base;
74 	int					fd;
75 	uint32_t				sendmsg_idx;
76 	struct spdk_uring_sock_group_impl	*group;
77 	STAILQ_HEAD(, spdk_uring_buf_tracker)	recv_stream;
78 	size_t					recv_offset;
79 	struct spdk_uring_task			write_task;
80 	struct spdk_uring_task			errqueue_task;
81 	struct spdk_uring_task			read_task;
82 	struct spdk_uring_task			cancel_task;
83 	struct spdk_pipe			*recv_pipe;
84 	void					*recv_buf;
85 	int					recv_buf_sz;
86 	bool					zcopy;
87 	bool					pending_recv;
88 	bool					pending_group_remove;
89 	int					zcopy_send_flags;
90 	int					connection_status;
91 	int					placement_id;
92 	uint8_t                                 reserved[4];
93 	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
94 	TAILQ_ENTRY(spdk_uring_sock)		link;
95 	char					interface_name[IFNAMSIZ];
96 };
97 /* 'struct cmsghdr' is mapped to the buffer 'buf', and while first element
98  * of this control message header has a size of 8 bytes, 'buf'
99  * must be 8-byte aligned.
100  */
101 SPDK_STATIC_ASSERT(offsetof(struct spdk_uring_sock, buf) % 8 == 0,
102 		   "Incorrect alignment: `buf` must be aligned to 8 bytes");
103 
104 TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
105 
106 struct spdk_uring_buf_tracker {
107 	void					*buf;
108 	size_t					buflen;
109 	size_t					len;
110 	void					*ctx;
111 	int					id;
112 	STAILQ_ENTRY(spdk_uring_buf_tracker)	link;
113 };
114 
115 struct spdk_uring_sock_group_impl {
116 	struct spdk_sock_group_impl		base;
117 	struct io_uring				uring;
118 	uint32_t				io_inflight;
119 	uint32_t				io_queued;
120 	uint32_t				io_avail;
121 	struct pending_recv_list		pending_recv;
122 
123 	struct io_uring_buf_ring		*buf_ring;
124 	uint32_t				buf_ring_count;
125 	struct spdk_uring_buf_tracker		*trackers;
126 	STAILQ_HEAD(, spdk_uring_buf_tracker)	free_trackers;
127 };
128 
129 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
130 	.recv_buf_size = DEFAULT_SO_RCVBUF_SIZE,
131 	.send_buf_size = DEFAULT_SO_SNDBUF_SIZE,
132 	.enable_recv_pipe = true,
133 	.enable_quickack = false,
134 	.enable_placement_id = PLACEMENT_NONE,
135 	.enable_zerocopy_send_server = false,
136 	.enable_zerocopy_send_client = false,
137 	.zerocopy_threshold = 0,
138 	.tls_version = 0,
139 	.enable_ktls = false,
140 	.psk_key = NULL,
141 	.psk_identity = NULL
142 };
143 
144 static struct spdk_sock_map g_map = {
145 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
146 	.mtx = PTHREAD_MUTEX_INITIALIZER
147 };
148 
149 __attribute((destructor)) static void
150 uring_sock_map_cleanup(void)
151 {
152 	spdk_sock_map_cleanup(&g_map);
153 }
154 
155 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
156 
157 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
158 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
159 
160 static void
161 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
162 			  size_t len)
163 {
164 #define FIELD_OK(field) \
165 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
166 
167 #define SET_FIELD(field) \
168 	if (FIELD_OK(field)) { \
169 		dest->field = src->field; \
170 	}
171 
172 	SET_FIELD(recv_buf_size);
173 	SET_FIELD(send_buf_size);
174 	SET_FIELD(enable_recv_pipe);
175 	SET_FIELD(enable_quickack);
176 	SET_FIELD(enable_placement_id);
177 	SET_FIELD(enable_zerocopy_send_server);
178 	SET_FIELD(enable_zerocopy_send_client);
179 	SET_FIELD(zerocopy_threshold);
180 	SET_FIELD(tls_version);
181 	SET_FIELD(enable_ktls);
182 	SET_FIELD(psk_key);
183 	SET_FIELD(psk_identity);
184 
185 #undef SET_FIELD
186 #undef FIELD_OK
187 }
188 
189 static int
190 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
191 {
192 	if (!opts || !len) {
193 		errno = EINVAL;
194 		return -1;
195 	}
196 
197 	assert(sizeof(*opts) >= *len);
198 	memset(opts, 0, *len);
199 
200 	uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len);
201 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
202 
203 	return 0;
204 }
205 
206 static int
207 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
208 {
209 	if (!opts) {
210 		errno = EINVAL;
211 		return -1;
212 	}
213 
214 	assert(sizeof(*opts) >= len);
215 	uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len);
216 
217 	return 0;
218 }
219 
220 static void
221 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
222 {
223 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
224 	memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest));
225 
226 	if (opts->impl_opts != NULL) {
227 		assert(sizeof(*dest) >= opts->impl_opts_size);
228 		uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
229 	}
230 }
231 
232 static int
233 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
234 		   char *caddr, int clen, uint16_t *cport)
235 {
236 	struct spdk_uring_sock *sock = __uring_sock(_sock);
237 
238 	assert(sock != NULL);
239 	return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport);
240 }
241 
242 static const char *
243 uring_sock_get_interface_name(struct spdk_sock *_sock)
244 {
245 	struct spdk_uring_sock *sock = __uring_sock(_sock);
246 	char saddr[64];
247 	int rc;
248 
249 	rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL);
250 	if (rc != 0) {
251 		return NULL;
252 	}
253 
254 	rc = spdk_net_get_interface_name(saddr, sock->interface_name,
255 					 sizeof(sock->interface_name));
256 	if (rc != 0) {
257 		return NULL;
258 	}
259 
260 	return sock->interface_name;
261 }
262 
263 static int32_t
264 uring_sock_get_numa_id(struct spdk_sock *sock)
265 {
266 	const char *interface_name;
267 	uint32_t numa_id;
268 	int rc;
269 
270 	interface_name = uring_sock_get_interface_name(sock);
271 	if (interface_name == NULL) {
272 		return SPDK_ENV_NUMA_ID_ANY;
273 	}
274 
275 	rc = spdk_read_sysfs_attribute_uint32(&numa_id,
276 					      "/sys/class/net/%s/device/numa_node", interface_name);
277 	if (rc == 0 && numa_id <= INT32_MAX) {
278 		return (int32_t)numa_id;
279 	} else {
280 		return SPDK_ENV_NUMA_ID_ANY;
281 	}
282 }
283 
284 enum uring_sock_create_type {
285 	SPDK_SOCK_CREATE_LISTEN,
286 	SPDK_SOCK_CREATE_CONNECT,
287 };
288 
289 static int
290 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
291 {
292 	uint8_t *new_buf;
293 	struct spdk_pipe *new_pipe;
294 	struct iovec siov[2];
295 	struct iovec diov[2];
296 	int sbytes;
297 	ssize_t bytes;
298 	int rc;
299 
300 	if (sock->recv_buf_sz == sz) {
301 		return 0;
302 	}
303 
304 	/* If the new size is 0, just free the pipe */
305 	if (sz == 0) {
306 		spdk_pipe_destroy(sock->recv_pipe);
307 		free(sock->recv_buf);
308 		sock->recv_pipe = NULL;
309 		sock->recv_buf = NULL;
310 		return 0;
311 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
312 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
313 		return -1;
314 	}
315 
316 	/* Round up to next 64 byte multiple */
317 	rc = posix_memalign((void **)&new_buf, 64, sz);
318 	if (rc != 0) {
319 		SPDK_ERRLOG("socket recv buf allocation failed\n");
320 		return -ENOMEM;
321 	}
322 	memset(new_buf, 0, sz);
323 
324 	new_pipe = spdk_pipe_create(new_buf, sz);
325 	if (new_pipe == NULL) {
326 		SPDK_ERRLOG("socket pipe allocation failed\n");
327 		free(new_buf);
328 		return -ENOMEM;
329 	}
330 
331 	if (sock->recv_pipe != NULL) {
332 		/* Pull all of the data out of the old pipe */
333 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
334 		if (sbytes > sz) {
335 			/* Too much data to fit into the new pipe size */
336 			spdk_pipe_destroy(new_pipe);
337 			free(new_buf);
338 			return -EINVAL;
339 		}
340 
341 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
342 		assert(sbytes == sz);
343 
344 		bytes = spdk_iovcpy(siov, 2, diov, 2);
345 		spdk_pipe_writer_advance(new_pipe, bytes);
346 
347 		spdk_pipe_destroy(sock->recv_pipe);
348 		free(sock->recv_buf);
349 	}
350 
351 	sock->recv_buf_sz = sz;
352 	sock->recv_buf = new_buf;
353 	sock->recv_pipe = new_pipe;
354 
355 	return 0;
356 }
357 
358 static int
359 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
360 {
361 	struct spdk_uring_sock *sock = __uring_sock(_sock);
362 	int min_size;
363 	int rc;
364 
365 	assert(sock != NULL);
366 
367 	if (_sock->impl_opts.enable_recv_pipe) {
368 		rc = uring_sock_alloc_pipe(sock, sz);
369 		if (rc) {
370 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
371 			return rc;
372 		}
373 	}
374 
375 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
376 	 * g_spdk_uring_sock_impl_opts.recv_buf_size. */
377 	min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size);
378 
379 	if (sz < min_size) {
380 		sz = min_size;
381 	}
382 
383 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
384 	if (rc < 0) {
385 		return rc;
386 	}
387 
388 	_sock->impl_opts.recv_buf_size = sz;
389 
390 	return 0;
391 }
392 
393 static int
394 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
395 {
396 	struct spdk_uring_sock *sock = __uring_sock(_sock);
397 	int min_size;
398 	int rc;
399 
400 	assert(sock != NULL);
401 
402 	/* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
403 	 * g_spdk_uring_sock_impl_opts.seend_buf_size. */
404 	min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size);
405 
406 	if (sz < min_size) {
407 		sz = min_size;
408 	}
409 
410 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
411 	if (rc < 0) {
412 		return rc;
413 	}
414 
415 	_sock->impl_opts.send_buf_size = sz;
416 
417 	return 0;
418 }
419 
420 static struct spdk_uring_sock *
421 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
422 {
423 	struct spdk_uring_sock *sock;
424 #if defined(__linux__)
425 	int flag;
426 	int rc;
427 #endif
428 
429 	sock = calloc(1, sizeof(*sock));
430 	if (sock == NULL) {
431 		SPDK_ERRLOG("sock allocation failed\n");
432 		return NULL;
433 	}
434 
435 	sock->fd = fd;
436 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
437 
438 	STAILQ_INIT(&sock->recv_stream);
439 
440 #if defined(__linux__)
441 	flag = 1;
442 
443 	if (sock->base.impl_opts.enable_quickack) {
444 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
445 		if (rc != 0) {
446 			SPDK_ERRLOG("quickack was failed to set\n");
447 		}
448 	}
449 
450 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
451 				   &sock->placement_id);
452 #ifdef SPDK_ZEROCOPY
453 	/* Try to turn on zero copy sends */
454 	flag = 1;
455 
456 	if (enable_zero_copy) {
457 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
458 		if (rc == 0) {
459 			sock->zcopy = true;
460 			sock->zcopy_send_flags = MSG_ZEROCOPY;
461 		}
462 	}
463 #endif
464 #endif
465 
466 	return sock;
467 }
468 
469 static struct spdk_sock *
470 uring_sock_create(const char *ip, int port,
471 		  enum uring_sock_create_type type,
472 		  struct spdk_sock_opts *opts)
473 {
474 	struct spdk_uring_sock *sock;
475 	struct spdk_sock_impl_opts impl_opts;
476 	char buf[MAX_TMPBUF];
477 	char portnum[PORTNUMLEN];
478 	char *p;
479 	const char *src_addr;
480 	uint16_t src_port;
481 	struct addrinfo hints, *res, *res0, *src_ai;
482 	int fd, flag;
483 	int val = 1;
484 	int rc;
485 	bool enable_zcopy_impl_opts = false;
486 	bool enable_zcopy_user_opts = true;
487 
488 	assert(opts != NULL);
489 	uring_opts_get_impl_opts(opts, &impl_opts);
490 
491 	if (ip == NULL) {
492 		return NULL;
493 	}
494 	if (ip[0] == '[') {
495 		snprintf(buf, sizeof(buf), "%s", ip + 1);
496 		p = strchr(buf, ']');
497 		if (p != NULL) {
498 			*p = '\0';
499 		}
500 		ip = (const char *) &buf[0];
501 	}
502 
503 	snprintf(portnum, sizeof portnum, "%d", port);
504 	memset(&hints, 0, sizeof hints);
505 	hints.ai_family = PF_UNSPEC;
506 	hints.ai_socktype = SOCK_STREAM;
507 	hints.ai_flags = AI_NUMERICSERV;
508 	hints.ai_flags |= AI_PASSIVE;
509 	hints.ai_flags |= AI_NUMERICHOST;
510 	rc = getaddrinfo(ip, portnum, &hints, &res0);
511 	if (rc != 0) {
512 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
513 		return NULL;
514 	}
515 
516 	/* try listen */
517 	fd = -1;
518 	for (res = res0; res != NULL; res = res->ai_next) {
519 retry:
520 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
521 		if (fd < 0) {
522 			/* error */
523 			continue;
524 		}
525 
526 		val = impl_opts.recv_buf_size;
527 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
528 		if (rc) {
529 			/* Not fatal */
530 		}
531 
532 		val = impl_opts.send_buf_size;
533 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
534 		if (rc) {
535 			/* Not fatal */
536 		}
537 
538 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
539 		if (rc != 0) {
540 			close(fd);
541 			fd = -1;
542 			/* error */
543 			continue;
544 		}
545 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
546 		if (rc != 0) {
547 			close(fd);
548 			fd = -1;
549 			/* error */
550 			continue;
551 		}
552 
553 		if (opts->ack_timeout) {
554 #if defined(__linux__)
555 			val = opts->ack_timeout;
556 			rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val);
557 			if (rc != 0) {
558 				close(fd);
559 				fd = -1;
560 				/* error */
561 				continue;
562 			}
563 #else
564 			SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
565 #endif
566 		}
567 
568 
569 
570 #if defined(SO_PRIORITY)
571 		if (opts != NULL && opts->priority) {
572 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
573 			if (rc != 0) {
574 				close(fd);
575 				fd = -1;
576 				/* error */
577 				continue;
578 			}
579 		}
580 #endif
581 		if (res->ai_family == AF_INET6) {
582 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
583 			if (rc != 0) {
584 				close(fd);
585 				fd = -1;
586 				/* error */
587 				continue;
588 			}
589 		}
590 
591 		if (type == SPDK_SOCK_CREATE_LISTEN) {
592 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
593 			if (rc != 0) {
594 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
595 				switch (errno) {
596 				case EINTR:
597 					/* interrupted? */
598 					close(fd);
599 					goto retry;
600 				case EADDRNOTAVAIL:
601 					SPDK_ERRLOG("IP address %s not available. "
602 						    "Verify IP address in config file "
603 						    "and make sure setup script is "
604 						    "run before starting spdk app.\n", ip);
605 				/* FALLTHROUGH */
606 				default:
607 					/* try next family */
608 					close(fd);
609 					fd = -1;
610 					continue;
611 				}
612 			}
613 			/* bind OK */
614 			rc = listen(fd, 512);
615 			if (rc != 0) {
616 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
617 				close(fd);
618 				fd = -1;
619 				break;
620 			}
621 
622 			flag = fcntl(fd, F_GETFL);
623 			if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
624 				SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
625 				close(fd);
626 				fd = -1;
627 				break;
628 			}
629 
630 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
631 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
632 			src_addr = SPDK_GET_FIELD(opts, src_addr, NULL, opts->opts_size);
633 			src_port = SPDK_GET_FIELD(opts, src_port, 0, opts->opts_size);
634 			if (src_addr != NULL || src_port != 0) {
635 				snprintf(portnum, sizeof(portnum), "%"PRIu16, src_port);
636 				memset(&hints, 0, sizeof hints);
637 				hints.ai_family = AF_UNSPEC;
638 				hints.ai_socktype = SOCK_STREAM;
639 				hints.ai_flags = AI_NUMERICSERV | AI_NUMERICHOST | AI_PASSIVE;
640 				rc = getaddrinfo(src_addr, src_port > 0 ? portnum : NULL,
641 						 &hints, &src_ai);
642 				if (rc != 0 || src_ai == NULL) {
643 					SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n",
644 						    rc != 0 ? gai_strerror(rc) : "", rc);
645 					close(fd);
646 					fd = -1;
647 					break;
648 				}
649 				rc = bind(fd, src_ai->ai_addr, src_ai->ai_addrlen);
650 				if (rc != 0) {
651 					SPDK_ERRLOG("bind() failed errno %d (%s:%s)\n", errno,
652 						    src_addr ? src_addr : "", portnum);
653 					close(fd);
654 					fd = -1;
655 					freeaddrinfo(src_ai);
656 					src_ai = NULL;
657 					break;
658 				}
659 				freeaddrinfo(src_ai);
660 				src_ai = NULL;
661 			}
662 
663 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
664 			if (rc != 0) {
665 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
666 				/* try next family */
667 				close(fd);
668 				fd = -1;
669 				continue;
670 			}
671 
672 			flag = fcntl(fd, F_GETFL);
673 			if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
674 				SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
675 				close(fd);
676 				fd = -1;
677 				break;
678 			}
679 
680 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
681 		}
682 		break;
683 	}
684 	freeaddrinfo(res0);
685 
686 	if (fd < 0) {
687 		return NULL;
688 	}
689 
690 	enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd);
691 	sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
692 	if (sock == NULL) {
693 		SPDK_ERRLOG("sock allocation failed\n");
694 		close(fd);
695 		return NULL;
696 	}
697 
698 	return &sock->base;
699 }
700 
701 static struct spdk_sock *
702 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
703 {
704 	if (spdk_interrupt_mode_is_enabled()) {
705 		SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
706 		return NULL;
707 	}
708 
709 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
710 }
711 
712 static struct spdk_sock *
713 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
714 {
715 	if (spdk_interrupt_mode_is_enabled()) {
716 		SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
717 		return NULL;
718 	}
719 
720 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
721 }
722 
723 static struct spdk_sock *
724 uring_sock_accept(struct spdk_sock *_sock)
725 {
726 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
727 	struct sockaddr_storage		sa;
728 	socklen_t			salen;
729 	int				rc, fd;
730 	struct spdk_uring_sock		*new_sock;
731 	int				flag;
732 
733 	memset(&sa, 0, sizeof(sa));
734 	salen = sizeof(sa);
735 
736 	assert(sock != NULL);
737 
738 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
739 
740 	if (rc == -1) {
741 		return NULL;
742 	}
743 
744 	fd = rc;
745 
746 	flag = fcntl(fd, F_GETFL);
747 	if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
748 		SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
749 		close(fd);
750 		return NULL;
751 	}
752 
753 #if defined(SO_PRIORITY)
754 	/* The priority is not inherited, so call this function again */
755 	if (sock->base.opts.priority) {
756 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
757 		if (rc != 0) {
758 			close(fd);
759 			return NULL;
760 		}
761 	}
762 #endif
763 
764 	new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
765 	if (new_sock == NULL) {
766 		close(fd);
767 		return NULL;
768 	}
769 
770 	return &new_sock->base;
771 }
772 
773 static int
774 uring_sock_close(struct spdk_sock *_sock)
775 {
776 	struct spdk_uring_sock *sock = __uring_sock(_sock);
777 
778 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
779 	assert(sock->group == NULL);
780 
781 	/* If the socket fails to close, the best choice is to
782 	 * leak the fd but continue to free the rest of the sock
783 	 * memory. */
784 	close(sock->fd);
785 
786 	spdk_pipe_destroy(sock->recv_pipe);
787 	free(sock->recv_buf);
788 	free(sock);
789 
790 	return 0;
791 }
792 
793 static ssize_t
794 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
795 {
796 	struct iovec siov[2];
797 	int sbytes;
798 	ssize_t bytes;
799 	struct spdk_uring_sock_group_impl *group;
800 
801 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
802 	if (sbytes < 0) {
803 		errno = EINVAL;
804 		return -1;
805 	} else if (sbytes == 0) {
806 		errno = EAGAIN;
807 		return -1;
808 	}
809 
810 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
811 
812 	if (bytes == 0) {
813 		/* The only way this happens is if diov is 0 length */
814 		errno = EINVAL;
815 		return -1;
816 	}
817 
818 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
819 
820 	/* If we drained the pipe, take it off the level-triggered list */
821 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
822 		group = __uring_group_impl(sock->base.group_impl);
823 		TAILQ_REMOVE(&group->pending_recv, sock, link);
824 		sock->pending_recv = false;
825 	}
826 
827 	return bytes;
828 }
829 
830 static inline ssize_t
831 sock_readv(int fd, struct iovec *iov, int iovcnt)
832 {
833 	struct msghdr msg = {
834 		.msg_iov = iov,
835 		.msg_iovlen = iovcnt,
836 	};
837 
838 	return recvmsg(fd, &msg, MSG_DONTWAIT);
839 }
840 
841 static inline ssize_t
842 uring_sock_read(struct spdk_uring_sock *sock)
843 {
844 	struct iovec iov[2];
845 	int bytes;
846 	struct spdk_uring_sock_group_impl *group;
847 
848 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
849 
850 	if (bytes > 0) {
851 		bytes = sock_readv(sock->fd, iov, 2);
852 		if (bytes > 0) {
853 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
854 			if (sock->base.group_impl && !sock->pending_recv) {
855 				group = __uring_group_impl(sock->base.group_impl);
856 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
857 				sock->pending_recv = true;
858 			}
859 		}
860 	}
861 
862 	return bytes;
863 }
864 
865 static int
866 uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx)
867 {
868 	struct spdk_uring_sock *sock = __uring_sock(_sock);
869 	struct spdk_uring_sock_group_impl *group;
870 	struct spdk_uring_buf_tracker *tr;
871 
872 	if (sock->connection_status < 0) {
873 		errno = -sock->connection_status;
874 		return -1;
875 	}
876 
877 	if (sock->recv_pipe != NULL) {
878 		errno = ENOTSUP;
879 		return -1;
880 	}
881 
882 	group = __uring_group_impl(_sock->group_impl);
883 
884 	tr = STAILQ_FIRST(&sock->recv_stream);
885 	if (tr == NULL) {
886 		if (sock->group->buf_ring_count > 0) {
887 			/* There are buffers posted, but data hasn't arrived. */
888 			errno = EAGAIN;
889 		} else {
890 			/* There are no buffers posted, so this won't ever
891 			 * make forward progress. */
892 			errno = ENOBUFS;
893 		}
894 		return -1;
895 	}
896 	assert(sock->pending_recv == true);
897 	assert(tr->buf != NULL);
898 
899 	*_buf = tr->buf + sock->recv_offset;
900 	*ctx = tr->ctx;
901 
902 	STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
903 	STAILQ_INSERT_HEAD(&group->free_trackers, tr, link);
904 
905 	if (STAILQ_EMPTY(&sock->recv_stream)) {
906 		sock->pending_recv = false;
907 		TAILQ_REMOVE(&group->pending_recv, sock, link);
908 	}
909 
910 	return tr->len - sock->recv_offset;
911 }
912 
913 static ssize_t
914 uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt)
915 {
916 	struct spdk_uring_sock *sock = __uring_sock(_sock);
917 	struct spdk_uring_buf_tracker *tr;
918 	struct iovec iov;
919 	ssize_t total, len;
920 	int i;
921 
922 	if (sock->connection_status < 0) {
923 		errno = -sock->connection_status;
924 		return -1;
925 	}
926 
927 	if (_sock->group_impl == NULL) {
928 		/* If not in a group just read from the socket the regular way. */
929 		return sock_readv(sock->fd, iovs, iovcnt);
930 	}
931 
932 	if (STAILQ_EMPTY(&sock->recv_stream)) {
933 		if (sock->group->buf_ring_count == 0) {
934 			/* If the user hasn't posted any buffers, read from the socket
935 			 * directly. */
936 
937 			if (sock->pending_recv) {
938 				sock->pending_recv = false;
939 				TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link);
940 			}
941 
942 			return sock_readv(sock->fd, iovs, iovcnt);
943 		}
944 
945 		errno = EAGAIN;
946 		return -1;
947 	}
948 
949 	total = 0;
950 	for (i = 0; i < iovcnt; i++) {
951 		/* Copy to stack so we can change it */
952 		iov = iovs[i];
953 
954 		tr = STAILQ_FIRST(&sock->recv_stream);
955 		while (tr != NULL) {
956 			len = spdk_min(iov.iov_len, tr->len - sock->recv_offset);
957 			memcpy(iov.iov_base, tr->buf + sock->recv_offset, len);
958 
959 			total += len;
960 			sock->recv_offset += len;
961 			iov.iov_base += len;
962 			iov.iov_len -= len;
963 
964 			if (sock->recv_offset == tr->len) {
965 				sock->recv_offset = 0;
966 				STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
967 				STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link);
968 				spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx);
969 				tr = STAILQ_FIRST(&sock->recv_stream);
970 			}
971 
972 			if (iov.iov_len == 0) {
973 				break;
974 			}
975 		}
976 	}
977 
978 	if (STAILQ_EMPTY(&sock->recv_stream)) {
979 		struct spdk_uring_sock_group_impl *group;
980 
981 		group = __uring_group_impl(_sock->group_impl);
982 		sock->pending_recv = false;
983 		TAILQ_REMOVE(&group->pending_recv, sock, link);
984 	}
985 
986 	assert(total > 0);
987 	return total;
988 }
989 
990 static ssize_t
991 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
992 {
993 	struct spdk_uring_sock *sock = __uring_sock(_sock);
994 	int rc, i;
995 	size_t len;
996 
997 	if (sock->connection_status < 0) {
998 		errno = -sock->connection_status;
999 		return -1;
1000 	}
1001 
1002 	if (sock->recv_pipe == NULL) {
1003 		return uring_sock_readv_no_pipe(_sock, iov, iovcnt);
1004 	}
1005 
1006 	len = 0;
1007 	for (i = 0; i < iovcnt; i++) {
1008 		len += iov[i].iov_len;
1009 	}
1010 
1011 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
1012 		/* If the user is receiving a sufficiently large amount of data,
1013 		 * receive directly to their buffers. */
1014 		if (len >= MIN_SOCK_PIPE_SIZE) {
1015 			return sock_readv(sock->fd, iov, iovcnt);
1016 		}
1017 
1018 		/* Otherwise, do a big read into our pipe */
1019 		rc = uring_sock_read(sock);
1020 		if (rc <= 0) {
1021 			return rc;
1022 		}
1023 	}
1024 
1025 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
1026 }
1027 
1028 static ssize_t
1029 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
1030 {
1031 	struct iovec iov[1];
1032 
1033 	iov[0].iov_base = buf;
1034 	iov[0].iov_len = len;
1035 
1036 	return uring_sock_readv(sock, iov, 1);
1037 }
1038 
1039 static ssize_t
1040 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1041 {
1042 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1043 	struct msghdr msg = {
1044 		.msg_iov = iov,
1045 		.msg_iovlen = iovcnt,
1046 	};
1047 
1048 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1049 		errno = EAGAIN;
1050 		return -1;
1051 	}
1052 
1053 	return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
1054 }
1055 
1056 static ssize_t
1057 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc)
1058 {
1059 	unsigned int offset;
1060 	size_t len;
1061 	int i;
1062 
1063 	offset = req->internal.offset;
1064 	for (i = 0; i < req->iovcnt; i++) {
1065 		/* Advance by the offset first */
1066 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
1067 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
1068 			continue;
1069 		}
1070 
1071 		/* Calculate the remaining length of this element */
1072 		len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
1073 
1074 		if (len > (size_t)rc) {
1075 			req->internal.offset += rc;
1076 			return -1;
1077 		}
1078 
1079 		offset = 0;
1080 		req->internal.offset += len;
1081 		rc -= len;
1082 	}
1083 
1084 	return rc;
1085 }
1086 
1087 static int
1088 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
1089 {
1090 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1091 	struct spdk_sock_request *req;
1092 	int retval;
1093 
1094 	if (is_zcopy) {
1095 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
1096 		 * req->internal.offset, so sendmsg_idx should not be zero */
1097 		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
1098 			sock->sendmsg_idx = 1;
1099 		} else {
1100 			sock->sendmsg_idx++;
1101 		}
1102 	}
1103 
1104 	/* Consume the requests that were actually written */
1105 	req = TAILQ_FIRST(&_sock->queued_reqs);
1106 	while (req) {
1107 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
1108 		req->internal.is_zcopy = is_zcopy;
1109 
1110 		rc = sock_request_advance_offset(req, rc);
1111 		if (rc < 0) {
1112 			/* This element was partially sent. */
1113 			return 0;
1114 		}
1115 
1116 		/* Handled a full request. */
1117 		spdk_sock_request_pend(_sock, req);
1118 
1119 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
1120 			retval = spdk_sock_request_put(_sock, req, 0);
1121 			if (retval) {
1122 				return retval;
1123 			}
1124 		} else {
1125 			/* Re-use the offset field to hold the sendmsg call index. The
1126 			 * index is 0 based, so subtract one here because we've already
1127 			 * incremented above. */
1128 			req->internal.offset = sock->sendmsg_idx - 1;
1129 		}
1130 
1131 		if (rc == 0) {
1132 			break;
1133 		}
1134 
1135 		req = TAILQ_FIRST(&_sock->queued_reqs);
1136 	}
1137 
1138 	return 0;
1139 }
1140 
1141 #ifdef SPDK_ZEROCOPY
1142 static int
1143 _sock_check_zcopy(struct spdk_sock *_sock, int status)
1144 {
1145 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1146 	ssize_t rc;
1147 	struct sock_extended_err *serr;
1148 	struct cmsghdr *cm;
1149 	uint32_t idx;
1150 	struct spdk_sock_request *req, *treq;
1151 	bool found;
1152 
1153 	assert(sock->zcopy == true);
1154 	if (spdk_unlikely(status) < 0) {
1155 		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
1156 			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
1157 				    status);
1158 		} else {
1159 			SPDK_WARNLOG("Recvmsg yielded an error!\n");
1160 		}
1161 		return 0;
1162 	}
1163 
1164 	cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
1165 	if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
1166 	      (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) {
1167 		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
1168 		return 0;
1169 	}
1170 
1171 	serr = (struct sock_extended_err *)CMSG_DATA(cm);
1172 	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
1173 		SPDK_WARNLOG("Unexpected extended error origin\n");
1174 		return 0;
1175 	}
1176 
1177 	/* Most of the time, the pending_reqs array is in the exact
1178 	 * order we need such that all of the requests to complete are
1179 	 * in order, in the front. It is guaranteed that all requests
1180 	 * belonging to the same sendmsg call are sequential, so once
1181 	 * we encounter one match we can stop looping as soon as a
1182 	 * non-match is found.
1183 	 */
1184 	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
1185 		found = false;
1186 		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
1187 			if (!req->internal.is_zcopy) {
1188 				/* This wasn't a zcopy request. It was just waiting in line to complete */
1189 				rc = spdk_sock_request_put(_sock, req, 0);
1190 				if (rc < 0) {
1191 					return rc;
1192 				}
1193 			} else if (req->internal.offset == idx) {
1194 				found = true;
1195 				rc = spdk_sock_request_put(_sock, req, 0);
1196 				if (rc < 0) {
1197 					return rc;
1198 				}
1199 			} else if (found) {
1200 				break;
1201 			}
1202 		}
1203 	}
1204 
1205 	return 0;
1206 }
1207 
1208 static void
1209 _sock_prep_errqueue(struct spdk_sock *_sock)
1210 {
1211 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1212 	struct spdk_uring_task *task = &sock->errqueue_task;
1213 	struct io_uring_sqe *sqe;
1214 
1215 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1216 		return;
1217 	}
1218 
1219 	if (sock->pending_group_remove) {
1220 		return;
1221 	}
1222 
1223 	assert(sock->group != NULL);
1224 	sock->group->io_queued++;
1225 
1226 	sqe = io_uring_get_sqe(&sock->group->uring);
1227 	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
1228 	io_uring_sqe_set_data(sqe, task);
1229 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1230 }
1231 
1232 #endif
1233 
1234 static void
1235 _sock_flush(struct spdk_sock *_sock)
1236 {
1237 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1238 	struct spdk_uring_task *task = &sock->write_task;
1239 	uint32_t iovcnt;
1240 	struct io_uring_sqe *sqe;
1241 	int flags;
1242 
1243 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1244 		return;
1245 	}
1246 
1247 #ifdef SPDK_ZEROCOPY
1248 	if (sock->zcopy) {
1249 		flags = MSG_DONTWAIT | sock->zcopy_send_flags;
1250 	} else
1251 #endif
1252 	{
1253 		flags = MSG_DONTWAIT;
1254 	}
1255 
1256 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
1257 	if (!iovcnt) {
1258 		return;
1259 	}
1260 
1261 	task->iov_cnt = iovcnt;
1262 	assert(sock->group != NULL);
1263 	task->msg.msg_iov = task->iovs;
1264 	task->msg.msg_iovlen = task->iov_cnt;
1265 #ifdef SPDK_ZEROCOPY
1266 	task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
1267 #endif
1268 	sock->group->io_queued++;
1269 
1270 	sqe = io_uring_get_sqe(&sock->group->uring);
1271 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
1272 	io_uring_sqe_set_data(sqe, task);
1273 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1274 }
1275 
1276 static void
1277 _sock_prep_read(struct spdk_sock *_sock)
1278 {
1279 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1280 	struct spdk_uring_task *task = &sock->read_task;
1281 	struct io_uring_sqe *sqe;
1282 
1283 	/* Do not prepare read event */
1284 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1285 		return;
1286 	}
1287 
1288 	if (sock->pending_group_remove) {
1289 		return;
1290 	}
1291 
1292 	assert(sock->group != NULL);
1293 	sock->group->io_queued++;
1294 
1295 	sqe = io_uring_get_sqe(&sock->group->uring);
1296 	io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0);
1297 	sqe->buf_group = URING_BUF_GROUP_ID;
1298 	sqe->flags |= IOSQE_BUFFER_SELECT;
1299 	io_uring_sqe_set_data(sqe, task);
1300 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1301 }
1302 
1303 static void
1304 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
1305 {
1306 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1307 	struct spdk_uring_task *task = &sock->cancel_task;
1308 	struct io_uring_sqe *sqe;
1309 
1310 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1311 		return;
1312 	}
1313 
1314 	assert(sock->group != NULL);
1315 	sock->group->io_queued++;
1316 
1317 	sqe = io_uring_get_sqe(&sock->group->uring);
1318 	io_uring_prep_cancel(sqe, user_data, 0);
1319 	io_uring_sqe_set_data(sqe, task);
1320 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1321 }
1322 
1323 static void
1324 uring_sock_fail(struct spdk_uring_sock *sock, int status)
1325 {
1326 	struct spdk_uring_sock_group_impl *group = sock->group;
1327 	int rc;
1328 
1329 	sock->connection_status = status;
1330 	rc = spdk_sock_abort_requests(&sock->base);
1331 
1332 	/* The user needs to be notified that this socket is dead. */
1333 	if (rc == 0 && sock->base.cb_fn != NULL &&
1334 	    sock->pending_recv == false) {
1335 		sock->pending_recv = true;
1336 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1337 	}
1338 }
1339 
1340 static int
1341 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
1342 		      struct spdk_sock **socks)
1343 {
1344 	int i, count, ret;
1345 	struct io_uring_cqe *cqe;
1346 	struct spdk_uring_sock *sock, *tmp;
1347 	struct spdk_uring_task *task;
1348 	int status, bid, flags;
1349 	bool is_zcopy;
1350 
1351 	for (i = 0; i < max; i++) {
1352 		ret = io_uring_peek_cqe(&group->uring, &cqe);
1353 		if (ret != 0) {
1354 			break;
1355 		}
1356 
1357 		if (cqe == NULL) {
1358 			break;
1359 		}
1360 
1361 		task = (struct spdk_uring_task *)cqe->user_data;
1362 		assert(task != NULL);
1363 		sock = task->sock;
1364 		assert(sock != NULL);
1365 		assert(sock->group != NULL);
1366 		assert(sock->group == group);
1367 		sock->group->io_inflight--;
1368 		sock->group->io_avail++;
1369 		status = cqe->res;
1370 		flags = cqe->flags;
1371 		io_uring_cqe_seen(&group->uring, cqe);
1372 
1373 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
1374 
1375 		switch (task->type) {
1376 		case URING_TASK_READ:
1377 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
1378 				/* This likely shouldn't happen, but would indicate that the
1379 				 * kernel didn't have enough resources to queue a task internally. */
1380 				_sock_prep_read(&sock->base);
1381 			} else if (status == -ECANCELED) {
1382 				continue;
1383 			} else if (status == -ENOBUFS) {
1384 				/* There's data in the socket but the user hasn't provided any buffers.
1385 				 * We need to notify the user that the socket has data pending. */
1386 				if (sock->base.cb_fn != NULL &&
1387 				    sock->pending_recv == false) {
1388 					sock->pending_recv = true;
1389 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1390 				}
1391 
1392 				_sock_prep_read(&sock->base);
1393 			} else if (spdk_unlikely(status <= 0)) {
1394 				uring_sock_fail(sock, status < 0 ? status : -ECONNRESET);
1395 			} else {
1396 				struct spdk_uring_buf_tracker *tracker;
1397 
1398 				assert((flags & IORING_CQE_F_BUFFER) != 0);
1399 
1400 				bid = flags >> IORING_CQE_BUFFER_SHIFT;
1401 				tracker = &group->trackers[bid];
1402 
1403 				assert(tracker->buf != NULL);
1404 				assert(tracker->len != 0);
1405 
1406 				/* Append this data to the stream */
1407 				tracker->len = status;
1408 				STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link);
1409 				assert(group->buf_ring_count > 0);
1410 				group->buf_ring_count--;
1411 
1412 				if (sock->base.cb_fn != NULL &&
1413 				    sock->pending_recv == false) {
1414 					sock->pending_recv = true;
1415 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1416 				}
1417 
1418 				_sock_prep_read(&sock->base);
1419 			}
1420 			break;
1421 		case URING_TASK_WRITE:
1422 			if (status == -EAGAIN || status == -EWOULDBLOCK ||
1423 			    (status == -ENOBUFS && sock->zcopy) ||
1424 			    status == -ECANCELED) {
1425 				continue;
1426 			} else if (spdk_unlikely(status) < 0) {
1427 				uring_sock_fail(sock, status);
1428 			} else {
1429 				task->last_req = NULL;
1430 				task->iov_cnt = 0;
1431 				is_zcopy = task->is_zcopy;
1432 				task->is_zcopy = false;
1433 				sock_complete_write_reqs(&sock->base, status, is_zcopy);
1434 			}
1435 
1436 			break;
1437 #ifdef SPDK_ZEROCOPY
1438 		case URING_TASK_ERRQUEUE:
1439 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
1440 				_sock_prep_errqueue(&sock->base);
1441 			} else if (status == -ECANCELED) {
1442 				continue;
1443 			} else if (spdk_unlikely(status < 0)) {
1444 				uring_sock_fail(sock, status);
1445 			} else {
1446 				_sock_check_zcopy(&sock->base, status);
1447 				_sock_prep_errqueue(&sock->base);
1448 			}
1449 			break;
1450 #endif
1451 		case URING_TASK_CANCEL:
1452 			/* Do nothing */
1453 			break;
1454 		default:
1455 			SPDK_UNREACHABLE();
1456 		}
1457 	}
1458 
1459 	if (!socks) {
1460 		return 0;
1461 	}
1462 	count = 0;
1463 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1464 		if (count == max_read_events) {
1465 			break;
1466 		}
1467 
1468 		/* If the socket's cb_fn is NULL, do not add it to socks array */
1469 		if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1470 			assert(sock->pending_recv == true);
1471 			sock->pending_recv = false;
1472 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1473 			continue;
1474 		}
1475 
1476 		socks[count++] = &sock->base;
1477 	}
1478 
1479 
1480 	/* Cycle the pending_recv list so that each time we poll things aren't
1481 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1482 	 * A B C D E F
1483 	 * And all 6 sockets had the poll events, but max_events is only 3. That means
1484 	 * psock currently points at D. We want to rearrange the list to the following:
1485 	 * D E F A B C
1486 	 *
1487 	 * The variables below are named according to this example to make it easier to
1488 	 * follow the swaps.
1489 	 */
1490 	if (sock != NULL) {
1491 		struct spdk_uring_sock *ua, *uc, *ud, *uf;
1492 
1493 		/* Capture pointers to the elements we need */
1494 		ud = sock;
1495 
1496 		ua = TAILQ_FIRST(&group->pending_recv);
1497 		if (ua == ud) {
1498 			goto end;
1499 		}
1500 
1501 		uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
1502 		if (uf == ud) {
1503 			TAILQ_REMOVE(&group->pending_recv, ud, link);
1504 			TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
1505 			goto end;
1506 		}
1507 
1508 		uc = TAILQ_PREV(ud, pending_recv_list, link);
1509 		assert(uc != NULL);
1510 
1511 		/* Break the link between C and D */
1512 		uc->link.tqe_next = NULL;
1513 
1514 		/* Connect F to A */
1515 		uf->link.tqe_next = ua;
1516 		ua->link.tqe_prev = &uf->link.tqe_next;
1517 
1518 		/* Fix up the list first/last pointers */
1519 		group->pending_recv.tqh_first = ud;
1520 		group->pending_recv.tqh_last = &uc->link.tqe_next;
1521 
1522 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1523 		ud->link.tqe_prev = &group->pending_recv.tqh_first;
1524 	}
1525 
1526 end:
1527 	return count;
1528 }
1529 
1530 static int uring_sock_flush(struct spdk_sock *_sock);
1531 
1532 static void
1533 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1534 {
1535 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1536 	int rc;
1537 
1538 	if (spdk_unlikely(sock->connection_status)) {
1539 		req->cb_fn(req->cb_arg, sock->connection_status);
1540 		return;
1541 	}
1542 
1543 	spdk_sock_request_queue(_sock, req);
1544 
1545 	if (!sock->group) {
1546 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1547 			rc = uring_sock_flush(_sock);
1548 			if (rc < 0 && errno != EAGAIN) {
1549 				spdk_sock_abort_requests(_sock);
1550 			}
1551 		}
1552 	}
1553 }
1554 
1555 static int
1556 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1557 {
1558 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1559 	int val;
1560 	int rc;
1561 
1562 	assert(sock != NULL);
1563 
1564 	val = nbytes;
1565 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1566 	if (rc != 0) {
1567 		return -1;
1568 	}
1569 	return 0;
1570 }
1571 
1572 static bool
1573 uring_sock_is_ipv6(struct spdk_sock *_sock)
1574 {
1575 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1576 	struct sockaddr_storage sa;
1577 	socklen_t salen;
1578 	int rc;
1579 
1580 	assert(sock != NULL);
1581 
1582 	memset(&sa, 0, sizeof sa);
1583 	salen = sizeof sa;
1584 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1585 	if (rc != 0) {
1586 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1587 		return false;
1588 	}
1589 
1590 	return (sa.ss_family == AF_INET6);
1591 }
1592 
1593 static bool
1594 uring_sock_is_ipv4(struct spdk_sock *_sock)
1595 {
1596 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1597 	struct sockaddr_storage sa;
1598 	socklen_t salen;
1599 	int rc;
1600 
1601 	assert(sock != NULL);
1602 
1603 	memset(&sa, 0, sizeof sa);
1604 	salen = sizeof sa;
1605 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1606 	if (rc != 0) {
1607 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1608 		return false;
1609 	}
1610 
1611 	return (sa.ss_family == AF_INET);
1612 }
1613 
1614 static bool
1615 uring_sock_is_connected(struct spdk_sock *_sock)
1616 {
1617 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1618 	uint8_t byte;
1619 	int rc;
1620 
1621 	rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
1622 	if (rc == 0) {
1623 		return false;
1624 	}
1625 
1626 	if (rc < 0) {
1627 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1628 			return true;
1629 		}
1630 
1631 		return false;
1632 	}
1633 
1634 	return true;
1635 }
1636 
1637 static struct spdk_sock_group_impl *
1638 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1639 {
1640 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1641 	struct spdk_sock_group_impl *group;
1642 
1643 	if (sock->placement_id != -1) {
1644 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
1645 		return group;
1646 	}
1647 
1648 	return NULL;
1649 }
1650 
1651 static int
1652 uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl)
1653 {
1654 	if (group_impl->buf_ring) {
1655 		io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID);
1656 		free(group_impl->buf_ring);
1657 	}
1658 
1659 	free(group_impl->trackers);
1660 
1661 	return 0;
1662 }
1663 
1664 static int
1665 uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl)
1666 {
1667 	struct io_uring_buf_reg buf_reg = {};
1668 	struct io_uring_buf_ring *buf_ring;
1669 	int i, rc;
1670 
1671 	rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf));
1672 	if (rc != 0) {
1673 		/* posix_memalign returns positive errno values */
1674 		return -rc;
1675 	}
1676 
1677 	buf_reg.ring_addr = (unsigned long long)buf_ring;
1678 	buf_reg.ring_entries = URING_BUF_POOL_SIZE;
1679 	buf_reg.bgid = URING_BUF_GROUP_ID;
1680 
1681 	rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0);
1682 	if (rc != 0) {
1683 		free(buf_ring);
1684 		return rc;
1685 	}
1686 
1687 	group_impl->buf_ring = buf_ring;
1688 	io_uring_buf_ring_init(group_impl->buf_ring);
1689 	group_impl->buf_ring_count = 0;
1690 
1691 	group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker));
1692 	if (group_impl->trackers == NULL) {
1693 		uring_sock_group_impl_buf_pool_free(group_impl);
1694 		return -ENOMEM;
1695 	}
1696 
1697 	STAILQ_INIT(&group_impl->free_trackers);
1698 
1699 	for (i = 0; i < URING_BUF_POOL_SIZE; i++) {
1700 		struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i];
1701 
1702 		tracker->buf = NULL;
1703 		tracker->len = 0;
1704 		tracker->ctx = NULL;
1705 		tracker->id = i;
1706 
1707 		STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link);
1708 	}
1709 
1710 	return 0;
1711 }
1712 
1713 static struct spdk_sock_group_impl *
1714 uring_sock_group_impl_create(void)
1715 {
1716 	struct spdk_uring_sock_group_impl *group_impl;
1717 
1718 	group_impl = calloc(1, sizeof(*group_impl));
1719 	if (group_impl == NULL) {
1720 		SPDK_ERRLOG("group_impl allocation failed\n");
1721 		return NULL;
1722 	}
1723 
1724 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1725 
1726 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1727 		SPDK_ERRLOG("uring I/O context setup failure\n");
1728 		free(group_impl);
1729 		return NULL;
1730 	}
1731 
1732 	TAILQ_INIT(&group_impl->pending_recv);
1733 
1734 	if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) {
1735 		SPDK_ERRLOG("Failed to create buffer ring."
1736 			    "uring sock implementation is likely not supported on this kernel.\n");
1737 		io_uring_queue_exit(&group_impl->uring);
1738 		free(group_impl);
1739 		return NULL;
1740 	}
1741 
1742 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1743 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1744 	}
1745 
1746 	return &group_impl->base;
1747 }
1748 
1749 static int
1750 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1751 			       struct spdk_sock *_sock)
1752 {
1753 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1754 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1755 	int rc;
1756 
1757 	sock->group = group;
1758 	sock->write_task.sock = sock;
1759 	sock->write_task.type = URING_TASK_WRITE;
1760 
1761 	sock->read_task.sock = sock;
1762 	sock->read_task.type = URING_TASK_READ;
1763 
1764 	sock->errqueue_task.sock = sock;
1765 	sock->errqueue_task.type = URING_TASK_ERRQUEUE;
1766 	sock->errqueue_task.msg.msg_control = sock->buf;
1767 	sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
1768 
1769 	sock->cancel_task.sock = sock;
1770 	sock->cancel_task.type = URING_TASK_CANCEL;
1771 
1772 	/* switched from another polling group due to scheduling */
1773 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1774 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1775 		assert(sock->pending_recv == false);
1776 		sock->pending_recv = true;
1777 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1778 	}
1779 
1780 	if (sock->placement_id != -1) {
1781 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1782 		if (rc != 0) {
1783 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1784 			/* Do not treat this as an error. The system will continue running. */
1785 		}
1786 	}
1787 
1788 	/* We get an async read going immediately */
1789 	_sock_prep_read(&sock->base);
1790 #ifdef SPDK_ZEROCOPY
1791 	if (sock->zcopy) {
1792 		_sock_prep_errqueue(_sock);
1793 	}
1794 #endif
1795 
1796 	return 0;
1797 }
1798 
1799 static void
1800 uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group)
1801 {
1802 	struct spdk_uring_buf_tracker *tracker;
1803 	int count, mask;
1804 
1805 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
1806 		/* If recv_pipe is enabled, we do not post buffers. */
1807 		return;
1808 	}
1809 
1810 	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
1811 	tracker = STAILQ_FIRST(&group->free_trackers);
1812 	count = 0;
1813 	mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE);
1814 	while (tracker != NULL) {
1815 		tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx);
1816 		if (tracker->buflen == 0) {
1817 			break;
1818 		}
1819 
1820 		assert(tracker->buf != NULL);
1821 		STAILQ_REMOVE_HEAD(&group->free_trackers, link);
1822 		assert(STAILQ_FIRST(&group->free_trackers) != tracker);
1823 
1824 		io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count);
1825 		count++;
1826 		tracker = STAILQ_FIRST(&group->free_trackers);
1827 	}
1828 
1829 	if (count > 0) {
1830 		group->buf_ring_count += count;
1831 		io_uring_buf_ring_advance(group->buf_ring, count);
1832 	}
1833 }
1834 
1835 static int
1836 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1837 			   struct spdk_sock **socks)
1838 {
1839 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1840 	int count, ret;
1841 	int to_complete, to_submit;
1842 	struct spdk_sock *_sock, *tmp;
1843 	struct spdk_uring_sock *sock;
1844 
1845 	if (spdk_likely(socks)) {
1846 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1847 			sock = __uring_sock(_sock);
1848 			if (spdk_unlikely(sock->connection_status)) {
1849 				continue;
1850 			}
1851 			_sock_flush(_sock);
1852 		}
1853 	}
1854 
1855 	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
1856 	uring_sock_group_populate_buf_ring(group);
1857 
1858 	to_submit = group->io_queued;
1859 
1860 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1861 	if (to_submit > 0) {
1862 		/* If there are I/O to submit, use io_uring_submit here.
1863 		 * It will automatically call io_uring_enter appropriately. */
1864 		ret = io_uring_submit(&group->uring);
1865 		if (ret < 0) {
1866 			return 1;
1867 		}
1868 		group->io_queued = 0;
1869 		group->io_inflight += to_submit;
1870 		group->io_avail -= to_submit;
1871 	}
1872 
1873 	count = 0;
1874 	to_complete = group->io_inflight;
1875 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1876 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1877 	}
1878 
1879 	return count;
1880 }
1881 
1882 static int
1883 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1884 				  struct spdk_sock *_sock)
1885 {
1886 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1887 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1888 
1889 	sock->pending_group_remove = true;
1890 
1891 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1892 		_sock_prep_cancel_task(_sock, &sock->write_task);
1893 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1894 		 * currently can use a while loop here. */
1895 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1896 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1897 			uring_sock_group_impl_poll(_group, 32, NULL);
1898 		}
1899 	}
1900 
1901 	if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1902 		_sock_prep_cancel_task(_sock, &sock->read_task);
1903 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1904 		 * currently can use a while loop here. */
1905 		while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1906 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1907 			uring_sock_group_impl_poll(_group, 32, NULL);
1908 		}
1909 	}
1910 
1911 	if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1912 		_sock_prep_cancel_task(_sock, &sock->errqueue_task);
1913 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1914 		 * currently can use a while loop here. */
1915 		while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1916 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1917 			uring_sock_group_impl_poll(_group, 32, NULL);
1918 		}
1919 	}
1920 
1921 	/* Make sure the cancelling the tasks above didn't cause sending new requests */
1922 	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1923 	assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1924 	assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1925 
1926 	if (sock->pending_recv) {
1927 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1928 		sock->pending_recv = false;
1929 	}
1930 	assert(sock->pending_recv == false);
1931 
1932 	/* We have no way to handle this case. We could let the user read this
1933 	 * buffer, but the buffer came from a group and we have lost the association
1934 	 * to that so we couldn't release it. */
1935 	assert(STAILQ_EMPTY(&sock->recv_stream));
1936 
1937 	if (sock->placement_id != -1) {
1938 		spdk_sock_map_release(&g_map, sock->placement_id);
1939 	}
1940 
1941 	sock->pending_group_remove = false;
1942 	sock->group = NULL;
1943 	return 0;
1944 }
1945 
1946 static int
1947 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1948 {
1949 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1950 
1951 	/* try to reap all the active I/O */
1952 	while (group->io_inflight) {
1953 		uring_sock_group_impl_poll(_group, 32, NULL);
1954 	}
1955 	assert(group->io_inflight == 0);
1956 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1957 
1958 	uring_sock_group_impl_buf_pool_free(group);
1959 
1960 	io_uring_queue_exit(&group->uring);
1961 
1962 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1963 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1964 	}
1965 
1966 	free(group);
1967 	return 0;
1968 }
1969 
1970 static int
1971 uring_sock_flush(struct spdk_sock *_sock)
1972 {
1973 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1974 	struct msghdr msg = {};
1975 	struct iovec iovs[IOV_BATCH_SIZE];
1976 	int iovcnt;
1977 	ssize_t rc;
1978 	int flags = sock->zcopy_send_flags;
1979 	int retval;
1980 	bool is_zcopy = false;
1981 	struct spdk_uring_task *task = &sock->errqueue_task;
1982 
1983 	/* Can't flush from within a callback or we end up with recursive calls */
1984 	if (_sock->cb_cnt > 0) {
1985 		errno = EAGAIN;
1986 		return -1;
1987 	}
1988 
1989 	/* Can't flush while a write is already outstanding */
1990 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1991 		errno = EAGAIN;
1992 		return -1;
1993 	}
1994 
1995 	/* Gather an iov */
1996 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
1997 	if (iovcnt == 0) {
1998 		/* Nothing to send */
1999 		return 0;
2000 	}
2001 
2002 	/* Perform the vectored write */
2003 	msg.msg_iov = iovs;
2004 	msg.msg_iovlen = iovcnt;
2005 	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
2006 	if (rc <= 0) {
2007 		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) {
2008 			errno = EAGAIN;
2009 		}
2010 		return -1;
2011 	}
2012 
2013 #ifdef SPDK_ZEROCOPY
2014 	is_zcopy = flags & MSG_ZEROCOPY;
2015 #endif
2016 	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
2017 	if (retval < 0) {
2018 		/* if the socket is closed, return to avoid heap-use-after-free error */
2019 		errno = ENOTCONN;
2020 		return -1;
2021 	}
2022 
2023 #ifdef SPDK_ZEROCOPY
2024 	/* At least do once to check zero copy case */
2025 	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
2026 		retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE);
2027 		if (retval < 0) {
2028 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
2029 				return rc;
2030 			}
2031 		}
2032 		_sock_check_zcopy(_sock, retval);;
2033 	}
2034 #endif
2035 
2036 	return rc;
2037 }
2038 
2039 static int
2040 uring_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events,
2041 		spdk_interrupt_fn fn, void *arg, const char *name)
2042 {
2043 	SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
2044 
2045 	return -ENOTSUP;
2046 }
2047 
2048 static void
2049 uring_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group)
2050 {
2051 }
2052 
2053 static struct spdk_net_impl g_uring_net_impl = {
2054 	.name		= "uring",
2055 	.getaddr	= uring_sock_getaddr,
2056 	.get_interface_name = uring_sock_get_interface_name,
2057 	.get_numa_id	= uring_sock_get_numa_id,
2058 	.connect	= uring_sock_connect,
2059 	.listen		= uring_sock_listen,
2060 	.accept		= uring_sock_accept,
2061 	.close		= uring_sock_close,
2062 	.recv		= uring_sock_recv,
2063 	.readv		= uring_sock_readv,
2064 	.writev		= uring_sock_writev,
2065 	.recv_next	= uring_sock_recv_next,
2066 	.writev_async	= uring_sock_writev_async,
2067 	.flush          = uring_sock_flush,
2068 	.set_recvlowat	= uring_sock_set_recvlowat,
2069 	.set_recvbuf	= uring_sock_set_recvbuf,
2070 	.set_sendbuf	= uring_sock_set_sendbuf,
2071 	.is_ipv6	= uring_sock_is_ipv6,
2072 	.is_ipv4	= uring_sock_is_ipv4,
2073 	.is_connected   = uring_sock_is_connected,
2074 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
2075 	.group_impl_create	= uring_sock_group_impl_create,
2076 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
2077 	.group_impl_remove_sock	= uring_sock_group_impl_remove_sock,
2078 	.group_impl_poll	= uring_sock_group_impl_poll,
2079 	.group_impl_register_interrupt    = uring_sock_group_impl_register_interrupt,
2080 	.group_impl_unregister_interrupt  = uring_sock_group_impl_unregister_interrupt,
2081 	.group_impl_close	= uring_sock_group_impl_close,
2082 	.get_opts		= uring_sock_impl_get_opts,
2083 	.set_opts		= uring_sock_impl_set_opts,
2084 };
2085 
2086 __attribute__((constructor)) static void
2087 net_impl_register_uring(void)
2088 {
2089 	struct spdk_sock_group_impl *impl;
2090 
2091 	/* Check if we can create a uring sock group before we register
2092 	 * it as a valid impl. */
2093 	impl = uring_sock_group_impl_create();
2094 	if (impl) {
2095 		uring_sock_group_impl_close(impl);
2096 		spdk_net_impl_register(&g_uring_net_impl);
2097 	}
2098 }
2099