xref: /spdk/module/sock/uring/uring.c (revision 7192849ed24874f3e9cc31e8a33a9b32c49b9506)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <sys/epoll.h>
38 #include <liburing.h>
39 
40 #include "spdk/barrier.h"
41 #include "spdk/likely.h"
42 #include "spdk/log.h"
43 #include "spdk/pipe.h"
44 #include "spdk/sock.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/sock.h"
49 #include "spdk_internal/assert.h"
50 
51 #define MAX_TMPBUF 1024
52 #define PORTNUMLEN 32
53 #define SO_RCVBUF_SIZE (2 * 1024 * 1024)
54 #define SO_SNDBUF_SIZE (2 * 1024 * 1024)
55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
56 #define IOV_BATCH_SIZE 64
57 
58 enum spdk_sock_task_type {
59 	SPDK_SOCK_TASK_POLLIN = 0,
60 	SPDK_SOCK_TASK_WRITE,
61 	SPDK_SOCK_TASK_CANCEL,
62 };
63 
64 enum spdk_uring_sock_task_status {
65 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
66 	SPDK_URING_SOCK_TASK_IN_PROCESS,
67 };
68 
69 struct spdk_uring_task {
70 	enum spdk_uring_sock_task_status	status;
71 	enum spdk_sock_task_type		type;
72 	struct spdk_uring_sock			*sock;
73 	struct msghdr				msg;
74 	struct iovec				iovs[IOV_BATCH_SIZE];
75 	int					iov_cnt;
76 	struct spdk_sock_request		*last_req;
77 	STAILQ_ENTRY(spdk_uring_task)		link;
78 };
79 
80 struct spdk_uring_sock {
81 	struct spdk_sock			base;
82 	int					fd;
83 	struct spdk_uring_sock_group_impl	*group;
84 	struct spdk_uring_task			write_task;
85 	struct spdk_uring_task			pollin_task;
86 	struct spdk_uring_task			cancel_task;
87 	struct spdk_pipe			*recv_pipe;
88 	void					*recv_buf;
89 	int					recv_buf_sz;
90 	bool					pending_recv;
91 	int					connection_status;
92 	TAILQ_ENTRY(spdk_uring_sock)		link;
93 };
94 
95 struct spdk_uring_sock_group_impl {
96 	struct spdk_sock_group_impl		base;
97 	struct io_uring				uring;
98 	uint32_t				io_inflight;
99 	uint32_t				io_queued;
100 	uint32_t				io_avail;
101 	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
102 };
103 
104 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
105 
106 static int
107 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
108 {
109 	const char *result = NULL;
110 
111 	if (sa == NULL || host == NULL) {
112 		return -1;
113 	}
114 
115 	switch (sa->sa_family) {
116 	case AF_INET:
117 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
118 				   host, hlen);
119 		break;
120 	case AF_INET6:
121 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
122 				   host, hlen);
123 		break;
124 	default:
125 		break;
126 	}
127 
128 	if (result != NULL) {
129 		return 0;
130 	} else {
131 		return -1;
132 	}
133 }
134 
135 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
136 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
137 
138 static int
139 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
140 		   char *caddr, int clen, uint16_t *cport)
141 {
142 	struct spdk_uring_sock *sock = __uring_sock(_sock);
143 	struct sockaddr_storage sa;
144 	socklen_t salen;
145 	int rc;
146 
147 	assert(sock != NULL);
148 
149 	memset(&sa, 0, sizeof sa);
150 	salen = sizeof sa;
151 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
152 	if (rc != 0) {
153 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
154 		return -1;
155 	}
156 
157 	switch (sa.ss_family) {
158 	case AF_UNIX:
159 		/* Acceptable connection types that don't have IPs */
160 		return 0;
161 	case AF_INET:
162 	case AF_INET6:
163 		/* Code below will get IP addresses */
164 		break;
165 	default:
166 		/* Unsupported socket family */
167 		return -1;
168 	}
169 
170 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
171 	if (rc != 0) {
172 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
173 		return -1;
174 	}
175 
176 	if (sport) {
177 		if (sa.ss_family == AF_INET) {
178 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
179 		} else if (sa.ss_family == AF_INET6) {
180 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
181 		}
182 	}
183 
184 	memset(&sa, 0, sizeof sa);
185 	salen = sizeof sa;
186 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
187 	if (rc != 0) {
188 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
189 		return -1;
190 	}
191 
192 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
193 	if (rc != 0) {
194 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
195 		return -1;
196 	}
197 
198 	if (cport) {
199 		if (sa.ss_family == AF_INET) {
200 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
201 		} else if (sa.ss_family == AF_INET6) {
202 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
203 		}
204 	}
205 
206 	return 0;
207 }
208 
209 enum uring_sock_create_type {
210 	SPDK_SOCK_CREATE_LISTEN,
211 	SPDK_SOCK_CREATE_CONNECT,
212 };
213 
214 static int
215 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
216 {
217 	uint8_t *new_buf;
218 	struct spdk_pipe *new_pipe;
219 	struct iovec siov[2];
220 	struct iovec diov[2];
221 	int sbytes;
222 	ssize_t bytes;
223 
224 	if (sock->recv_buf_sz == sz) {
225 		return 0;
226 	}
227 
228 	/* If the new size is 0, just free the pipe */
229 	if (sz == 0) {
230 		spdk_pipe_destroy(sock->recv_pipe);
231 		free(sock->recv_buf);
232 		sock->recv_pipe = NULL;
233 		sock->recv_buf = NULL;
234 		return 0;
235 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
236 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
237 		return -1;
238 	}
239 
240 	/* Round up to next 64 byte multiple */
241 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
242 	if (!new_buf) {
243 		SPDK_ERRLOG("socket recv buf allocation failed\n");
244 		return -ENOMEM;
245 	}
246 
247 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
248 	if (new_pipe == NULL) {
249 		SPDK_ERRLOG("socket pipe allocation failed\n");
250 		free(new_buf);
251 		return -ENOMEM;
252 	}
253 
254 	if (sock->recv_pipe != NULL) {
255 		/* Pull all of the data out of the old pipe */
256 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
257 		if (sbytes > sz) {
258 			/* Too much data to fit into the new pipe size */
259 			spdk_pipe_destroy(new_pipe);
260 			free(new_buf);
261 			return -EINVAL;
262 		}
263 
264 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
265 		assert(sbytes == sz);
266 
267 		bytes = spdk_iovcpy(siov, 2, diov, 2);
268 		spdk_pipe_writer_advance(new_pipe, bytes);
269 
270 		spdk_pipe_destroy(sock->recv_pipe);
271 		free(sock->recv_buf);
272 	}
273 
274 	sock->recv_buf_sz = sz;
275 	sock->recv_buf = new_buf;
276 	sock->recv_pipe = new_pipe;
277 
278 	return 0;
279 }
280 
281 static int
282 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
283 {
284 	struct spdk_uring_sock *sock = __uring_sock(_sock);
285 	int rc;
286 
287 	assert(sock != NULL);
288 
289 #ifndef __aarch64__
290 	/* On ARM systems, this buffering does not help. Skip it. */
291 	/* The size of the pipe is purely derived from benchmarks. It seems to work well. */
292 	rc = uring_sock_alloc_pipe(sock, sz);
293 	if (rc) {
294 		SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
295 		return rc;
296 	}
297 #endif
298 
299 	if (sz < SO_RCVBUF_SIZE) {
300 		sz = SO_RCVBUF_SIZE;
301 	}
302 
303 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
304 	if (rc < 0) {
305 		return rc;
306 	}
307 
308 	return 0;
309 }
310 
311 static int
312 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
313 {
314 	struct spdk_uring_sock *sock = __uring_sock(_sock);
315 	int rc;
316 
317 	assert(sock != NULL);
318 
319 	if (sz < SO_SNDBUF_SIZE) {
320 		sz = SO_SNDBUF_SIZE;
321 	}
322 
323 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
324 	if (rc < 0) {
325 		return rc;
326 	}
327 
328 	return 0;
329 }
330 
331 static struct spdk_uring_sock *
332 uring_sock_alloc(int fd)
333 {
334 	struct spdk_uring_sock *sock;
335 
336 	sock = calloc(1, sizeof(*sock));
337 	if (sock == NULL) {
338 		SPDK_ERRLOG("sock allocation failed\n");
339 		return NULL;
340 	}
341 
342 	sock->fd = fd;
343 	return sock;
344 }
345 
346 static struct spdk_sock *
347 uring_sock_create(const char *ip, int port,
348 		  enum uring_sock_create_type type,
349 		  struct spdk_sock_opts *opts)
350 {
351 	struct spdk_uring_sock *sock;
352 	char buf[MAX_TMPBUF];
353 	char portnum[PORTNUMLEN];
354 	char *p;
355 	struct addrinfo hints, *res, *res0;
356 	int fd, flag;
357 	int val = 1;
358 	int rc;
359 
360 	if (ip == NULL) {
361 		return NULL;
362 	}
363 	if (ip[0] == '[') {
364 		snprintf(buf, sizeof(buf), "%s", ip + 1);
365 		p = strchr(buf, ']');
366 		if (p != NULL) {
367 			*p = '\0';
368 		}
369 		ip = (const char *) &buf[0];
370 	}
371 
372 	snprintf(portnum, sizeof portnum, "%d", port);
373 	memset(&hints, 0, sizeof hints);
374 	hints.ai_family = PF_UNSPEC;
375 	hints.ai_socktype = SOCK_STREAM;
376 	hints.ai_flags = AI_NUMERICSERV;
377 	hints.ai_flags |= AI_PASSIVE;
378 	hints.ai_flags |= AI_NUMERICHOST;
379 	rc = getaddrinfo(ip, portnum, &hints, &res0);
380 	if (rc != 0) {
381 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
382 		return NULL;
383 	}
384 
385 	/* try listen */
386 	fd = -1;
387 	for (res = res0; res != NULL; res = res->ai_next) {
388 retry:
389 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
390 		if (fd < 0) {
391 			/* error */
392 			continue;
393 		}
394 
395 		val = SO_RCVBUF_SIZE;
396 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
397 		if (rc) {
398 			/* Not fatal */
399 		}
400 
401 		val = SO_SNDBUF_SIZE;
402 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
403 		if (rc) {
404 			/* Not fatal */
405 		}
406 
407 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
408 		if (rc != 0) {
409 			close(fd);
410 			/* error */
411 			continue;
412 		}
413 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
414 		if (rc != 0) {
415 			close(fd);
416 			/* error */
417 			continue;
418 		}
419 
420 #if defined(SO_PRIORITY)
421 		if (opts != NULL && opts->priority) {
422 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
423 			if (rc != 0) {
424 				close(fd);
425 				/* error */
426 				continue;
427 			}
428 		}
429 #endif
430 		if (res->ai_family == AF_INET6) {
431 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
432 			if (rc != 0) {
433 				close(fd);
434 				/* error */
435 				continue;
436 			}
437 		}
438 
439 		if (type == SPDK_SOCK_CREATE_LISTEN) {
440 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
441 			if (rc != 0) {
442 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
443 				switch (errno) {
444 				case EINTR:
445 					/* interrupted? */
446 					close(fd);
447 					goto retry;
448 				case EADDRNOTAVAIL:
449 					SPDK_ERRLOG("IP address %s not available. "
450 						    "Verify IP address in config file "
451 						    "and make sure setup script is "
452 						    "run before starting spdk app.\n", ip);
453 				/* FALLTHROUGH */
454 				default:
455 					/* try next family */
456 					close(fd);
457 					fd = -1;
458 					continue;
459 				}
460 			}
461 			/* bind OK */
462 			rc = listen(fd, 512);
463 			if (rc != 0) {
464 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
465 				close(fd);
466 				fd = -1;
467 				break;
468 			}
469 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
470 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
471 			if (rc != 0) {
472 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
473 				/* try next family */
474 				close(fd);
475 				fd = -1;
476 				continue;
477 			}
478 		}
479 
480 		flag = fcntl(fd, F_GETFL);
481 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
482 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
483 			close(fd);
484 			fd = -1;
485 			break;
486 		}
487 		break;
488 	}
489 	freeaddrinfo(res0);
490 
491 	if (fd < 0) {
492 		return NULL;
493 	}
494 
495 	sock = uring_sock_alloc(fd);
496 	if (sock == NULL) {
497 		SPDK_ERRLOG("sock allocation failed\n");
498 		close(fd);
499 		return NULL;
500 	}
501 
502 	return &sock->base;
503 }
504 
505 static struct spdk_sock *
506 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
507 {
508 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
509 }
510 
511 static struct spdk_sock *
512 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
513 {
514 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
515 }
516 
517 static struct spdk_sock *
518 uring_sock_accept(struct spdk_sock *_sock)
519 {
520 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
521 	struct sockaddr_storage		sa;
522 	socklen_t			salen;
523 	int				rc, fd;
524 	struct spdk_uring_sock		*new_sock;
525 	int				flag;
526 
527 	memset(&sa, 0, sizeof(sa));
528 	salen = sizeof(sa);
529 
530 	assert(sock != NULL);
531 
532 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
533 
534 	if (rc == -1) {
535 		return NULL;
536 	}
537 
538 	fd = rc;
539 
540 	flag = fcntl(fd, F_GETFL);
541 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
542 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
543 		close(fd);
544 		return NULL;
545 	}
546 
547 #if defined(SO_PRIORITY)
548 	/* The priority is not inherited, so call this function again */
549 	if (sock->base.opts.priority) {
550 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
551 		if (rc != 0) {
552 			close(fd);
553 			return NULL;
554 		}
555 	}
556 #endif
557 
558 	new_sock = uring_sock_alloc(fd);
559 	if (new_sock == NULL) {
560 		close(fd);
561 		return NULL;
562 	}
563 
564 	return &new_sock->base;
565 }
566 
567 static int
568 uring_sock_close(struct spdk_sock *_sock)
569 {
570 	struct spdk_uring_sock *sock = __uring_sock(_sock);
571 	int rc;
572 
573 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
574 	assert(sock->group == NULL);
575 
576 	spdk_pipe_destroy(sock->recv_pipe);
577 	free(sock->recv_buf);
578 	rc = close(sock->fd);
579 	if (rc == 0) {
580 		free(sock);
581 	}
582 
583 	return rc;
584 }
585 
586 static ssize_t
587 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
588 {
589 	struct iovec siov[2];
590 	int sbytes;
591 	ssize_t bytes;
592 	struct spdk_uring_sock_group_impl *group;
593 
594 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
595 	if (sbytes < 0) {
596 		errno = EINVAL;
597 		return -1;
598 	} else if (sbytes == 0) {
599 		errno = EAGAIN;
600 		return -1;
601 	}
602 
603 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
604 
605 	if (bytes == 0) {
606 		/* The only way this happens is if diov is 0 length */
607 		errno = EINVAL;
608 		return -1;
609 	}
610 
611 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
612 
613 	/* If we drained the pipe, take it off the level-triggered list */
614 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
615 		group = __uring_group_impl(sock->base.group_impl);
616 		TAILQ_REMOVE(&group->pending_recv, sock, link);
617 		sock->pending_recv = false;
618 	}
619 
620 	return bytes;
621 }
622 
623 static inline ssize_t
624 uring_sock_read(struct spdk_uring_sock *sock)
625 {
626 	struct iovec iov[2];
627 	int bytes;
628 	struct spdk_uring_sock_group_impl *group;
629 
630 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
631 
632 	if (bytes > 0) {
633 		bytes = readv(sock->fd, iov, 2);
634 		if (bytes > 0) {
635 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
636 			if (sock->base.group_impl) {
637 				group = __uring_group_impl(sock->base.group_impl);
638 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
639 				sock->pending_recv = true;
640 			}
641 		}
642 	}
643 
644 	return bytes;
645 }
646 
647 static ssize_t
648 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
649 {
650 	struct spdk_uring_sock *sock = __uring_sock(_sock);
651 	int rc, i;
652 	size_t len;
653 
654 	if (sock->recv_pipe == NULL) {
655 		return readv(sock->fd, iov, iovcnt);
656 	}
657 
658 	len = 0;
659 	for (i = 0; i < iovcnt; i++) {
660 		len += iov[i].iov_len;
661 	}
662 
663 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
664 		/* If the user is receiving a sufficiently large amount of data,
665 		 * receive directly to their buffers. */
666 		if (len >= MIN_SOCK_PIPE_SIZE) {
667 			return readv(sock->fd, iov, iovcnt);
668 		}
669 
670 		/* Otherwise, do a big read into our pipe */
671 		rc = uring_sock_read(sock);
672 		if (rc <= 0) {
673 			return rc;
674 		}
675 	}
676 
677 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
678 }
679 
680 static ssize_t
681 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
682 {
683 	struct iovec iov[1];
684 
685 	iov[0].iov_base = buf;
686 	iov[0].iov_len = len;
687 
688 	return uring_sock_readv(sock, iov, 1);
689 }
690 
691 static ssize_t
692 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
693 {
694 	struct spdk_uring_sock *sock = __uring_sock(_sock);
695 
696 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
697 		errno = EAGAIN;
698 		return -1;
699 	}
700 
701 	return writev(sock->fd, iov, iovcnt);
702 }
703 
704 static int
705 sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
706 	       struct spdk_sock_request **last_req)
707 {
708 	int iovcnt, i;
709 	struct spdk_sock_request *req;
710 	unsigned int offset;
711 
712 	/* Gather an iov */
713 	iovcnt = index;
714 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
715 		goto end;
716 	}
717 
718 	if (last_req != NULL && *last_req != NULL) {
719 		req = TAILQ_NEXT(*last_req, internal.link);
720 	} else {
721 		req = TAILQ_FIRST(&_sock->queued_reqs);
722 	}
723 
724 	while (req) {
725 		offset = req->internal.offset;
726 
727 		for (i = 0; i < req->iovcnt; i++) {
728 			/* Consume any offset first */
729 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
730 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
731 				continue;
732 			}
733 
734 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
735 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
736 			iovcnt++;
737 
738 			offset = 0;
739 
740 			if (iovcnt >= IOV_BATCH_SIZE) {
741 				break;
742 			}
743 		}
744 		if (iovcnt >= IOV_BATCH_SIZE) {
745 			break;
746 		}
747 
748 		if (last_req != NULL) {
749 			*last_req = req;
750 		}
751 		req = TAILQ_NEXT(req, internal.link);
752 	}
753 
754 end:
755 	return iovcnt;
756 }
757 
758 static int
759 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
760 {
761 	struct spdk_sock_request *req;
762 	int i, retval;
763 	unsigned int offset;
764 	size_t len;
765 
766 	/* Consume the requests that were actually written */
767 	req = TAILQ_FIRST(&_sock->queued_reqs);
768 	while (req) {
769 		offset = req->internal.offset;
770 
771 		for (i = 0; i < req->iovcnt; i++) {
772 			/* Advance by the offset first */
773 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
774 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
775 				continue;
776 			}
777 
778 			/* Calculate the remaining length of this element */
779 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
780 
781 			if (len > (size_t)rc) {
782 				/* This element was partially sent. */
783 				req->internal.offset += rc;
784 				return 0;
785 			}
786 
787 			offset = 0;
788 			req->internal.offset += len;
789 			rc -= len;
790 		}
791 
792 		/* Handled a full request. */
793 		spdk_sock_request_pend(_sock, req);
794 
795 		retval = spdk_sock_request_put(_sock, req, 0);
796 		if (retval) {
797 			return retval;
798 		}
799 
800 		if (rc == 0) {
801 			break;
802 		}
803 
804 		req = TAILQ_FIRST(&_sock->queued_reqs);
805 	}
806 
807 	return 0;
808 }
809 
810 static void
811 _sock_flush(struct spdk_sock *_sock)
812 {
813 	struct spdk_uring_sock *sock = __uring_sock(_sock);
814 	struct spdk_uring_task *task = &sock->write_task;
815 	uint32_t iovcnt;
816 	struct io_uring_sqe *sqe;
817 
818 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
819 		return;
820 	}
821 
822 	iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
823 	if (!iovcnt) {
824 		return;
825 	}
826 
827 	task->iov_cnt = iovcnt;
828 	assert(sock->group != NULL);
829 	task->msg.msg_iov = task->iovs;
830 	task->msg.msg_iovlen = task->iov_cnt;
831 
832 	sock->group->io_queued++;
833 
834 	sqe = io_uring_get_sqe(&sock->group->uring);
835 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
836 	io_uring_sqe_set_data(sqe, task);
837 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
838 }
839 
840 static void
841 _sock_prep_pollin(struct spdk_sock *_sock)
842 {
843 	struct spdk_uring_sock *sock = __uring_sock(_sock);
844 	struct spdk_uring_task *task = &sock->pollin_task;
845 	struct io_uring_sqe *sqe;
846 
847 	/* Do not prepare pollin event */
848 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
849 		return;
850 	}
851 
852 	assert(sock->group != NULL);
853 	sock->group->io_queued++;
854 
855 	sqe = io_uring_get_sqe(&sock->group->uring);
856 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
857 	io_uring_sqe_set_data(sqe, task);
858 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
859 }
860 
861 static void
862 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
863 {
864 	struct spdk_uring_sock *sock = __uring_sock(_sock);
865 	struct spdk_uring_task *task = &sock->cancel_task;
866 	struct io_uring_sqe *sqe;
867 
868 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
869 		return;
870 	}
871 
872 	assert(sock->group != NULL);
873 	sock->group->io_queued++;
874 
875 	sqe = io_uring_get_sqe(&sock->group->uring);
876 	io_uring_prep_cancel(sqe, user_data, 0);
877 	io_uring_sqe_set_data(sqe, task);
878 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
879 }
880 
881 static int
882 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
883 		      struct spdk_sock **socks)
884 {
885 	int i, count, ret;
886 	struct io_uring_cqe *cqe;
887 	struct spdk_uring_sock *sock, *tmp;
888 	struct spdk_uring_task *task;
889 	int status;
890 
891 	for (i = 0; i < max; i++) {
892 		ret = io_uring_peek_cqe(&group->uring, &cqe);
893 		if (ret != 0) {
894 			break;
895 		}
896 
897 		if (cqe == NULL) {
898 			break;
899 		}
900 
901 		task = (struct spdk_uring_task *)cqe->user_data;
902 		assert(task != NULL);
903 		sock = task->sock;
904 		assert(sock != NULL);
905 		assert(sock->group != NULL);
906 		assert(sock->group == group);
907 		sock->group->io_inflight--;
908 		sock->group->io_avail++;
909 		status = cqe->res;
910 		io_uring_cqe_seen(&group->uring, cqe);
911 
912 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
913 
914 		if (spdk_unlikely(status <= 0)) {
915 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
916 				continue;
917 			}
918 		}
919 
920 		switch (task->type) {
921 		case SPDK_SOCK_TASK_POLLIN:
922 			if ((status & POLLIN) == POLLIN) {
923 				if (sock->base.cb_fn != NULL) {
924 					assert(sock->pending_recv == false);
925 					sock->pending_recv = true;
926 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
927 				}
928 			}
929 			break;
930 		case SPDK_SOCK_TASK_WRITE:
931 			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
932 			task->last_req = NULL;
933 			task->iov_cnt = 0;
934 			if (spdk_unlikely(status) < 0) {
935 				sock->connection_status = status;
936 				spdk_sock_abort_requests(&sock->base);
937 			} else {
938 				sock_complete_reqs(&sock->base, status);
939 			}
940 
941 			break;
942 		case SPDK_SOCK_TASK_CANCEL:
943 			/* Do nothing */
944 			break;
945 		default:
946 			SPDK_UNREACHABLE();
947 		}
948 	}
949 
950 	if (!socks) {
951 		return 0;
952 	}
953 	count = 0;
954 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
955 		if (count == max_read_events) {
956 			break;
957 		}
958 
959 		socks[count++] = &sock->base;
960 	}
961 
962 	/* Cycle the pending_recv list so that each time we poll things aren't
963 	 * in the same order. */
964 	for (i = 0; i < count; i++) {
965 		sock = __uring_sock(socks[i]);
966 
967 		TAILQ_REMOVE(&group->pending_recv, sock, link);
968 
969 		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
970 			sock->pending_recv = false;
971 		} else {
972 			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
973 		}
974 	}
975 
976 	return count;
977 }
978 
979 static int
980 _sock_flush_client(struct spdk_sock *_sock)
981 {
982 	struct spdk_uring_sock *sock = __uring_sock(_sock);
983 	struct msghdr msg = {};
984 	struct iovec iovs[IOV_BATCH_SIZE];
985 	int iovcnt;
986 	ssize_t rc;
987 
988 	/* Can't flush from within a callback or we end up with recursive calls */
989 	if (_sock->cb_cnt > 0) {
990 		return 0;
991 	}
992 
993 	/* Gather an iov */
994 	iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL);
995 	if (iovcnt == 0) {
996 		return 0;
997 	}
998 
999 	/* Perform the vectored write */
1000 	msg.msg_iov = iovs;
1001 	msg.msg_iovlen = iovcnt;
1002 	rc = sendmsg(sock->fd, &msg, 0);
1003 	if (rc <= 0) {
1004 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1005 			return 0;
1006 		}
1007 		return rc;
1008 	}
1009 
1010 	sock_complete_reqs(_sock, rc);
1011 
1012 	return 0;
1013 }
1014 
1015 static void
1016 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1017 {
1018 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1019 	int rc;
1020 
1021 	if (spdk_unlikely(sock->connection_status)) {
1022 		req->cb_fn(req->cb_arg, sock->connection_status);
1023 		return;
1024 	}
1025 
1026 	spdk_sock_request_queue(_sock, req);
1027 
1028 	if (!sock->group) {
1029 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1030 			rc = _sock_flush_client(_sock);
1031 			if (rc) {
1032 				spdk_sock_abort_requests(_sock);
1033 			}
1034 		}
1035 	}
1036 }
1037 
1038 static int
1039 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1040 {
1041 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1042 	int val;
1043 	int rc;
1044 
1045 	assert(sock != NULL);
1046 
1047 	val = nbytes;
1048 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1049 	if (rc != 0) {
1050 		return -1;
1051 	}
1052 	return 0;
1053 }
1054 
1055 static bool
1056 uring_sock_is_ipv6(struct spdk_sock *_sock)
1057 {
1058 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1059 	struct sockaddr_storage sa;
1060 	socklen_t salen;
1061 	int rc;
1062 
1063 	assert(sock != NULL);
1064 
1065 	memset(&sa, 0, sizeof sa);
1066 	salen = sizeof sa;
1067 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1068 	if (rc != 0) {
1069 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1070 		return false;
1071 	}
1072 
1073 	return (sa.ss_family == AF_INET6);
1074 }
1075 
1076 static bool
1077 uring_sock_is_ipv4(struct spdk_sock *_sock)
1078 {
1079 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1080 	struct sockaddr_storage sa;
1081 	socklen_t salen;
1082 	int rc;
1083 
1084 	assert(sock != NULL);
1085 
1086 	memset(&sa, 0, sizeof sa);
1087 	salen = sizeof sa;
1088 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1089 	if (rc != 0) {
1090 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1091 		return false;
1092 	}
1093 
1094 	return (sa.ss_family == AF_INET);
1095 }
1096 
1097 static bool
1098 uring_sock_is_connected(struct spdk_sock *_sock)
1099 {
1100 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1101 	uint8_t byte;
1102 	int rc;
1103 
1104 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1105 	if (rc == 0) {
1106 		return false;
1107 	}
1108 
1109 	if (rc < 0) {
1110 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1111 			return true;
1112 		}
1113 
1114 		return false;
1115 	}
1116 
1117 	return true;
1118 }
1119 
1120 static int
1121 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1122 {
1123 	int rc = -1;
1124 
1125 #if defined(SO_INCOMING_NAPI_ID)
1126 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1127 	socklen_t salen = sizeof(int);
1128 
1129 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1130 	if (rc != 0) {
1131 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1132 	}
1133 
1134 #endif
1135 	return rc;
1136 }
1137 
1138 static struct spdk_sock_group_impl *
1139 uring_sock_group_impl_create(void)
1140 {
1141 	struct spdk_uring_sock_group_impl *group_impl;
1142 
1143 	group_impl = calloc(1, sizeof(*group_impl));
1144 	if (group_impl == NULL) {
1145 		SPDK_ERRLOG("group_impl allocation failed\n");
1146 		return NULL;
1147 	}
1148 
1149 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1150 
1151 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1152 		SPDK_ERRLOG("uring I/O context setup failure\n");
1153 		free(group_impl);
1154 		return NULL;
1155 	}
1156 
1157 	TAILQ_INIT(&group_impl->pending_recv);
1158 
1159 	return &group_impl->base;
1160 }
1161 
1162 static int
1163 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1164 			       struct spdk_sock *_sock)
1165 {
1166 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1167 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1168 
1169 	sock->group = group;
1170 	sock->write_task.sock = sock;
1171 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1172 
1173 	sock->pollin_task.sock = sock;
1174 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1175 
1176 	sock->cancel_task.sock = sock;
1177 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1178 
1179 	/* switched from another polling group due to scheduling */
1180 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1181 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1182 		assert(sock->pending_recv == false);
1183 		sock->pending_recv = true;
1184 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1185 	}
1186 
1187 	return 0;
1188 }
1189 
1190 static int
1191 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1192 			   struct spdk_sock **socks)
1193 {
1194 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1195 	int count, ret;
1196 	int to_complete, to_submit;
1197 	struct spdk_sock *_sock, *tmp;
1198 	struct spdk_uring_sock *sock;
1199 
1200 	if (spdk_likely(socks)) {
1201 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1202 			sock = __uring_sock(_sock);
1203 			if (spdk_unlikely(sock->connection_status)) {
1204 				continue;
1205 			}
1206 			_sock_flush(_sock);
1207 			_sock_prep_pollin(_sock);
1208 		}
1209 	}
1210 
1211 	to_submit = group->io_queued;
1212 
1213 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1214 	if (to_submit > 0) {
1215 		/* If there are I/O to submit, use io_uring_submit here.
1216 		 * It will automatically call io_uring_enter appropriately. */
1217 		ret = io_uring_submit(&group->uring);
1218 		if (ret < 0) {
1219 			return 1;
1220 		}
1221 		group->io_queued = 0;
1222 		group->io_inflight += to_submit;
1223 		group->io_avail -= to_submit;
1224 	}
1225 
1226 	count = 0;
1227 	to_complete = group->io_inflight;
1228 	if (to_complete > 0) {
1229 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1230 	}
1231 
1232 	return count;
1233 }
1234 
1235 static int
1236 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1237 				  struct spdk_sock *_sock)
1238 {
1239 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1240 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1241 
1242 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1243 		_sock_prep_cancel_task(_sock, &sock->write_task);
1244 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1245 		 * currently can use a while loop here. */
1246 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1247 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1248 			uring_sock_group_impl_poll(_group, 32, NULL);
1249 		}
1250 	}
1251 
1252 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1253 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1254 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1255 		 * currently can use a while loop here. */
1256 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1257 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1258 			uring_sock_group_impl_poll(_group, 32, NULL);
1259 		}
1260 	}
1261 
1262 	if (sock->pending_recv) {
1263 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1264 		sock->pending_recv = false;
1265 	}
1266 	assert(sock->pending_recv == false);
1267 
1268 	sock->group = NULL;
1269 	return 0;
1270 }
1271 
1272 static int
1273 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1274 {
1275 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1276 
1277 	/* try to reap all the active I/O */
1278 	while (group->io_inflight) {
1279 		uring_sock_group_impl_poll(_group, 32, NULL);
1280 	}
1281 	assert(group->io_inflight == 0);
1282 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1283 
1284 	io_uring_queue_exit(&group->uring);
1285 
1286 	free(group);
1287 	return 0;
1288 }
1289 
1290 static int
1291 uring_sock_flush(struct spdk_sock *_sock)
1292 {
1293 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1294 
1295 	if (!sock->group) {
1296 		return _sock_flush_client(_sock);
1297 	}
1298 
1299 	return 0;
1300 }
1301 
1302 static struct spdk_net_impl g_uring_net_impl = {
1303 	.name		= "uring",
1304 	.getaddr	= uring_sock_getaddr,
1305 	.connect	= uring_sock_connect,
1306 	.listen		= uring_sock_listen,
1307 	.accept		= uring_sock_accept,
1308 	.close		= uring_sock_close,
1309 	.recv		= uring_sock_recv,
1310 	.readv		= uring_sock_readv,
1311 	.writev		= uring_sock_writev,
1312 	.writev_async	= uring_sock_writev_async,
1313 	.flush          = uring_sock_flush,
1314 	.set_recvlowat	= uring_sock_set_recvlowat,
1315 	.set_recvbuf	= uring_sock_set_recvbuf,
1316 	.set_sendbuf	= uring_sock_set_sendbuf,
1317 	.is_ipv6	= uring_sock_is_ipv6,
1318 	.is_ipv4	= uring_sock_is_ipv4,
1319 	.is_connected   = uring_sock_is_connected,
1320 	.get_placement_id	= uring_sock_get_placement_id,
1321 	.group_impl_create	= uring_sock_group_impl_create,
1322 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1323 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1324 	.group_impl_poll	= uring_sock_group_impl_poll,
1325 	.group_impl_close	= uring_sock_group_impl_close,
1326 };
1327 
1328 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1329