xref: /spdk/module/sock/uring/uring.c (revision 441431d22872ae4e05a1bf8b78e9aeff1eba1eb3)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <sys/epoll.h>
38 #include <liburing.h>
39 
40 #include "spdk/barrier.h"
41 #include "spdk/env.h"
42 #include "spdk/log.h"
43 #include "spdk/pipe.h"
44 #include "spdk/sock.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/sock.h"
49 #include "spdk_internal/assert.h"
50 
51 #define MAX_TMPBUF 1024
52 #define PORTNUMLEN 32
53 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
54 
55 enum spdk_sock_task_type {
56 	SPDK_SOCK_TASK_POLLIN = 0,
57 	SPDK_SOCK_TASK_WRITE,
58 	SPDK_SOCK_TASK_CANCEL,
59 };
60 
61 enum spdk_uring_sock_task_status {
62 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
63 	SPDK_URING_SOCK_TASK_IN_PROCESS,
64 };
65 
66 struct spdk_uring_task {
67 	enum spdk_uring_sock_task_status	status;
68 	enum spdk_sock_task_type		type;
69 	struct spdk_uring_sock			*sock;
70 	struct msghdr				msg;
71 	struct iovec				iovs[IOV_BATCH_SIZE];
72 	int					iov_cnt;
73 	struct spdk_sock_request		*last_req;
74 	STAILQ_ENTRY(spdk_uring_task)		link;
75 };
76 
77 struct spdk_uring_sock {
78 	struct spdk_sock			base;
79 	int					fd;
80 	struct spdk_uring_sock_group_impl	*group;
81 	struct spdk_uring_task			write_task;
82 	struct spdk_uring_task			pollin_task;
83 	struct spdk_uring_task			cancel_task;
84 	struct spdk_pipe			*recv_pipe;
85 	void					*recv_buf;
86 	int					recv_buf_sz;
87 	bool					pending_recv;
88 	int					connection_status;
89 	int					placement_id;
90 	TAILQ_ENTRY(spdk_uring_sock)		link;
91 };
92 
93 struct spdk_uring_sock_group_impl {
94 	struct spdk_sock_group_impl		base;
95 	struct io_uring				uring;
96 	uint32_t				io_inflight;
97 	uint32_t				io_queued;
98 	uint32_t				io_avail;
99 	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
100 };
101 
102 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
103 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
104 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
105 	.enable_recv_pipe = true,
106 	.enable_quickack = false,
107 	.enable_placement_id = PLACEMENT_NONE,
108 };
109 
110 static struct spdk_sock_map g_map = {
111 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
112 	.mtx = PTHREAD_MUTEX_INITIALIZER
113 };
114 
115 __attribute((destructor)) static void
116 uring_sock_map_cleanup(void)
117 {
118 	spdk_sock_map_cleanup(&g_map);
119 }
120 
121 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
122 
123 static int
124 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
125 {
126 	const char *result = NULL;
127 
128 	if (sa == NULL || host == NULL) {
129 		return -1;
130 	}
131 
132 	switch (sa->sa_family) {
133 	case AF_INET:
134 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
135 				   host, hlen);
136 		break;
137 	case AF_INET6:
138 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
139 				   host, hlen);
140 		break;
141 	default:
142 		break;
143 	}
144 
145 	if (result != NULL) {
146 		return 0;
147 	} else {
148 		return -1;
149 	}
150 }
151 
152 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
153 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
154 
155 static int
156 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
157 		   char *caddr, int clen, uint16_t *cport)
158 {
159 	struct spdk_uring_sock *sock = __uring_sock(_sock);
160 	struct sockaddr_storage sa;
161 	socklen_t salen;
162 	int rc;
163 
164 	assert(sock != NULL);
165 
166 	memset(&sa, 0, sizeof sa);
167 	salen = sizeof sa;
168 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
169 	if (rc != 0) {
170 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
171 		return -1;
172 	}
173 
174 	switch (sa.ss_family) {
175 	case AF_UNIX:
176 		/* Acceptable connection types that don't have IPs */
177 		return 0;
178 	case AF_INET:
179 	case AF_INET6:
180 		/* Code below will get IP addresses */
181 		break;
182 	default:
183 		/* Unsupported socket family */
184 		return -1;
185 	}
186 
187 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
188 	if (rc != 0) {
189 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
190 		return -1;
191 	}
192 
193 	if (sport) {
194 		if (sa.ss_family == AF_INET) {
195 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
196 		} else if (sa.ss_family == AF_INET6) {
197 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
198 		}
199 	}
200 
201 	memset(&sa, 0, sizeof sa);
202 	salen = sizeof sa;
203 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
204 	if (rc != 0) {
205 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
206 		return -1;
207 	}
208 
209 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
210 	if (rc != 0) {
211 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
212 		return -1;
213 	}
214 
215 	if (cport) {
216 		if (sa.ss_family == AF_INET) {
217 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
218 		} else if (sa.ss_family == AF_INET6) {
219 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
220 		}
221 	}
222 
223 	return 0;
224 }
225 
226 enum uring_sock_create_type {
227 	SPDK_SOCK_CREATE_LISTEN,
228 	SPDK_SOCK_CREATE_CONNECT,
229 };
230 
231 static int
232 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
233 {
234 	uint8_t *new_buf;
235 	struct spdk_pipe *new_pipe;
236 	struct iovec siov[2];
237 	struct iovec diov[2];
238 	int sbytes;
239 	ssize_t bytes;
240 
241 	if (sock->recv_buf_sz == sz) {
242 		return 0;
243 	}
244 
245 	/* If the new size is 0, just free the pipe */
246 	if (sz == 0) {
247 		spdk_pipe_destroy(sock->recv_pipe);
248 		free(sock->recv_buf);
249 		sock->recv_pipe = NULL;
250 		sock->recv_buf = NULL;
251 		return 0;
252 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
253 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
254 		return -1;
255 	}
256 
257 	/* Round up to next 64 byte multiple */
258 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
259 	if (!new_buf) {
260 		SPDK_ERRLOG("socket recv buf allocation failed\n");
261 		return -ENOMEM;
262 	}
263 
264 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
265 	if (new_pipe == NULL) {
266 		SPDK_ERRLOG("socket pipe allocation failed\n");
267 		free(new_buf);
268 		return -ENOMEM;
269 	}
270 
271 	if (sock->recv_pipe != NULL) {
272 		/* Pull all of the data out of the old pipe */
273 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
274 		if (sbytes > sz) {
275 			/* Too much data to fit into the new pipe size */
276 			spdk_pipe_destroy(new_pipe);
277 			free(new_buf);
278 			return -EINVAL;
279 		}
280 
281 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
282 		assert(sbytes == sz);
283 
284 		bytes = spdk_iovcpy(siov, 2, diov, 2);
285 		spdk_pipe_writer_advance(new_pipe, bytes);
286 
287 		spdk_pipe_destroy(sock->recv_pipe);
288 		free(sock->recv_buf);
289 	}
290 
291 	sock->recv_buf_sz = sz;
292 	sock->recv_buf = new_buf;
293 	sock->recv_pipe = new_pipe;
294 
295 	return 0;
296 }
297 
298 static int
299 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
300 {
301 	struct spdk_uring_sock *sock = __uring_sock(_sock);
302 	int rc;
303 
304 	assert(sock != NULL);
305 
306 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
307 		rc = uring_sock_alloc_pipe(sock, sz);
308 		if (rc) {
309 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
310 			return rc;
311 		}
312 	}
313 
314 	if (sz < MIN_SO_RCVBUF_SIZE) {
315 		sz = MIN_SO_RCVBUF_SIZE;
316 	}
317 
318 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
319 	if (rc < 0) {
320 		return rc;
321 	}
322 
323 	return 0;
324 }
325 
326 static int
327 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
328 {
329 	struct spdk_uring_sock *sock = __uring_sock(_sock);
330 	int rc;
331 
332 	assert(sock != NULL);
333 
334 	if (sz < MIN_SO_SNDBUF_SIZE) {
335 		sz = MIN_SO_SNDBUF_SIZE;
336 	}
337 
338 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
339 	if (rc < 0) {
340 		return rc;
341 	}
342 
343 	return 0;
344 }
345 
346 static struct spdk_uring_sock *
347 uring_sock_alloc(int fd)
348 {
349 	struct spdk_uring_sock *sock;
350 #if defined(__linux__)
351 	int flag;
352 	int rc;
353 #endif
354 
355 	sock = calloc(1, sizeof(*sock));
356 	if (sock == NULL) {
357 		SPDK_ERRLOG("sock allocation failed\n");
358 		return NULL;
359 	}
360 
361 	sock->fd = fd;
362 
363 #if defined(__linux__)
364 	flag = 1;
365 
366 	if (g_spdk_uring_sock_impl_opts.enable_quickack) {
367 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
368 		if (rc != 0) {
369 			SPDK_ERRLOG("quickack was failed to set\n");
370 		}
371 	}
372 
373 	spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id,
374 				   &sock->placement_id);
375 #endif
376 
377 	return sock;
378 }
379 
380 static struct spdk_sock *
381 uring_sock_create(const char *ip, int port,
382 		  enum uring_sock_create_type type,
383 		  struct spdk_sock_opts *opts)
384 {
385 	struct spdk_uring_sock *sock;
386 	char buf[MAX_TMPBUF];
387 	char portnum[PORTNUMLEN];
388 	char *p;
389 	struct addrinfo hints, *res, *res0;
390 	int fd, flag;
391 	int val = 1;
392 	int rc;
393 
394 	if (ip == NULL) {
395 		return NULL;
396 	}
397 	if (ip[0] == '[') {
398 		snprintf(buf, sizeof(buf), "%s", ip + 1);
399 		p = strchr(buf, ']');
400 		if (p != NULL) {
401 			*p = '\0';
402 		}
403 		ip = (const char *) &buf[0];
404 	}
405 
406 	snprintf(portnum, sizeof portnum, "%d", port);
407 	memset(&hints, 0, sizeof hints);
408 	hints.ai_family = PF_UNSPEC;
409 	hints.ai_socktype = SOCK_STREAM;
410 	hints.ai_flags = AI_NUMERICSERV;
411 	hints.ai_flags |= AI_PASSIVE;
412 	hints.ai_flags |= AI_NUMERICHOST;
413 	rc = getaddrinfo(ip, portnum, &hints, &res0);
414 	if (rc != 0) {
415 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
416 		return NULL;
417 	}
418 
419 	/* try listen */
420 	fd = -1;
421 	for (res = res0; res != NULL; res = res->ai_next) {
422 retry:
423 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
424 		if (fd < 0) {
425 			/* error */
426 			continue;
427 		}
428 
429 		val = g_spdk_uring_sock_impl_opts.recv_buf_size;
430 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
431 		if (rc) {
432 			/* Not fatal */
433 		}
434 
435 		val = g_spdk_uring_sock_impl_opts.send_buf_size;
436 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
437 		if (rc) {
438 			/* Not fatal */
439 		}
440 
441 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
442 		if (rc != 0) {
443 			close(fd);
444 			/* error */
445 			continue;
446 		}
447 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
448 		if (rc != 0) {
449 			close(fd);
450 			/* error */
451 			continue;
452 		}
453 
454 #if defined(SO_PRIORITY)
455 		if (opts != NULL && opts->priority) {
456 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
457 			if (rc != 0) {
458 				close(fd);
459 				/* error */
460 				continue;
461 			}
462 		}
463 #endif
464 		if (res->ai_family == AF_INET6) {
465 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
466 			if (rc != 0) {
467 				close(fd);
468 				/* error */
469 				continue;
470 			}
471 		}
472 
473 		if (type == SPDK_SOCK_CREATE_LISTEN) {
474 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
475 			if (rc != 0) {
476 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
477 				switch (errno) {
478 				case EINTR:
479 					/* interrupted? */
480 					close(fd);
481 					goto retry;
482 				case EADDRNOTAVAIL:
483 					SPDK_ERRLOG("IP address %s not available. "
484 						    "Verify IP address in config file "
485 						    "and make sure setup script is "
486 						    "run before starting spdk app.\n", ip);
487 				/* FALLTHROUGH */
488 				default:
489 					/* try next family */
490 					close(fd);
491 					fd = -1;
492 					continue;
493 				}
494 			}
495 			/* bind OK */
496 			rc = listen(fd, 512);
497 			if (rc != 0) {
498 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
499 				close(fd);
500 				fd = -1;
501 				break;
502 			}
503 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
504 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
505 			if (rc != 0) {
506 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
507 				/* try next family */
508 				close(fd);
509 				fd = -1;
510 				continue;
511 			}
512 		}
513 
514 		flag = fcntl(fd, F_GETFL);
515 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
516 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
517 			close(fd);
518 			fd = -1;
519 			break;
520 		}
521 		break;
522 	}
523 	freeaddrinfo(res0);
524 
525 	if (fd < 0) {
526 		return NULL;
527 	}
528 
529 	sock = uring_sock_alloc(fd);
530 	if (sock == NULL) {
531 		SPDK_ERRLOG("sock allocation failed\n");
532 		close(fd);
533 		return NULL;
534 	}
535 
536 	return &sock->base;
537 }
538 
539 static struct spdk_sock *
540 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
541 {
542 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
543 }
544 
545 static struct spdk_sock *
546 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
547 {
548 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
549 }
550 
551 static struct spdk_sock *
552 uring_sock_accept(struct spdk_sock *_sock)
553 {
554 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
555 	struct sockaddr_storage		sa;
556 	socklen_t			salen;
557 	int				rc, fd;
558 	struct spdk_uring_sock		*new_sock;
559 	int				flag;
560 
561 	memset(&sa, 0, sizeof(sa));
562 	salen = sizeof(sa);
563 
564 	assert(sock != NULL);
565 
566 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
567 
568 	if (rc == -1) {
569 		return NULL;
570 	}
571 
572 	fd = rc;
573 
574 	flag = fcntl(fd, F_GETFL);
575 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
576 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
577 		close(fd);
578 		return NULL;
579 	}
580 
581 #if defined(SO_PRIORITY)
582 	/* The priority is not inherited, so call this function again */
583 	if (sock->base.opts.priority) {
584 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
585 		if (rc != 0) {
586 			close(fd);
587 			return NULL;
588 		}
589 	}
590 #endif
591 
592 	new_sock = uring_sock_alloc(fd);
593 	if (new_sock == NULL) {
594 		close(fd);
595 		return NULL;
596 	}
597 
598 	return &new_sock->base;
599 }
600 
601 static int
602 uring_sock_close(struct spdk_sock *_sock)
603 {
604 	struct spdk_uring_sock *sock = __uring_sock(_sock);
605 
606 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
607 	assert(sock->group == NULL);
608 
609 	/* If the socket fails to close, the best choice is to
610 	 * leak the fd but continue to free the rest of the sock
611 	 * memory. */
612 	close(sock->fd);
613 
614 	spdk_pipe_destroy(sock->recv_pipe);
615 	free(sock->recv_buf);
616 	free(sock);
617 
618 	return 0;
619 }
620 
621 static ssize_t
622 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
623 {
624 	struct iovec siov[2];
625 	int sbytes;
626 	ssize_t bytes;
627 	struct spdk_uring_sock_group_impl *group;
628 
629 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
630 	if (sbytes < 0) {
631 		errno = EINVAL;
632 		return -1;
633 	} else if (sbytes == 0) {
634 		errno = EAGAIN;
635 		return -1;
636 	}
637 
638 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
639 
640 	if (bytes == 0) {
641 		/* The only way this happens is if diov is 0 length */
642 		errno = EINVAL;
643 		return -1;
644 	}
645 
646 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
647 
648 	/* If we drained the pipe, take it off the level-triggered list */
649 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
650 		group = __uring_group_impl(sock->base.group_impl);
651 		TAILQ_REMOVE(&group->pending_recv, sock, link);
652 		sock->pending_recv = false;
653 	}
654 
655 	return bytes;
656 }
657 
658 static inline ssize_t
659 uring_sock_read(struct spdk_uring_sock *sock)
660 {
661 	struct iovec iov[2];
662 	int bytes;
663 	struct spdk_uring_sock_group_impl *group;
664 
665 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
666 
667 	if (bytes > 0) {
668 		bytes = readv(sock->fd, iov, 2);
669 		if (bytes > 0) {
670 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
671 			if (sock->base.group_impl) {
672 				group = __uring_group_impl(sock->base.group_impl);
673 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
674 				sock->pending_recv = true;
675 			}
676 		}
677 	}
678 
679 	return bytes;
680 }
681 
682 static ssize_t
683 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
684 {
685 	struct spdk_uring_sock *sock = __uring_sock(_sock);
686 	int rc, i;
687 	size_t len;
688 
689 	if (sock->recv_pipe == NULL) {
690 		return readv(sock->fd, iov, iovcnt);
691 	}
692 
693 	len = 0;
694 	for (i = 0; i < iovcnt; i++) {
695 		len += iov[i].iov_len;
696 	}
697 
698 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
699 		/* If the user is receiving a sufficiently large amount of data,
700 		 * receive directly to their buffers. */
701 		if (len >= MIN_SOCK_PIPE_SIZE) {
702 			return readv(sock->fd, iov, iovcnt);
703 		}
704 
705 		/* Otherwise, do a big read into our pipe */
706 		rc = uring_sock_read(sock);
707 		if (rc <= 0) {
708 			return rc;
709 		}
710 	}
711 
712 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
713 }
714 
715 static ssize_t
716 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
717 {
718 	struct iovec iov[1];
719 
720 	iov[0].iov_base = buf;
721 	iov[0].iov_len = len;
722 
723 	return uring_sock_readv(sock, iov, 1);
724 }
725 
726 static ssize_t
727 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
728 {
729 	struct spdk_uring_sock *sock = __uring_sock(_sock);
730 
731 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
732 		errno = EAGAIN;
733 		return -1;
734 	}
735 
736 	return writev(sock->fd, iov, iovcnt);
737 }
738 
739 static int
740 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
741 {
742 	struct spdk_sock_request *req;
743 	int i, retval;
744 	unsigned int offset;
745 	size_t len;
746 
747 	/* Consume the requests that were actually written */
748 	req = TAILQ_FIRST(&_sock->queued_reqs);
749 	while (req) {
750 		offset = req->internal.offset;
751 
752 		for (i = 0; i < req->iovcnt; i++) {
753 			/* Advance by the offset first */
754 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
755 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
756 				continue;
757 			}
758 
759 			/* Calculate the remaining length of this element */
760 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
761 
762 			if (len > (size_t)rc) {
763 				/* This element was partially sent. */
764 				req->internal.offset += rc;
765 				return 0;
766 			}
767 
768 			offset = 0;
769 			req->internal.offset += len;
770 			rc -= len;
771 		}
772 
773 		/* Handled a full request. */
774 		spdk_sock_request_pend(_sock, req);
775 
776 		retval = spdk_sock_request_put(_sock, req, 0);
777 		if (retval) {
778 			return retval;
779 		}
780 
781 		if (rc == 0) {
782 			break;
783 		}
784 
785 		req = TAILQ_FIRST(&_sock->queued_reqs);
786 	}
787 
788 	return 0;
789 }
790 
791 static void
792 _sock_flush(struct spdk_sock *_sock)
793 {
794 	struct spdk_uring_sock *sock = __uring_sock(_sock);
795 	struct spdk_uring_task *task = &sock->write_task;
796 	uint32_t iovcnt;
797 	struct io_uring_sqe *sqe;
798 
799 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
800 		return;
801 	}
802 
803 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
804 	if (!iovcnt) {
805 		return;
806 	}
807 
808 	task->iov_cnt = iovcnt;
809 	assert(sock->group != NULL);
810 	task->msg.msg_iov = task->iovs;
811 	task->msg.msg_iovlen = task->iov_cnt;
812 
813 	sock->group->io_queued++;
814 
815 	sqe = io_uring_get_sqe(&sock->group->uring);
816 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
817 	io_uring_sqe_set_data(sqe, task);
818 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
819 }
820 
821 static void
822 _sock_prep_pollin(struct spdk_sock *_sock)
823 {
824 	struct spdk_uring_sock *sock = __uring_sock(_sock);
825 	struct spdk_uring_task *task = &sock->pollin_task;
826 	struct io_uring_sqe *sqe;
827 
828 	/* Do not prepare pollin event */
829 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
830 		return;
831 	}
832 
833 	assert(sock->group != NULL);
834 	sock->group->io_queued++;
835 
836 	sqe = io_uring_get_sqe(&sock->group->uring);
837 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
838 	io_uring_sqe_set_data(sqe, task);
839 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
840 }
841 
842 static void
843 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
844 {
845 	struct spdk_uring_sock *sock = __uring_sock(_sock);
846 	struct spdk_uring_task *task = &sock->cancel_task;
847 	struct io_uring_sqe *sqe;
848 
849 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
850 		return;
851 	}
852 
853 	assert(sock->group != NULL);
854 	sock->group->io_queued++;
855 
856 	sqe = io_uring_get_sqe(&sock->group->uring);
857 	io_uring_prep_cancel(sqe, user_data, 0);
858 	io_uring_sqe_set_data(sqe, task);
859 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
860 }
861 
862 static int
863 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
864 		      struct spdk_sock **socks)
865 {
866 	int i, count, ret;
867 	struct io_uring_cqe *cqe;
868 	struct spdk_uring_sock *sock, *tmp;
869 	struct spdk_uring_task *task;
870 	int status;
871 
872 	for (i = 0; i < max; i++) {
873 		ret = io_uring_peek_cqe(&group->uring, &cqe);
874 		if (ret != 0) {
875 			break;
876 		}
877 
878 		if (cqe == NULL) {
879 			break;
880 		}
881 
882 		task = (struct spdk_uring_task *)cqe->user_data;
883 		assert(task != NULL);
884 		sock = task->sock;
885 		assert(sock != NULL);
886 		assert(sock->group != NULL);
887 		assert(sock->group == group);
888 		sock->group->io_inflight--;
889 		sock->group->io_avail++;
890 		status = cqe->res;
891 		io_uring_cqe_seen(&group->uring, cqe);
892 
893 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
894 
895 		if (spdk_unlikely(status <= 0)) {
896 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
897 				continue;
898 			}
899 		}
900 
901 		switch (task->type) {
902 		case SPDK_SOCK_TASK_POLLIN:
903 			if ((status & POLLIN) == POLLIN) {
904 				if (sock->base.cb_fn != NULL) {
905 					assert(sock->pending_recv == false);
906 					sock->pending_recv = true;
907 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
908 				}
909 			}
910 			break;
911 		case SPDK_SOCK_TASK_WRITE:
912 			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
913 			task->last_req = NULL;
914 			task->iov_cnt = 0;
915 			if (spdk_unlikely(status) < 0) {
916 				sock->connection_status = status;
917 				spdk_sock_abort_requests(&sock->base);
918 			} else {
919 				sock_complete_reqs(&sock->base, status);
920 			}
921 
922 			break;
923 		case SPDK_SOCK_TASK_CANCEL:
924 			/* Do nothing */
925 			break;
926 		default:
927 			SPDK_UNREACHABLE();
928 		}
929 	}
930 
931 	if (!socks) {
932 		return 0;
933 	}
934 	count = 0;
935 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
936 		if (count == max_read_events) {
937 			break;
938 		}
939 
940 		/* If the socket's cb_fn is NULL, just remove it from
941 		 * the list and do not add it to socks array */
942 		if (spdk_unlikely(sock->base.cb_fn == NULL)) {
943 			sock->pending_recv = false;
944 			TAILQ_REMOVE(&group->pending_recv, sock, link);
945 			continue;
946 		}
947 
948 		socks[count++] = &sock->base;
949 	}
950 
951 	/* Cycle the pending_recv list so that each time we poll things aren't
952 	 * in the same order. */
953 	for (i = 0; i < count; i++) {
954 		sock = __uring_sock(socks[i]);
955 
956 		TAILQ_REMOVE(&group->pending_recv, sock, link);
957 
958 		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
959 			sock->pending_recv = false;
960 		} else {
961 			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
962 		}
963 	}
964 
965 	return count;
966 }
967 
968 static int
969 _sock_flush_client(struct spdk_sock *_sock)
970 {
971 	struct spdk_uring_sock *sock = __uring_sock(_sock);
972 	struct msghdr msg = {};
973 	struct iovec iovs[IOV_BATCH_SIZE];
974 	int iovcnt;
975 	ssize_t rc;
976 
977 	/* Can't flush from within a callback or we end up with recursive calls */
978 	if (_sock->cb_cnt > 0) {
979 		return 0;
980 	}
981 
982 	/* Gather an iov */
983 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL);
984 	if (iovcnt == 0) {
985 		return 0;
986 	}
987 
988 	/* Perform the vectored write */
989 	msg.msg_iov = iovs;
990 	msg.msg_iovlen = iovcnt;
991 	rc = sendmsg(sock->fd, &msg, 0);
992 	if (rc <= 0) {
993 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
994 			return 0;
995 		}
996 		return rc;
997 	}
998 
999 	sock_complete_reqs(_sock, rc);
1000 
1001 	return 0;
1002 }
1003 
1004 static void
1005 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1006 {
1007 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1008 	int rc;
1009 
1010 	if (spdk_unlikely(sock->connection_status)) {
1011 		req->cb_fn(req->cb_arg, sock->connection_status);
1012 		return;
1013 	}
1014 
1015 	spdk_sock_request_queue(_sock, req);
1016 
1017 	if (!sock->group) {
1018 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1019 			rc = _sock_flush_client(_sock);
1020 			if (rc) {
1021 				spdk_sock_abort_requests(_sock);
1022 			}
1023 		}
1024 	}
1025 }
1026 
1027 static int
1028 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1029 {
1030 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1031 	int val;
1032 	int rc;
1033 
1034 	assert(sock != NULL);
1035 
1036 	val = nbytes;
1037 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1038 	if (rc != 0) {
1039 		return -1;
1040 	}
1041 	return 0;
1042 }
1043 
1044 static bool
1045 uring_sock_is_ipv6(struct spdk_sock *_sock)
1046 {
1047 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1048 	struct sockaddr_storage sa;
1049 	socklen_t salen;
1050 	int rc;
1051 
1052 	assert(sock != NULL);
1053 
1054 	memset(&sa, 0, sizeof sa);
1055 	salen = sizeof sa;
1056 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1057 	if (rc != 0) {
1058 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1059 		return false;
1060 	}
1061 
1062 	return (sa.ss_family == AF_INET6);
1063 }
1064 
1065 static bool
1066 uring_sock_is_ipv4(struct spdk_sock *_sock)
1067 {
1068 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1069 	struct sockaddr_storage sa;
1070 	socklen_t salen;
1071 	int rc;
1072 
1073 	assert(sock != NULL);
1074 
1075 	memset(&sa, 0, sizeof sa);
1076 	salen = sizeof sa;
1077 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1078 	if (rc != 0) {
1079 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1080 		return false;
1081 	}
1082 
1083 	return (sa.ss_family == AF_INET);
1084 }
1085 
1086 static bool
1087 uring_sock_is_connected(struct spdk_sock *_sock)
1088 {
1089 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1090 	uint8_t byte;
1091 	int rc;
1092 
1093 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1094 	if (rc == 0) {
1095 		return false;
1096 	}
1097 
1098 	if (rc < 0) {
1099 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1100 			return true;
1101 		}
1102 
1103 		return false;
1104 	}
1105 
1106 	return true;
1107 }
1108 
1109 static struct spdk_sock_group_impl *
1110 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock)
1111 {
1112 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1113 	struct spdk_sock_group_impl *group;
1114 
1115 	if (sock->placement_id != -1) {
1116 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group);
1117 		return group;
1118 	}
1119 
1120 	return NULL;
1121 }
1122 
1123 static struct spdk_sock_group_impl *
1124 uring_sock_group_impl_create(void)
1125 {
1126 	struct spdk_uring_sock_group_impl *group_impl;
1127 
1128 	group_impl = calloc(1, sizeof(*group_impl));
1129 	if (group_impl == NULL) {
1130 		SPDK_ERRLOG("group_impl allocation failed\n");
1131 		return NULL;
1132 	}
1133 
1134 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1135 
1136 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1137 		SPDK_ERRLOG("uring I/O context setup failure\n");
1138 		free(group_impl);
1139 		return NULL;
1140 	}
1141 
1142 	TAILQ_INIT(&group_impl->pending_recv);
1143 
1144 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1145 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1146 	}
1147 
1148 	return &group_impl->base;
1149 }
1150 
1151 static int
1152 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1153 			       struct spdk_sock *_sock)
1154 {
1155 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1156 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1157 	int rc;
1158 
1159 	sock->group = group;
1160 	sock->write_task.sock = sock;
1161 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1162 
1163 	sock->pollin_task.sock = sock;
1164 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1165 
1166 	sock->cancel_task.sock = sock;
1167 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1168 
1169 	/* switched from another polling group due to scheduling */
1170 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1171 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1172 		assert(sock->pending_recv == false);
1173 		sock->pending_recv = true;
1174 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1175 	}
1176 
1177 	if (sock->placement_id != -1) {
1178 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1179 		if (rc != 0) {
1180 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1181 			/* Do not treat this as an error. The system will continue running. */
1182 		}
1183 	}
1184 
1185 	return 0;
1186 }
1187 
1188 static int
1189 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1190 			   struct spdk_sock **socks)
1191 {
1192 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1193 	int count, ret;
1194 	int to_complete, to_submit;
1195 	struct spdk_sock *_sock, *tmp;
1196 	struct spdk_uring_sock *sock;
1197 
1198 	if (spdk_likely(socks)) {
1199 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1200 			sock = __uring_sock(_sock);
1201 			if (spdk_unlikely(sock->connection_status)) {
1202 				continue;
1203 			}
1204 			_sock_flush(_sock);
1205 			_sock_prep_pollin(_sock);
1206 		}
1207 	}
1208 
1209 	to_submit = group->io_queued;
1210 
1211 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1212 	if (to_submit > 0) {
1213 		/* If there are I/O to submit, use io_uring_submit here.
1214 		 * It will automatically call io_uring_enter appropriately. */
1215 		ret = io_uring_submit(&group->uring);
1216 		if (ret < 0) {
1217 			return 1;
1218 		}
1219 		group->io_queued = 0;
1220 		group->io_inflight += to_submit;
1221 		group->io_avail -= to_submit;
1222 	}
1223 
1224 	count = 0;
1225 	to_complete = group->io_inflight;
1226 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1227 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1228 	}
1229 
1230 	return count;
1231 }
1232 
1233 static int
1234 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1235 				  struct spdk_sock *_sock)
1236 {
1237 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1238 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1239 
1240 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1241 		_sock_prep_cancel_task(_sock, &sock->write_task);
1242 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1243 		 * currently can use a while loop here. */
1244 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1245 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1246 			uring_sock_group_impl_poll(_group, 32, NULL);
1247 		}
1248 	}
1249 
1250 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1251 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1252 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1253 		 * currently can use a while loop here. */
1254 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1255 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1256 			uring_sock_group_impl_poll(_group, 32, NULL);
1257 		}
1258 	}
1259 
1260 	if (sock->pending_recv) {
1261 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1262 		sock->pending_recv = false;
1263 	}
1264 	assert(sock->pending_recv == false);
1265 
1266 	if (sock->placement_id != -1) {
1267 		spdk_sock_map_release(&g_map, sock->placement_id);
1268 	}
1269 
1270 	sock->group = NULL;
1271 	return 0;
1272 }
1273 
1274 static int
1275 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1276 {
1277 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1278 
1279 	/* try to reap all the active I/O */
1280 	while (group->io_inflight) {
1281 		uring_sock_group_impl_poll(_group, 32, NULL);
1282 	}
1283 	assert(group->io_inflight == 0);
1284 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1285 
1286 	io_uring_queue_exit(&group->uring);
1287 
1288 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1289 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1290 	}
1291 
1292 	free(group);
1293 	return 0;
1294 }
1295 
1296 static int
1297 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1298 {
1299 	if (!opts || !len) {
1300 		errno = EINVAL;
1301 		return -1;
1302 	}
1303 	memset(opts, 0, *len);
1304 
1305 #define FIELD_OK(field) \
1306 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1307 
1308 #define GET_FIELD(field) \
1309 	if (FIELD_OK(field)) { \
1310 		opts->field = g_spdk_uring_sock_impl_opts.field; \
1311 	}
1312 
1313 	GET_FIELD(recv_buf_size);
1314 	GET_FIELD(send_buf_size);
1315 	GET_FIELD(enable_recv_pipe);
1316 	GET_FIELD(enable_quickack);
1317 	GET_FIELD(enable_placement_id);
1318 
1319 #undef GET_FIELD
1320 #undef FIELD_OK
1321 
1322 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
1323 	return 0;
1324 }
1325 
1326 static int
1327 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1328 {
1329 	if (!opts) {
1330 		errno = EINVAL;
1331 		return -1;
1332 	}
1333 
1334 #define FIELD_OK(field) \
1335 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1336 
1337 #define SET_FIELD(field) \
1338 	if (FIELD_OK(field)) { \
1339 		g_spdk_uring_sock_impl_opts.field = opts->field; \
1340 	}
1341 
1342 	SET_FIELD(recv_buf_size);
1343 	SET_FIELD(send_buf_size);
1344 	SET_FIELD(enable_recv_pipe);
1345 	SET_FIELD(enable_quickack);
1346 	SET_FIELD(enable_placement_id);
1347 
1348 #undef SET_FIELD
1349 #undef FIELD_OK
1350 
1351 	return 0;
1352 }
1353 
1354 static int
1355 uring_sock_flush(struct spdk_sock *_sock)
1356 {
1357 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1358 
1359 	if (!sock->group) {
1360 		return _sock_flush_client(_sock);
1361 	}
1362 
1363 	return 0;
1364 }
1365 
1366 static struct spdk_net_impl g_uring_net_impl = {
1367 	.name		= "uring",
1368 	.getaddr	= uring_sock_getaddr,
1369 	.connect	= uring_sock_connect,
1370 	.listen		= uring_sock_listen,
1371 	.accept		= uring_sock_accept,
1372 	.close		= uring_sock_close,
1373 	.recv		= uring_sock_recv,
1374 	.readv		= uring_sock_readv,
1375 	.writev		= uring_sock_writev,
1376 	.writev_async	= uring_sock_writev_async,
1377 	.flush          = uring_sock_flush,
1378 	.set_recvlowat	= uring_sock_set_recvlowat,
1379 	.set_recvbuf	= uring_sock_set_recvbuf,
1380 	.set_sendbuf	= uring_sock_set_sendbuf,
1381 	.is_ipv6	= uring_sock_is_ipv6,
1382 	.is_ipv4	= uring_sock_is_ipv4,
1383 	.is_connected   = uring_sock_is_connected,
1384 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
1385 	.group_impl_create	= uring_sock_group_impl_create,
1386 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1387 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1388 	.group_impl_poll	= uring_sock_group_impl_poll,
1389 	.group_impl_close	= uring_sock_group_impl_close,
1390 	.get_opts		= uring_sock_impl_get_opts,
1391 	.set_opts		= uring_sock_impl_set_opts,
1392 };
1393 
1394 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1395