xref: /spdk/module/sock/uring/uring.c (revision e1d06d9954b871531c9b376069d620d2c6cee854)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <sys/epoll.h>
38 #include <liburing.h>
39 
40 #include "spdk/barrier.h"
41 #include "spdk/env.h"
42 #include "spdk/log.h"
43 #include "spdk/pipe.h"
44 #include "spdk/sock.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/sock.h"
49 #include "spdk_internal/assert.h"
50 
51 #define MAX_TMPBUF 1024
52 #define PORTNUMLEN 32
53 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
54 
55 enum spdk_sock_task_type {
56 	SPDK_SOCK_TASK_POLLIN = 0,
57 	SPDK_SOCK_TASK_WRITE,
58 	SPDK_SOCK_TASK_CANCEL,
59 };
60 
61 enum spdk_uring_sock_task_status {
62 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
63 	SPDK_URING_SOCK_TASK_IN_PROCESS,
64 };
65 
66 struct spdk_uring_task {
67 	enum spdk_uring_sock_task_status	status;
68 	enum spdk_sock_task_type		type;
69 	struct spdk_uring_sock			*sock;
70 	struct msghdr				msg;
71 	struct iovec				iovs[IOV_BATCH_SIZE];
72 	int					iov_cnt;
73 	struct spdk_sock_request		*last_req;
74 	STAILQ_ENTRY(spdk_uring_task)		link;
75 };
76 
77 struct spdk_uring_sock {
78 	struct spdk_sock			base;
79 	int					fd;
80 	struct spdk_uring_sock_group_impl	*group;
81 	struct spdk_uring_task			write_task;
82 	struct spdk_uring_task			pollin_task;
83 	struct spdk_uring_task			cancel_task;
84 	struct spdk_pipe			*recv_pipe;
85 	void					*recv_buf;
86 	int					recv_buf_sz;
87 	bool					pending_recv;
88 	int					connection_status;
89 	int					placement_id;
90 	TAILQ_ENTRY(spdk_uring_sock)		link;
91 };
92 
93 struct spdk_uring_sock_group_impl {
94 	struct spdk_sock_group_impl		base;
95 	struct io_uring				uring;
96 	uint32_t				io_inflight;
97 	uint32_t				io_queued;
98 	uint32_t				io_avail;
99 	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
100 };
101 
102 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
103 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
104 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
105 	.enable_recv_pipe = true,
106 	.enable_quickack = false,
107 	.enable_placement_id = PLACEMENT_NONE,
108 };
109 
110 static struct spdk_sock_map g_map = {
111 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
112 	.mtx = PTHREAD_MUTEX_INITIALIZER
113 };
114 
115 __attribute((destructor)) static void
116 uring_sock_map_cleanup(void)
117 {
118 	spdk_sock_map_cleanup(&g_map);
119 }
120 
121 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
122 
123 static int
124 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
125 {
126 	const char *result = NULL;
127 
128 	if (sa == NULL || host == NULL) {
129 		return -1;
130 	}
131 
132 	switch (sa->sa_family) {
133 	case AF_INET:
134 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
135 				   host, hlen);
136 		break;
137 	case AF_INET6:
138 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
139 				   host, hlen);
140 		break;
141 	default:
142 		break;
143 	}
144 
145 	if (result != NULL) {
146 		return 0;
147 	} else {
148 		return -1;
149 	}
150 }
151 
152 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
153 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
154 
155 static int
156 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
157 		   char *caddr, int clen, uint16_t *cport)
158 {
159 	struct spdk_uring_sock *sock = __uring_sock(_sock);
160 	struct sockaddr_storage sa;
161 	socklen_t salen;
162 	int rc;
163 
164 	assert(sock != NULL);
165 
166 	memset(&sa, 0, sizeof sa);
167 	salen = sizeof sa;
168 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
169 	if (rc != 0) {
170 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
171 		return -1;
172 	}
173 
174 	switch (sa.ss_family) {
175 	case AF_UNIX:
176 		/* Acceptable connection types that don't have IPs */
177 		return 0;
178 	case AF_INET:
179 	case AF_INET6:
180 		/* Code below will get IP addresses */
181 		break;
182 	default:
183 		/* Unsupported socket family */
184 		return -1;
185 	}
186 
187 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
188 	if (rc != 0) {
189 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
190 		return -1;
191 	}
192 
193 	if (sport) {
194 		if (sa.ss_family == AF_INET) {
195 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
196 		} else if (sa.ss_family == AF_INET6) {
197 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
198 		}
199 	}
200 
201 	memset(&sa, 0, sizeof sa);
202 	salen = sizeof sa;
203 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
204 	if (rc != 0) {
205 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
206 		return -1;
207 	}
208 
209 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
210 	if (rc != 0) {
211 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
212 		return -1;
213 	}
214 
215 	if (cport) {
216 		if (sa.ss_family == AF_INET) {
217 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
218 		} else if (sa.ss_family == AF_INET6) {
219 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
220 		}
221 	}
222 
223 	return 0;
224 }
225 
226 enum uring_sock_create_type {
227 	SPDK_SOCK_CREATE_LISTEN,
228 	SPDK_SOCK_CREATE_CONNECT,
229 };
230 
231 static int
232 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
233 {
234 	uint8_t *new_buf;
235 	struct spdk_pipe *new_pipe;
236 	struct iovec siov[2];
237 	struct iovec diov[2];
238 	int sbytes;
239 	ssize_t bytes;
240 
241 	if (sock->recv_buf_sz == sz) {
242 		return 0;
243 	}
244 
245 	/* If the new size is 0, just free the pipe */
246 	if (sz == 0) {
247 		spdk_pipe_destroy(sock->recv_pipe);
248 		free(sock->recv_buf);
249 		sock->recv_pipe = NULL;
250 		sock->recv_buf = NULL;
251 		return 0;
252 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
253 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
254 		return -1;
255 	}
256 
257 	/* Round up to next 64 byte multiple */
258 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
259 	if (!new_buf) {
260 		SPDK_ERRLOG("socket recv buf allocation failed\n");
261 		return -ENOMEM;
262 	}
263 
264 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
265 	if (new_pipe == NULL) {
266 		SPDK_ERRLOG("socket pipe allocation failed\n");
267 		free(new_buf);
268 		return -ENOMEM;
269 	}
270 
271 	if (sock->recv_pipe != NULL) {
272 		/* Pull all of the data out of the old pipe */
273 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
274 		if (sbytes > sz) {
275 			/* Too much data to fit into the new pipe size */
276 			spdk_pipe_destroy(new_pipe);
277 			free(new_buf);
278 			return -EINVAL;
279 		}
280 
281 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
282 		assert(sbytes == sz);
283 
284 		bytes = spdk_iovcpy(siov, 2, diov, 2);
285 		spdk_pipe_writer_advance(new_pipe, bytes);
286 
287 		spdk_pipe_destroy(sock->recv_pipe);
288 		free(sock->recv_buf);
289 	}
290 
291 	sock->recv_buf_sz = sz;
292 	sock->recv_buf = new_buf;
293 	sock->recv_pipe = new_pipe;
294 
295 	return 0;
296 }
297 
298 static int
299 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
300 {
301 	struct spdk_uring_sock *sock = __uring_sock(_sock);
302 	int rc;
303 
304 	assert(sock != NULL);
305 
306 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
307 		rc = uring_sock_alloc_pipe(sock, sz);
308 		if (rc) {
309 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
310 			return rc;
311 		}
312 	}
313 
314 	if (sz < MIN_SO_RCVBUF_SIZE) {
315 		sz = MIN_SO_RCVBUF_SIZE;
316 	}
317 
318 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
319 	if (rc < 0) {
320 		return rc;
321 	}
322 
323 	return 0;
324 }
325 
326 static int
327 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
328 {
329 	struct spdk_uring_sock *sock = __uring_sock(_sock);
330 	int rc;
331 
332 	assert(sock != NULL);
333 
334 	if (sz < MIN_SO_SNDBUF_SIZE) {
335 		sz = MIN_SO_SNDBUF_SIZE;
336 	}
337 
338 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
339 	if (rc < 0) {
340 		return rc;
341 	}
342 
343 	return 0;
344 }
345 
346 static struct spdk_uring_sock *
347 uring_sock_alloc(int fd)
348 {
349 	struct spdk_uring_sock *sock;
350 #if defined(__linux__)
351 	int flag;
352 	int rc;
353 #endif
354 
355 	sock = calloc(1, sizeof(*sock));
356 	if (sock == NULL) {
357 		SPDK_ERRLOG("sock allocation failed\n");
358 		return NULL;
359 	}
360 
361 	sock->fd = fd;
362 
363 #if defined(__linux__)
364 	flag = 1;
365 
366 	if (g_spdk_uring_sock_impl_opts.enable_quickack) {
367 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
368 		if (rc != 0) {
369 			SPDK_ERRLOG("quickack was failed to set\n");
370 		}
371 	}
372 
373 	spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id,
374 				   &sock->placement_id);
375 #endif
376 
377 	return sock;
378 }
379 
380 static struct spdk_sock *
381 uring_sock_create(const char *ip, int port,
382 		  enum uring_sock_create_type type,
383 		  struct spdk_sock_opts *opts)
384 {
385 	struct spdk_uring_sock *sock;
386 	char buf[MAX_TMPBUF];
387 	char portnum[PORTNUMLEN];
388 	char *p;
389 	struct addrinfo hints, *res, *res0;
390 	int fd, flag;
391 	int val = 1;
392 	int rc;
393 
394 	if (ip == NULL) {
395 		return NULL;
396 	}
397 	if (ip[0] == '[') {
398 		snprintf(buf, sizeof(buf), "%s", ip + 1);
399 		p = strchr(buf, ']');
400 		if (p != NULL) {
401 			*p = '\0';
402 		}
403 		ip = (const char *) &buf[0];
404 	}
405 
406 	snprintf(portnum, sizeof portnum, "%d", port);
407 	memset(&hints, 0, sizeof hints);
408 	hints.ai_family = PF_UNSPEC;
409 	hints.ai_socktype = SOCK_STREAM;
410 	hints.ai_flags = AI_NUMERICSERV;
411 	hints.ai_flags |= AI_PASSIVE;
412 	hints.ai_flags |= AI_NUMERICHOST;
413 	rc = getaddrinfo(ip, portnum, &hints, &res0);
414 	if (rc != 0) {
415 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
416 		return NULL;
417 	}
418 
419 	/* try listen */
420 	fd = -1;
421 	for (res = res0; res != NULL; res = res->ai_next) {
422 retry:
423 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
424 		if (fd < 0) {
425 			/* error */
426 			continue;
427 		}
428 
429 		val = g_spdk_uring_sock_impl_opts.recv_buf_size;
430 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
431 		if (rc) {
432 			/* Not fatal */
433 		}
434 
435 		val = g_spdk_uring_sock_impl_opts.send_buf_size;
436 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
437 		if (rc) {
438 			/* Not fatal */
439 		}
440 
441 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
442 		if (rc != 0) {
443 			close(fd);
444 			fd = -1;
445 			/* error */
446 			continue;
447 		}
448 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
449 		if (rc != 0) {
450 			close(fd);
451 			fd = -1;
452 			/* error */
453 			continue;
454 		}
455 
456 #if defined(SO_PRIORITY)
457 		if (opts != NULL && opts->priority) {
458 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
459 			if (rc != 0) {
460 				close(fd);
461 				fd = -1;
462 				/* error */
463 				continue;
464 			}
465 		}
466 #endif
467 		if (res->ai_family == AF_INET6) {
468 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
469 			if (rc != 0) {
470 				close(fd);
471 				fd = -1;
472 				/* error */
473 				continue;
474 			}
475 		}
476 
477 		if (type == SPDK_SOCK_CREATE_LISTEN) {
478 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
479 			if (rc != 0) {
480 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
481 				switch (errno) {
482 				case EINTR:
483 					/* interrupted? */
484 					close(fd);
485 					goto retry;
486 				case EADDRNOTAVAIL:
487 					SPDK_ERRLOG("IP address %s not available. "
488 						    "Verify IP address in config file "
489 						    "and make sure setup script is "
490 						    "run before starting spdk app.\n", ip);
491 				/* FALLTHROUGH */
492 				default:
493 					/* try next family */
494 					close(fd);
495 					fd = -1;
496 					continue;
497 				}
498 			}
499 			/* bind OK */
500 			rc = listen(fd, 512);
501 			if (rc != 0) {
502 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
503 				close(fd);
504 				fd = -1;
505 				break;
506 			}
507 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
508 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
509 			if (rc != 0) {
510 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
511 				/* try next family */
512 				close(fd);
513 				fd = -1;
514 				continue;
515 			}
516 		}
517 
518 		flag = fcntl(fd, F_GETFL);
519 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
520 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
521 			close(fd);
522 			fd = -1;
523 			break;
524 		}
525 		break;
526 	}
527 	freeaddrinfo(res0);
528 
529 	if (fd < 0) {
530 		return NULL;
531 	}
532 
533 	sock = uring_sock_alloc(fd);
534 	if (sock == NULL) {
535 		SPDK_ERRLOG("sock allocation failed\n");
536 		close(fd);
537 		return NULL;
538 	}
539 
540 	return &sock->base;
541 }
542 
543 static struct spdk_sock *
544 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
545 {
546 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
547 }
548 
549 static struct spdk_sock *
550 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
551 {
552 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
553 }
554 
555 static struct spdk_sock *
556 uring_sock_accept(struct spdk_sock *_sock)
557 {
558 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
559 	struct sockaddr_storage		sa;
560 	socklen_t			salen;
561 	int				rc, fd;
562 	struct spdk_uring_sock		*new_sock;
563 	int				flag;
564 
565 	memset(&sa, 0, sizeof(sa));
566 	salen = sizeof(sa);
567 
568 	assert(sock != NULL);
569 
570 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
571 
572 	if (rc == -1) {
573 		return NULL;
574 	}
575 
576 	fd = rc;
577 
578 	flag = fcntl(fd, F_GETFL);
579 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
580 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
581 		close(fd);
582 		return NULL;
583 	}
584 
585 #if defined(SO_PRIORITY)
586 	/* The priority is not inherited, so call this function again */
587 	if (sock->base.opts.priority) {
588 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
589 		if (rc != 0) {
590 			close(fd);
591 			return NULL;
592 		}
593 	}
594 #endif
595 
596 	new_sock = uring_sock_alloc(fd);
597 	if (new_sock == NULL) {
598 		close(fd);
599 		return NULL;
600 	}
601 
602 	return &new_sock->base;
603 }
604 
605 static int
606 uring_sock_close(struct spdk_sock *_sock)
607 {
608 	struct spdk_uring_sock *sock = __uring_sock(_sock);
609 
610 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
611 	assert(sock->group == NULL);
612 
613 	/* If the socket fails to close, the best choice is to
614 	 * leak the fd but continue to free the rest of the sock
615 	 * memory. */
616 	close(sock->fd);
617 
618 	spdk_pipe_destroy(sock->recv_pipe);
619 	free(sock->recv_buf);
620 	free(sock);
621 
622 	return 0;
623 }
624 
625 static ssize_t
626 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
627 {
628 	struct iovec siov[2];
629 	int sbytes;
630 	ssize_t bytes;
631 	struct spdk_uring_sock_group_impl *group;
632 
633 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
634 	if (sbytes < 0) {
635 		errno = EINVAL;
636 		return -1;
637 	} else if (sbytes == 0) {
638 		errno = EAGAIN;
639 		return -1;
640 	}
641 
642 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
643 
644 	if (bytes == 0) {
645 		/* The only way this happens is if diov is 0 length */
646 		errno = EINVAL;
647 		return -1;
648 	}
649 
650 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
651 
652 	/* If we drained the pipe, take it off the level-triggered list */
653 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
654 		group = __uring_group_impl(sock->base.group_impl);
655 		TAILQ_REMOVE(&group->pending_recv, sock, link);
656 		sock->pending_recv = false;
657 	}
658 
659 	return bytes;
660 }
661 
662 static inline ssize_t
663 uring_sock_read(struct spdk_uring_sock *sock)
664 {
665 	struct iovec iov[2];
666 	int bytes;
667 	struct spdk_uring_sock_group_impl *group;
668 
669 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
670 
671 	if (bytes > 0) {
672 		bytes = readv(sock->fd, iov, 2);
673 		if (bytes > 0) {
674 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
675 			if (sock->base.group_impl) {
676 				group = __uring_group_impl(sock->base.group_impl);
677 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
678 				sock->pending_recv = true;
679 			}
680 		}
681 	}
682 
683 	return bytes;
684 }
685 
686 static ssize_t
687 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
688 {
689 	struct spdk_uring_sock *sock = __uring_sock(_sock);
690 	int rc, i;
691 	size_t len;
692 
693 	if (sock->recv_pipe == NULL) {
694 		return readv(sock->fd, iov, iovcnt);
695 	}
696 
697 	len = 0;
698 	for (i = 0; i < iovcnt; i++) {
699 		len += iov[i].iov_len;
700 	}
701 
702 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
703 		/* If the user is receiving a sufficiently large amount of data,
704 		 * receive directly to their buffers. */
705 		if (len >= MIN_SOCK_PIPE_SIZE) {
706 			return readv(sock->fd, iov, iovcnt);
707 		}
708 
709 		/* Otherwise, do a big read into our pipe */
710 		rc = uring_sock_read(sock);
711 		if (rc <= 0) {
712 			return rc;
713 		}
714 	}
715 
716 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
717 }
718 
719 static ssize_t
720 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
721 {
722 	struct iovec iov[1];
723 
724 	iov[0].iov_base = buf;
725 	iov[0].iov_len = len;
726 
727 	return uring_sock_readv(sock, iov, 1);
728 }
729 
730 static ssize_t
731 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
732 {
733 	struct spdk_uring_sock *sock = __uring_sock(_sock);
734 
735 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
736 		errno = EAGAIN;
737 		return -1;
738 	}
739 
740 	return writev(sock->fd, iov, iovcnt);
741 }
742 
743 static int
744 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
745 {
746 	struct spdk_sock_request *req;
747 	int i, retval;
748 	unsigned int offset;
749 	size_t len;
750 
751 	/* Consume the requests that were actually written */
752 	req = TAILQ_FIRST(&_sock->queued_reqs);
753 	while (req) {
754 		offset = req->internal.offset;
755 
756 		for (i = 0; i < req->iovcnt; i++) {
757 			/* Advance by the offset first */
758 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
759 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
760 				continue;
761 			}
762 
763 			/* Calculate the remaining length of this element */
764 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
765 
766 			if (len > (size_t)rc) {
767 				/* This element was partially sent. */
768 				req->internal.offset += rc;
769 				return 0;
770 			}
771 
772 			offset = 0;
773 			req->internal.offset += len;
774 			rc -= len;
775 		}
776 
777 		/* Handled a full request. */
778 		spdk_sock_request_pend(_sock, req);
779 
780 		retval = spdk_sock_request_put(_sock, req, 0);
781 		if (retval) {
782 			return retval;
783 		}
784 
785 		if (rc == 0) {
786 			break;
787 		}
788 
789 		req = TAILQ_FIRST(&_sock->queued_reqs);
790 	}
791 
792 	return 0;
793 }
794 
795 static void
796 _sock_flush(struct spdk_sock *_sock)
797 {
798 	struct spdk_uring_sock *sock = __uring_sock(_sock);
799 	struct spdk_uring_task *task = &sock->write_task;
800 	uint32_t iovcnt;
801 	struct io_uring_sqe *sqe;
802 
803 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
804 		return;
805 	}
806 
807 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
808 	if (!iovcnt) {
809 		return;
810 	}
811 
812 	task->iov_cnt = iovcnt;
813 	assert(sock->group != NULL);
814 	task->msg.msg_iov = task->iovs;
815 	task->msg.msg_iovlen = task->iov_cnt;
816 
817 	sock->group->io_queued++;
818 
819 	sqe = io_uring_get_sqe(&sock->group->uring);
820 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
821 	io_uring_sqe_set_data(sqe, task);
822 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
823 }
824 
825 static void
826 _sock_prep_pollin(struct spdk_sock *_sock)
827 {
828 	struct spdk_uring_sock *sock = __uring_sock(_sock);
829 	struct spdk_uring_task *task = &sock->pollin_task;
830 	struct io_uring_sqe *sqe;
831 
832 	/* Do not prepare pollin event */
833 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
834 		return;
835 	}
836 
837 	assert(sock->group != NULL);
838 	sock->group->io_queued++;
839 
840 	sqe = io_uring_get_sqe(&sock->group->uring);
841 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
842 	io_uring_sqe_set_data(sqe, task);
843 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
844 }
845 
846 static void
847 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
848 {
849 	struct spdk_uring_sock *sock = __uring_sock(_sock);
850 	struct spdk_uring_task *task = &sock->cancel_task;
851 	struct io_uring_sqe *sqe;
852 
853 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
854 		return;
855 	}
856 
857 	assert(sock->group != NULL);
858 	sock->group->io_queued++;
859 
860 	sqe = io_uring_get_sqe(&sock->group->uring);
861 	io_uring_prep_cancel(sqe, user_data, 0);
862 	io_uring_sqe_set_data(sqe, task);
863 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
864 }
865 
866 static int
867 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
868 		      struct spdk_sock **socks)
869 {
870 	int i, count, ret;
871 	struct io_uring_cqe *cqe;
872 	struct spdk_uring_sock *sock, *tmp;
873 	struct spdk_uring_task *task;
874 	int status;
875 
876 	for (i = 0; i < max; i++) {
877 		ret = io_uring_peek_cqe(&group->uring, &cqe);
878 		if (ret != 0) {
879 			break;
880 		}
881 
882 		if (cqe == NULL) {
883 			break;
884 		}
885 
886 		task = (struct spdk_uring_task *)cqe->user_data;
887 		assert(task != NULL);
888 		sock = task->sock;
889 		assert(sock != NULL);
890 		assert(sock->group != NULL);
891 		assert(sock->group == group);
892 		sock->group->io_inflight--;
893 		sock->group->io_avail++;
894 		status = cqe->res;
895 		io_uring_cqe_seen(&group->uring, cqe);
896 
897 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
898 
899 		if (spdk_unlikely(status <= 0)) {
900 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
901 				continue;
902 			}
903 		}
904 
905 		switch (task->type) {
906 		case SPDK_SOCK_TASK_POLLIN:
907 			if ((status & POLLIN) == POLLIN) {
908 				if (sock->base.cb_fn != NULL) {
909 					assert(sock->pending_recv == false);
910 					sock->pending_recv = true;
911 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
912 				}
913 			}
914 			break;
915 		case SPDK_SOCK_TASK_WRITE:
916 			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
917 			task->last_req = NULL;
918 			task->iov_cnt = 0;
919 			if (spdk_unlikely(status) < 0) {
920 				sock->connection_status = status;
921 				spdk_sock_abort_requests(&sock->base);
922 			} else {
923 				sock_complete_reqs(&sock->base, status);
924 			}
925 
926 			break;
927 		case SPDK_SOCK_TASK_CANCEL:
928 			/* Do nothing */
929 			break;
930 		default:
931 			SPDK_UNREACHABLE();
932 		}
933 	}
934 
935 	if (!socks) {
936 		return 0;
937 	}
938 	count = 0;
939 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
940 		if (count == max_read_events) {
941 			break;
942 		}
943 
944 		/* If the socket's cb_fn is NULL, just remove it from
945 		 * the list and do not add it to socks array */
946 		if (spdk_unlikely(sock->base.cb_fn == NULL)) {
947 			sock->pending_recv = false;
948 			TAILQ_REMOVE(&group->pending_recv, sock, link);
949 			continue;
950 		}
951 
952 		socks[count++] = &sock->base;
953 	}
954 
955 	/* Cycle the pending_recv list so that each time we poll things aren't
956 	 * in the same order. */
957 	for (i = 0; i < count; i++) {
958 		sock = __uring_sock(socks[i]);
959 
960 		TAILQ_REMOVE(&group->pending_recv, sock, link);
961 
962 		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
963 			sock->pending_recv = false;
964 		} else {
965 			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
966 		}
967 	}
968 
969 	return count;
970 }
971 
972 static int
973 _sock_flush_client(struct spdk_sock *_sock)
974 {
975 	struct spdk_uring_sock *sock = __uring_sock(_sock);
976 	struct msghdr msg = {};
977 	struct iovec iovs[IOV_BATCH_SIZE];
978 	int iovcnt;
979 	ssize_t rc;
980 
981 	/* Can't flush from within a callback or we end up with recursive calls */
982 	if (_sock->cb_cnt > 0) {
983 		return 0;
984 	}
985 
986 	/* Gather an iov */
987 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL);
988 	if (iovcnt == 0) {
989 		return 0;
990 	}
991 
992 	/* Perform the vectored write */
993 	msg.msg_iov = iovs;
994 	msg.msg_iovlen = iovcnt;
995 	rc = sendmsg(sock->fd, &msg, 0);
996 	if (rc <= 0) {
997 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
998 			return 0;
999 		}
1000 		return rc;
1001 	}
1002 
1003 	sock_complete_reqs(_sock, rc);
1004 
1005 	return 0;
1006 }
1007 
1008 static void
1009 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1010 {
1011 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1012 	int rc;
1013 
1014 	if (spdk_unlikely(sock->connection_status)) {
1015 		req->cb_fn(req->cb_arg, sock->connection_status);
1016 		return;
1017 	}
1018 
1019 	spdk_sock_request_queue(_sock, req);
1020 
1021 	if (!sock->group) {
1022 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1023 			rc = _sock_flush_client(_sock);
1024 			if (rc) {
1025 				spdk_sock_abort_requests(_sock);
1026 			}
1027 		}
1028 	}
1029 }
1030 
1031 static int
1032 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1033 {
1034 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1035 	int val;
1036 	int rc;
1037 
1038 	assert(sock != NULL);
1039 
1040 	val = nbytes;
1041 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1042 	if (rc != 0) {
1043 		return -1;
1044 	}
1045 	return 0;
1046 }
1047 
1048 static bool
1049 uring_sock_is_ipv6(struct spdk_sock *_sock)
1050 {
1051 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1052 	struct sockaddr_storage sa;
1053 	socklen_t salen;
1054 	int rc;
1055 
1056 	assert(sock != NULL);
1057 
1058 	memset(&sa, 0, sizeof sa);
1059 	salen = sizeof sa;
1060 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1061 	if (rc != 0) {
1062 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1063 		return false;
1064 	}
1065 
1066 	return (sa.ss_family == AF_INET6);
1067 }
1068 
1069 static bool
1070 uring_sock_is_ipv4(struct spdk_sock *_sock)
1071 {
1072 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1073 	struct sockaddr_storage sa;
1074 	socklen_t salen;
1075 	int rc;
1076 
1077 	assert(sock != NULL);
1078 
1079 	memset(&sa, 0, sizeof sa);
1080 	salen = sizeof sa;
1081 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1082 	if (rc != 0) {
1083 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1084 		return false;
1085 	}
1086 
1087 	return (sa.ss_family == AF_INET);
1088 }
1089 
1090 static bool
1091 uring_sock_is_connected(struct spdk_sock *_sock)
1092 {
1093 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1094 	uint8_t byte;
1095 	int rc;
1096 
1097 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1098 	if (rc == 0) {
1099 		return false;
1100 	}
1101 
1102 	if (rc < 0) {
1103 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1104 			return true;
1105 		}
1106 
1107 		return false;
1108 	}
1109 
1110 	return true;
1111 }
1112 
1113 static struct spdk_sock_group_impl *
1114 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock)
1115 {
1116 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1117 	struct spdk_sock_group_impl *group;
1118 
1119 	if (sock->placement_id != -1) {
1120 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group);
1121 		return group;
1122 	}
1123 
1124 	return NULL;
1125 }
1126 
1127 static struct spdk_sock_group_impl *
1128 uring_sock_group_impl_create(void)
1129 {
1130 	struct spdk_uring_sock_group_impl *group_impl;
1131 
1132 	group_impl = calloc(1, sizeof(*group_impl));
1133 	if (group_impl == NULL) {
1134 		SPDK_ERRLOG("group_impl allocation failed\n");
1135 		return NULL;
1136 	}
1137 
1138 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1139 
1140 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1141 		SPDK_ERRLOG("uring I/O context setup failure\n");
1142 		free(group_impl);
1143 		return NULL;
1144 	}
1145 
1146 	TAILQ_INIT(&group_impl->pending_recv);
1147 
1148 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1149 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1150 	}
1151 
1152 	return &group_impl->base;
1153 }
1154 
1155 static int
1156 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1157 			       struct spdk_sock *_sock)
1158 {
1159 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1160 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1161 	int rc;
1162 
1163 	sock->group = group;
1164 	sock->write_task.sock = sock;
1165 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1166 
1167 	sock->pollin_task.sock = sock;
1168 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1169 
1170 	sock->cancel_task.sock = sock;
1171 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1172 
1173 	/* switched from another polling group due to scheduling */
1174 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1175 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1176 		assert(sock->pending_recv == false);
1177 		sock->pending_recv = true;
1178 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1179 	}
1180 
1181 	if (sock->placement_id != -1) {
1182 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1183 		if (rc != 0) {
1184 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1185 			/* Do not treat this as an error. The system will continue running. */
1186 		}
1187 	}
1188 
1189 	return 0;
1190 }
1191 
1192 static int
1193 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1194 			   struct spdk_sock **socks)
1195 {
1196 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1197 	int count, ret;
1198 	int to_complete, to_submit;
1199 	struct spdk_sock *_sock, *tmp;
1200 	struct spdk_uring_sock *sock;
1201 
1202 	if (spdk_likely(socks)) {
1203 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1204 			sock = __uring_sock(_sock);
1205 			if (spdk_unlikely(sock->connection_status)) {
1206 				continue;
1207 			}
1208 			_sock_flush(_sock);
1209 			_sock_prep_pollin(_sock);
1210 		}
1211 	}
1212 
1213 	to_submit = group->io_queued;
1214 
1215 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1216 	if (to_submit > 0) {
1217 		/* If there are I/O to submit, use io_uring_submit here.
1218 		 * It will automatically call io_uring_enter appropriately. */
1219 		ret = io_uring_submit(&group->uring);
1220 		if (ret < 0) {
1221 			return 1;
1222 		}
1223 		group->io_queued = 0;
1224 		group->io_inflight += to_submit;
1225 		group->io_avail -= to_submit;
1226 	}
1227 
1228 	count = 0;
1229 	to_complete = group->io_inflight;
1230 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1231 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1232 	}
1233 
1234 	return count;
1235 }
1236 
1237 static int
1238 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1239 				  struct spdk_sock *_sock)
1240 {
1241 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1242 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1243 
1244 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1245 		_sock_prep_cancel_task(_sock, &sock->write_task);
1246 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1247 		 * currently can use a while loop here. */
1248 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1249 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1250 			uring_sock_group_impl_poll(_group, 32, NULL);
1251 		}
1252 	}
1253 
1254 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1255 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1256 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1257 		 * currently can use a while loop here. */
1258 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1259 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1260 			uring_sock_group_impl_poll(_group, 32, NULL);
1261 		}
1262 	}
1263 
1264 	if (sock->pending_recv) {
1265 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1266 		sock->pending_recv = false;
1267 	}
1268 	assert(sock->pending_recv == false);
1269 
1270 	if (sock->placement_id != -1) {
1271 		spdk_sock_map_release(&g_map, sock->placement_id);
1272 	}
1273 
1274 	sock->group = NULL;
1275 	return 0;
1276 }
1277 
1278 static int
1279 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1280 {
1281 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1282 
1283 	/* try to reap all the active I/O */
1284 	while (group->io_inflight) {
1285 		uring_sock_group_impl_poll(_group, 32, NULL);
1286 	}
1287 	assert(group->io_inflight == 0);
1288 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1289 
1290 	io_uring_queue_exit(&group->uring);
1291 
1292 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1293 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1294 	}
1295 
1296 	free(group);
1297 	return 0;
1298 }
1299 
1300 static int
1301 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1302 {
1303 	if (!opts || !len) {
1304 		errno = EINVAL;
1305 		return -1;
1306 	}
1307 	memset(opts, 0, *len);
1308 
1309 #define FIELD_OK(field) \
1310 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1311 
1312 #define GET_FIELD(field) \
1313 	if (FIELD_OK(field)) { \
1314 		opts->field = g_spdk_uring_sock_impl_opts.field; \
1315 	}
1316 
1317 	GET_FIELD(recv_buf_size);
1318 	GET_FIELD(send_buf_size);
1319 	GET_FIELD(enable_recv_pipe);
1320 	GET_FIELD(enable_quickack);
1321 	GET_FIELD(enable_placement_id);
1322 
1323 #undef GET_FIELD
1324 #undef FIELD_OK
1325 
1326 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
1327 	return 0;
1328 }
1329 
1330 static int
1331 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1332 {
1333 	if (!opts) {
1334 		errno = EINVAL;
1335 		return -1;
1336 	}
1337 
1338 #define FIELD_OK(field) \
1339 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1340 
1341 #define SET_FIELD(field) \
1342 	if (FIELD_OK(field)) { \
1343 		g_spdk_uring_sock_impl_opts.field = opts->field; \
1344 	}
1345 
1346 	SET_FIELD(recv_buf_size);
1347 	SET_FIELD(send_buf_size);
1348 	SET_FIELD(enable_recv_pipe);
1349 	SET_FIELD(enable_quickack);
1350 	SET_FIELD(enable_placement_id);
1351 
1352 #undef SET_FIELD
1353 #undef FIELD_OK
1354 
1355 	return 0;
1356 }
1357 
1358 static int
1359 uring_sock_flush(struct spdk_sock *_sock)
1360 {
1361 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1362 
1363 	if (!sock->group) {
1364 		return _sock_flush_client(_sock);
1365 	}
1366 
1367 	return 0;
1368 }
1369 
1370 static struct spdk_net_impl g_uring_net_impl = {
1371 	.name		= "uring",
1372 	.getaddr	= uring_sock_getaddr,
1373 	.connect	= uring_sock_connect,
1374 	.listen		= uring_sock_listen,
1375 	.accept		= uring_sock_accept,
1376 	.close		= uring_sock_close,
1377 	.recv		= uring_sock_recv,
1378 	.readv		= uring_sock_readv,
1379 	.writev		= uring_sock_writev,
1380 	.writev_async	= uring_sock_writev_async,
1381 	.flush          = uring_sock_flush,
1382 	.set_recvlowat	= uring_sock_set_recvlowat,
1383 	.set_recvbuf	= uring_sock_set_recvbuf,
1384 	.set_sendbuf	= uring_sock_set_sendbuf,
1385 	.is_ipv6	= uring_sock_is_ipv6,
1386 	.is_ipv4	= uring_sock_is_ipv4,
1387 	.is_connected   = uring_sock_is_connected,
1388 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
1389 	.group_impl_create	= uring_sock_group_impl_create,
1390 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1391 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1392 	.group_impl_poll	= uring_sock_group_impl_poll,
1393 	.group_impl_close	= uring_sock_group_impl_close,
1394 	.get_opts		= uring_sock_impl_get_opts,
1395 	.set_opts		= uring_sock_impl_set_opts,
1396 };
1397 
1398 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1399