xref: /spdk/module/sock/uring/uring.c (revision 2172c432cfdaecc5a279d64e37c6b51e794683c1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <sys/epoll.h>
38 #include <liburing.h>
39 
40 #include "spdk/barrier.h"
41 #include "spdk/likely.h"
42 #include "spdk/log.h"
43 #include "spdk/pipe.h"
44 #include "spdk/sock.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/sock.h"
49 #include "spdk_internal/assert.h"
50 
51 #define MAX_TMPBUF 1024
52 #define PORTNUMLEN 32
53 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
54 #define IOV_BATCH_SIZE 64
55 
56 enum spdk_sock_task_type {
57 	SPDK_SOCK_TASK_POLLIN = 0,
58 	SPDK_SOCK_TASK_WRITE,
59 	SPDK_SOCK_TASK_CANCEL,
60 };
61 
62 enum spdk_uring_sock_task_status {
63 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
64 	SPDK_URING_SOCK_TASK_IN_PROCESS,
65 };
66 
67 struct spdk_uring_task {
68 	enum spdk_uring_sock_task_status	status;
69 	enum spdk_sock_task_type		type;
70 	struct spdk_uring_sock			*sock;
71 	struct msghdr				msg;
72 	struct iovec				iovs[IOV_BATCH_SIZE];
73 	int					iov_cnt;
74 	struct spdk_sock_request		*last_req;
75 	STAILQ_ENTRY(spdk_uring_task)		link;
76 };
77 
78 struct spdk_uring_sock {
79 	struct spdk_sock			base;
80 	int					fd;
81 	struct spdk_uring_sock_group_impl	*group;
82 	struct spdk_uring_task			write_task;
83 	struct spdk_uring_task			pollin_task;
84 	struct spdk_uring_task			cancel_task;
85 	struct spdk_pipe			*recv_pipe;
86 	void					*recv_buf;
87 	int					recv_buf_sz;
88 	bool					pending_recv;
89 	int					connection_status;
90 	TAILQ_ENTRY(spdk_uring_sock)		link;
91 };
92 
93 struct spdk_uring_sock_group_impl {
94 	struct spdk_sock_group_impl		base;
95 	struct io_uring				uring;
96 	uint32_t				io_inflight;
97 	uint32_t				io_queued;
98 	uint32_t				io_avail;
99 	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
100 };
101 
102 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
103 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
104 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
105 	.enable_recv_pipe = true,
106 	.enable_quickack = false,
107 	.enable_placement_id = false,
108 };
109 
110 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
111 
112 static int
113 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
114 {
115 	const char *result = NULL;
116 
117 	if (sa == NULL || host == NULL) {
118 		return -1;
119 	}
120 
121 	switch (sa->sa_family) {
122 	case AF_INET:
123 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
124 				   host, hlen);
125 		break;
126 	case AF_INET6:
127 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
128 				   host, hlen);
129 		break;
130 	default:
131 		break;
132 	}
133 
134 	if (result != NULL) {
135 		return 0;
136 	} else {
137 		return -1;
138 	}
139 }
140 
141 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
142 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
143 
144 static int
145 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
146 		   char *caddr, int clen, uint16_t *cport)
147 {
148 	struct spdk_uring_sock *sock = __uring_sock(_sock);
149 	struct sockaddr_storage sa;
150 	socklen_t salen;
151 	int rc;
152 
153 	assert(sock != NULL);
154 
155 	memset(&sa, 0, sizeof sa);
156 	salen = sizeof sa;
157 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
158 	if (rc != 0) {
159 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
160 		return -1;
161 	}
162 
163 	switch (sa.ss_family) {
164 	case AF_UNIX:
165 		/* Acceptable connection types that don't have IPs */
166 		return 0;
167 	case AF_INET:
168 	case AF_INET6:
169 		/* Code below will get IP addresses */
170 		break;
171 	default:
172 		/* Unsupported socket family */
173 		return -1;
174 	}
175 
176 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
177 	if (rc != 0) {
178 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
179 		return -1;
180 	}
181 
182 	if (sport) {
183 		if (sa.ss_family == AF_INET) {
184 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
185 		} else if (sa.ss_family == AF_INET6) {
186 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
187 		}
188 	}
189 
190 	memset(&sa, 0, sizeof sa);
191 	salen = sizeof sa;
192 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
193 	if (rc != 0) {
194 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
195 		return -1;
196 	}
197 
198 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
199 	if (rc != 0) {
200 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
201 		return -1;
202 	}
203 
204 	if (cport) {
205 		if (sa.ss_family == AF_INET) {
206 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
207 		} else if (sa.ss_family == AF_INET6) {
208 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
209 		}
210 	}
211 
212 	return 0;
213 }
214 
215 enum uring_sock_create_type {
216 	SPDK_SOCK_CREATE_LISTEN,
217 	SPDK_SOCK_CREATE_CONNECT,
218 };
219 
220 static int
221 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
222 {
223 	uint8_t *new_buf;
224 	struct spdk_pipe *new_pipe;
225 	struct iovec siov[2];
226 	struct iovec diov[2];
227 	int sbytes;
228 	ssize_t bytes;
229 
230 	if (sock->recv_buf_sz == sz) {
231 		return 0;
232 	}
233 
234 	/* If the new size is 0, just free the pipe */
235 	if (sz == 0) {
236 		spdk_pipe_destroy(sock->recv_pipe);
237 		free(sock->recv_buf);
238 		sock->recv_pipe = NULL;
239 		sock->recv_buf = NULL;
240 		return 0;
241 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
242 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
243 		return -1;
244 	}
245 
246 	/* Round up to next 64 byte multiple */
247 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
248 	if (!new_buf) {
249 		SPDK_ERRLOG("socket recv buf allocation failed\n");
250 		return -ENOMEM;
251 	}
252 
253 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
254 	if (new_pipe == NULL) {
255 		SPDK_ERRLOG("socket pipe allocation failed\n");
256 		free(new_buf);
257 		return -ENOMEM;
258 	}
259 
260 	if (sock->recv_pipe != NULL) {
261 		/* Pull all of the data out of the old pipe */
262 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
263 		if (sbytes > sz) {
264 			/* Too much data to fit into the new pipe size */
265 			spdk_pipe_destroy(new_pipe);
266 			free(new_buf);
267 			return -EINVAL;
268 		}
269 
270 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
271 		assert(sbytes == sz);
272 
273 		bytes = spdk_iovcpy(siov, 2, diov, 2);
274 		spdk_pipe_writer_advance(new_pipe, bytes);
275 
276 		spdk_pipe_destroy(sock->recv_pipe);
277 		free(sock->recv_buf);
278 	}
279 
280 	sock->recv_buf_sz = sz;
281 	sock->recv_buf = new_buf;
282 	sock->recv_pipe = new_pipe;
283 
284 	return 0;
285 }
286 
287 static int
288 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
289 {
290 	struct spdk_uring_sock *sock = __uring_sock(_sock);
291 	int rc;
292 
293 	assert(sock != NULL);
294 
295 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
296 		rc = uring_sock_alloc_pipe(sock, sz);
297 		if (rc) {
298 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
299 			return rc;
300 		}
301 	}
302 
303 	if (sz < MIN_SO_RCVBUF_SIZE) {
304 		sz = MIN_SO_RCVBUF_SIZE;
305 	}
306 
307 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
308 	if (rc < 0) {
309 		return rc;
310 	}
311 
312 	return 0;
313 }
314 
315 static int
316 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
317 {
318 	struct spdk_uring_sock *sock = __uring_sock(_sock);
319 	int rc;
320 
321 	assert(sock != NULL);
322 
323 	if (sz < MIN_SO_SNDBUF_SIZE) {
324 		sz = MIN_SO_SNDBUF_SIZE;
325 	}
326 
327 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
328 	if (rc < 0) {
329 		return rc;
330 	}
331 
332 	return 0;
333 }
334 
335 static struct spdk_uring_sock *
336 uring_sock_alloc(int fd)
337 {
338 	struct spdk_uring_sock *sock;
339 #if defined(__linux__)
340 	int flag;
341 	int rc;
342 #endif
343 
344 	sock = calloc(1, sizeof(*sock));
345 	if (sock == NULL) {
346 		SPDK_ERRLOG("sock allocation failed\n");
347 		return NULL;
348 	}
349 
350 	sock->fd = fd;
351 
352 #if defined(__linux__)
353 	flag = 1;
354 
355 	if (g_spdk_uring_sock_impl_opts.enable_quickack) {
356 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
357 		if (rc != 0) {
358 			SPDK_ERRLOG("quickack was failed to set\n");
359 		}
360 	}
361 #endif
362 	return sock;
363 }
364 
365 static struct spdk_sock *
366 uring_sock_create(const char *ip, int port,
367 		  enum uring_sock_create_type type,
368 		  struct spdk_sock_opts *opts)
369 {
370 	struct spdk_uring_sock *sock;
371 	char buf[MAX_TMPBUF];
372 	char portnum[PORTNUMLEN];
373 	char *p;
374 	struct addrinfo hints, *res, *res0;
375 	int fd, flag;
376 	int val = 1;
377 	int rc;
378 
379 	if (ip == NULL) {
380 		return NULL;
381 	}
382 	if (ip[0] == '[') {
383 		snprintf(buf, sizeof(buf), "%s", ip + 1);
384 		p = strchr(buf, ']');
385 		if (p != NULL) {
386 			*p = '\0';
387 		}
388 		ip = (const char *) &buf[0];
389 	}
390 
391 	snprintf(portnum, sizeof portnum, "%d", port);
392 	memset(&hints, 0, sizeof hints);
393 	hints.ai_family = PF_UNSPEC;
394 	hints.ai_socktype = SOCK_STREAM;
395 	hints.ai_flags = AI_NUMERICSERV;
396 	hints.ai_flags |= AI_PASSIVE;
397 	hints.ai_flags |= AI_NUMERICHOST;
398 	rc = getaddrinfo(ip, portnum, &hints, &res0);
399 	if (rc != 0) {
400 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
401 		return NULL;
402 	}
403 
404 	/* try listen */
405 	fd = -1;
406 	for (res = res0; res != NULL; res = res->ai_next) {
407 retry:
408 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
409 		if (fd < 0) {
410 			/* error */
411 			continue;
412 		}
413 
414 		val = g_spdk_uring_sock_impl_opts.recv_buf_size;
415 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
416 		if (rc) {
417 			/* Not fatal */
418 		}
419 
420 		val = g_spdk_uring_sock_impl_opts.send_buf_size;
421 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
422 		if (rc) {
423 			/* Not fatal */
424 		}
425 
426 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
427 		if (rc != 0) {
428 			close(fd);
429 			/* error */
430 			continue;
431 		}
432 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
433 		if (rc != 0) {
434 			close(fd);
435 			/* error */
436 			continue;
437 		}
438 
439 #if defined(SO_PRIORITY)
440 		if (opts != NULL && opts->priority) {
441 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
442 			if (rc != 0) {
443 				close(fd);
444 				/* error */
445 				continue;
446 			}
447 		}
448 #endif
449 		if (res->ai_family == AF_INET6) {
450 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
451 			if (rc != 0) {
452 				close(fd);
453 				/* error */
454 				continue;
455 			}
456 		}
457 
458 		if (type == SPDK_SOCK_CREATE_LISTEN) {
459 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
460 			if (rc != 0) {
461 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
462 				switch (errno) {
463 				case EINTR:
464 					/* interrupted? */
465 					close(fd);
466 					goto retry;
467 				case EADDRNOTAVAIL:
468 					SPDK_ERRLOG("IP address %s not available. "
469 						    "Verify IP address in config file "
470 						    "and make sure setup script is "
471 						    "run before starting spdk app.\n", ip);
472 				/* FALLTHROUGH */
473 				default:
474 					/* try next family */
475 					close(fd);
476 					fd = -1;
477 					continue;
478 				}
479 			}
480 			/* bind OK */
481 			rc = listen(fd, 512);
482 			if (rc != 0) {
483 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
484 				close(fd);
485 				fd = -1;
486 				break;
487 			}
488 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
489 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
490 			if (rc != 0) {
491 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
492 				/* try next family */
493 				close(fd);
494 				fd = -1;
495 				continue;
496 			}
497 		}
498 
499 		flag = fcntl(fd, F_GETFL);
500 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
501 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
502 			close(fd);
503 			fd = -1;
504 			break;
505 		}
506 		break;
507 	}
508 	freeaddrinfo(res0);
509 
510 	if (fd < 0) {
511 		return NULL;
512 	}
513 
514 	sock = uring_sock_alloc(fd);
515 	if (sock == NULL) {
516 		SPDK_ERRLOG("sock allocation failed\n");
517 		close(fd);
518 		return NULL;
519 	}
520 
521 	return &sock->base;
522 }
523 
524 static struct spdk_sock *
525 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
526 {
527 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
528 }
529 
530 static struct spdk_sock *
531 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
532 {
533 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
534 }
535 
536 static struct spdk_sock *
537 uring_sock_accept(struct spdk_sock *_sock)
538 {
539 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
540 	struct sockaddr_storage		sa;
541 	socklen_t			salen;
542 	int				rc, fd;
543 	struct spdk_uring_sock		*new_sock;
544 	int				flag;
545 
546 	memset(&sa, 0, sizeof(sa));
547 	salen = sizeof(sa);
548 
549 	assert(sock != NULL);
550 
551 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
552 
553 	if (rc == -1) {
554 		return NULL;
555 	}
556 
557 	fd = rc;
558 
559 	flag = fcntl(fd, F_GETFL);
560 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
561 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
562 		close(fd);
563 		return NULL;
564 	}
565 
566 #if defined(SO_PRIORITY)
567 	/* The priority is not inherited, so call this function again */
568 	if (sock->base.opts.priority) {
569 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
570 		if (rc != 0) {
571 			close(fd);
572 			return NULL;
573 		}
574 	}
575 #endif
576 
577 	new_sock = uring_sock_alloc(fd);
578 	if (new_sock == NULL) {
579 		close(fd);
580 		return NULL;
581 	}
582 
583 	return &new_sock->base;
584 }
585 
586 static int
587 uring_sock_close(struct spdk_sock *_sock)
588 {
589 	struct spdk_uring_sock *sock = __uring_sock(_sock);
590 	int rc;
591 
592 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
593 	assert(sock->group == NULL);
594 
595 	spdk_pipe_destroy(sock->recv_pipe);
596 	free(sock->recv_buf);
597 	rc = close(sock->fd);
598 	if (rc == 0) {
599 		free(sock);
600 	}
601 
602 	return rc;
603 }
604 
605 static ssize_t
606 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
607 {
608 	struct iovec siov[2];
609 	int sbytes;
610 	ssize_t bytes;
611 	struct spdk_uring_sock_group_impl *group;
612 
613 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
614 	if (sbytes < 0) {
615 		errno = EINVAL;
616 		return -1;
617 	} else if (sbytes == 0) {
618 		errno = EAGAIN;
619 		return -1;
620 	}
621 
622 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
623 
624 	if (bytes == 0) {
625 		/* The only way this happens is if diov is 0 length */
626 		errno = EINVAL;
627 		return -1;
628 	}
629 
630 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
631 
632 	/* If we drained the pipe, take it off the level-triggered list */
633 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
634 		group = __uring_group_impl(sock->base.group_impl);
635 		TAILQ_REMOVE(&group->pending_recv, sock, link);
636 		sock->pending_recv = false;
637 	}
638 
639 	return bytes;
640 }
641 
642 static inline ssize_t
643 uring_sock_read(struct spdk_uring_sock *sock)
644 {
645 	struct iovec iov[2];
646 	int bytes;
647 	struct spdk_uring_sock_group_impl *group;
648 
649 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
650 
651 	if (bytes > 0) {
652 		bytes = readv(sock->fd, iov, 2);
653 		if (bytes > 0) {
654 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
655 			if (sock->base.group_impl) {
656 				group = __uring_group_impl(sock->base.group_impl);
657 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
658 				sock->pending_recv = true;
659 			}
660 		}
661 	}
662 
663 	return bytes;
664 }
665 
666 static ssize_t
667 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
668 {
669 	struct spdk_uring_sock *sock = __uring_sock(_sock);
670 	int rc, i;
671 	size_t len;
672 
673 	if (sock->recv_pipe == NULL) {
674 		return readv(sock->fd, iov, iovcnt);
675 	}
676 
677 	len = 0;
678 	for (i = 0; i < iovcnt; i++) {
679 		len += iov[i].iov_len;
680 	}
681 
682 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
683 		/* If the user is receiving a sufficiently large amount of data,
684 		 * receive directly to their buffers. */
685 		if (len >= MIN_SOCK_PIPE_SIZE) {
686 			return readv(sock->fd, iov, iovcnt);
687 		}
688 
689 		/* Otherwise, do a big read into our pipe */
690 		rc = uring_sock_read(sock);
691 		if (rc <= 0) {
692 			return rc;
693 		}
694 	}
695 
696 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
697 }
698 
699 static ssize_t
700 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
701 {
702 	struct iovec iov[1];
703 
704 	iov[0].iov_base = buf;
705 	iov[0].iov_len = len;
706 
707 	return uring_sock_readv(sock, iov, 1);
708 }
709 
710 static ssize_t
711 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
712 {
713 	struct spdk_uring_sock *sock = __uring_sock(_sock);
714 
715 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
716 		errno = EAGAIN;
717 		return -1;
718 	}
719 
720 	return writev(sock->fd, iov, iovcnt);
721 }
722 
723 static int
724 sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
725 	       struct spdk_sock_request **last_req)
726 {
727 	int iovcnt, i;
728 	struct spdk_sock_request *req;
729 	unsigned int offset;
730 
731 	/* Gather an iov */
732 	iovcnt = index;
733 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
734 		goto end;
735 	}
736 
737 	if (last_req != NULL && *last_req != NULL) {
738 		req = TAILQ_NEXT(*last_req, internal.link);
739 	} else {
740 		req = TAILQ_FIRST(&_sock->queued_reqs);
741 	}
742 
743 	while (req) {
744 		offset = req->internal.offset;
745 
746 		for (i = 0; i < req->iovcnt; i++) {
747 			/* Consume any offset first */
748 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
749 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
750 				continue;
751 			}
752 
753 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
754 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
755 			iovcnt++;
756 
757 			offset = 0;
758 
759 			if (iovcnt >= IOV_BATCH_SIZE) {
760 				break;
761 			}
762 		}
763 		if (iovcnt >= IOV_BATCH_SIZE) {
764 			break;
765 		}
766 
767 		if (last_req != NULL) {
768 			*last_req = req;
769 		}
770 		req = TAILQ_NEXT(req, internal.link);
771 	}
772 
773 end:
774 	return iovcnt;
775 }
776 
777 static int
778 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
779 {
780 	struct spdk_sock_request *req;
781 	int i, retval;
782 	unsigned int offset;
783 	size_t len;
784 
785 	/* Consume the requests that were actually written */
786 	req = TAILQ_FIRST(&_sock->queued_reqs);
787 	while (req) {
788 		offset = req->internal.offset;
789 
790 		for (i = 0; i < req->iovcnt; i++) {
791 			/* Advance by the offset first */
792 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
793 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
794 				continue;
795 			}
796 
797 			/* Calculate the remaining length of this element */
798 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
799 
800 			if (len > (size_t)rc) {
801 				/* This element was partially sent. */
802 				req->internal.offset += rc;
803 				return 0;
804 			}
805 
806 			offset = 0;
807 			req->internal.offset += len;
808 			rc -= len;
809 		}
810 
811 		/* Handled a full request. */
812 		spdk_sock_request_pend(_sock, req);
813 
814 		retval = spdk_sock_request_put(_sock, req, 0);
815 		if (retval) {
816 			return retval;
817 		}
818 
819 		if (rc == 0) {
820 			break;
821 		}
822 
823 		req = TAILQ_FIRST(&_sock->queued_reqs);
824 	}
825 
826 	return 0;
827 }
828 
829 static void
830 _sock_flush(struct spdk_sock *_sock)
831 {
832 	struct spdk_uring_sock *sock = __uring_sock(_sock);
833 	struct spdk_uring_task *task = &sock->write_task;
834 	uint32_t iovcnt;
835 	struct io_uring_sqe *sqe;
836 
837 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
838 		return;
839 	}
840 
841 	iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
842 	if (!iovcnt) {
843 		return;
844 	}
845 
846 	task->iov_cnt = iovcnt;
847 	assert(sock->group != NULL);
848 	task->msg.msg_iov = task->iovs;
849 	task->msg.msg_iovlen = task->iov_cnt;
850 
851 	sock->group->io_queued++;
852 
853 	sqe = io_uring_get_sqe(&sock->group->uring);
854 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
855 	io_uring_sqe_set_data(sqe, task);
856 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
857 }
858 
859 static void
860 _sock_prep_pollin(struct spdk_sock *_sock)
861 {
862 	struct spdk_uring_sock *sock = __uring_sock(_sock);
863 	struct spdk_uring_task *task = &sock->pollin_task;
864 	struct io_uring_sqe *sqe;
865 
866 	/* Do not prepare pollin event */
867 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
868 		return;
869 	}
870 
871 	assert(sock->group != NULL);
872 	sock->group->io_queued++;
873 
874 	sqe = io_uring_get_sqe(&sock->group->uring);
875 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
876 	io_uring_sqe_set_data(sqe, task);
877 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
878 }
879 
880 static void
881 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
882 {
883 	struct spdk_uring_sock *sock = __uring_sock(_sock);
884 	struct spdk_uring_task *task = &sock->cancel_task;
885 	struct io_uring_sqe *sqe;
886 
887 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
888 		return;
889 	}
890 
891 	assert(sock->group != NULL);
892 	sock->group->io_queued++;
893 
894 	sqe = io_uring_get_sqe(&sock->group->uring);
895 	io_uring_prep_cancel(sqe, user_data, 0);
896 	io_uring_sqe_set_data(sqe, task);
897 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
898 }
899 
900 static int
901 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
902 		      struct spdk_sock **socks)
903 {
904 	int i, count, ret;
905 	struct io_uring_cqe *cqe;
906 	struct spdk_uring_sock *sock, *tmp;
907 	struct spdk_uring_task *task;
908 	int status;
909 
910 	for (i = 0; i < max; i++) {
911 		ret = io_uring_peek_cqe(&group->uring, &cqe);
912 		if (ret != 0) {
913 			break;
914 		}
915 
916 		if (cqe == NULL) {
917 			break;
918 		}
919 
920 		task = (struct spdk_uring_task *)cqe->user_data;
921 		assert(task != NULL);
922 		sock = task->sock;
923 		assert(sock != NULL);
924 		assert(sock->group != NULL);
925 		assert(sock->group == group);
926 		sock->group->io_inflight--;
927 		sock->group->io_avail++;
928 		status = cqe->res;
929 		io_uring_cqe_seen(&group->uring, cqe);
930 
931 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
932 
933 		if (spdk_unlikely(status <= 0)) {
934 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
935 				continue;
936 			}
937 		}
938 
939 		switch (task->type) {
940 		case SPDK_SOCK_TASK_POLLIN:
941 			if ((status & POLLIN) == POLLIN) {
942 				if (sock->base.cb_fn != NULL) {
943 					assert(sock->pending_recv == false);
944 					sock->pending_recv = true;
945 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
946 				}
947 			}
948 			break;
949 		case SPDK_SOCK_TASK_WRITE:
950 			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
951 			task->last_req = NULL;
952 			task->iov_cnt = 0;
953 			if (spdk_unlikely(status) < 0) {
954 				sock->connection_status = status;
955 				spdk_sock_abort_requests(&sock->base);
956 			} else {
957 				sock_complete_reqs(&sock->base, status);
958 			}
959 
960 			break;
961 		case SPDK_SOCK_TASK_CANCEL:
962 			/* Do nothing */
963 			break;
964 		default:
965 			SPDK_UNREACHABLE();
966 		}
967 	}
968 
969 	if (!socks) {
970 		return 0;
971 	}
972 	count = 0;
973 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
974 		if (count == max_read_events) {
975 			break;
976 		}
977 
978 		socks[count++] = &sock->base;
979 	}
980 
981 	/* Cycle the pending_recv list so that each time we poll things aren't
982 	 * in the same order. */
983 	for (i = 0; i < count; i++) {
984 		sock = __uring_sock(socks[i]);
985 
986 		TAILQ_REMOVE(&group->pending_recv, sock, link);
987 
988 		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
989 			sock->pending_recv = false;
990 		} else {
991 			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
992 		}
993 	}
994 
995 	return count;
996 }
997 
998 static int
999 _sock_flush_client(struct spdk_sock *_sock)
1000 {
1001 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1002 	struct msghdr msg = {};
1003 	struct iovec iovs[IOV_BATCH_SIZE];
1004 	int iovcnt;
1005 	ssize_t rc;
1006 
1007 	/* Can't flush from within a callback or we end up with recursive calls */
1008 	if (_sock->cb_cnt > 0) {
1009 		return 0;
1010 	}
1011 
1012 	/* Gather an iov */
1013 	iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL);
1014 	if (iovcnt == 0) {
1015 		return 0;
1016 	}
1017 
1018 	/* Perform the vectored write */
1019 	msg.msg_iov = iovs;
1020 	msg.msg_iovlen = iovcnt;
1021 	rc = sendmsg(sock->fd, &msg, 0);
1022 	if (rc <= 0) {
1023 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1024 			return 0;
1025 		}
1026 		return rc;
1027 	}
1028 
1029 	sock_complete_reqs(_sock, rc);
1030 
1031 	return 0;
1032 }
1033 
1034 static void
1035 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1036 {
1037 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1038 	int rc;
1039 
1040 	if (spdk_unlikely(sock->connection_status)) {
1041 		req->cb_fn(req->cb_arg, sock->connection_status);
1042 		return;
1043 	}
1044 
1045 	spdk_sock_request_queue(_sock, req);
1046 
1047 	if (!sock->group) {
1048 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1049 			rc = _sock_flush_client(_sock);
1050 			if (rc) {
1051 				spdk_sock_abort_requests(_sock);
1052 			}
1053 		}
1054 	}
1055 }
1056 
1057 static int
1058 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1059 {
1060 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1061 	int val;
1062 	int rc;
1063 
1064 	assert(sock != NULL);
1065 
1066 	val = nbytes;
1067 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1068 	if (rc != 0) {
1069 		return -1;
1070 	}
1071 	return 0;
1072 }
1073 
1074 static bool
1075 uring_sock_is_ipv6(struct spdk_sock *_sock)
1076 {
1077 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1078 	struct sockaddr_storage sa;
1079 	socklen_t salen;
1080 	int rc;
1081 
1082 	assert(sock != NULL);
1083 
1084 	memset(&sa, 0, sizeof sa);
1085 	salen = sizeof sa;
1086 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1087 	if (rc != 0) {
1088 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1089 		return false;
1090 	}
1091 
1092 	return (sa.ss_family == AF_INET6);
1093 }
1094 
1095 static bool
1096 uring_sock_is_ipv4(struct spdk_sock *_sock)
1097 {
1098 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1099 	struct sockaddr_storage sa;
1100 	socklen_t salen;
1101 	int rc;
1102 
1103 	assert(sock != NULL);
1104 
1105 	memset(&sa, 0, sizeof sa);
1106 	salen = sizeof sa;
1107 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1108 	if (rc != 0) {
1109 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1110 		return false;
1111 	}
1112 
1113 	return (sa.ss_family == AF_INET);
1114 }
1115 
1116 static bool
1117 uring_sock_is_connected(struct spdk_sock *_sock)
1118 {
1119 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1120 	uint8_t byte;
1121 	int rc;
1122 
1123 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1124 	if (rc == 0) {
1125 		return false;
1126 	}
1127 
1128 	if (rc < 0) {
1129 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1130 			return true;
1131 		}
1132 
1133 		return false;
1134 	}
1135 
1136 	return true;
1137 }
1138 
1139 static int
1140 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1141 {
1142 	int rc = -1;
1143 
1144 	if (!g_spdk_uring_sock_impl_opts.enable_placement_id) {
1145 		return rc;
1146 	}
1147 
1148 #if defined(SO_INCOMING_NAPI_ID)
1149 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1150 	socklen_t salen = sizeof(int);
1151 
1152 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1153 	if (rc != 0) {
1154 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1155 	}
1156 
1157 #endif
1158 	return rc;
1159 }
1160 
1161 static struct spdk_sock_group_impl *
1162 uring_sock_group_impl_create(void)
1163 {
1164 	struct spdk_uring_sock_group_impl *group_impl;
1165 
1166 	group_impl = calloc(1, sizeof(*group_impl));
1167 	if (group_impl == NULL) {
1168 		SPDK_ERRLOG("group_impl allocation failed\n");
1169 		return NULL;
1170 	}
1171 
1172 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1173 
1174 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1175 		SPDK_ERRLOG("uring I/O context setup failure\n");
1176 		free(group_impl);
1177 		return NULL;
1178 	}
1179 
1180 	TAILQ_INIT(&group_impl->pending_recv);
1181 
1182 	return &group_impl->base;
1183 }
1184 
1185 static int
1186 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1187 			       struct spdk_sock *_sock)
1188 {
1189 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1190 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1191 
1192 	sock->group = group;
1193 	sock->write_task.sock = sock;
1194 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1195 
1196 	sock->pollin_task.sock = sock;
1197 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1198 
1199 	sock->cancel_task.sock = sock;
1200 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1201 
1202 	/* switched from another polling group due to scheduling */
1203 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1204 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1205 		assert(sock->pending_recv == false);
1206 		sock->pending_recv = true;
1207 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1208 	}
1209 
1210 	return 0;
1211 }
1212 
1213 static int
1214 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1215 			   struct spdk_sock **socks)
1216 {
1217 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1218 	int count, ret;
1219 	int to_complete, to_submit;
1220 	struct spdk_sock *_sock, *tmp;
1221 	struct spdk_uring_sock *sock;
1222 
1223 	if (spdk_likely(socks)) {
1224 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1225 			sock = __uring_sock(_sock);
1226 			if (spdk_unlikely(sock->connection_status)) {
1227 				continue;
1228 			}
1229 			_sock_flush(_sock);
1230 			_sock_prep_pollin(_sock);
1231 		}
1232 	}
1233 
1234 	to_submit = group->io_queued;
1235 
1236 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1237 	if (to_submit > 0) {
1238 		/* If there are I/O to submit, use io_uring_submit here.
1239 		 * It will automatically call io_uring_enter appropriately. */
1240 		ret = io_uring_submit(&group->uring);
1241 		if (ret < 0) {
1242 			return 1;
1243 		}
1244 		group->io_queued = 0;
1245 		group->io_inflight += to_submit;
1246 		group->io_avail -= to_submit;
1247 	}
1248 
1249 	count = 0;
1250 	to_complete = group->io_inflight;
1251 	if (to_complete > 0) {
1252 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1253 	}
1254 
1255 	return count;
1256 }
1257 
1258 static int
1259 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1260 				  struct spdk_sock *_sock)
1261 {
1262 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1263 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1264 
1265 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1266 		_sock_prep_cancel_task(_sock, &sock->write_task);
1267 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1268 		 * currently can use a while loop here. */
1269 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1270 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1271 			uring_sock_group_impl_poll(_group, 32, NULL);
1272 		}
1273 	}
1274 
1275 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1276 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1277 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1278 		 * currently can use a while loop here. */
1279 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1280 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1281 			uring_sock_group_impl_poll(_group, 32, NULL);
1282 		}
1283 	}
1284 
1285 	if (sock->pending_recv) {
1286 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1287 		sock->pending_recv = false;
1288 	}
1289 	assert(sock->pending_recv == false);
1290 
1291 	sock->group = NULL;
1292 	return 0;
1293 }
1294 
1295 static int
1296 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1297 {
1298 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1299 
1300 	/* try to reap all the active I/O */
1301 	while (group->io_inflight) {
1302 		uring_sock_group_impl_poll(_group, 32, NULL);
1303 	}
1304 	assert(group->io_inflight == 0);
1305 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1306 
1307 	io_uring_queue_exit(&group->uring);
1308 
1309 	free(group);
1310 	return 0;
1311 }
1312 
1313 static int
1314 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1315 {
1316 	if (!opts || !len) {
1317 		errno = EINVAL;
1318 		return -1;
1319 	}
1320 
1321 #define FIELD_OK(field) \
1322 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1323 
1324 #define GET_FIELD(field) \
1325 	if (FIELD_OK(field)) { \
1326 		opts->field = g_spdk_uring_sock_impl_opts.field; \
1327 	}
1328 
1329 	GET_FIELD(recv_buf_size);
1330 	GET_FIELD(send_buf_size);
1331 	GET_FIELD(enable_recv_pipe);
1332 	GET_FIELD(enable_quickack);
1333 	GET_FIELD(enable_placement_id);
1334 
1335 #undef GET_FIELD
1336 #undef FIELD_OK
1337 
1338 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
1339 	return 0;
1340 }
1341 
1342 static int
1343 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1344 {
1345 	if (!opts) {
1346 		errno = EINVAL;
1347 		return -1;
1348 	}
1349 
1350 #define FIELD_OK(field) \
1351 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1352 
1353 #define SET_FIELD(field) \
1354 	if (FIELD_OK(field)) { \
1355 		g_spdk_uring_sock_impl_opts.field = opts->field; \
1356 	}
1357 
1358 	SET_FIELD(recv_buf_size);
1359 	SET_FIELD(send_buf_size);
1360 	SET_FIELD(enable_recv_pipe);
1361 	SET_FIELD(enable_quickack);
1362 	SET_FIELD(enable_placement_id);
1363 
1364 #undef SET_FIELD
1365 #undef FIELD_OK
1366 
1367 	return 0;
1368 }
1369 
1370 static int
1371 uring_sock_flush(struct spdk_sock *_sock)
1372 {
1373 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1374 
1375 	if (!sock->group) {
1376 		return _sock_flush_client(_sock);
1377 	}
1378 
1379 	return 0;
1380 }
1381 
1382 static struct spdk_net_impl g_uring_net_impl = {
1383 	.name		= "uring",
1384 	.getaddr	= uring_sock_getaddr,
1385 	.connect	= uring_sock_connect,
1386 	.listen		= uring_sock_listen,
1387 	.accept		= uring_sock_accept,
1388 	.close		= uring_sock_close,
1389 	.recv		= uring_sock_recv,
1390 	.readv		= uring_sock_readv,
1391 	.writev		= uring_sock_writev,
1392 	.writev_async	= uring_sock_writev_async,
1393 	.flush          = uring_sock_flush,
1394 	.set_recvlowat	= uring_sock_set_recvlowat,
1395 	.set_recvbuf	= uring_sock_set_recvbuf,
1396 	.set_sendbuf	= uring_sock_set_sendbuf,
1397 	.is_ipv6	= uring_sock_is_ipv6,
1398 	.is_ipv4	= uring_sock_is_ipv4,
1399 	.is_connected   = uring_sock_is_connected,
1400 	.get_placement_id	= uring_sock_get_placement_id,
1401 	.group_impl_create	= uring_sock_group_impl_create,
1402 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1403 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1404 	.group_impl_poll	= uring_sock_group_impl_poll,
1405 	.group_impl_close	= uring_sock_group_impl_close,
1406 	.get_opts		= uring_sock_impl_get_opts,
1407 	.set_opts		= uring_sock_impl_set_opts,
1408 };
1409 
1410 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1411