xref: /spdk/module/sock/uring/uring.c (revision 7c0177b049f1f9eb05ea28cbca2d4353e38909c7)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/config.h"
8 
9 #include <linux/errqueue.h>
10 #include <sys/epoll.h>
11 #include <liburing.h>
12 
13 #include "spdk/barrier.h"
14 #include "spdk/env.h"
15 #include "spdk/log.h"
16 #include "spdk/pipe.h"
17 #include "spdk/sock.h"
18 #include "spdk/string.h"
19 #include "spdk/util.h"
20 #include "spdk/net.h"
21 #include "spdk/file.h"
22 
23 #include "spdk_internal/sock.h"
24 #include "spdk_internal/assert.h"
25 #include "spdk/net.h"
26 
27 #define MAX_TMPBUF 1024
28 #define PORTNUMLEN 32
29 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
30 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
31 
32 enum uring_task_type {
33 	URING_TASK_READ = 0,
34 	URING_TASK_ERRQUEUE,
35 	URING_TASK_WRITE,
36 	URING_TASK_CANCEL,
37 };
38 
39 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
40 #define SPDK_ZEROCOPY
41 #endif
42 
43 /* We don't know how big the buffers that the user posts will be, but this
44  * is the maximum we'll ever allow it to receive in a single command.
45  * If the user buffers are smaller, it will just receive less. */
46 #define URING_MAX_RECV_SIZE (128 * 1024)
47 
48 /* We don't know how many buffers the user will post, but this is the
49  * maximum number we'll take from the pool to post per group. */
50 #define URING_BUF_POOL_SIZE 128
51 
52 /* We use 1 just so it's not zero and we can validate it's right. */
53 #define URING_BUF_GROUP_ID 1
54 
55 enum spdk_uring_sock_task_status {
56 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
57 	SPDK_URING_SOCK_TASK_IN_PROCESS,
58 };
59 
60 struct spdk_uring_task {
61 	enum spdk_uring_sock_task_status	status;
62 	enum uring_task_type		type;
63 	struct spdk_uring_sock			*sock;
64 	struct msghdr				msg;
65 	struct iovec				iovs[IOV_BATCH_SIZE];
66 	int					iov_cnt;
67 	struct spdk_sock_request		*last_req;
68 	bool					is_zcopy;
69 	STAILQ_ENTRY(spdk_uring_task)		link;
70 };
71 
72 struct spdk_uring_sock {
73 	struct spdk_sock			base;
74 	int					fd;
75 	uint32_t				sendmsg_idx;
76 	struct spdk_uring_sock_group_impl	*group;
77 	STAILQ_HEAD(, spdk_uring_buf_tracker)	recv_stream;
78 	size_t					recv_offset;
79 	struct spdk_uring_task			write_task;
80 	struct spdk_uring_task			errqueue_task;
81 	struct spdk_uring_task			read_task;
82 	struct spdk_uring_task			cancel_task;
83 	struct spdk_pipe			*recv_pipe;
84 	void					*recv_buf;
85 	int					recv_buf_sz;
86 	bool					zcopy;
87 	bool					pending_recv;
88 	bool					pending_group_remove;
89 	int					zcopy_send_flags;
90 	int					connection_status;
91 	int					placement_id;
92 	uint8_t                                 reserved[4];
93 	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
94 	TAILQ_ENTRY(spdk_uring_sock)		link;
95 	char					interface_name[IFNAMSIZ];
96 };
97 /* 'struct cmsghdr' is mapped to the buffer 'buf', and while first element
98  * of this control message header has a size of 8 bytes, 'buf'
99  * must be 8-byte aligned.
100  */
101 SPDK_STATIC_ASSERT(offsetof(struct spdk_uring_sock, buf) % 8 == 0,
102 		   "Incorrect alignment: `buf` must be aligned to 8 bytes");
103 
104 TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
105 
106 struct spdk_uring_buf_tracker {
107 	void					*buf;
108 	size_t					buflen;
109 	size_t					len;
110 	void					*ctx;
111 	int					id;
112 	STAILQ_ENTRY(spdk_uring_buf_tracker)	link;
113 };
114 
115 struct spdk_uring_sock_group_impl {
116 	struct spdk_sock_group_impl		base;
117 	struct io_uring				uring;
118 	uint32_t				io_inflight;
119 	uint32_t				io_queued;
120 	uint32_t				io_avail;
121 	struct pending_recv_list		pending_recv;
122 
123 	struct io_uring_buf_ring		*buf_ring;
124 	uint32_t				buf_ring_count;
125 	struct spdk_uring_buf_tracker		*trackers;
126 	STAILQ_HEAD(, spdk_uring_buf_tracker)	free_trackers;
127 };
128 
129 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
130 	.recv_buf_size = DEFAULT_SO_RCVBUF_SIZE,
131 	.send_buf_size = DEFAULT_SO_SNDBUF_SIZE,
132 	.enable_recv_pipe = true,
133 	.enable_quickack = false,
134 	.enable_placement_id = PLACEMENT_NONE,
135 	.enable_zerocopy_send_server = false,
136 	.enable_zerocopy_send_client = false,
137 	.zerocopy_threshold = 0,
138 	.tls_version = 0,
139 	.enable_ktls = false,
140 	.psk_key = NULL,
141 	.psk_identity = NULL
142 };
143 
144 static struct spdk_sock_map g_map = {
145 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
146 	.mtx = PTHREAD_MUTEX_INITIALIZER
147 };
148 
149 __attribute((destructor)) static void
150 uring_sock_map_cleanup(void)
151 {
152 	spdk_sock_map_cleanup(&g_map);
153 }
154 
155 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
156 
157 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
158 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
159 
160 static void
161 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
162 			  size_t len)
163 {
164 #define FIELD_OK(field) \
165 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
166 
167 #define SET_FIELD(field) \
168 	if (FIELD_OK(field)) { \
169 		dest->field = src->field; \
170 	}
171 
172 	SET_FIELD(recv_buf_size);
173 	SET_FIELD(send_buf_size);
174 	SET_FIELD(enable_recv_pipe);
175 	SET_FIELD(enable_quickack);
176 	SET_FIELD(enable_placement_id);
177 	SET_FIELD(enable_zerocopy_send_server);
178 	SET_FIELD(enable_zerocopy_send_client);
179 	SET_FIELD(zerocopy_threshold);
180 	SET_FIELD(tls_version);
181 	SET_FIELD(enable_ktls);
182 	SET_FIELD(psk_key);
183 	SET_FIELD(psk_identity);
184 
185 #undef SET_FIELD
186 #undef FIELD_OK
187 }
188 
189 static int
190 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
191 {
192 	if (!opts || !len) {
193 		errno = EINVAL;
194 		return -1;
195 	}
196 
197 	assert(sizeof(*opts) >= *len);
198 	memset(opts, 0, *len);
199 
200 	uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len);
201 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
202 
203 	return 0;
204 }
205 
206 static int
207 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
208 {
209 	if (!opts) {
210 		errno = EINVAL;
211 		return -1;
212 	}
213 
214 	assert(sizeof(*opts) >= len);
215 	uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len);
216 
217 	return 0;
218 }
219 
220 static void
221 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
222 {
223 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
224 	memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest));
225 
226 	if (opts->impl_opts != NULL) {
227 		assert(sizeof(*dest) >= opts->impl_opts_size);
228 		uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
229 	}
230 }
231 
232 static int
233 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
234 		   char *caddr, int clen, uint16_t *cport)
235 {
236 	struct spdk_uring_sock *sock = __uring_sock(_sock);
237 
238 	assert(sock != NULL);
239 	return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport);
240 }
241 
242 static const char *
243 uring_sock_get_interface_name(struct spdk_sock *_sock)
244 {
245 	struct spdk_uring_sock *sock = __uring_sock(_sock);
246 	char saddr[64];
247 	int rc;
248 
249 	rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL);
250 	if (rc != 0) {
251 		return NULL;
252 	}
253 
254 	rc = spdk_net_get_interface_name(saddr, sock->interface_name,
255 					 sizeof(sock->interface_name));
256 	if (rc != 0) {
257 		return NULL;
258 	}
259 
260 	return sock->interface_name;
261 }
262 
263 static uint32_t
264 uring_sock_get_numa_socket_id(struct spdk_sock *sock)
265 {
266 	const char *interface_name;
267 	uint32_t numa_socket_id;
268 	int rc;
269 
270 	interface_name = uring_sock_get_interface_name(sock);
271 	if (interface_name == NULL) {
272 		return SPDK_ENV_SOCKET_ID_ANY;
273 	}
274 
275 	rc = spdk_read_sysfs_attribute_uint32(&numa_socket_id,
276 					      "/sys/class/net/%s/device/numa_node", interface_name);
277 	if (rc == 0) {
278 		return numa_socket_id;
279 	} else {
280 		return SPDK_ENV_SOCKET_ID_ANY;
281 	}
282 }
283 
284 enum uring_sock_create_type {
285 	SPDK_SOCK_CREATE_LISTEN,
286 	SPDK_SOCK_CREATE_CONNECT,
287 };
288 
289 static int
290 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
291 {
292 	uint8_t *new_buf;
293 	struct spdk_pipe *new_pipe;
294 	struct iovec siov[2];
295 	struct iovec diov[2];
296 	int sbytes;
297 	ssize_t bytes;
298 	int rc;
299 
300 	if (sock->recv_buf_sz == sz) {
301 		return 0;
302 	}
303 
304 	/* If the new size is 0, just free the pipe */
305 	if (sz == 0) {
306 		spdk_pipe_destroy(sock->recv_pipe);
307 		free(sock->recv_buf);
308 		sock->recv_pipe = NULL;
309 		sock->recv_buf = NULL;
310 		return 0;
311 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
312 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
313 		return -1;
314 	}
315 
316 	/* Round up to next 64 byte multiple */
317 	rc = posix_memalign((void **)&new_buf, 64, sz);
318 	if (rc != 0) {
319 		SPDK_ERRLOG("socket recv buf allocation failed\n");
320 		return -ENOMEM;
321 	}
322 	memset(new_buf, 0, sz);
323 
324 	new_pipe = spdk_pipe_create(new_buf, sz);
325 	if (new_pipe == NULL) {
326 		SPDK_ERRLOG("socket pipe allocation failed\n");
327 		free(new_buf);
328 		return -ENOMEM;
329 	}
330 
331 	if (sock->recv_pipe != NULL) {
332 		/* Pull all of the data out of the old pipe */
333 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
334 		if (sbytes > sz) {
335 			/* Too much data to fit into the new pipe size */
336 			spdk_pipe_destroy(new_pipe);
337 			free(new_buf);
338 			return -EINVAL;
339 		}
340 
341 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
342 		assert(sbytes == sz);
343 
344 		bytes = spdk_iovcpy(siov, 2, diov, 2);
345 		spdk_pipe_writer_advance(new_pipe, bytes);
346 
347 		spdk_pipe_destroy(sock->recv_pipe);
348 		free(sock->recv_buf);
349 	}
350 
351 	sock->recv_buf_sz = sz;
352 	sock->recv_buf = new_buf;
353 	sock->recv_pipe = new_pipe;
354 
355 	return 0;
356 }
357 
358 static int
359 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
360 {
361 	struct spdk_uring_sock *sock = __uring_sock(_sock);
362 	int min_size;
363 	int rc;
364 
365 	assert(sock != NULL);
366 
367 	if (_sock->impl_opts.enable_recv_pipe) {
368 		rc = uring_sock_alloc_pipe(sock, sz);
369 		if (rc) {
370 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
371 			return rc;
372 		}
373 	}
374 
375 	/* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
376 	 * g_spdk_uring_sock_impl_opts.recv_buf_size. */
377 	min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size);
378 
379 	if (sz < min_size) {
380 		sz = min_size;
381 	}
382 
383 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
384 	if (rc < 0) {
385 		return rc;
386 	}
387 
388 	_sock->impl_opts.recv_buf_size = sz;
389 
390 	return 0;
391 }
392 
393 static int
394 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
395 {
396 	struct spdk_uring_sock *sock = __uring_sock(_sock);
397 	int min_size;
398 	int rc;
399 
400 	assert(sock != NULL);
401 
402 	/* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
403 	 * g_spdk_uring_sock_impl_opts.seend_buf_size. */
404 	min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size);
405 
406 	if (sz < min_size) {
407 		sz = min_size;
408 	}
409 
410 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
411 	if (rc < 0) {
412 		return rc;
413 	}
414 
415 	_sock->impl_opts.send_buf_size = sz;
416 
417 	return 0;
418 }
419 
420 static struct spdk_uring_sock *
421 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
422 {
423 	struct spdk_uring_sock *sock;
424 #if defined(__linux__)
425 	int flag;
426 	int rc;
427 #endif
428 
429 	sock = calloc(1, sizeof(*sock));
430 	if (sock == NULL) {
431 		SPDK_ERRLOG("sock allocation failed\n");
432 		return NULL;
433 	}
434 
435 	sock->fd = fd;
436 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
437 
438 	STAILQ_INIT(&sock->recv_stream);
439 
440 #if defined(__linux__)
441 	flag = 1;
442 
443 	if (sock->base.impl_opts.enable_quickack) {
444 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
445 		if (rc != 0) {
446 			SPDK_ERRLOG("quickack was failed to set\n");
447 		}
448 	}
449 
450 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
451 				   &sock->placement_id);
452 #ifdef SPDK_ZEROCOPY
453 	/* Try to turn on zero copy sends */
454 	flag = 1;
455 
456 	if (enable_zero_copy) {
457 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
458 		if (rc == 0) {
459 			sock->zcopy = true;
460 			sock->zcopy_send_flags = MSG_ZEROCOPY;
461 		}
462 	}
463 #endif
464 #endif
465 
466 	return sock;
467 }
468 
469 static struct spdk_sock *
470 uring_sock_create(const char *ip, int port,
471 		  enum uring_sock_create_type type,
472 		  struct spdk_sock_opts *opts)
473 {
474 	struct spdk_uring_sock *sock;
475 	struct spdk_sock_impl_opts impl_opts;
476 	char buf[MAX_TMPBUF];
477 	char portnum[PORTNUMLEN];
478 	char *p;
479 	struct addrinfo hints, *res, *res0;
480 	int fd, flag;
481 	int val = 1;
482 	int rc;
483 	bool enable_zcopy_impl_opts = false;
484 	bool enable_zcopy_user_opts = true;
485 
486 	assert(opts != NULL);
487 	uring_opts_get_impl_opts(opts, &impl_opts);
488 
489 	if (ip == NULL) {
490 		return NULL;
491 	}
492 	if (ip[0] == '[') {
493 		snprintf(buf, sizeof(buf), "%s", ip + 1);
494 		p = strchr(buf, ']');
495 		if (p != NULL) {
496 			*p = '\0';
497 		}
498 		ip = (const char *) &buf[0];
499 	}
500 
501 	snprintf(portnum, sizeof portnum, "%d", port);
502 	memset(&hints, 0, sizeof hints);
503 	hints.ai_family = PF_UNSPEC;
504 	hints.ai_socktype = SOCK_STREAM;
505 	hints.ai_flags = AI_NUMERICSERV;
506 	hints.ai_flags |= AI_PASSIVE;
507 	hints.ai_flags |= AI_NUMERICHOST;
508 	rc = getaddrinfo(ip, portnum, &hints, &res0);
509 	if (rc != 0) {
510 		SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
511 		return NULL;
512 	}
513 
514 	/* try listen */
515 	fd = -1;
516 	for (res = res0; res != NULL; res = res->ai_next) {
517 retry:
518 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
519 		if (fd < 0) {
520 			/* error */
521 			continue;
522 		}
523 
524 		val = impl_opts.recv_buf_size;
525 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
526 		if (rc) {
527 			/* Not fatal */
528 		}
529 
530 		val = impl_opts.send_buf_size;
531 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
532 		if (rc) {
533 			/* Not fatal */
534 		}
535 
536 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
537 		if (rc != 0) {
538 			close(fd);
539 			fd = -1;
540 			/* error */
541 			continue;
542 		}
543 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
544 		if (rc != 0) {
545 			close(fd);
546 			fd = -1;
547 			/* error */
548 			continue;
549 		}
550 
551 		if (opts->ack_timeout) {
552 #if defined(__linux__)
553 			val = opts->ack_timeout;
554 			rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val);
555 			if (rc != 0) {
556 				close(fd);
557 				fd = -1;
558 				/* error */
559 				continue;
560 			}
561 #else
562 			SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
563 #endif
564 		}
565 
566 
567 
568 #if defined(SO_PRIORITY)
569 		if (opts != NULL && opts->priority) {
570 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
571 			if (rc != 0) {
572 				close(fd);
573 				fd = -1;
574 				/* error */
575 				continue;
576 			}
577 		}
578 #endif
579 		if (res->ai_family == AF_INET6) {
580 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
581 			if (rc != 0) {
582 				close(fd);
583 				fd = -1;
584 				/* error */
585 				continue;
586 			}
587 		}
588 
589 		if (type == SPDK_SOCK_CREATE_LISTEN) {
590 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
591 			if (rc != 0) {
592 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
593 				switch (errno) {
594 				case EINTR:
595 					/* interrupted? */
596 					close(fd);
597 					goto retry;
598 				case EADDRNOTAVAIL:
599 					SPDK_ERRLOG("IP address %s not available. "
600 						    "Verify IP address in config file "
601 						    "and make sure setup script is "
602 						    "run before starting spdk app.\n", ip);
603 				/* FALLTHROUGH */
604 				default:
605 					/* try next family */
606 					close(fd);
607 					fd = -1;
608 					continue;
609 				}
610 			}
611 			/* bind OK */
612 			rc = listen(fd, 512);
613 			if (rc != 0) {
614 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
615 				close(fd);
616 				fd = -1;
617 				break;
618 			}
619 
620 			flag = fcntl(fd, F_GETFL);
621 			if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
622 				SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
623 				close(fd);
624 				fd = -1;
625 				break;
626 			}
627 
628 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
629 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
630 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
631 			if (rc != 0) {
632 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
633 				/* try next family */
634 				close(fd);
635 				fd = -1;
636 				continue;
637 			}
638 
639 			flag = fcntl(fd, F_GETFL);
640 			if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
641 				SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
642 				close(fd);
643 				fd = -1;
644 				break;
645 			}
646 
647 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
648 		}
649 		break;
650 	}
651 	freeaddrinfo(res0);
652 
653 	if (fd < 0) {
654 		return NULL;
655 	}
656 
657 	enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd);
658 	sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
659 	if (sock == NULL) {
660 		SPDK_ERRLOG("sock allocation failed\n");
661 		close(fd);
662 		return NULL;
663 	}
664 
665 	return &sock->base;
666 }
667 
668 static struct spdk_sock *
669 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
670 {
671 	if (spdk_interrupt_mode_is_enabled()) {
672 		SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
673 		return NULL;
674 	}
675 
676 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
677 }
678 
679 static struct spdk_sock *
680 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
681 {
682 	if (spdk_interrupt_mode_is_enabled()) {
683 		SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
684 		return NULL;
685 	}
686 
687 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
688 }
689 
690 static struct spdk_sock *
691 uring_sock_accept(struct spdk_sock *_sock)
692 {
693 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
694 	struct sockaddr_storage		sa;
695 	socklen_t			salen;
696 	int				rc, fd;
697 	struct spdk_uring_sock		*new_sock;
698 	int				flag;
699 
700 	memset(&sa, 0, sizeof(sa));
701 	salen = sizeof(sa);
702 
703 	assert(sock != NULL);
704 
705 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
706 
707 	if (rc == -1) {
708 		return NULL;
709 	}
710 
711 	fd = rc;
712 
713 	flag = fcntl(fd, F_GETFL);
714 	if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
715 		SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
716 		close(fd);
717 		return NULL;
718 	}
719 
720 #if defined(SO_PRIORITY)
721 	/* The priority is not inherited, so call this function again */
722 	if (sock->base.opts.priority) {
723 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
724 		if (rc != 0) {
725 			close(fd);
726 			return NULL;
727 		}
728 	}
729 #endif
730 
731 	new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
732 	if (new_sock == NULL) {
733 		close(fd);
734 		return NULL;
735 	}
736 
737 	return &new_sock->base;
738 }
739 
740 static int
741 uring_sock_close(struct spdk_sock *_sock)
742 {
743 	struct spdk_uring_sock *sock = __uring_sock(_sock);
744 
745 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
746 	assert(sock->group == NULL);
747 
748 	/* If the socket fails to close, the best choice is to
749 	 * leak the fd but continue to free the rest of the sock
750 	 * memory. */
751 	close(sock->fd);
752 
753 	spdk_pipe_destroy(sock->recv_pipe);
754 	free(sock->recv_buf);
755 	free(sock);
756 
757 	return 0;
758 }
759 
760 static ssize_t
761 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
762 {
763 	struct iovec siov[2];
764 	int sbytes;
765 	ssize_t bytes;
766 	struct spdk_uring_sock_group_impl *group;
767 
768 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
769 	if (sbytes < 0) {
770 		errno = EINVAL;
771 		return -1;
772 	} else if (sbytes == 0) {
773 		errno = EAGAIN;
774 		return -1;
775 	}
776 
777 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
778 
779 	if (bytes == 0) {
780 		/* The only way this happens is if diov is 0 length */
781 		errno = EINVAL;
782 		return -1;
783 	}
784 
785 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
786 
787 	/* If we drained the pipe, take it off the level-triggered list */
788 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
789 		group = __uring_group_impl(sock->base.group_impl);
790 		TAILQ_REMOVE(&group->pending_recv, sock, link);
791 		sock->pending_recv = false;
792 	}
793 
794 	return bytes;
795 }
796 
797 static inline ssize_t
798 sock_readv(int fd, struct iovec *iov, int iovcnt)
799 {
800 	struct msghdr msg = {
801 		.msg_iov = iov,
802 		.msg_iovlen = iovcnt,
803 	};
804 
805 	return recvmsg(fd, &msg, MSG_DONTWAIT);
806 }
807 
808 static inline ssize_t
809 uring_sock_read(struct spdk_uring_sock *sock)
810 {
811 	struct iovec iov[2];
812 	int bytes;
813 	struct spdk_uring_sock_group_impl *group;
814 
815 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
816 
817 	if (bytes > 0) {
818 		bytes = sock_readv(sock->fd, iov, 2);
819 		if (bytes > 0) {
820 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
821 			if (sock->base.group_impl && !sock->pending_recv) {
822 				group = __uring_group_impl(sock->base.group_impl);
823 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
824 				sock->pending_recv = true;
825 			}
826 		}
827 	}
828 
829 	return bytes;
830 }
831 
832 static int
833 uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx)
834 {
835 	struct spdk_uring_sock *sock = __uring_sock(_sock);
836 	struct spdk_uring_sock_group_impl *group;
837 	struct spdk_uring_buf_tracker *tr;
838 
839 	if (sock->connection_status < 0) {
840 		errno = -sock->connection_status;
841 		return -1;
842 	}
843 
844 	if (sock->recv_pipe != NULL) {
845 		errno = ENOTSUP;
846 		return -1;
847 	}
848 
849 	group = __uring_group_impl(_sock->group_impl);
850 
851 	tr = STAILQ_FIRST(&sock->recv_stream);
852 	if (tr == NULL) {
853 		if (sock->group->buf_ring_count > 0) {
854 			/* There are buffers posted, but data hasn't arrived. */
855 			errno = EAGAIN;
856 		} else {
857 			/* There are no buffers posted, so this won't ever
858 			 * make forward progress. */
859 			errno = ENOBUFS;
860 		}
861 		return -1;
862 	}
863 	assert(sock->pending_recv == true);
864 	assert(tr->buf != NULL);
865 
866 	*_buf = tr->buf + sock->recv_offset;
867 	*ctx = tr->ctx;
868 
869 	STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
870 	STAILQ_INSERT_HEAD(&group->free_trackers, tr, link);
871 
872 	if (STAILQ_EMPTY(&sock->recv_stream)) {
873 		sock->pending_recv = false;
874 		TAILQ_REMOVE(&group->pending_recv, sock, link);
875 	}
876 
877 	return tr->len - sock->recv_offset;
878 }
879 
880 static ssize_t
881 uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt)
882 {
883 	struct spdk_uring_sock *sock = __uring_sock(_sock);
884 	struct spdk_uring_buf_tracker *tr;
885 	struct iovec iov;
886 	ssize_t total, len;
887 	int i;
888 
889 	if (sock->connection_status < 0) {
890 		errno = -sock->connection_status;
891 		return -1;
892 	}
893 
894 	if (_sock->group_impl == NULL) {
895 		/* If not in a group just read from the socket the regular way. */
896 		return sock_readv(sock->fd, iovs, iovcnt);
897 	}
898 
899 	if (STAILQ_EMPTY(&sock->recv_stream)) {
900 		if (sock->group->buf_ring_count == 0) {
901 			/* If the user hasn't posted any buffers, read from the socket
902 			 * directly. */
903 
904 			if (sock->pending_recv) {
905 				sock->pending_recv = false;
906 				TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link);
907 			}
908 
909 			return sock_readv(sock->fd, iovs, iovcnt);
910 		}
911 
912 		errno = EAGAIN;
913 		return -1;
914 	}
915 
916 	total = 0;
917 	for (i = 0; i < iovcnt; i++) {
918 		/* Copy to stack so we can change it */
919 		iov = iovs[i];
920 
921 		tr = STAILQ_FIRST(&sock->recv_stream);
922 		while (tr != NULL) {
923 			len = spdk_min(iov.iov_len, tr->len - sock->recv_offset);
924 			memcpy(iov.iov_base, tr->buf + sock->recv_offset, len);
925 
926 			total += len;
927 			sock->recv_offset += len;
928 			iov.iov_base += len;
929 			iov.iov_len -= len;
930 
931 			if (sock->recv_offset == tr->len) {
932 				sock->recv_offset = 0;
933 				STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
934 				STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link);
935 				spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx);
936 				tr = STAILQ_FIRST(&sock->recv_stream);
937 			}
938 
939 			if (iov.iov_len == 0) {
940 				break;
941 			}
942 		}
943 	}
944 
945 	if (STAILQ_EMPTY(&sock->recv_stream)) {
946 		struct spdk_uring_sock_group_impl *group;
947 
948 		group = __uring_group_impl(_sock->group_impl);
949 		sock->pending_recv = false;
950 		TAILQ_REMOVE(&group->pending_recv, sock, link);
951 	}
952 
953 	assert(total > 0);
954 	return total;
955 }
956 
957 static ssize_t
958 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
959 {
960 	struct spdk_uring_sock *sock = __uring_sock(_sock);
961 	int rc, i;
962 	size_t len;
963 
964 	if (sock->connection_status < 0) {
965 		errno = -sock->connection_status;
966 		return -1;
967 	}
968 
969 	if (sock->recv_pipe == NULL) {
970 		return uring_sock_readv_no_pipe(_sock, iov, iovcnt);
971 	}
972 
973 	len = 0;
974 	for (i = 0; i < iovcnt; i++) {
975 		len += iov[i].iov_len;
976 	}
977 
978 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
979 		/* If the user is receiving a sufficiently large amount of data,
980 		 * receive directly to their buffers. */
981 		if (len >= MIN_SOCK_PIPE_SIZE) {
982 			return sock_readv(sock->fd, iov, iovcnt);
983 		}
984 
985 		/* Otherwise, do a big read into our pipe */
986 		rc = uring_sock_read(sock);
987 		if (rc <= 0) {
988 			return rc;
989 		}
990 	}
991 
992 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
993 }
994 
995 static ssize_t
996 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
997 {
998 	struct iovec iov[1];
999 
1000 	iov[0].iov_base = buf;
1001 	iov[0].iov_len = len;
1002 
1003 	return uring_sock_readv(sock, iov, 1);
1004 }
1005 
1006 static ssize_t
1007 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
1008 {
1009 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1010 	struct msghdr msg = {
1011 		.msg_iov = iov,
1012 		.msg_iovlen = iovcnt,
1013 	};
1014 
1015 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1016 		errno = EAGAIN;
1017 		return -1;
1018 	}
1019 
1020 	return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
1021 }
1022 
1023 static ssize_t
1024 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc)
1025 {
1026 	unsigned int offset;
1027 	size_t len;
1028 	int i;
1029 
1030 	offset = req->internal.offset;
1031 	for (i = 0; i < req->iovcnt; i++) {
1032 		/* Advance by the offset first */
1033 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
1034 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
1035 			continue;
1036 		}
1037 
1038 		/* Calculate the remaining length of this element */
1039 		len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
1040 
1041 		if (len > (size_t)rc) {
1042 			req->internal.offset += rc;
1043 			return -1;
1044 		}
1045 
1046 		offset = 0;
1047 		req->internal.offset += len;
1048 		rc -= len;
1049 	}
1050 
1051 	return rc;
1052 }
1053 
1054 static int
1055 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
1056 {
1057 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1058 	struct spdk_sock_request *req;
1059 	int retval;
1060 
1061 	if (is_zcopy) {
1062 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
1063 		 * req->internal.offset, so sendmsg_idx should not be zero */
1064 		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
1065 			sock->sendmsg_idx = 1;
1066 		} else {
1067 			sock->sendmsg_idx++;
1068 		}
1069 	}
1070 
1071 	/* Consume the requests that were actually written */
1072 	req = TAILQ_FIRST(&_sock->queued_reqs);
1073 	while (req) {
1074 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
1075 		req->internal.is_zcopy = is_zcopy;
1076 
1077 		rc = sock_request_advance_offset(req, rc);
1078 		if (rc < 0) {
1079 			/* This element was partially sent. */
1080 			return 0;
1081 		}
1082 
1083 		/* Handled a full request. */
1084 		spdk_sock_request_pend(_sock, req);
1085 
1086 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
1087 			retval = spdk_sock_request_put(_sock, req, 0);
1088 			if (retval) {
1089 				return retval;
1090 			}
1091 		} else {
1092 			/* Re-use the offset field to hold the sendmsg call index. The
1093 			 * index is 0 based, so subtract one here because we've already
1094 			 * incremented above. */
1095 			req->internal.offset = sock->sendmsg_idx - 1;
1096 		}
1097 
1098 		if (rc == 0) {
1099 			break;
1100 		}
1101 
1102 		req = TAILQ_FIRST(&_sock->queued_reqs);
1103 	}
1104 
1105 	return 0;
1106 }
1107 
1108 #ifdef SPDK_ZEROCOPY
1109 static int
1110 _sock_check_zcopy(struct spdk_sock *_sock, int status)
1111 {
1112 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1113 	ssize_t rc;
1114 	struct sock_extended_err *serr;
1115 	struct cmsghdr *cm;
1116 	uint32_t idx;
1117 	struct spdk_sock_request *req, *treq;
1118 	bool found;
1119 
1120 	assert(sock->zcopy == true);
1121 	if (spdk_unlikely(status) < 0) {
1122 		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
1123 			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
1124 				    status);
1125 		} else {
1126 			SPDK_WARNLOG("Recvmsg yielded an error!\n");
1127 		}
1128 		return 0;
1129 	}
1130 
1131 	cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
1132 	if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
1133 	      (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) {
1134 		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
1135 		return 0;
1136 	}
1137 
1138 	serr = (struct sock_extended_err *)CMSG_DATA(cm);
1139 	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
1140 		SPDK_WARNLOG("Unexpected extended error origin\n");
1141 		return 0;
1142 	}
1143 
1144 	/* Most of the time, the pending_reqs array is in the exact
1145 	 * order we need such that all of the requests to complete are
1146 	 * in order, in the front. It is guaranteed that all requests
1147 	 * belonging to the same sendmsg call are sequential, so once
1148 	 * we encounter one match we can stop looping as soon as a
1149 	 * non-match is found.
1150 	 */
1151 	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
1152 		found = false;
1153 		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
1154 			if (!req->internal.is_zcopy) {
1155 				/* This wasn't a zcopy request. It was just waiting in line to complete */
1156 				rc = spdk_sock_request_put(_sock, req, 0);
1157 				if (rc < 0) {
1158 					return rc;
1159 				}
1160 			} else if (req->internal.offset == idx) {
1161 				found = true;
1162 				rc = spdk_sock_request_put(_sock, req, 0);
1163 				if (rc < 0) {
1164 					return rc;
1165 				}
1166 			} else if (found) {
1167 				break;
1168 			}
1169 		}
1170 	}
1171 
1172 	return 0;
1173 }
1174 
1175 static void
1176 _sock_prep_errqueue(struct spdk_sock *_sock)
1177 {
1178 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1179 	struct spdk_uring_task *task = &sock->errqueue_task;
1180 	struct io_uring_sqe *sqe;
1181 
1182 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1183 		return;
1184 	}
1185 
1186 	if (sock->pending_group_remove) {
1187 		return;
1188 	}
1189 
1190 	assert(sock->group != NULL);
1191 	sock->group->io_queued++;
1192 
1193 	sqe = io_uring_get_sqe(&sock->group->uring);
1194 	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
1195 	io_uring_sqe_set_data(sqe, task);
1196 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1197 }
1198 
1199 #endif
1200 
1201 static void
1202 _sock_flush(struct spdk_sock *_sock)
1203 {
1204 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1205 	struct spdk_uring_task *task = &sock->write_task;
1206 	uint32_t iovcnt;
1207 	struct io_uring_sqe *sqe;
1208 	int flags;
1209 
1210 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1211 		return;
1212 	}
1213 
1214 #ifdef SPDK_ZEROCOPY
1215 	if (sock->zcopy) {
1216 		flags = MSG_DONTWAIT | sock->zcopy_send_flags;
1217 	} else
1218 #endif
1219 	{
1220 		flags = MSG_DONTWAIT;
1221 	}
1222 
1223 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
1224 	if (!iovcnt) {
1225 		return;
1226 	}
1227 
1228 	task->iov_cnt = iovcnt;
1229 	assert(sock->group != NULL);
1230 	task->msg.msg_iov = task->iovs;
1231 	task->msg.msg_iovlen = task->iov_cnt;
1232 #ifdef SPDK_ZEROCOPY
1233 	task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
1234 #endif
1235 	sock->group->io_queued++;
1236 
1237 	sqe = io_uring_get_sqe(&sock->group->uring);
1238 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
1239 	io_uring_sqe_set_data(sqe, task);
1240 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1241 }
1242 
1243 static void
1244 _sock_prep_read(struct spdk_sock *_sock)
1245 {
1246 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1247 	struct spdk_uring_task *task = &sock->read_task;
1248 	struct io_uring_sqe *sqe;
1249 
1250 	/* Do not prepare read event */
1251 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1252 		return;
1253 	}
1254 
1255 	if (sock->pending_group_remove) {
1256 		return;
1257 	}
1258 
1259 	assert(sock->group != NULL);
1260 	sock->group->io_queued++;
1261 
1262 	sqe = io_uring_get_sqe(&sock->group->uring);
1263 	io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0);
1264 	sqe->buf_group = URING_BUF_GROUP_ID;
1265 	sqe->flags |= IOSQE_BUFFER_SELECT;
1266 	io_uring_sqe_set_data(sqe, task);
1267 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1268 }
1269 
1270 static void
1271 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
1272 {
1273 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1274 	struct spdk_uring_task *task = &sock->cancel_task;
1275 	struct io_uring_sqe *sqe;
1276 
1277 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1278 		return;
1279 	}
1280 
1281 	assert(sock->group != NULL);
1282 	sock->group->io_queued++;
1283 
1284 	sqe = io_uring_get_sqe(&sock->group->uring);
1285 	io_uring_prep_cancel(sqe, user_data, 0);
1286 	io_uring_sqe_set_data(sqe, task);
1287 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1288 }
1289 
1290 static void
1291 uring_sock_fail(struct spdk_uring_sock *sock, int status)
1292 {
1293 	struct spdk_uring_sock_group_impl *group = sock->group;
1294 	int rc;
1295 
1296 	sock->connection_status = status;
1297 	rc = spdk_sock_abort_requests(&sock->base);
1298 
1299 	/* The user needs to be notified that this socket is dead. */
1300 	if (rc == 0 && sock->base.cb_fn != NULL &&
1301 	    sock->pending_recv == false) {
1302 		sock->pending_recv = true;
1303 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1304 	}
1305 }
1306 
1307 static int
1308 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
1309 		      struct spdk_sock **socks)
1310 {
1311 	int i, count, ret;
1312 	struct io_uring_cqe *cqe;
1313 	struct spdk_uring_sock *sock, *tmp;
1314 	struct spdk_uring_task *task;
1315 	int status, bid, flags;
1316 	bool is_zcopy;
1317 
1318 	for (i = 0; i < max; i++) {
1319 		ret = io_uring_peek_cqe(&group->uring, &cqe);
1320 		if (ret != 0) {
1321 			break;
1322 		}
1323 
1324 		if (cqe == NULL) {
1325 			break;
1326 		}
1327 
1328 		task = (struct spdk_uring_task *)cqe->user_data;
1329 		assert(task != NULL);
1330 		sock = task->sock;
1331 		assert(sock != NULL);
1332 		assert(sock->group != NULL);
1333 		assert(sock->group == group);
1334 		sock->group->io_inflight--;
1335 		sock->group->io_avail++;
1336 		status = cqe->res;
1337 		flags = cqe->flags;
1338 		io_uring_cqe_seen(&group->uring, cqe);
1339 
1340 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
1341 
1342 		switch (task->type) {
1343 		case URING_TASK_READ:
1344 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
1345 				/* This likely shouldn't happen, but would indicate that the
1346 				 * kernel didn't have enough resources to queue a task internally. */
1347 				_sock_prep_read(&sock->base);
1348 			} else if (status == -ECANCELED) {
1349 				continue;
1350 			} else if (status == -ENOBUFS) {
1351 				/* There's data in the socket but the user hasn't provided any buffers.
1352 				 * We need to notify the user that the socket has data pending. */
1353 				if (sock->base.cb_fn != NULL &&
1354 				    sock->pending_recv == false) {
1355 					sock->pending_recv = true;
1356 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1357 				}
1358 
1359 				_sock_prep_read(&sock->base);
1360 			} else if (spdk_unlikely(status <= 0)) {
1361 				uring_sock_fail(sock, status < 0 ? status : -ECONNRESET);
1362 			} else {
1363 				struct spdk_uring_buf_tracker *tracker;
1364 
1365 				assert((flags & IORING_CQE_F_BUFFER) != 0);
1366 
1367 				bid = flags >> IORING_CQE_BUFFER_SHIFT;
1368 				tracker = &group->trackers[bid];
1369 
1370 				assert(tracker->buf != NULL);
1371 				assert(tracker->len != 0);
1372 
1373 				/* Append this data to the stream */
1374 				tracker->len = status;
1375 				STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link);
1376 				assert(group->buf_ring_count > 0);
1377 				group->buf_ring_count--;
1378 
1379 				if (sock->base.cb_fn != NULL &&
1380 				    sock->pending_recv == false) {
1381 					sock->pending_recv = true;
1382 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1383 				}
1384 
1385 				_sock_prep_read(&sock->base);
1386 			}
1387 			break;
1388 		case URING_TASK_WRITE:
1389 			if (status == -EAGAIN || status == -EWOULDBLOCK ||
1390 			    (status == -ENOBUFS && sock->zcopy) ||
1391 			    status == -ECANCELED) {
1392 				continue;
1393 			} else if (spdk_unlikely(status) < 0) {
1394 				uring_sock_fail(sock, status);
1395 			} else {
1396 				task->last_req = NULL;
1397 				task->iov_cnt = 0;
1398 				is_zcopy = task->is_zcopy;
1399 				task->is_zcopy = false;
1400 				sock_complete_write_reqs(&sock->base, status, is_zcopy);
1401 			}
1402 
1403 			break;
1404 #ifdef SPDK_ZEROCOPY
1405 		case URING_TASK_ERRQUEUE:
1406 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
1407 				_sock_prep_errqueue(&sock->base);
1408 			} else if (status == -ECANCELED) {
1409 				continue;
1410 			} else if (spdk_unlikely(status < 0)) {
1411 				uring_sock_fail(sock, status);
1412 			} else {
1413 				_sock_check_zcopy(&sock->base, status);
1414 				_sock_prep_errqueue(&sock->base);
1415 			}
1416 			break;
1417 #endif
1418 		case URING_TASK_CANCEL:
1419 			/* Do nothing */
1420 			break;
1421 		default:
1422 			SPDK_UNREACHABLE();
1423 		}
1424 	}
1425 
1426 	if (!socks) {
1427 		return 0;
1428 	}
1429 	count = 0;
1430 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1431 		if (count == max_read_events) {
1432 			break;
1433 		}
1434 
1435 		/* If the socket's cb_fn is NULL, do not add it to socks array */
1436 		if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1437 			assert(sock->pending_recv == true);
1438 			sock->pending_recv = false;
1439 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1440 			continue;
1441 		}
1442 
1443 		socks[count++] = &sock->base;
1444 	}
1445 
1446 
1447 	/* Cycle the pending_recv list so that each time we poll things aren't
1448 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1449 	 * A B C D E F
1450 	 * And all 6 sockets had the poll events, but max_events is only 3. That means
1451 	 * psock currently points at D. We want to rearrange the list to the following:
1452 	 * D E F A B C
1453 	 *
1454 	 * The variables below are named according to this example to make it easier to
1455 	 * follow the swaps.
1456 	 */
1457 	if (sock != NULL) {
1458 		struct spdk_uring_sock *ua, *uc, *ud, *uf;
1459 
1460 		/* Capture pointers to the elements we need */
1461 		ud = sock;
1462 
1463 		ua = TAILQ_FIRST(&group->pending_recv);
1464 		if (ua == ud) {
1465 			goto end;
1466 		}
1467 
1468 		uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
1469 		if (uf == ud) {
1470 			TAILQ_REMOVE(&group->pending_recv, ud, link);
1471 			TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
1472 			goto end;
1473 		}
1474 
1475 		uc = TAILQ_PREV(ud, pending_recv_list, link);
1476 		assert(uc != NULL);
1477 
1478 		/* Break the link between C and D */
1479 		uc->link.tqe_next = NULL;
1480 
1481 		/* Connect F to A */
1482 		uf->link.tqe_next = ua;
1483 		ua->link.tqe_prev = &uf->link.tqe_next;
1484 
1485 		/* Fix up the list first/last pointers */
1486 		group->pending_recv.tqh_first = ud;
1487 		group->pending_recv.tqh_last = &uc->link.tqe_next;
1488 
1489 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1490 		ud->link.tqe_prev = &group->pending_recv.tqh_first;
1491 	}
1492 
1493 end:
1494 	return count;
1495 }
1496 
1497 static int uring_sock_flush(struct spdk_sock *_sock);
1498 
1499 static void
1500 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1501 {
1502 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1503 	int rc;
1504 
1505 	if (spdk_unlikely(sock->connection_status)) {
1506 		req->cb_fn(req->cb_arg, sock->connection_status);
1507 		return;
1508 	}
1509 
1510 	spdk_sock_request_queue(_sock, req);
1511 
1512 	if (!sock->group) {
1513 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1514 			rc = uring_sock_flush(_sock);
1515 			if (rc < 0 && errno != EAGAIN) {
1516 				spdk_sock_abort_requests(_sock);
1517 			}
1518 		}
1519 	}
1520 }
1521 
1522 static int
1523 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1524 {
1525 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1526 	int val;
1527 	int rc;
1528 
1529 	assert(sock != NULL);
1530 
1531 	val = nbytes;
1532 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1533 	if (rc != 0) {
1534 		return -1;
1535 	}
1536 	return 0;
1537 }
1538 
1539 static bool
1540 uring_sock_is_ipv6(struct spdk_sock *_sock)
1541 {
1542 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1543 	struct sockaddr_storage sa;
1544 	socklen_t salen;
1545 	int rc;
1546 
1547 	assert(sock != NULL);
1548 
1549 	memset(&sa, 0, sizeof sa);
1550 	salen = sizeof sa;
1551 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1552 	if (rc != 0) {
1553 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1554 		return false;
1555 	}
1556 
1557 	return (sa.ss_family == AF_INET6);
1558 }
1559 
1560 static bool
1561 uring_sock_is_ipv4(struct spdk_sock *_sock)
1562 {
1563 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1564 	struct sockaddr_storage sa;
1565 	socklen_t salen;
1566 	int rc;
1567 
1568 	assert(sock != NULL);
1569 
1570 	memset(&sa, 0, sizeof sa);
1571 	salen = sizeof sa;
1572 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1573 	if (rc != 0) {
1574 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1575 		return false;
1576 	}
1577 
1578 	return (sa.ss_family == AF_INET);
1579 }
1580 
1581 static bool
1582 uring_sock_is_connected(struct spdk_sock *_sock)
1583 {
1584 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1585 	uint8_t byte;
1586 	int rc;
1587 
1588 	rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
1589 	if (rc == 0) {
1590 		return false;
1591 	}
1592 
1593 	if (rc < 0) {
1594 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1595 			return true;
1596 		}
1597 
1598 		return false;
1599 	}
1600 
1601 	return true;
1602 }
1603 
1604 static struct spdk_sock_group_impl *
1605 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1606 {
1607 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1608 	struct spdk_sock_group_impl *group;
1609 
1610 	if (sock->placement_id != -1) {
1611 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
1612 		return group;
1613 	}
1614 
1615 	return NULL;
1616 }
1617 
1618 static int
1619 uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl)
1620 {
1621 	if (group_impl->buf_ring) {
1622 		io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID);
1623 		free(group_impl->buf_ring);
1624 	}
1625 
1626 	free(group_impl->trackers);
1627 
1628 	return 0;
1629 }
1630 
1631 static int
1632 uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl)
1633 {
1634 	struct io_uring_buf_reg buf_reg = {};
1635 	struct io_uring_buf_ring *buf_ring;
1636 	int i, rc;
1637 
1638 	rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf));
1639 	if (rc != 0) {
1640 		/* posix_memalign returns positive errno values */
1641 		return -rc;
1642 	}
1643 
1644 	buf_reg.ring_addr = (unsigned long long)buf_ring;
1645 	buf_reg.ring_entries = URING_BUF_POOL_SIZE;
1646 	buf_reg.bgid = URING_BUF_GROUP_ID;
1647 
1648 	rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0);
1649 	if (rc != 0) {
1650 		free(buf_ring);
1651 		return rc;
1652 	}
1653 
1654 	group_impl->buf_ring = buf_ring;
1655 	io_uring_buf_ring_init(group_impl->buf_ring);
1656 	group_impl->buf_ring_count = 0;
1657 
1658 	group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker));
1659 	if (group_impl->trackers == NULL) {
1660 		uring_sock_group_impl_buf_pool_free(group_impl);
1661 		return -ENOMEM;
1662 	}
1663 
1664 	STAILQ_INIT(&group_impl->free_trackers);
1665 
1666 	for (i = 0; i < URING_BUF_POOL_SIZE; i++) {
1667 		struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i];
1668 
1669 		tracker->buf = NULL;
1670 		tracker->len = 0;
1671 		tracker->ctx = NULL;
1672 		tracker->id = i;
1673 
1674 		STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link);
1675 	}
1676 
1677 	return 0;
1678 }
1679 
1680 static struct spdk_sock_group_impl *
1681 uring_sock_group_impl_create(void)
1682 {
1683 	struct spdk_uring_sock_group_impl *group_impl;
1684 
1685 	group_impl = calloc(1, sizeof(*group_impl));
1686 	if (group_impl == NULL) {
1687 		SPDK_ERRLOG("group_impl allocation failed\n");
1688 		return NULL;
1689 	}
1690 
1691 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1692 
1693 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1694 		SPDK_ERRLOG("uring I/O context setup failure\n");
1695 		free(group_impl);
1696 		return NULL;
1697 	}
1698 
1699 	TAILQ_INIT(&group_impl->pending_recv);
1700 
1701 	if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) {
1702 		SPDK_ERRLOG("Failed to create buffer ring."
1703 			    "uring sock implementation is likely not supported on this kernel.\n");
1704 		io_uring_queue_exit(&group_impl->uring);
1705 		free(group_impl);
1706 		return NULL;
1707 	}
1708 
1709 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1710 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1711 	}
1712 
1713 	return &group_impl->base;
1714 }
1715 
1716 static int
1717 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1718 			       struct spdk_sock *_sock)
1719 {
1720 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1721 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1722 	int rc;
1723 
1724 	sock->group = group;
1725 	sock->write_task.sock = sock;
1726 	sock->write_task.type = URING_TASK_WRITE;
1727 
1728 	sock->read_task.sock = sock;
1729 	sock->read_task.type = URING_TASK_READ;
1730 
1731 	sock->errqueue_task.sock = sock;
1732 	sock->errqueue_task.type = URING_TASK_ERRQUEUE;
1733 	sock->errqueue_task.msg.msg_control = sock->buf;
1734 	sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
1735 
1736 	sock->cancel_task.sock = sock;
1737 	sock->cancel_task.type = URING_TASK_CANCEL;
1738 
1739 	/* switched from another polling group due to scheduling */
1740 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1741 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1742 		assert(sock->pending_recv == false);
1743 		sock->pending_recv = true;
1744 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1745 	}
1746 
1747 	if (sock->placement_id != -1) {
1748 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1749 		if (rc != 0) {
1750 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1751 			/* Do not treat this as an error. The system will continue running. */
1752 		}
1753 	}
1754 
1755 	/* We get an async read going immediately */
1756 	_sock_prep_read(&sock->base);
1757 #ifdef SPDK_ZEROCOPY
1758 	if (sock->zcopy) {
1759 		_sock_prep_errqueue(_sock);
1760 	}
1761 #endif
1762 
1763 	return 0;
1764 }
1765 
1766 static void
1767 uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group)
1768 {
1769 	struct spdk_uring_buf_tracker *tracker;
1770 	int count, mask;
1771 
1772 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
1773 		/* If recv_pipe is enabled, we do not post buffers. */
1774 		return;
1775 	}
1776 
1777 	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
1778 	tracker = STAILQ_FIRST(&group->free_trackers);
1779 	count = 0;
1780 	mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE);
1781 	while (tracker != NULL) {
1782 		tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx);
1783 		if (tracker->buflen == 0) {
1784 			break;
1785 		}
1786 
1787 		assert(tracker->buf != NULL);
1788 		STAILQ_REMOVE_HEAD(&group->free_trackers, link);
1789 		assert(STAILQ_FIRST(&group->free_trackers) != tracker);
1790 
1791 		io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count);
1792 		count++;
1793 		tracker = STAILQ_FIRST(&group->free_trackers);
1794 	}
1795 
1796 	if (count > 0) {
1797 		group->buf_ring_count += count;
1798 		io_uring_buf_ring_advance(group->buf_ring, count);
1799 	}
1800 }
1801 
1802 static int
1803 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1804 			   struct spdk_sock **socks)
1805 {
1806 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1807 	int count, ret;
1808 	int to_complete, to_submit;
1809 	struct spdk_sock *_sock, *tmp;
1810 	struct spdk_uring_sock *sock;
1811 
1812 	if (spdk_likely(socks)) {
1813 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1814 			sock = __uring_sock(_sock);
1815 			if (spdk_unlikely(sock->connection_status)) {
1816 				continue;
1817 			}
1818 			_sock_flush(_sock);
1819 		}
1820 	}
1821 
1822 	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
1823 	uring_sock_group_populate_buf_ring(group);
1824 
1825 	to_submit = group->io_queued;
1826 
1827 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1828 	if (to_submit > 0) {
1829 		/* If there are I/O to submit, use io_uring_submit here.
1830 		 * It will automatically call io_uring_enter appropriately. */
1831 		ret = io_uring_submit(&group->uring);
1832 		if (ret < 0) {
1833 			return 1;
1834 		}
1835 		group->io_queued = 0;
1836 		group->io_inflight += to_submit;
1837 		group->io_avail -= to_submit;
1838 	}
1839 
1840 	count = 0;
1841 	to_complete = group->io_inflight;
1842 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1843 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1844 	}
1845 
1846 	return count;
1847 }
1848 
1849 static int
1850 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1851 				  struct spdk_sock *_sock)
1852 {
1853 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1854 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1855 
1856 	sock->pending_group_remove = true;
1857 
1858 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1859 		_sock_prep_cancel_task(_sock, &sock->write_task);
1860 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1861 		 * currently can use a while loop here. */
1862 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1863 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1864 			uring_sock_group_impl_poll(_group, 32, NULL);
1865 		}
1866 	}
1867 
1868 	if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1869 		_sock_prep_cancel_task(_sock, &sock->read_task);
1870 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1871 		 * currently can use a while loop here. */
1872 		while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1873 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1874 			uring_sock_group_impl_poll(_group, 32, NULL);
1875 		}
1876 	}
1877 
1878 	if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1879 		_sock_prep_cancel_task(_sock, &sock->errqueue_task);
1880 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1881 		 * currently can use a while loop here. */
1882 		while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1883 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1884 			uring_sock_group_impl_poll(_group, 32, NULL);
1885 		}
1886 	}
1887 
1888 	/* Make sure the cancelling the tasks above didn't cause sending new requests */
1889 	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1890 	assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1891 	assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1892 
1893 	if (sock->pending_recv) {
1894 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1895 		sock->pending_recv = false;
1896 	}
1897 	assert(sock->pending_recv == false);
1898 
1899 	/* We have no way to handle this case. We could let the user read this
1900 	 * buffer, but the buffer came from a group and we have lost the association
1901 	 * to that so we couldn't release it. */
1902 	assert(STAILQ_EMPTY(&sock->recv_stream));
1903 
1904 	if (sock->placement_id != -1) {
1905 		spdk_sock_map_release(&g_map, sock->placement_id);
1906 	}
1907 
1908 	sock->pending_group_remove = false;
1909 	sock->group = NULL;
1910 	return 0;
1911 }
1912 
1913 static int
1914 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1915 {
1916 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1917 
1918 	/* try to reap all the active I/O */
1919 	while (group->io_inflight) {
1920 		uring_sock_group_impl_poll(_group, 32, NULL);
1921 	}
1922 	assert(group->io_inflight == 0);
1923 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1924 
1925 	uring_sock_group_impl_buf_pool_free(group);
1926 
1927 	io_uring_queue_exit(&group->uring);
1928 
1929 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1930 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1931 	}
1932 
1933 	free(group);
1934 	return 0;
1935 }
1936 
1937 static int
1938 uring_sock_flush(struct spdk_sock *_sock)
1939 {
1940 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1941 	struct msghdr msg = {};
1942 	struct iovec iovs[IOV_BATCH_SIZE];
1943 	int iovcnt;
1944 	ssize_t rc;
1945 	int flags = sock->zcopy_send_flags;
1946 	int retval;
1947 	bool is_zcopy = false;
1948 	struct spdk_uring_task *task = &sock->errqueue_task;
1949 
1950 	/* Can't flush from within a callback or we end up with recursive calls */
1951 	if (_sock->cb_cnt > 0) {
1952 		errno = EAGAIN;
1953 		return -1;
1954 	}
1955 
1956 	/* Can't flush while a write is already outstanding */
1957 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1958 		errno = EAGAIN;
1959 		return -1;
1960 	}
1961 
1962 	/* Gather an iov */
1963 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
1964 	if (iovcnt == 0) {
1965 		/* Nothing to send */
1966 		return 0;
1967 	}
1968 
1969 	/* Perform the vectored write */
1970 	msg.msg_iov = iovs;
1971 	msg.msg_iovlen = iovcnt;
1972 	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
1973 	if (rc <= 0) {
1974 		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) {
1975 			errno = EAGAIN;
1976 		}
1977 		return -1;
1978 	}
1979 
1980 #ifdef SPDK_ZEROCOPY
1981 	is_zcopy = flags & MSG_ZEROCOPY;
1982 #endif
1983 	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
1984 	if (retval < 0) {
1985 		/* if the socket is closed, return to avoid heap-use-after-free error */
1986 		errno = ENOTCONN;
1987 		return -1;
1988 	}
1989 
1990 #ifdef SPDK_ZEROCOPY
1991 	/* At least do once to check zero copy case */
1992 	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
1993 		retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE);
1994 		if (retval < 0) {
1995 			if (errno == EWOULDBLOCK || errno == EAGAIN) {
1996 				return rc;
1997 			}
1998 		}
1999 		_sock_check_zcopy(_sock, retval);;
2000 	}
2001 #endif
2002 
2003 	return rc;
2004 }
2005 
2006 static int
2007 uring_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events,
2008 		spdk_interrupt_fn fn, void *arg, const char *name)
2009 {
2010 	SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
2011 
2012 	return -ENOTSUP;
2013 }
2014 
2015 static void
2016 uring_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group)
2017 {
2018 }
2019 
2020 static struct spdk_net_impl g_uring_net_impl = {
2021 	.name		= "uring",
2022 	.getaddr	= uring_sock_getaddr,
2023 	.get_interface_name = uring_sock_get_interface_name,
2024 	.get_numa_socket_id = uring_sock_get_numa_socket_id,
2025 	.connect	= uring_sock_connect,
2026 	.listen		= uring_sock_listen,
2027 	.accept		= uring_sock_accept,
2028 	.close		= uring_sock_close,
2029 	.recv		= uring_sock_recv,
2030 	.readv		= uring_sock_readv,
2031 	.writev		= uring_sock_writev,
2032 	.recv_next	= uring_sock_recv_next,
2033 	.writev_async	= uring_sock_writev_async,
2034 	.flush          = uring_sock_flush,
2035 	.set_recvlowat	= uring_sock_set_recvlowat,
2036 	.set_recvbuf	= uring_sock_set_recvbuf,
2037 	.set_sendbuf	= uring_sock_set_sendbuf,
2038 	.is_ipv6	= uring_sock_is_ipv6,
2039 	.is_ipv4	= uring_sock_is_ipv4,
2040 	.is_connected   = uring_sock_is_connected,
2041 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
2042 	.group_impl_create	= uring_sock_group_impl_create,
2043 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
2044 	.group_impl_remove_sock	= uring_sock_group_impl_remove_sock,
2045 	.group_impl_poll	= uring_sock_group_impl_poll,
2046 	.group_impl_register_interrupt    = uring_sock_group_impl_register_interrupt,
2047 	.group_impl_unregister_interrupt  = uring_sock_group_impl_unregister_interrupt,
2048 	.group_impl_close	= uring_sock_group_impl_close,
2049 	.get_opts		= uring_sock_impl_get_opts,
2050 	.set_opts		= uring_sock_impl_set_opts,
2051 };
2052 
2053 __attribute__((constructor)) static void
2054 net_impl_register_uring(void)
2055 {
2056 	struct spdk_sock_group_impl *impl;
2057 
2058 	/* Check if we can create a uring sock group before we register
2059 	 * it as a valid impl. */
2060 	impl = uring_sock_group_impl_create();
2061 	if (impl) {
2062 		uring_sock_group_impl_close(impl);
2063 		spdk_net_impl_register(&g_uring_net_impl);
2064 	}
2065 }
2066