xref: /spdk/module/sock/uring/uring.c (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <sys/epoll.h>
38 #include <liburing.h>
39 
40 #include "spdk/barrier.h"
41 #include "spdk/log.h"
42 #include "spdk/pipe.h"
43 #include "spdk/sock.h"
44 #include "spdk/string.h"
45 #include "spdk/util.h"
46 
47 #include "spdk_internal/sock.h"
48 #include "spdk_internal/assert.h"
49 
50 #define MAX_TMPBUF 1024
51 #define PORTNUMLEN 32
52 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
53 
54 enum spdk_sock_task_type {
55 	SPDK_SOCK_TASK_POLLIN = 0,
56 	SPDK_SOCK_TASK_WRITE,
57 	SPDK_SOCK_TASK_CANCEL,
58 };
59 
60 enum spdk_uring_sock_task_status {
61 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
62 	SPDK_URING_SOCK_TASK_IN_PROCESS,
63 };
64 
65 struct spdk_uring_task {
66 	enum spdk_uring_sock_task_status	status;
67 	enum spdk_sock_task_type		type;
68 	struct spdk_uring_sock			*sock;
69 	struct msghdr				msg;
70 	struct iovec				iovs[IOV_BATCH_SIZE];
71 	int					iov_cnt;
72 	struct spdk_sock_request		*last_req;
73 	STAILQ_ENTRY(spdk_uring_task)		link;
74 };
75 
76 struct spdk_uring_sock {
77 	struct spdk_sock			base;
78 	int					fd;
79 	struct spdk_uring_sock_group_impl	*group;
80 	struct spdk_uring_task			write_task;
81 	struct spdk_uring_task			pollin_task;
82 	struct spdk_uring_task			cancel_task;
83 	struct spdk_pipe			*recv_pipe;
84 	void					*recv_buf;
85 	int					recv_buf_sz;
86 	bool					pending_recv;
87 	int					connection_status;
88 	TAILQ_ENTRY(spdk_uring_sock)		link;
89 };
90 
91 struct spdk_uring_sock_group_impl {
92 	struct spdk_sock_group_impl		base;
93 	struct io_uring				uring;
94 	uint32_t				io_inflight;
95 	uint32_t				io_queued;
96 	uint32_t				io_avail;
97 	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
98 };
99 
100 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
101 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
102 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
103 	.enable_recv_pipe = true,
104 	.enable_quickack = false,
105 	.enable_placement_id = false,
106 };
107 
108 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
109 
110 static int
111 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
112 {
113 	const char *result = NULL;
114 
115 	if (sa == NULL || host == NULL) {
116 		return -1;
117 	}
118 
119 	switch (sa->sa_family) {
120 	case AF_INET:
121 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
122 				   host, hlen);
123 		break;
124 	case AF_INET6:
125 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
126 				   host, hlen);
127 		break;
128 	default:
129 		break;
130 	}
131 
132 	if (result != NULL) {
133 		return 0;
134 	} else {
135 		return -1;
136 	}
137 }
138 
139 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
140 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
141 
142 static int
143 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
144 		   char *caddr, int clen, uint16_t *cport)
145 {
146 	struct spdk_uring_sock *sock = __uring_sock(_sock);
147 	struct sockaddr_storage sa;
148 	socklen_t salen;
149 	int rc;
150 
151 	assert(sock != NULL);
152 
153 	memset(&sa, 0, sizeof sa);
154 	salen = sizeof sa;
155 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
156 	if (rc != 0) {
157 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
158 		return -1;
159 	}
160 
161 	switch (sa.ss_family) {
162 	case AF_UNIX:
163 		/* Acceptable connection types that don't have IPs */
164 		return 0;
165 	case AF_INET:
166 	case AF_INET6:
167 		/* Code below will get IP addresses */
168 		break;
169 	default:
170 		/* Unsupported socket family */
171 		return -1;
172 	}
173 
174 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
175 	if (rc != 0) {
176 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
177 		return -1;
178 	}
179 
180 	if (sport) {
181 		if (sa.ss_family == AF_INET) {
182 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
183 		} else if (sa.ss_family == AF_INET6) {
184 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
185 		}
186 	}
187 
188 	memset(&sa, 0, sizeof sa);
189 	salen = sizeof sa;
190 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
191 	if (rc != 0) {
192 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
193 		return -1;
194 	}
195 
196 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
197 	if (rc != 0) {
198 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
199 		return -1;
200 	}
201 
202 	if (cport) {
203 		if (sa.ss_family == AF_INET) {
204 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
205 		} else if (sa.ss_family == AF_INET6) {
206 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
207 		}
208 	}
209 
210 	return 0;
211 }
212 
213 enum uring_sock_create_type {
214 	SPDK_SOCK_CREATE_LISTEN,
215 	SPDK_SOCK_CREATE_CONNECT,
216 };
217 
218 static int
219 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
220 {
221 	uint8_t *new_buf;
222 	struct spdk_pipe *new_pipe;
223 	struct iovec siov[2];
224 	struct iovec diov[2];
225 	int sbytes;
226 	ssize_t bytes;
227 
228 	if (sock->recv_buf_sz == sz) {
229 		return 0;
230 	}
231 
232 	/* If the new size is 0, just free the pipe */
233 	if (sz == 0) {
234 		spdk_pipe_destroy(sock->recv_pipe);
235 		free(sock->recv_buf);
236 		sock->recv_pipe = NULL;
237 		sock->recv_buf = NULL;
238 		return 0;
239 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
240 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
241 		return -1;
242 	}
243 
244 	/* Round up to next 64 byte multiple */
245 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
246 	if (!new_buf) {
247 		SPDK_ERRLOG("socket recv buf allocation failed\n");
248 		return -ENOMEM;
249 	}
250 
251 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
252 	if (new_pipe == NULL) {
253 		SPDK_ERRLOG("socket pipe allocation failed\n");
254 		free(new_buf);
255 		return -ENOMEM;
256 	}
257 
258 	if (sock->recv_pipe != NULL) {
259 		/* Pull all of the data out of the old pipe */
260 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
261 		if (sbytes > sz) {
262 			/* Too much data to fit into the new pipe size */
263 			spdk_pipe_destroy(new_pipe);
264 			free(new_buf);
265 			return -EINVAL;
266 		}
267 
268 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
269 		assert(sbytes == sz);
270 
271 		bytes = spdk_iovcpy(siov, 2, diov, 2);
272 		spdk_pipe_writer_advance(new_pipe, bytes);
273 
274 		spdk_pipe_destroy(sock->recv_pipe);
275 		free(sock->recv_buf);
276 	}
277 
278 	sock->recv_buf_sz = sz;
279 	sock->recv_buf = new_buf;
280 	sock->recv_pipe = new_pipe;
281 
282 	return 0;
283 }
284 
285 static int
286 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
287 {
288 	struct spdk_uring_sock *sock = __uring_sock(_sock);
289 	int rc;
290 
291 	assert(sock != NULL);
292 
293 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
294 		rc = uring_sock_alloc_pipe(sock, sz);
295 		if (rc) {
296 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
297 			return rc;
298 		}
299 	}
300 
301 	if (sz < MIN_SO_RCVBUF_SIZE) {
302 		sz = MIN_SO_RCVBUF_SIZE;
303 	}
304 
305 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
306 	if (rc < 0) {
307 		return rc;
308 	}
309 
310 	return 0;
311 }
312 
313 static int
314 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
315 {
316 	struct spdk_uring_sock *sock = __uring_sock(_sock);
317 	int rc;
318 
319 	assert(sock != NULL);
320 
321 	if (sz < MIN_SO_SNDBUF_SIZE) {
322 		sz = MIN_SO_SNDBUF_SIZE;
323 	}
324 
325 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
326 	if (rc < 0) {
327 		return rc;
328 	}
329 
330 	return 0;
331 }
332 
333 static struct spdk_uring_sock *
334 uring_sock_alloc(int fd)
335 {
336 	struct spdk_uring_sock *sock;
337 #if defined(__linux__)
338 	int flag;
339 	int rc;
340 #endif
341 
342 	sock = calloc(1, sizeof(*sock));
343 	if (sock == NULL) {
344 		SPDK_ERRLOG("sock allocation failed\n");
345 		return NULL;
346 	}
347 
348 	sock->fd = fd;
349 
350 #if defined(__linux__)
351 	flag = 1;
352 
353 	if (g_spdk_uring_sock_impl_opts.enable_quickack) {
354 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
355 		if (rc != 0) {
356 			SPDK_ERRLOG("quickack was failed to set\n");
357 		}
358 	}
359 #endif
360 	return sock;
361 }
362 
363 static struct spdk_sock *
364 uring_sock_create(const char *ip, int port,
365 		  enum uring_sock_create_type type,
366 		  struct spdk_sock_opts *opts)
367 {
368 	struct spdk_uring_sock *sock;
369 	char buf[MAX_TMPBUF];
370 	char portnum[PORTNUMLEN];
371 	char *p;
372 	struct addrinfo hints, *res, *res0;
373 	int fd, flag;
374 	int val = 1;
375 	int rc;
376 
377 	if (ip == NULL) {
378 		return NULL;
379 	}
380 	if (ip[0] == '[') {
381 		snprintf(buf, sizeof(buf), "%s", ip + 1);
382 		p = strchr(buf, ']');
383 		if (p != NULL) {
384 			*p = '\0';
385 		}
386 		ip = (const char *) &buf[0];
387 	}
388 
389 	snprintf(portnum, sizeof portnum, "%d", port);
390 	memset(&hints, 0, sizeof hints);
391 	hints.ai_family = PF_UNSPEC;
392 	hints.ai_socktype = SOCK_STREAM;
393 	hints.ai_flags = AI_NUMERICSERV;
394 	hints.ai_flags |= AI_PASSIVE;
395 	hints.ai_flags |= AI_NUMERICHOST;
396 	rc = getaddrinfo(ip, portnum, &hints, &res0);
397 	if (rc != 0) {
398 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
399 		return NULL;
400 	}
401 
402 	/* try listen */
403 	fd = -1;
404 	for (res = res0; res != NULL; res = res->ai_next) {
405 retry:
406 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
407 		if (fd < 0) {
408 			/* error */
409 			continue;
410 		}
411 
412 		val = g_spdk_uring_sock_impl_opts.recv_buf_size;
413 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
414 		if (rc) {
415 			/* Not fatal */
416 		}
417 
418 		val = g_spdk_uring_sock_impl_opts.send_buf_size;
419 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
420 		if (rc) {
421 			/* Not fatal */
422 		}
423 
424 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
425 		if (rc != 0) {
426 			close(fd);
427 			/* error */
428 			continue;
429 		}
430 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
431 		if (rc != 0) {
432 			close(fd);
433 			/* error */
434 			continue;
435 		}
436 
437 #if defined(SO_PRIORITY)
438 		if (opts != NULL && opts->priority) {
439 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
440 			if (rc != 0) {
441 				close(fd);
442 				/* error */
443 				continue;
444 			}
445 		}
446 #endif
447 		if (res->ai_family == AF_INET6) {
448 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
449 			if (rc != 0) {
450 				close(fd);
451 				/* error */
452 				continue;
453 			}
454 		}
455 
456 		if (type == SPDK_SOCK_CREATE_LISTEN) {
457 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
458 			if (rc != 0) {
459 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
460 				switch (errno) {
461 				case EINTR:
462 					/* interrupted? */
463 					close(fd);
464 					goto retry;
465 				case EADDRNOTAVAIL:
466 					SPDK_ERRLOG("IP address %s not available. "
467 						    "Verify IP address in config file "
468 						    "and make sure setup script is "
469 						    "run before starting spdk app.\n", ip);
470 				/* FALLTHROUGH */
471 				default:
472 					/* try next family */
473 					close(fd);
474 					fd = -1;
475 					continue;
476 				}
477 			}
478 			/* bind OK */
479 			rc = listen(fd, 512);
480 			if (rc != 0) {
481 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
482 				close(fd);
483 				fd = -1;
484 				break;
485 			}
486 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
487 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
488 			if (rc != 0) {
489 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
490 				/* try next family */
491 				close(fd);
492 				fd = -1;
493 				continue;
494 			}
495 		}
496 
497 		flag = fcntl(fd, F_GETFL);
498 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
499 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
500 			close(fd);
501 			fd = -1;
502 			break;
503 		}
504 		break;
505 	}
506 	freeaddrinfo(res0);
507 
508 	if (fd < 0) {
509 		return NULL;
510 	}
511 
512 	sock = uring_sock_alloc(fd);
513 	if (sock == NULL) {
514 		SPDK_ERRLOG("sock allocation failed\n");
515 		close(fd);
516 		return NULL;
517 	}
518 
519 	return &sock->base;
520 }
521 
522 static struct spdk_sock *
523 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
524 {
525 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
526 }
527 
528 static struct spdk_sock *
529 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
530 {
531 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
532 }
533 
534 static struct spdk_sock *
535 uring_sock_accept(struct spdk_sock *_sock)
536 {
537 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
538 	struct sockaddr_storage		sa;
539 	socklen_t			salen;
540 	int				rc, fd;
541 	struct spdk_uring_sock		*new_sock;
542 	int				flag;
543 
544 	memset(&sa, 0, sizeof(sa));
545 	salen = sizeof(sa);
546 
547 	assert(sock != NULL);
548 
549 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
550 
551 	if (rc == -1) {
552 		return NULL;
553 	}
554 
555 	fd = rc;
556 
557 	flag = fcntl(fd, F_GETFL);
558 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
559 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
560 		close(fd);
561 		return NULL;
562 	}
563 
564 #if defined(SO_PRIORITY)
565 	/* The priority is not inherited, so call this function again */
566 	if (sock->base.opts.priority) {
567 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
568 		if (rc != 0) {
569 			close(fd);
570 			return NULL;
571 		}
572 	}
573 #endif
574 
575 	new_sock = uring_sock_alloc(fd);
576 	if (new_sock == NULL) {
577 		close(fd);
578 		return NULL;
579 	}
580 
581 	return &new_sock->base;
582 }
583 
584 static int
585 uring_sock_close(struct spdk_sock *_sock)
586 {
587 	struct spdk_uring_sock *sock = __uring_sock(_sock);
588 	int rc;
589 
590 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
591 	assert(sock->group == NULL);
592 
593 	spdk_pipe_destroy(sock->recv_pipe);
594 	free(sock->recv_buf);
595 	rc = close(sock->fd);
596 	if (rc == 0) {
597 		free(sock);
598 	}
599 
600 	return rc;
601 }
602 
603 static ssize_t
604 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
605 {
606 	struct iovec siov[2];
607 	int sbytes;
608 	ssize_t bytes;
609 	struct spdk_uring_sock_group_impl *group;
610 
611 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
612 	if (sbytes < 0) {
613 		errno = EINVAL;
614 		return -1;
615 	} else if (sbytes == 0) {
616 		errno = EAGAIN;
617 		return -1;
618 	}
619 
620 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
621 
622 	if (bytes == 0) {
623 		/* The only way this happens is if diov is 0 length */
624 		errno = EINVAL;
625 		return -1;
626 	}
627 
628 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
629 
630 	/* If we drained the pipe, take it off the level-triggered list */
631 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
632 		group = __uring_group_impl(sock->base.group_impl);
633 		TAILQ_REMOVE(&group->pending_recv, sock, link);
634 		sock->pending_recv = false;
635 	}
636 
637 	return bytes;
638 }
639 
640 static inline ssize_t
641 uring_sock_read(struct spdk_uring_sock *sock)
642 {
643 	struct iovec iov[2];
644 	int bytes;
645 	struct spdk_uring_sock_group_impl *group;
646 
647 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
648 
649 	if (bytes > 0) {
650 		bytes = readv(sock->fd, iov, 2);
651 		if (bytes > 0) {
652 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
653 			if (sock->base.group_impl) {
654 				group = __uring_group_impl(sock->base.group_impl);
655 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
656 				sock->pending_recv = true;
657 			}
658 		}
659 	}
660 
661 	return bytes;
662 }
663 
664 static ssize_t
665 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
666 {
667 	struct spdk_uring_sock *sock = __uring_sock(_sock);
668 	int rc, i;
669 	size_t len;
670 
671 	if (sock->recv_pipe == NULL) {
672 		return readv(sock->fd, iov, iovcnt);
673 	}
674 
675 	len = 0;
676 	for (i = 0; i < iovcnt; i++) {
677 		len += iov[i].iov_len;
678 	}
679 
680 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
681 		/* If the user is receiving a sufficiently large amount of data,
682 		 * receive directly to their buffers. */
683 		if (len >= MIN_SOCK_PIPE_SIZE) {
684 			return readv(sock->fd, iov, iovcnt);
685 		}
686 
687 		/* Otherwise, do a big read into our pipe */
688 		rc = uring_sock_read(sock);
689 		if (rc <= 0) {
690 			return rc;
691 		}
692 	}
693 
694 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
695 }
696 
697 static ssize_t
698 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
699 {
700 	struct iovec iov[1];
701 
702 	iov[0].iov_base = buf;
703 	iov[0].iov_len = len;
704 
705 	return uring_sock_readv(sock, iov, 1);
706 }
707 
708 static ssize_t
709 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
710 {
711 	struct spdk_uring_sock *sock = __uring_sock(_sock);
712 
713 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
714 		errno = EAGAIN;
715 		return -1;
716 	}
717 
718 	return writev(sock->fd, iov, iovcnt);
719 }
720 
721 static int
722 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
723 {
724 	struct spdk_sock_request *req;
725 	int i, retval;
726 	unsigned int offset;
727 	size_t len;
728 
729 	/* Consume the requests that were actually written */
730 	req = TAILQ_FIRST(&_sock->queued_reqs);
731 	while (req) {
732 		offset = req->internal.offset;
733 
734 		for (i = 0; i < req->iovcnt; i++) {
735 			/* Advance by the offset first */
736 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
737 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
738 				continue;
739 			}
740 
741 			/* Calculate the remaining length of this element */
742 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
743 
744 			if (len > (size_t)rc) {
745 				/* This element was partially sent. */
746 				req->internal.offset += rc;
747 				return 0;
748 			}
749 
750 			offset = 0;
751 			req->internal.offset += len;
752 			rc -= len;
753 		}
754 
755 		/* Handled a full request. */
756 		spdk_sock_request_pend(_sock, req);
757 
758 		retval = spdk_sock_request_put(_sock, req, 0);
759 		if (retval) {
760 			return retval;
761 		}
762 
763 		if (rc == 0) {
764 			break;
765 		}
766 
767 		req = TAILQ_FIRST(&_sock->queued_reqs);
768 	}
769 
770 	return 0;
771 }
772 
773 static void
774 _sock_flush(struct spdk_sock *_sock)
775 {
776 	struct spdk_uring_sock *sock = __uring_sock(_sock);
777 	struct spdk_uring_task *task = &sock->write_task;
778 	uint32_t iovcnt;
779 	struct io_uring_sqe *sqe;
780 
781 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
782 		return;
783 	}
784 
785 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
786 	if (!iovcnt) {
787 		return;
788 	}
789 
790 	task->iov_cnt = iovcnt;
791 	assert(sock->group != NULL);
792 	task->msg.msg_iov = task->iovs;
793 	task->msg.msg_iovlen = task->iov_cnt;
794 
795 	sock->group->io_queued++;
796 
797 	sqe = io_uring_get_sqe(&sock->group->uring);
798 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
799 	io_uring_sqe_set_data(sqe, task);
800 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
801 }
802 
803 static void
804 _sock_prep_pollin(struct spdk_sock *_sock)
805 {
806 	struct spdk_uring_sock *sock = __uring_sock(_sock);
807 	struct spdk_uring_task *task = &sock->pollin_task;
808 	struct io_uring_sqe *sqe;
809 
810 	/* Do not prepare pollin event */
811 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
812 		return;
813 	}
814 
815 	assert(sock->group != NULL);
816 	sock->group->io_queued++;
817 
818 	sqe = io_uring_get_sqe(&sock->group->uring);
819 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
820 	io_uring_sqe_set_data(sqe, task);
821 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
822 }
823 
824 static void
825 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
826 {
827 	struct spdk_uring_sock *sock = __uring_sock(_sock);
828 	struct spdk_uring_task *task = &sock->cancel_task;
829 	struct io_uring_sqe *sqe;
830 
831 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
832 		return;
833 	}
834 
835 	assert(sock->group != NULL);
836 	sock->group->io_queued++;
837 
838 	sqe = io_uring_get_sqe(&sock->group->uring);
839 	io_uring_prep_cancel(sqe, user_data, 0);
840 	io_uring_sqe_set_data(sqe, task);
841 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
842 }
843 
844 static int
845 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
846 		      struct spdk_sock **socks)
847 {
848 	int i, count, ret;
849 	struct io_uring_cqe *cqe;
850 	struct spdk_uring_sock *sock, *tmp;
851 	struct spdk_uring_task *task;
852 	int status;
853 
854 	for (i = 0; i < max; i++) {
855 		ret = io_uring_peek_cqe(&group->uring, &cqe);
856 		if (ret != 0) {
857 			break;
858 		}
859 
860 		if (cqe == NULL) {
861 			break;
862 		}
863 
864 		task = (struct spdk_uring_task *)cqe->user_data;
865 		assert(task != NULL);
866 		sock = task->sock;
867 		assert(sock != NULL);
868 		assert(sock->group != NULL);
869 		assert(sock->group == group);
870 		sock->group->io_inflight--;
871 		sock->group->io_avail++;
872 		status = cqe->res;
873 		io_uring_cqe_seen(&group->uring, cqe);
874 
875 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
876 
877 		if (spdk_unlikely(status <= 0)) {
878 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
879 				continue;
880 			}
881 		}
882 
883 		switch (task->type) {
884 		case SPDK_SOCK_TASK_POLLIN:
885 			if ((status & POLLIN) == POLLIN) {
886 				if (sock->base.cb_fn != NULL) {
887 					assert(sock->pending_recv == false);
888 					sock->pending_recv = true;
889 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
890 				}
891 			}
892 			break;
893 		case SPDK_SOCK_TASK_WRITE:
894 			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
895 			task->last_req = NULL;
896 			task->iov_cnt = 0;
897 			if (spdk_unlikely(status) < 0) {
898 				sock->connection_status = status;
899 				spdk_sock_abort_requests(&sock->base);
900 			} else {
901 				sock_complete_reqs(&sock->base, status);
902 			}
903 
904 			break;
905 		case SPDK_SOCK_TASK_CANCEL:
906 			/* Do nothing */
907 			break;
908 		default:
909 			SPDK_UNREACHABLE();
910 		}
911 	}
912 
913 	if (!socks) {
914 		return 0;
915 	}
916 	count = 0;
917 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
918 		if (count == max_read_events) {
919 			break;
920 		}
921 
922 		socks[count++] = &sock->base;
923 	}
924 
925 	/* Cycle the pending_recv list so that each time we poll things aren't
926 	 * in the same order. */
927 	for (i = 0; i < count; i++) {
928 		sock = __uring_sock(socks[i]);
929 
930 		TAILQ_REMOVE(&group->pending_recv, sock, link);
931 
932 		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
933 			sock->pending_recv = false;
934 		} else {
935 			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
936 		}
937 	}
938 
939 	return count;
940 }
941 
942 static int
943 _sock_flush_client(struct spdk_sock *_sock)
944 {
945 	struct spdk_uring_sock *sock = __uring_sock(_sock);
946 	struct msghdr msg = {};
947 	struct iovec iovs[IOV_BATCH_SIZE];
948 	int iovcnt;
949 	ssize_t rc;
950 
951 	/* Can't flush from within a callback or we end up with recursive calls */
952 	if (_sock->cb_cnt > 0) {
953 		return 0;
954 	}
955 
956 	/* Gather an iov */
957 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL);
958 	if (iovcnt == 0) {
959 		return 0;
960 	}
961 
962 	/* Perform the vectored write */
963 	msg.msg_iov = iovs;
964 	msg.msg_iovlen = iovcnt;
965 	rc = sendmsg(sock->fd, &msg, 0);
966 	if (rc <= 0) {
967 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
968 			return 0;
969 		}
970 		return rc;
971 	}
972 
973 	sock_complete_reqs(_sock, rc);
974 
975 	return 0;
976 }
977 
978 static void
979 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
980 {
981 	struct spdk_uring_sock *sock = __uring_sock(_sock);
982 	int rc;
983 
984 	if (spdk_unlikely(sock->connection_status)) {
985 		req->cb_fn(req->cb_arg, sock->connection_status);
986 		return;
987 	}
988 
989 	spdk_sock_request_queue(_sock, req);
990 
991 	if (!sock->group) {
992 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
993 			rc = _sock_flush_client(_sock);
994 			if (rc) {
995 				spdk_sock_abort_requests(_sock);
996 			}
997 		}
998 	}
999 }
1000 
1001 static int
1002 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1003 {
1004 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1005 	int val;
1006 	int rc;
1007 
1008 	assert(sock != NULL);
1009 
1010 	val = nbytes;
1011 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1012 	if (rc != 0) {
1013 		return -1;
1014 	}
1015 	return 0;
1016 }
1017 
1018 static bool
1019 uring_sock_is_ipv6(struct spdk_sock *_sock)
1020 {
1021 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1022 	struct sockaddr_storage sa;
1023 	socklen_t salen;
1024 	int rc;
1025 
1026 	assert(sock != NULL);
1027 
1028 	memset(&sa, 0, sizeof sa);
1029 	salen = sizeof sa;
1030 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1031 	if (rc != 0) {
1032 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1033 		return false;
1034 	}
1035 
1036 	return (sa.ss_family == AF_INET6);
1037 }
1038 
1039 static bool
1040 uring_sock_is_ipv4(struct spdk_sock *_sock)
1041 {
1042 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1043 	struct sockaddr_storage sa;
1044 	socklen_t salen;
1045 	int rc;
1046 
1047 	assert(sock != NULL);
1048 
1049 	memset(&sa, 0, sizeof sa);
1050 	salen = sizeof sa;
1051 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1052 	if (rc != 0) {
1053 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1054 		return false;
1055 	}
1056 
1057 	return (sa.ss_family == AF_INET);
1058 }
1059 
1060 static bool
1061 uring_sock_is_connected(struct spdk_sock *_sock)
1062 {
1063 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1064 	uint8_t byte;
1065 	int rc;
1066 
1067 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1068 	if (rc == 0) {
1069 		return false;
1070 	}
1071 
1072 	if (rc < 0) {
1073 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1074 			return true;
1075 		}
1076 
1077 		return false;
1078 	}
1079 
1080 	return true;
1081 }
1082 
1083 static int
1084 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1085 {
1086 	int rc = -1;
1087 
1088 	if (!g_spdk_uring_sock_impl_opts.enable_placement_id) {
1089 		return rc;
1090 	}
1091 
1092 #if defined(SO_INCOMING_NAPI_ID)
1093 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1094 	socklen_t salen = sizeof(int);
1095 
1096 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1097 	if (rc != 0) {
1098 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1099 	}
1100 
1101 #endif
1102 	return rc;
1103 }
1104 
1105 static struct spdk_sock_group_impl *
1106 uring_sock_group_impl_create(void)
1107 {
1108 	struct spdk_uring_sock_group_impl *group_impl;
1109 
1110 	group_impl = calloc(1, sizeof(*group_impl));
1111 	if (group_impl == NULL) {
1112 		SPDK_ERRLOG("group_impl allocation failed\n");
1113 		return NULL;
1114 	}
1115 
1116 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1117 
1118 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1119 		SPDK_ERRLOG("uring I/O context setup failure\n");
1120 		free(group_impl);
1121 		return NULL;
1122 	}
1123 
1124 	TAILQ_INIT(&group_impl->pending_recv);
1125 
1126 	return &group_impl->base;
1127 }
1128 
1129 static int
1130 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1131 			       struct spdk_sock *_sock)
1132 {
1133 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1134 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1135 
1136 	sock->group = group;
1137 	sock->write_task.sock = sock;
1138 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1139 
1140 	sock->pollin_task.sock = sock;
1141 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1142 
1143 	sock->cancel_task.sock = sock;
1144 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1145 
1146 	/* switched from another polling group due to scheduling */
1147 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1148 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1149 		assert(sock->pending_recv == false);
1150 		sock->pending_recv = true;
1151 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1152 	}
1153 
1154 	return 0;
1155 }
1156 
1157 static int
1158 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1159 			   struct spdk_sock **socks)
1160 {
1161 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1162 	int count, ret;
1163 	int to_complete, to_submit;
1164 	struct spdk_sock *_sock, *tmp;
1165 	struct spdk_uring_sock *sock;
1166 
1167 	if (spdk_likely(socks)) {
1168 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1169 			sock = __uring_sock(_sock);
1170 			if (spdk_unlikely(sock->connection_status)) {
1171 				continue;
1172 			}
1173 			_sock_flush(_sock);
1174 			_sock_prep_pollin(_sock);
1175 		}
1176 	}
1177 
1178 	to_submit = group->io_queued;
1179 
1180 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1181 	if (to_submit > 0) {
1182 		/* If there are I/O to submit, use io_uring_submit here.
1183 		 * It will automatically call io_uring_enter appropriately. */
1184 		ret = io_uring_submit(&group->uring);
1185 		if (ret < 0) {
1186 			return 1;
1187 		}
1188 		group->io_queued = 0;
1189 		group->io_inflight += to_submit;
1190 		group->io_avail -= to_submit;
1191 	}
1192 
1193 	count = 0;
1194 	to_complete = group->io_inflight;
1195 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1196 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1197 	}
1198 
1199 	return count;
1200 }
1201 
1202 static int
1203 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1204 				  struct spdk_sock *_sock)
1205 {
1206 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1207 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1208 
1209 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1210 		_sock_prep_cancel_task(_sock, &sock->write_task);
1211 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1212 		 * currently can use a while loop here. */
1213 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1214 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1215 			uring_sock_group_impl_poll(_group, 32, NULL);
1216 		}
1217 	}
1218 
1219 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1220 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1221 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1222 		 * currently can use a while loop here. */
1223 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1224 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1225 			uring_sock_group_impl_poll(_group, 32, NULL);
1226 		}
1227 	}
1228 
1229 	if (sock->pending_recv) {
1230 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1231 		sock->pending_recv = false;
1232 	}
1233 	assert(sock->pending_recv == false);
1234 
1235 	sock->group = NULL;
1236 	return 0;
1237 }
1238 
1239 static int
1240 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1241 {
1242 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1243 
1244 	/* try to reap all the active I/O */
1245 	while (group->io_inflight) {
1246 		uring_sock_group_impl_poll(_group, 32, NULL);
1247 	}
1248 	assert(group->io_inflight == 0);
1249 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1250 
1251 	io_uring_queue_exit(&group->uring);
1252 
1253 	free(group);
1254 	return 0;
1255 }
1256 
1257 static int
1258 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1259 {
1260 	if (!opts || !len) {
1261 		errno = EINVAL;
1262 		return -1;
1263 	}
1264 	memset(opts, 0, *len);
1265 
1266 #define FIELD_OK(field) \
1267 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1268 
1269 #define GET_FIELD(field) \
1270 	if (FIELD_OK(field)) { \
1271 		opts->field = g_spdk_uring_sock_impl_opts.field; \
1272 	}
1273 
1274 	GET_FIELD(recv_buf_size);
1275 	GET_FIELD(send_buf_size);
1276 	GET_FIELD(enable_recv_pipe);
1277 	GET_FIELD(enable_quickack);
1278 	GET_FIELD(enable_placement_id);
1279 
1280 #undef GET_FIELD
1281 #undef FIELD_OK
1282 
1283 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
1284 	return 0;
1285 }
1286 
1287 static int
1288 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1289 {
1290 	if (!opts) {
1291 		errno = EINVAL;
1292 		return -1;
1293 	}
1294 
1295 #define FIELD_OK(field) \
1296 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1297 
1298 #define SET_FIELD(field) \
1299 	if (FIELD_OK(field)) { \
1300 		g_spdk_uring_sock_impl_opts.field = opts->field; \
1301 	}
1302 
1303 	SET_FIELD(recv_buf_size);
1304 	SET_FIELD(send_buf_size);
1305 	SET_FIELD(enable_recv_pipe);
1306 	SET_FIELD(enable_quickack);
1307 	SET_FIELD(enable_placement_id);
1308 
1309 #undef SET_FIELD
1310 #undef FIELD_OK
1311 
1312 	return 0;
1313 }
1314 
1315 static int
1316 uring_sock_flush(struct spdk_sock *_sock)
1317 {
1318 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1319 
1320 	if (!sock->group) {
1321 		return _sock_flush_client(_sock);
1322 	}
1323 
1324 	return 0;
1325 }
1326 
1327 static struct spdk_net_impl g_uring_net_impl = {
1328 	.name		= "uring",
1329 	.getaddr	= uring_sock_getaddr,
1330 	.connect	= uring_sock_connect,
1331 	.listen		= uring_sock_listen,
1332 	.accept		= uring_sock_accept,
1333 	.close		= uring_sock_close,
1334 	.recv		= uring_sock_recv,
1335 	.readv		= uring_sock_readv,
1336 	.writev		= uring_sock_writev,
1337 	.writev_async	= uring_sock_writev_async,
1338 	.flush          = uring_sock_flush,
1339 	.set_recvlowat	= uring_sock_set_recvlowat,
1340 	.set_recvbuf	= uring_sock_set_recvbuf,
1341 	.set_sendbuf	= uring_sock_set_sendbuf,
1342 	.is_ipv6	= uring_sock_is_ipv6,
1343 	.is_ipv4	= uring_sock_is_ipv4,
1344 	.is_connected   = uring_sock_is_connected,
1345 	.get_placement_id	= uring_sock_get_placement_id,
1346 	.group_impl_create	= uring_sock_group_impl_create,
1347 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1348 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1349 	.group_impl_poll	= uring_sock_group_impl_poll,
1350 	.group_impl_close	= uring_sock_group_impl_close,
1351 	.get_opts		= uring_sock_impl_get_opts,
1352 	.set_opts		= uring_sock_impl_set_opts,
1353 };
1354 
1355 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1356