xref: /spdk/module/sock/uring/uring.c (revision 6e5d6032a09ca918509e7c6f28d6d2e20b8dc832)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <linux/errqueue.h>
38 #include <sys/epoll.h>
39 #include <liburing.h>
40 
41 #include "spdk/barrier.h"
42 #include "spdk/env.h"
43 #include "spdk/log.h"
44 #include "spdk/pipe.h"
45 #include "spdk/sock.h"
46 #include "spdk/string.h"
47 #include "spdk/util.h"
48 
49 #include "spdk_internal/sock.h"
50 #include "spdk_internal/assert.h"
51 #include "../sock_kernel.h"
52 
53 #define MAX_TMPBUF 1024
54 #define PORTNUMLEN 32
55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
56 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
57 
58 enum spdk_sock_task_type {
59 	SPDK_SOCK_TASK_POLLIN = 0,
60 	SPDK_SOCK_TASK_RECV,
61 	SPDK_SOCK_TASK_WRITE,
62 	SPDK_SOCK_TASK_CANCEL,
63 };
64 
65 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
66 #define SPDK_ZEROCOPY
67 #endif
68 
69 enum spdk_uring_sock_task_status {
70 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
71 	SPDK_URING_SOCK_TASK_IN_PROCESS,
72 };
73 
74 struct spdk_uring_task {
75 	enum spdk_uring_sock_task_status	status;
76 	enum spdk_sock_task_type		type;
77 	struct spdk_uring_sock			*sock;
78 	struct msghdr				msg;
79 	struct iovec				iovs[IOV_BATCH_SIZE];
80 	int					iov_cnt;
81 	struct spdk_sock_request		*last_req;
82 	STAILQ_ENTRY(spdk_uring_task)		link;
83 };
84 
85 struct spdk_uring_sock {
86 	struct spdk_sock			base;
87 	int					fd;
88 	uint32_t				sendmsg_idx;
89 	struct spdk_uring_sock_group_impl	*group;
90 	struct spdk_uring_task			write_task;
91 	struct spdk_uring_task			recv_task;
92 	struct spdk_uring_task			pollin_task;
93 	struct spdk_uring_task			cancel_task;
94 	struct spdk_pipe			*recv_pipe;
95 	void					*recv_buf;
96 	int					recv_buf_sz;
97 	bool					zcopy;
98 	bool					pending_recv;
99 	int					zcopy_send_flags;
100 	int					connection_status;
101 	int					placement_id;
102 	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
103 	TAILQ_ENTRY(spdk_uring_sock)		link;
104 };
105 
106 TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
107 
108 struct spdk_uring_sock_group_impl {
109 	struct spdk_sock_group_impl		base;
110 	struct io_uring				uring;
111 	uint32_t				io_inflight;
112 	uint32_t				io_queued;
113 	uint32_t				io_avail;
114 	struct pending_recv_list		pending_recv;
115 };
116 
117 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
118 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
119 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
120 	.enable_recv_pipe = true,
121 	.enable_quickack = false,
122 	.enable_placement_id = PLACEMENT_NONE,
123 	.enable_zerocopy_send_server = false,
124 	.enable_zerocopy_send_client = false,
125 };
126 
127 static struct spdk_sock_map g_map = {
128 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
129 	.mtx = PTHREAD_MUTEX_INITIALIZER
130 };
131 
132 __attribute((destructor)) static void
133 uring_sock_map_cleanup(void)
134 {
135 	spdk_sock_map_cleanup(&g_map);
136 }
137 
138 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
139 
140 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
141 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
142 
143 static int
144 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
145 		   char *caddr, int clen, uint16_t *cport)
146 {
147 	struct spdk_uring_sock *sock = __uring_sock(_sock);
148 	struct sockaddr_storage sa;
149 	socklen_t salen;
150 	int rc;
151 
152 	assert(sock != NULL);
153 
154 	memset(&sa, 0, sizeof sa);
155 	salen = sizeof sa;
156 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
157 	if (rc != 0) {
158 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
159 		return -1;
160 	}
161 
162 	switch (sa.ss_family) {
163 	case AF_UNIX:
164 		/* Acceptable connection types that don't have IPs */
165 		return 0;
166 	case AF_INET:
167 	case AF_INET6:
168 		/* Code below will get IP addresses */
169 		break;
170 	default:
171 		/* Unsupported socket family */
172 		return -1;
173 	}
174 
175 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
176 	if (rc != 0) {
177 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
178 		return -1;
179 	}
180 
181 	if (sport) {
182 		if (sa.ss_family == AF_INET) {
183 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
184 		} else if (sa.ss_family == AF_INET6) {
185 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
186 		}
187 	}
188 
189 	memset(&sa, 0, sizeof sa);
190 	salen = sizeof sa;
191 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
192 	if (rc != 0) {
193 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
194 		return -1;
195 	}
196 
197 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
198 	if (rc != 0) {
199 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
200 		return -1;
201 	}
202 
203 	if (cport) {
204 		if (sa.ss_family == AF_INET) {
205 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
206 		} else if (sa.ss_family == AF_INET6) {
207 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
208 		}
209 	}
210 
211 	return 0;
212 }
213 
214 enum uring_sock_create_type {
215 	SPDK_SOCK_CREATE_LISTEN,
216 	SPDK_SOCK_CREATE_CONNECT,
217 };
218 
219 static int
220 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
221 {
222 	uint8_t *new_buf;
223 	struct spdk_pipe *new_pipe;
224 	struct iovec siov[2];
225 	struct iovec diov[2];
226 	int sbytes;
227 	ssize_t bytes;
228 
229 	if (sock->recv_buf_sz == sz) {
230 		return 0;
231 	}
232 
233 	/* If the new size is 0, just free the pipe */
234 	if (sz == 0) {
235 		spdk_pipe_destroy(sock->recv_pipe);
236 		free(sock->recv_buf);
237 		sock->recv_pipe = NULL;
238 		sock->recv_buf = NULL;
239 		return 0;
240 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
241 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
242 		return -1;
243 	}
244 
245 	/* Round up to next 64 byte multiple */
246 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
247 	if (!new_buf) {
248 		SPDK_ERRLOG("socket recv buf allocation failed\n");
249 		return -ENOMEM;
250 	}
251 
252 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
253 	if (new_pipe == NULL) {
254 		SPDK_ERRLOG("socket pipe allocation failed\n");
255 		free(new_buf);
256 		return -ENOMEM;
257 	}
258 
259 	if (sock->recv_pipe != NULL) {
260 		/* Pull all of the data out of the old pipe */
261 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
262 		if (sbytes > sz) {
263 			/* Too much data to fit into the new pipe size */
264 			spdk_pipe_destroy(new_pipe);
265 			free(new_buf);
266 			return -EINVAL;
267 		}
268 
269 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
270 		assert(sbytes == sz);
271 
272 		bytes = spdk_iovcpy(siov, 2, diov, 2);
273 		spdk_pipe_writer_advance(new_pipe, bytes);
274 
275 		spdk_pipe_destroy(sock->recv_pipe);
276 		free(sock->recv_buf);
277 	}
278 
279 	sock->recv_buf_sz = sz;
280 	sock->recv_buf = new_buf;
281 	sock->recv_pipe = new_pipe;
282 
283 	return 0;
284 }
285 
286 static int
287 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
288 {
289 	struct spdk_uring_sock *sock = __uring_sock(_sock);
290 	int rc;
291 
292 	assert(sock != NULL);
293 
294 	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
295 		rc = uring_sock_alloc_pipe(sock, sz);
296 		if (rc) {
297 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
298 			return rc;
299 		}
300 	}
301 
302 	if (sz < MIN_SO_RCVBUF_SIZE) {
303 		sz = MIN_SO_RCVBUF_SIZE;
304 	}
305 
306 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
307 	if (rc < 0) {
308 		return rc;
309 	}
310 
311 	return 0;
312 }
313 
314 static int
315 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
316 {
317 	struct spdk_uring_sock *sock = __uring_sock(_sock);
318 	int rc;
319 
320 	assert(sock != NULL);
321 
322 	if (sz < MIN_SO_SNDBUF_SIZE) {
323 		sz = MIN_SO_SNDBUF_SIZE;
324 	}
325 
326 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
327 	if (rc < 0) {
328 		return rc;
329 	}
330 
331 	return 0;
332 }
333 
334 static struct spdk_uring_sock *
335 uring_sock_alloc(int fd, bool enable_zero_copy)
336 {
337 	struct spdk_uring_sock *sock;
338 #if defined(__linux__)
339 	int flag;
340 	int rc;
341 #endif
342 
343 	sock = calloc(1, sizeof(*sock));
344 	if (sock == NULL) {
345 		SPDK_ERRLOG("sock allocation failed\n");
346 		return NULL;
347 	}
348 
349 	sock->fd = fd;
350 
351 #if defined(__linux__)
352 	flag = 1;
353 
354 	if (g_spdk_uring_sock_impl_opts.enable_quickack) {
355 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
356 		if (rc != 0) {
357 			SPDK_ERRLOG("quickack was failed to set\n");
358 		}
359 	}
360 
361 	spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id,
362 				   &sock->placement_id);
363 #ifdef SPDK_ZEROCOPY
364 	/* Try to turn on zero copy sends */
365 	flag = 1;
366 
367 	if (enable_zero_copy) {
368 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
369 		if (rc == 0) {
370 			sock->zcopy = true;
371 			sock->zcopy_send_flags = MSG_ZEROCOPY;
372 		}
373 	}
374 #endif
375 #endif
376 
377 	return sock;
378 }
379 
380 static struct spdk_sock *
381 uring_sock_create(const char *ip, int port,
382 		  enum uring_sock_create_type type,
383 		  struct spdk_sock_opts *opts)
384 {
385 	struct spdk_uring_sock *sock;
386 	char buf[MAX_TMPBUF];
387 	char portnum[PORTNUMLEN];
388 	char *p;
389 	struct addrinfo hints, *res, *res0;
390 	int fd, flag;
391 	int val = 1;
392 	int rc;
393 	bool enable_zcopy_impl_opts = false;
394 	bool enable_zcopy_user_opts = true;
395 
396 	assert(opts != NULL);
397 
398 	if (ip == NULL) {
399 		return NULL;
400 	}
401 	if (ip[0] == '[') {
402 		snprintf(buf, sizeof(buf), "%s", ip + 1);
403 		p = strchr(buf, ']');
404 		if (p != NULL) {
405 			*p = '\0';
406 		}
407 		ip = (const char *) &buf[0];
408 	}
409 
410 	snprintf(portnum, sizeof portnum, "%d", port);
411 	memset(&hints, 0, sizeof hints);
412 	hints.ai_family = PF_UNSPEC;
413 	hints.ai_socktype = SOCK_STREAM;
414 	hints.ai_flags = AI_NUMERICSERV;
415 	hints.ai_flags |= AI_PASSIVE;
416 	hints.ai_flags |= AI_NUMERICHOST;
417 	rc = getaddrinfo(ip, portnum, &hints, &res0);
418 	if (rc != 0) {
419 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
420 		return NULL;
421 	}
422 
423 	/* try listen */
424 	fd = -1;
425 	for (res = res0; res != NULL; res = res->ai_next) {
426 retry:
427 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
428 		if (fd < 0) {
429 			/* error */
430 			continue;
431 		}
432 
433 		val = g_spdk_uring_sock_impl_opts.recv_buf_size;
434 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
435 		if (rc) {
436 			/* Not fatal */
437 		}
438 
439 		val = g_spdk_uring_sock_impl_opts.send_buf_size;
440 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
441 		if (rc) {
442 			/* Not fatal */
443 		}
444 
445 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
446 		if (rc != 0) {
447 			close(fd);
448 			fd = -1;
449 			/* error */
450 			continue;
451 		}
452 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
453 		if (rc != 0) {
454 			close(fd);
455 			fd = -1;
456 			/* error */
457 			continue;
458 		}
459 
460 #if defined(SO_PRIORITY)
461 		if (opts != NULL && opts->priority) {
462 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
463 			if (rc != 0) {
464 				close(fd);
465 				fd = -1;
466 				/* error */
467 				continue;
468 			}
469 		}
470 #endif
471 		if (res->ai_family == AF_INET6) {
472 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
473 			if (rc != 0) {
474 				close(fd);
475 				fd = -1;
476 				/* error */
477 				continue;
478 			}
479 		}
480 
481 		if (type == SPDK_SOCK_CREATE_LISTEN) {
482 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
483 			if (rc != 0) {
484 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
485 				switch (errno) {
486 				case EINTR:
487 					/* interrupted? */
488 					close(fd);
489 					goto retry;
490 				case EADDRNOTAVAIL:
491 					SPDK_ERRLOG("IP address %s not available. "
492 						    "Verify IP address in config file "
493 						    "and make sure setup script is "
494 						    "run before starting spdk app.\n", ip);
495 				/* FALLTHROUGH */
496 				default:
497 					/* try next family */
498 					close(fd);
499 					fd = -1;
500 					continue;
501 				}
502 			}
503 			/* bind OK */
504 			rc = listen(fd, 512);
505 			if (rc != 0) {
506 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
507 				close(fd);
508 				fd = -1;
509 				break;
510 			}
511 			enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_server;
512 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
513 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
514 			if (rc != 0) {
515 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
516 				/* try next family */
517 				close(fd);
518 				fd = -1;
519 				continue;
520 			}
521 
522 			enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_client;
523 		}
524 
525 		flag = fcntl(fd, F_GETFL);
526 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
527 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
528 			close(fd);
529 			fd = -1;
530 			break;
531 		}
532 		break;
533 	}
534 	freeaddrinfo(res0);
535 
536 	if (fd < 0) {
537 		return NULL;
538 	}
539 
540 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
541 	sock = uring_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts);
542 	if (sock == NULL) {
543 		SPDK_ERRLOG("sock allocation failed\n");
544 		close(fd);
545 		return NULL;
546 	}
547 
548 	return &sock->base;
549 }
550 
551 static struct spdk_sock *
552 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
553 {
554 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
555 }
556 
557 static struct spdk_sock *
558 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
559 {
560 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
561 }
562 
563 static struct spdk_sock *
564 uring_sock_accept(struct spdk_sock *_sock)
565 {
566 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
567 	struct sockaddr_storage		sa;
568 	socklen_t			salen;
569 	int				rc, fd;
570 	struct spdk_uring_sock		*new_sock;
571 	int				flag;
572 
573 	memset(&sa, 0, sizeof(sa));
574 	salen = sizeof(sa);
575 
576 	assert(sock != NULL);
577 
578 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
579 
580 	if (rc == -1) {
581 		return NULL;
582 	}
583 
584 	fd = rc;
585 
586 	flag = fcntl(fd, F_GETFL);
587 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
588 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
589 		close(fd);
590 		return NULL;
591 	}
592 
593 #if defined(SO_PRIORITY)
594 	/* The priority is not inherited, so call this function again */
595 	if (sock->base.opts.priority) {
596 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
597 		if (rc != 0) {
598 			close(fd);
599 			return NULL;
600 		}
601 	}
602 #endif
603 
604 	new_sock = uring_sock_alloc(fd, sock->zcopy);
605 	if (new_sock == NULL) {
606 		close(fd);
607 		return NULL;
608 	}
609 
610 	return &new_sock->base;
611 }
612 
613 static int
614 uring_sock_close(struct spdk_sock *_sock)
615 {
616 	struct spdk_uring_sock *sock = __uring_sock(_sock);
617 
618 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
619 	assert(sock->group == NULL);
620 
621 	/* If the socket fails to close, the best choice is to
622 	 * leak the fd but continue to free the rest of the sock
623 	 * memory. */
624 	close(sock->fd);
625 
626 	spdk_pipe_destroy(sock->recv_pipe);
627 	free(sock->recv_buf);
628 	free(sock);
629 
630 	return 0;
631 }
632 
633 static ssize_t
634 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
635 {
636 	struct iovec siov[2];
637 	int sbytes;
638 	ssize_t bytes;
639 	struct spdk_uring_sock_group_impl *group;
640 
641 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
642 	if (sbytes < 0) {
643 		errno = EINVAL;
644 		return -1;
645 	} else if (sbytes == 0) {
646 		errno = EAGAIN;
647 		return -1;
648 	}
649 
650 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
651 
652 	if (bytes == 0) {
653 		/* The only way this happens is if diov is 0 length */
654 		errno = EINVAL;
655 		return -1;
656 	}
657 
658 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
659 
660 	/* If we drained the pipe, take it off the level-triggered list */
661 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
662 		group = __uring_group_impl(sock->base.group_impl);
663 		TAILQ_REMOVE(&group->pending_recv, sock, link);
664 		sock->pending_recv = false;
665 	}
666 
667 	return bytes;
668 }
669 
670 static inline ssize_t
671 uring_sock_read(struct spdk_uring_sock *sock)
672 {
673 	struct iovec iov[2];
674 	int bytes;
675 	struct spdk_uring_sock_group_impl *group;
676 
677 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
678 
679 	if (bytes > 0) {
680 		bytes = readv(sock->fd, iov, 2);
681 		if (bytes > 0) {
682 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
683 			if (sock->base.group_impl) {
684 				group = __uring_group_impl(sock->base.group_impl);
685 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
686 				sock->pending_recv = true;
687 			}
688 		}
689 	}
690 
691 	return bytes;
692 }
693 
694 static ssize_t
695 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
696 {
697 	struct spdk_uring_sock *sock = __uring_sock(_sock);
698 	int rc, i;
699 	size_t len;
700 
701 	if (sock->recv_pipe == NULL) {
702 		return readv(sock->fd, iov, iovcnt);
703 	}
704 
705 	len = 0;
706 	for (i = 0; i < iovcnt; i++) {
707 		len += iov[i].iov_len;
708 	}
709 
710 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
711 		/* If the user is receiving a sufficiently large amount of data,
712 		 * receive directly to their buffers. */
713 		if (len >= MIN_SOCK_PIPE_SIZE) {
714 			return readv(sock->fd, iov, iovcnt);
715 		}
716 
717 		/* Otherwise, do a big read into our pipe */
718 		rc = uring_sock_read(sock);
719 		if (rc <= 0) {
720 			return rc;
721 		}
722 	}
723 
724 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
725 }
726 
727 static ssize_t
728 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
729 {
730 	struct iovec iov[1];
731 
732 	iov[0].iov_base = buf;
733 	iov[0].iov_len = len;
734 
735 	return uring_sock_readv(sock, iov, 1);
736 }
737 
738 static ssize_t
739 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
740 {
741 	struct spdk_uring_sock *sock = __uring_sock(_sock);
742 
743 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
744 		errno = EAGAIN;
745 		return -1;
746 	}
747 
748 	return writev(sock->fd, iov, iovcnt);
749 }
750 
751 static int
752 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
753 {
754 	struct spdk_uring_sock *sock = __uring_sock(_sock);
755 	struct spdk_sock_request *req;
756 	int i, retval;
757 	unsigned int offset;
758 	size_t len;
759 
760 	if (sock->zcopy) {
761 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
762 		 * req->internal.offset, so sendmsg_idx should not be zero */
763 		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
764 			sock->sendmsg_idx = 1;
765 		} else {
766 			sock->sendmsg_idx++;
767 		}
768 	}
769 
770 	/* Consume the requests that were actually written */
771 	req = TAILQ_FIRST(&_sock->queued_reqs);
772 	while (req) {
773 		offset = req->internal.offset;
774 
775 		for (i = 0; i < req->iovcnt; i++) {
776 			/* Advance by the offset first */
777 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
778 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
779 				continue;
780 			}
781 
782 			/* Calculate the remaining length of this element */
783 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
784 
785 			if (len > (size_t)rc) {
786 				/* This element was partially sent. */
787 				req->internal.offset += rc;
788 				return 0;
789 			}
790 
791 			offset = 0;
792 			req->internal.offset += len;
793 			rc -= len;
794 		}
795 
796 		/* Handled a full request. */
797 		spdk_sock_request_pend(_sock, req);
798 
799 		if (!sock->zcopy) {
800 			retval = spdk_sock_request_put(_sock, req, 0);
801 			if (retval) {
802 				return retval;
803 			}
804 		} else {
805 			/* Re-use the offset field to hold the sendmsg call index. The
806 			 * index is 0 based, so subtract one here because we've already
807 			 * incremented above. */
808 			req->internal.offset = sock->sendmsg_idx - 1;
809 		}
810 
811 		if (rc == 0) {
812 			break;
813 		}
814 
815 		req = TAILQ_FIRST(&_sock->queued_reqs);
816 	}
817 
818 	return 0;
819 }
820 
821 #ifdef SPDK_ZEROCOPY
822 static int
823 _sock_check_zcopy(struct spdk_sock *_sock, int status)
824 {
825 	struct spdk_uring_sock *sock = __uring_sock(_sock);
826 	ssize_t rc;
827 	struct sock_extended_err *serr;
828 	struct cmsghdr *cm;
829 	uint32_t idx;
830 	struct spdk_sock_request *req, *treq;
831 	bool found;
832 
833 	assert(sock->zcopy == true);
834 	if (spdk_unlikely(status) < 0) {
835 		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
836 			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
837 				    status);
838 		} else {
839 			SPDK_WARNLOG("Recvmsg yielded an error!\n");
840 		}
841 		return 0;
842 	}
843 
844 	cm = CMSG_FIRSTHDR(&sock->recv_task.msg);
845 	if (cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
846 		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
847 		return 0;
848 	}
849 
850 	serr = (struct sock_extended_err *)CMSG_DATA(cm);
851 	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
852 		SPDK_WARNLOG("Unexpected extended error origin\n");
853 		return 0;
854 	}
855 
856 	/* Most of the time, the pending_reqs array is in the exact
857 	 * order we need such that all of the requests to complete are
858 	 * in order, in the front. It is guaranteed that all requests
859 	 * belonging to the same sendmsg call are sequential, so once
860 	 * we encounter one match we can stop looping as soon as a
861 	 * non-match is found.
862 	 */
863 	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
864 		found = false;
865 		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
866 			if (req->internal.offset == idx) {
867 				found = true;
868 				rc = spdk_sock_request_put(_sock, req, 0);
869 				if (rc < 0) {
870 					return rc;
871 				}
872 
873 			} else if (found) {
874 				break;
875 			}
876 		}
877 	}
878 
879 	return 0;
880 }
881 
882 static void
883 _sock_prep_recv(struct spdk_sock *_sock)
884 {
885 	struct spdk_uring_sock *sock = __uring_sock(_sock);
886 	struct spdk_uring_task *task = &sock->recv_task;
887 	struct io_uring_sqe *sqe;
888 
889 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
890 		return;
891 	}
892 
893 	assert(sock->group != NULL);
894 	sock->group->io_queued++;
895 
896 	sqe = io_uring_get_sqe(&sock->group->uring);
897 	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
898 	io_uring_sqe_set_data(sqe, task);
899 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
900 }
901 
902 #endif
903 
904 static void
905 _sock_flush(struct spdk_sock *_sock)
906 {
907 	struct spdk_uring_sock *sock = __uring_sock(_sock);
908 	struct spdk_uring_task *task = &sock->write_task;
909 	uint32_t iovcnt;
910 	struct io_uring_sqe *sqe;
911 	int flags = MSG_DONTWAIT | sock->zcopy_send_flags;
912 
913 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
914 		return;
915 	}
916 
917 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
918 	if (!iovcnt) {
919 		return;
920 	}
921 
922 	task->iov_cnt = iovcnt;
923 	assert(sock->group != NULL);
924 	task->msg.msg_iov = task->iovs;
925 	task->msg.msg_iovlen = task->iov_cnt;
926 
927 	sock->group->io_queued++;
928 
929 	sqe = io_uring_get_sqe(&sock->group->uring);
930 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
931 	io_uring_sqe_set_data(sqe, task);
932 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
933 }
934 
935 static void
936 _sock_prep_pollin(struct spdk_sock *_sock)
937 {
938 	struct spdk_uring_sock *sock = __uring_sock(_sock);
939 	struct spdk_uring_task *task = &sock->pollin_task;
940 	struct io_uring_sqe *sqe;
941 
942 	/* Do not prepare pollin event */
943 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) {
944 		return;
945 	}
946 
947 	assert(sock->group != NULL);
948 	sock->group->io_queued++;
949 
950 	sqe = io_uring_get_sqe(&sock->group->uring);
951 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR);
952 	io_uring_sqe_set_data(sqe, task);
953 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
954 }
955 
956 static void
957 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
958 {
959 	struct spdk_uring_sock *sock = __uring_sock(_sock);
960 	struct spdk_uring_task *task = &sock->cancel_task;
961 	struct io_uring_sqe *sqe;
962 
963 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
964 		return;
965 	}
966 
967 	assert(sock->group != NULL);
968 	sock->group->io_queued++;
969 
970 	sqe = io_uring_get_sqe(&sock->group->uring);
971 	io_uring_prep_cancel(sqe, user_data, 0);
972 	io_uring_sqe_set_data(sqe, task);
973 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
974 }
975 
976 static int
977 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
978 		      struct spdk_sock **socks)
979 {
980 	int i, count, ret;
981 	struct io_uring_cqe *cqe;
982 	struct spdk_uring_sock *sock, *tmp;
983 	struct spdk_uring_task *task;
984 	int status;
985 
986 	for (i = 0; i < max; i++) {
987 		ret = io_uring_peek_cqe(&group->uring, &cqe);
988 		if (ret != 0) {
989 			break;
990 		}
991 
992 		if (cqe == NULL) {
993 			break;
994 		}
995 
996 		task = (struct spdk_uring_task *)cqe->user_data;
997 		assert(task != NULL);
998 		sock = task->sock;
999 		assert(sock != NULL);
1000 		assert(sock->group != NULL);
1001 		assert(sock->group == group);
1002 		sock->group->io_inflight--;
1003 		sock->group->io_avail++;
1004 		status = cqe->res;
1005 		io_uring_cqe_seen(&group->uring, cqe);
1006 
1007 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
1008 
1009 		if (spdk_unlikely(status <= 0)) {
1010 			if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) {
1011 				continue;
1012 			}
1013 		}
1014 
1015 		switch (task->type) {
1016 		case SPDK_SOCK_TASK_POLLIN:
1017 #ifdef SPDK_ZEROCOPY
1018 			if ((status & POLLERR) == POLLERR) {
1019 				_sock_prep_recv(&sock->base);
1020 			}
1021 #endif
1022 			if ((status & POLLIN) == POLLIN) {
1023 				if (sock->base.cb_fn != NULL &&
1024 				    sock->pending_recv == false) {
1025 					sock->pending_recv = true;
1026 					TAILQ_INSERT_HEAD(&group->pending_recv, sock, link);
1027 				}
1028 			}
1029 			break;
1030 		case SPDK_SOCK_TASK_WRITE:
1031 			task->last_req = NULL;
1032 			task->iov_cnt = 0;
1033 			if (spdk_unlikely(status) < 0) {
1034 				sock->connection_status = status;
1035 				spdk_sock_abort_requests(&sock->base);
1036 			} else {
1037 				sock_complete_reqs(&sock->base, status);
1038 			}
1039 
1040 			break;
1041 #ifdef SPDK_ZEROCOPY
1042 		case SPDK_SOCK_TASK_RECV:
1043 			if (spdk_unlikely(status == -ECANCELED)) {
1044 				sock->connection_status = status;
1045 				break;
1046 			}
1047 			_sock_check_zcopy(&sock->base, status);
1048 			break;
1049 #endif
1050 		case SPDK_SOCK_TASK_CANCEL:
1051 			/* Do nothing */
1052 			break;
1053 		default:
1054 			SPDK_UNREACHABLE();
1055 		}
1056 	}
1057 
1058 	if (!socks) {
1059 		return 0;
1060 	}
1061 	count = 0;
1062 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1063 		if (count == max_read_events) {
1064 			break;
1065 		}
1066 
1067 		if (spdk_unlikely(sock->base.cb_fn == NULL) ||
1068 		    (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) {
1069 			sock->pending_recv = false;
1070 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1071 			if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1072 				/* If the socket's cb_fn is NULL, do not add it to socks array */
1073 				continue;
1074 			}
1075 		}
1076 
1077 		socks[count++] = &sock->base;
1078 	}
1079 
1080 
1081 	/* Cycle the pending_recv list so that each time we poll things aren't
1082 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1083 	 * A B C D E F
1084 	 * And all 6 sockets had the poll events, but max_events is only 3. That means
1085 	 * psock currently points at D. We want to rearrange the list to the following:
1086 	 * D E F A B C
1087 	 *
1088 	 * The variables below are named according to this example to make it easier to
1089 	 * follow the swaps.
1090 	 */
1091 	if (sock != NULL) {
1092 		struct spdk_uring_sock *ua, *uc, *ud, *uf;
1093 
1094 		/* Capture pointers to the elements we need */
1095 		ud = sock;
1096 
1097 		ua = TAILQ_FIRST(&group->pending_recv);
1098 		if (ua == ud) {
1099 			goto end;
1100 		}
1101 
1102 		uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
1103 		if (uf == ud) {
1104 			TAILQ_REMOVE(&group->pending_recv, ud, link);
1105 			TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
1106 			goto end;
1107 		}
1108 
1109 		uc = TAILQ_PREV(ud, pending_recv_list, link);
1110 		assert(uc != NULL);
1111 
1112 		/* Break the link between C and D */
1113 		uc->link.tqe_next = NULL;
1114 
1115 		/* Connect F to A */
1116 		uf->link.tqe_next = ua;
1117 		ua->link.tqe_prev = &uf->link.tqe_next;
1118 
1119 		/* Fix up the list first/last pointers */
1120 		group->pending_recv.tqh_first = ud;
1121 		group->pending_recv.tqh_last = &uc->link.tqe_next;
1122 
1123 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1124 		ud->link.tqe_prev = &group->pending_recv.tqh_first;
1125 	}
1126 
1127 end:
1128 	return count;
1129 }
1130 
1131 static int
1132 _sock_flush_client(struct spdk_sock *_sock)
1133 {
1134 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1135 	struct msghdr msg = {};
1136 	struct iovec iovs[IOV_BATCH_SIZE];
1137 	int iovcnt;
1138 	ssize_t rc;
1139 	int flags = sock->zcopy_send_flags;
1140 
1141 	/* Can't flush from within a callback or we end up with recursive calls */
1142 	if (_sock->cb_cnt > 0) {
1143 		return 0;
1144 	}
1145 
1146 	/* Gather an iov */
1147 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL);
1148 	if (iovcnt == 0) {
1149 		return 0;
1150 	}
1151 
1152 	/* Perform the vectored write */
1153 	msg.msg_iov = iovs;
1154 	msg.msg_iovlen = iovcnt;
1155 	rc = sendmsg(sock->fd, &msg, flags);
1156 	if (rc <= 0) {
1157 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1158 			return 0;
1159 		}
1160 		return rc;
1161 	}
1162 
1163 	sock_complete_reqs(_sock, rc);
1164 
1165 #ifdef SPDK_ZEROCOPY
1166 	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
1167 		_sock_check_zcopy(_sock, 0);
1168 	}
1169 #endif
1170 
1171 	return 0;
1172 }
1173 
1174 static void
1175 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1176 {
1177 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1178 	int rc;
1179 
1180 	if (spdk_unlikely(sock->connection_status)) {
1181 		req->cb_fn(req->cb_arg, sock->connection_status);
1182 		return;
1183 	}
1184 
1185 	spdk_sock_request_queue(_sock, req);
1186 
1187 	if (!sock->group) {
1188 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1189 			rc = _sock_flush_client(_sock);
1190 			if (rc) {
1191 				spdk_sock_abort_requests(_sock);
1192 			}
1193 		}
1194 	}
1195 }
1196 
1197 static int
1198 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1199 {
1200 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1201 	int val;
1202 	int rc;
1203 
1204 	assert(sock != NULL);
1205 
1206 	val = nbytes;
1207 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1208 	if (rc != 0) {
1209 		return -1;
1210 	}
1211 	return 0;
1212 }
1213 
1214 static bool
1215 uring_sock_is_ipv6(struct spdk_sock *_sock)
1216 {
1217 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1218 	struct sockaddr_storage sa;
1219 	socklen_t salen;
1220 	int rc;
1221 
1222 	assert(sock != NULL);
1223 
1224 	memset(&sa, 0, sizeof sa);
1225 	salen = sizeof sa;
1226 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1227 	if (rc != 0) {
1228 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1229 		return false;
1230 	}
1231 
1232 	return (sa.ss_family == AF_INET6);
1233 }
1234 
1235 static bool
1236 uring_sock_is_ipv4(struct spdk_sock *_sock)
1237 {
1238 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1239 	struct sockaddr_storage sa;
1240 	socklen_t salen;
1241 	int rc;
1242 
1243 	assert(sock != NULL);
1244 
1245 	memset(&sa, 0, sizeof sa);
1246 	salen = sizeof sa;
1247 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1248 	if (rc != 0) {
1249 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1250 		return false;
1251 	}
1252 
1253 	return (sa.ss_family == AF_INET);
1254 }
1255 
1256 static bool
1257 uring_sock_is_connected(struct spdk_sock *_sock)
1258 {
1259 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1260 	uint8_t byte;
1261 	int rc;
1262 
1263 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1264 	if (rc == 0) {
1265 		return false;
1266 	}
1267 
1268 	if (rc < 0) {
1269 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1270 			return true;
1271 		}
1272 
1273 		return false;
1274 	}
1275 
1276 	return true;
1277 }
1278 
1279 static struct spdk_sock_group_impl *
1280 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock)
1281 {
1282 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1283 	struct spdk_sock_group_impl *group;
1284 
1285 	if (sock->placement_id != -1) {
1286 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group);
1287 		return group;
1288 	}
1289 
1290 	return NULL;
1291 }
1292 
1293 static struct spdk_sock_group_impl *
1294 uring_sock_group_impl_create(void)
1295 {
1296 	struct spdk_uring_sock_group_impl *group_impl;
1297 
1298 	group_impl = calloc(1, sizeof(*group_impl));
1299 	if (group_impl == NULL) {
1300 		SPDK_ERRLOG("group_impl allocation failed\n");
1301 		return NULL;
1302 	}
1303 
1304 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1305 
1306 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1307 		SPDK_ERRLOG("uring I/O context setup failure\n");
1308 		free(group_impl);
1309 		return NULL;
1310 	}
1311 
1312 	TAILQ_INIT(&group_impl->pending_recv);
1313 
1314 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1315 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1316 	}
1317 
1318 	return &group_impl->base;
1319 }
1320 
1321 static int
1322 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1323 			       struct spdk_sock *_sock)
1324 {
1325 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1326 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1327 	int rc;
1328 
1329 	sock->group = group;
1330 	sock->write_task.sock = sock;
1331 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1332 
1333 	sock->pollin_task.sock = sock;
1334 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1335 
1336 	sock->recv_task.sock = sock;
1337 	sock->recv_task.type = SPDK_SOCK_TASK_RECV;
1338 	sock->recv_task.msg.msg_control = sock->buf;
1339 	sock->recv_task.msg.msg_controllen = sizeof(sock->buf);
1340 
1341 	sock->cancel_task.sock = sock;
1342 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1343 
1344 	/* switched from another polling group due to scheduling */
1345 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1346 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1347 		assert(sock->pending_recv == false);
1348 		sock->pending_recv = true;
1349 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1350 	}
1351 
1352 	if (sock->placement_id != -1) {
1353 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1354 		if (rc != 0) {
1355 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1356 			/* Do not treat this as an error. The system will continue running. */
1357 		}
1358 	}
1359 
1360 	return 0;
1361 }
1362 
1363 static int
1364 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1365 			   struct spdk_sock **socks)
1366 {
1367 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1368 	int count, ret;
1369 	int to_complete, to_submit;
1370 	struct spdk_sock *_sock, *tmp;
1371 	struct spdk_uring_sock *sock;
1372 
1373 	if (spdk_likely(socks)) {
1374 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1375 			sock = __uring_sock(_sock);
1376 			if (spdk_unlikely(sock->connection_status)) {
1377 				continue;
1378 			}
1379 			_sock_flush(_sock);
1380 			_sock_prep_pollin(_sock);
1381 		}
1382 	}
1383 
1384 	to_submit = group->io_queued;
1385 
1386 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1387 	if (to_submit > 0) {
1388 		/* If there are I/O to submit, use io_uring_submit here.
1389 		 * It will automatically call io_uring_enter appropriately. */
1390 		ret = io_uring_submit(&group->uring);
1391 		if (ret < 0) {
1392 			return 1;
1393 		}
1394 		group->io_queued = 0;
1395 		group->io_inflight += to_submit;
1396 		group->io_avail -= to_submit;
1397 	}
1398 
1399 	count = 0;
1400 	to_complete = group->io_inflight;
1401 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1402 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1403 	}
1404 
1405 	return count;
1406 }
1407 
1408 static int
1409 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1410 				  struct spdk_sock *_sock)
1411 {
1412 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1413 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1414 
1415 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1416 		_sock_prep_cancel_task(_sock, &sock->write_task);
1417 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1418 		 * currently can use a while loop here. */
1419 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1420 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1421 			uring_sock_group_impl_poll(_group, 32, NULL);
1422 		}
1423 	}
1424 
1425 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1426 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1427 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1428 		 * currently can use a while loop here. */
1429 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1430 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1431 			uring_sock_group_impl_poll(_group, 32, NULL);
1432 		}
1433 	}
1434 
1435 	if (sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1436 		_sock_prep_cancel_task(_sock, &sock->recv_task);
1437 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1438 		 * currently can use a while loop here. */
1439 		while ((sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1440 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1441 			uring_sock_group_impl_poll(_group, 32, NULL);
1442 		}
1443 	}
1444 
1445 	/* Make sure the cancelling the tasks above didn't cause sending new requests */
1446 	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1447 	assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1448 	assert(sock->recv_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1449 
1450 	if (sock->pending_recv) {
1451 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1452 		sock->pending_recv = false;
1453 	}
1454 	assert(sock->pending_recv == false);
1455 
1456 	if (sock->placement_id != -1) {
1457 		spdk_sock_map_release(&g_map, sock->placement_id);
1458 	}
1459 
1460 	sock->group = NULL;
1461 	return 0;
1462 }
1463 
1464 static int
1465 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1466 {
1467 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1468 
1469 	/* try to reap all the active I/O */
1470 	while (group->io_inflight) {
1471 		uring_sock_group_impl_poll(_group, 32, NULL);
1472 	}
1473 	assert(group->io_inflight == 0);
1474 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1475 
1476 	io_uring_queue_exit(&group->uring);
1477 
1478 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1479 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1480 	}
1481 
1482 	free(group);
1483 	return 0;
1484 }
1485 
1486 static int
1487 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1488 {
1489 	if (!opts || !len) {
1490 		errno = EINVAL;
1491 		return -1;
1492 	}
1493 	memset(opts, 0, *len);
1494 
1495 #define FIELD_OK(field) \
1496 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1497 
1498 #define GET_FIELD(field) \
1499 	if (FIELD_OK(field)) { \
1500 		opts->field = g_spdk_uring_sock_impl_opts.field; \
1501 	}
1502 
1503 	GET_FIELD(recv_buf_size);
1504 	GET_FIELD(send_buf_size);
1505 	GET_FIELD(enable_recv_pipe);
1506 	GET_FIELD(enable_quickack);
1507 	GET_FIELD(enable_placement_id);
1508 	GET_FIELD(enable_zerocopy_send_server);
1509 	GET_FIELD(enable_zerocopy_send_client);
1510 
1511 #undef GET_FIELD
1512 #undef FIELD_OK
1513 
1514 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
1515 	return 0;
1516 }
1517 
1518 static int
1519 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1520 {
1521 	if (!opts) {
1522 		errno = EINVAL;
1523 		return -1;
1524 	}
1525 
1526 #define FIELD_OK(field) \
1527 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1528 
1529 #define SET_FIELD(field) \
1530 	if (FIELD_OK(field)) { \
1531 		g_spdk_uring_sock_impl_opts.field = opts->field; \
1532 	}
1533 
1534 	SET_FIELD(recv_buf_size);
1535 	SET_FIELD(send_buf_size);
1536 	SET_FIELD(enable_recv_pipe);
1537 	SET_FIELD(enable_quickack);
1538 	SET_FIELD(enable_placement_id);
1539 	SET_FIELD(enable_zerocopy_send_server);
1540 	SET_FIELD(enable_zerocopy_send_client);
1541 
1542 #undef SET_FIELD
1543 #undef FIELD_OK
1544 
1545 	return 0;
1546 }
1547 
1548 static int
1549 uring_sock_flush(struct spdk_sock *_sock)
1550 {
1551 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1552 
1553 	if (!sock->group) {
1554 		return _sock_flush_client(_sock);
1555 	}
1556 
1557 	return 0;
1558 }
1559 
1560 static struct spdk_net_impl g_uring_net_impl = {
1561 	.name		= "uring",
1562 	.getaddr	= uring_sock_getaddr,
1563 	.connect	= uring_sock_connect,
1564 	.listen		= uring_sock_listen,
1565 	.accept		= uring_sock_accept,
1566 	.close		= uring_sock_close,
1567 	.recv		= uring_sock_recv,
1568 	.readv		= uring_sock_readv,
1569 	.writev		= uring_sock_writev,
1570 	.writev_async	= uring_sock_writev_async,
1571 	.flush          = uring_sock_flush,
1572 	.set_recvlowat	= uring_sock_set_recvlowat,
1573 	.set_recvbuf	= uring_sock_set_recvbuf,
1574 	.set_sendbuf	= uring_sock_set_sendbuf,
1575 	.is_ipv6	= uring_sock_is_ipv6,
1576 	.is_ipv4	= uring_sock_is_ipv4,
1577 	.is_connected   = uring_sock_is_connected,
1578 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
1579 	.group_impl_create	= uring_sock_group_impl_create,
1580 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1581 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1582 	.group_impl_poll	= uring_sock_group_impl_poll,
1583 	.group_impl_close	= uring_sock_group_impl_close,
1584 	.get_opts		= uring_sock_impl_get_opts,
1585 	.set_opts		= uring_sock_impl_set_opts,
1586 };
1587 
1588 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1589