xref: /spdk/module/sock/uring/uring.c (revision 1fa071d332db21bf893d581a8e93b425ba788a24)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <sys/epoll.h>
38 #include <liburing.h>
39 
40 #include "spdk/barrier.h"
41 #include "spdk/likely.h"
42 #include "spdk/log.h"
43 #include "spdk/pipe.h"
44 #include "spdk/sock.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/sock.h"
49 #include "spdk_internal/assert.h"
50 
51 #define MAX_TMPBUF 1024
52 #define PORTNUMLEN 32
53 #define SO_RCVBUF_SIZE (2 * 1024 * 1024)
54 #define SO_SNDBUF_SIZE (2 * 1024 * 1024)
55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
56 #define IOV_BATCH_SIZE 64
57 
58 enum spdk_sock_task_type {
59 	SPDK_SOCK_TASK_POLLIN = 0,
60 	SPDK_SOCK_TASK_WRITE,
61 };
62 
63 enum spdk_uring_sock_task_status {
64 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
65 	SPDK_URING_SOCK_TASK_IN_PROCESS,
66 };
67 
68 struct spdk_uring_task {
69 	enum spdk_uring_sock_task_status	status;
70 	enum spdk_sock_task_type		type;
71 	struct spdk_uring_sock			*sock;
72 	struct msghdr				msg;
73 	struct iovec				iovs[IOV_BATCH_SIZE];
74 	int					iov_cnt;
75 	struct spdk_sock_request		*last_req;
76 	STAILQ_ENTRY(spdk_uring_task)		link;
77 };
78 
79 struct spdk_uring_sock {
80 	struct spdk_sock			base;
81 	int					fd;
82 	struct spdk_uring_sock_group_impl	*group;
83 	struct spdk_uring_task			write_task;
84 	struct spdk_uring_task			pollin_task;
85 	int					outstanding_io;
86 	struct spdk_pipe			*recv_pipe;
87 	void					*recv_buf;
88 	int					recv_buf_sz;
89 	bool					pending_recv;
90 	TAILQ_ENTRY(spdk_uring_sock)		link;
91 };
92 
93 struct spdk_uring_sock_group_impl {
94 	struct spdk_sock_group_impl		base;
95 	struct io_uring				uring;
96 	uint32_t				io_inflight;
97 	uint32_t				io_queued;
98 	uint32_t				io_avail;
99 	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
100 };
101 
102 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
103 
104 static int
105 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
106 {
107 	const char *result = NULL;
108 
109 	if (sa == NULL || host == NULL) {
110 		return -1;
111 	}
112 
113 	switch (sa->sa_family) {
114 	case AF_INET:
115 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
116 				   host, hlen);
117 		break;
118 	case AF_INET6:
119 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
120 				   host, hlen);
121 		break;
122 	default:
123 		break;
124 	}
125 
126 	if (result != NULL) {
127 		return 0;
128 	} else {
129 		return -1;
130 	}
131 }
132 
133 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
134 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
135 
136 static int
137 spdk_uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
138 			char *caddr, int clen, uint16_t *cport)
139 {
140 	struct spdk_uring_sock *sock = __uring_sock(_sock);
141 	struct sockaddr_storage sa;
142 	socklen_t salen;
143 	int rc;
144 
145 	assert(sock != NULL);
146 
147 	memset(&sa, 0, sizeof sa);
148 	salen = sizeof sa;
149 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
150 	if (rc != 0) {
151 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
152 		return -1;
153 	}
154 
155 	switch (sa.ss_family) {
156 	case AF_UNIX:
157 		/* Acceptable connection types that don't have IPs */
158 		return 0;
159 	case AF_INET:
160 	case AF_INET6:
161 		/* Code below will get IP addresses */
162 		break;
163 	default:
164 		/* Unsupported socket family */
165 		return -1;
166 	}
167 
168 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
169 	if (rc != 0) {
170 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
171 		return -1;
172 	}
173 
174 	if (sport) {
175 		if (sa.ss_family == AF_INET) {
176 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
177 		} else if (sa.ss_family == AF_INET6) {
178 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
179 		}
180 	}
181 
182 	memset(&sa, 0, sizeof sa);
183 	salen = sizeof sa;
184 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
185 	if (rc != 0) {
186 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
187 		return -1;
188 	}
189 
190 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
191 	if (rc != 0) {
192 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
193 		return -1;
194 	}
195 
196 	if (cport) {
197 		if (sa.ss_family == AF_INET) {
198 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
199 		} else if (sa.ss_family == AF_INET6) {
200 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
201 		}
202 	}
203 
204 	return 0;
205 }
206 
207 enum spdk_uring_sock_create_type {
208 	SPDK_SOCK_CREATE_LISTEN,
209 	SPDK_SOCK_CREATE_CONNECT,
210 };
211 
212 static int
213 spdk_uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
214 {
215 	uint8_t *new_buf;
216 	struct spdk_pipe *new_pipe;
217 	struct iovec siov[2];
218 	struct iovec diov[2];
219 	int sbytes;
220 	ssize_t bytes;
221 
222 	if (sock->recv_buf_sz == sz) {
223 		return 0;
224 	}
225 
226 	/* If the new size is 0, just free the pipe */
227 	if (sz == 0) {
228 		spdk_pipe_destroy(sock->recv_pipe);
229 		free(sock->recv_buf);
230 		sock->recv_pipe = NULL;
231 		sock->recv_buf = NULL;
232 		return 0;
233 	}
234 
235 	/* Round up to next 64 byte multiple */
236 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
237 	if (!new_buf) {
238 		SPDK_ERRLOG("socket recv buf allocation failed\n");
239 		return -ENOMEM;
240 	}
241 
242 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
243 	if (new_pipe == NULL) {
244 		SPDK_ERRLOG("socket pipe allocation failed\n");
245 		free(new_buf);
246 		return -ENOMEM;
247 	}
248 
249 	if (sock->recv_pipe != NULL) {
250 		/* Pull all of the data out of the old pipe */
251 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
252 		if (sbytes > sz) {
253 			/* Too much data to fit into the new pipe size */
254 			spdk_pipe_destroy(new_pipe);
255 			free(new_buf);
256 			return -EINVAL;
257 		}
258 
259 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
260 		assert(sbytes == sz);
261 
262 		bytes = spdk_iovcpy(siov, 2, diov, 2);
263 		spdk_pipe_writer_advance(new_pipe, bytes);
264 
265 		spdk_pipe_destroy(sock->recv_pipe);
266 		free(sock->recv_buf);
267 	}
268 
269 	sock->recv_buf_sz = sz;
270 	sock->recv_buf = new_buf;
271 	sock->recv_pipe = new_pipe;
272 
273 	return 0;
274 }
275 
276 static int
277 spdk_uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
278 {
279 	struct spdk_uring_sock *sock = __uring_sock(_sock);
280 	int rc;
281 
282 	assert(sock != NULL);
283 
284 #ifndef __aarch64__
285 	/* On ARM systems, this buffering does not help. Skip it. */
286 	/* The size of the pipe is purely derived from benchmarks. It seems to work well. */
287 	rc = spdk_uring_sock_alloc_pipe(sock, sz);
288 	if (rc) {
289 		SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
290 		return rc;
291 	}
292 #endif
293 
294 	if (sz < SO_RCVBUF_SIZE) {
295 		sz = SO_RCVBUF_SIZE;
296 	}
297 
298 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
299 	if (rc < 0) {
300 		return rc;
301 	}
302 
303 	return 0;
304 }
305 
306 static int
307 spdk_uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
308 {
309 	struct spdk_uring_sock *sock = __uring_sock(_sock);
310 	int rc;
311 
312 	assert(sock != NULL);
313 
314 	if (sz < SO_SNDBUF_SIZE) {
315 		sz = SO_SNDBUF_SIZE;
316 	}
317 
318 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
319 	if (rc < 0) {
320 		return rc;
321 	}
322 
323 	return 0;
324 }
325 
326 static struct spdk_uring_sock *
327 _spdk_uring_sock_alloc(int fd)
328 {
329 	struct spdk_uring_sock *sock;
330 
331 	sock = calloc(1, sizeof(*sock));
332 	if (sock == NULL) {
333 		SPDK_ERRLOG("sock allocation failed\n");
334 		return NULL;
335 	}
336 
337 	sock->fd = fd;
338 	return sock;
339 }
340 
341 static struct spdk_sock *
342 spdk_uring_sock_create(const char *ip, int port, enum spdk_uring_sock_create_type type)
343 {
344 	struct spdk_uring_sock *sock;
345 	char buf[MAX_TMPBUF];
346 	char portnum[PORTNUMLEN];
347 	char *p;
348 	struct addrinfo hints, *res, *res0;
349 	int fd, flag;
350 	int val = 1;
351 	int rc;
352 
353 	if (ip == NULL) {
354 		return NULL;
355 	}
356 	if (ip[0] == '[') {
357 		snprintf(buf, sizeof(buf), "%s", ip + 1);
358 		p = strchr(buf, ']');
359 		if (p != NULL) {
360 			*p = '\0';
361 		}
362 		ip = (const char *) &buf[0];
363 	}
364 
365 	snprintf(portnum, sizeof portnum, "%d", port);
366 	memset(&hints, 0, sizeof hints);
367 	hints.ai_family = PF_UNSPEC;
368 	hints.ai_socktype = SOCK_STREAM;
369 	hints.ai_flags = AI_NUMERICSERV;
370 	hints.ai_flags |= AI_PASSIVE;
371 	hints.ai_flags |= AI_NUMERICHOST;
372 	rc = getaddrinfo(ip, portnum, &hints, &res0);
373 	if (rc != 0) {
374 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
375 		return NULL;
376 	}
377 
378 	/* try listen */
379 	fd = -1;
380 	for (res = res0; res != NULL; res = res->ai_next) {
381 retry:
382 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
383 		if (fd < 0) {
384 			/* error */
385 			continue;
386 		}
387 
388 		val = SO_RCVBUF_SIZE;
389 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
390 		if (rc) {
391 			/* Not fatal */
392 		}
393 
394 		val = SO_SNDBUF_SIZE;
395 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
396 		if (rc) {
397 			/* Not fatal */
398 		}
399 
400 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
401 		if (rc != 0) {
402 			close(fd);
403 			/* error */
404 			continue;
405 		}
406 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
407 		if (rc != 0) {
408 			close(fd);
409 			/* error */
410 			continue;
411 		}
412 
413 		if (res->ai_family == AF_INET6) {
414 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
415 			if (rc != 0) {
416 				close(fd);
417 				/* error */
418 				continue;
419 			}
420 		}
421 
422 		if (type == SPDK_SOCK_CREATE_LISTEN) {
423 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
424 			if (rc != 0) {
425 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
426 				switch (errno) {
427 				case EINTR:
428 					/* interrupted? */
429 					close(fd);
430 					goto retry;
431 				case EADDRNOTAVAIL:
432 					SPDK_ERRLOG("IP address %s not available. "
433 						    "Verify IP address in config file "
434 						    "and make sure setup script is "
435 						    "run before starting spdk app.\n", ip);
436 				/* FALLTHROUGH */
437 				default:
438 					/* try next family */
439 					close(fd);
440 					fd = -1;
441 					continue;
442 				}
443 			}
444 			/* bind OK */
445 			rc = listen(fd, 512);
446 			if (rc != 0) {
447 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
448 				close(fd);
449 				fd = -1;
450 				break;
451 			}
452 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
453 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
454 			if (rc != 0) {
455 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
456 				/* try next family */
457 				close(fd);
458 				fd = -1;
459 				continue;
460 			}
461 		}
462 
463 		flag = fcntl(fd, F_GETFL);
464 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
465 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
466 			close(fd);
467 			fd = -1;
468 			break;
469 		}
470 		break;
471 	}
472 	freeaddrinfo(res0);
473 
474 	if (fd < 0) {
475 		return NULL;
476 	}
477 
478 	sock = _spdk_uring_sock_alloc(fd);
479 	if (sock == NULL) {
480 		SPDK_ERRLOG("sock allocation failed\n");
481 		close(fd);
482 		return NULL;
483 	}
484 
485 	return &sock->base;
486 }
487 
488 static struct spdk_sock *
489 spdk_uring_sock_listen(const char *ip, int port)
490 {
491 	return spdk_uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN);
492 }
493 
494 static struct spdk_sock *
495 spdk_uring_sock_connect(const char *ip, int port)
496 {
497 	return spdk_uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT);
498 }
499 
500 static struct spdk_sock *
501 spdk_uring_sock_accept(struct spdk_sock *_sock)
502 {
503 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
504 	struct sockaddr_storage		sa;
505 	socklen_t			salen;
506 	int				rc, fd;
507 	struct spdk_uring_sock		*new_sock;
508 	int				flag;
509 
510 	memset(&sa, 0, sizeof(sa));
511 	salen = sizeof(sa);
512 
513 	assert(sock != NULL);
514 
515 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
516 
517 	if (rc == -1) {
518 		return NULL;
519 	}
520 
521 	fd = rc;
522 
523 	flag = fcntl(fd, F_GETFL);
524 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
525 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
526 		close(fd);
527 		return NULL;
528 	}
529 
530 	new_sock = _spdk_uring_sock_alloc(fd);
531 	if (new_sock == NULL) {
532 		close(fd);
533 		return NULL;
534 	}
535 
536 	return &new_sock->base;
537 }
538 
539 static int
540 spdk_uring_sock_close(struct spdk_sock *_sock)
541 {
542 	struct spdk_uring_sock *sock = __uring_sock(_sock);
543 	int rc;
544 
545 	/* defer the socket close if there is outstanding I/O */
546 	if (sock->outstanding_io) {
547 		return 0;
548 	}
549 
550 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
551 	assert(sock->group == NULL);
552 
553 	spdk_pipe_destroy(sock->recv_pipe);
554 	free(sock->recv_buf);
555 	rc = close(sock->fd);
556 	if (rc == 0) {
557 		free(sock);
558 	}
559 
560 	return rc;
561 }
562 
563 static ssize_t
564 spdk_uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
565 {
566 	struct iovec siov[2];
567 	int sbytes;
568 	ssize_t bytes;
569 	struct spdk_uring_sock_group_impl *group;
570 
571 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
572 	if (sbytes < 0) {
573 		errno = EINVAL;
574 		return -1;
575 	} else if (sbytes == 0) {
576 		errno = EAGAIN;
577 		return -1;
578 	}
579 
580 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
581 
582 	if (bytes == 0) {
583 		/* The only way this happens is if diov is 0 length */
584 		errno = EINVAL;
585 		return -1;
586 	}
587 
588 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
589 
590 	/* If we drained the pipe, take it off the level-triggered list */
591 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
592 		group = __uring_group_impl(sock->base.group_impl);
593 		TAILQ_REMOVE(&group->pending_recv, sock, link);
594 		sock->pending_recv = false;
595 	}
596 
597 	return bytes;
598 }
599 
600 static inline ssize_t
601 _spdk_uring_sock_read(struct spdk_uring_sock *sock)
602 {
603 	struct iovec iov[2];
604 	int bytes;
605 	struct spdk_uring_sock_group_impl *group;
606 
607 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
608 
609 	if (bytes > 0) {
610 		bytes = readv(sock->fd, iov, 2);
611 		if (bytes > 0) {
612 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
613 			if (sock->base.group_impl) {
614 				group = __uring_group_impl(sock->base.group_impl);
615 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
616 				sock->pending_recv = true;
617 			}
618 		}
619 	}
620 
621 	return bytes;
622 }
623 
624 static ssize_t
625 spdk_uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
626 {
627 	struct spdk_uring_sock *sock = __uring_sock(_sock);
628 	int rc, i;
629 	size_t len;
630 
631 	if (sock->recv_pipe == NULL) {
632 		return readv(sock->fd, iov, iovcnt);
633 	}
634 
635 	len = 0;
636 	for (i = 0; i < iovcnt; i++) {
637 		len += iov[i].iov_len;
638 	}
639 
640 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
641 		/* If the user is receiving a sufficiently large amount of data,
642 		 * receive directly to their buffers. */
643 		if (len >= 1024) {
644 			return readv(sock->fd, iov, iovcnt);
645 		}
646 
647 		/* Otherwise, do a big read into our pipe */
648 		rc = _spdk_uring_sock_read(sock);
649 		if (rc <= 0) {
650 			return rc;
651 		}
652 	}
653 
654 	return spdk_uring_sock_recv_from_pipe(sock, iov, iovcnt);
655 }
656 
657 static ssize_t
658 spdk_uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
659 {
660 	struct iovec iov[1];
661 
662 	iov[0].iov_base = buf;
663 	iov[0].iov_len = len;
664 
665 	return spdk_uring_sock_readv(sock, iov, 1);
666 }
667 
668 static ssize_t
669 spdk_uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
670 {
671 	struct spdk_uring_sock *sock = __uring_sock(_sock);
672 
673 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
674 		errno = EAGAIN;
675 		return -1;
676 	}
677 
678 	return writev(sock->fd, iov, iovcnt);
679 }
680 
681 static int
682 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
683 		    struct spdk_sock_request **last_req)
684 {
685 	int iovcnt, i;
686 	struct spdk_sock_request *req;
687 	unsigned int offset;
688 
689 	/* Gather an iov */
690 	iovcnt = index;
691 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
692 		goto end;
693 	}
694 
695 	if (last_req != NULL && *last_req != NULL) {
696 		req = TAILQ_NEXT(*last_req, internal.link);
697 	} else {
698 		req = TAILQ_FIRST(&_sock->queued_reqs);
699 	}
700 
701 	while (req) {
702 		offset = req->internal.offset;
703 
704 		for (i = 0; i < req->iovcnt; i++) {
705 			/* Consume any offset first */
706 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
707 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
708 				continue;
709 			}
710 
711 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
712 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
713 			iovcnt++;
714 
715 			offset = 0;
716 
717 			if (iovcnt >= IOV_BATCH_SIZE) {
718 				break;
719 			}
720 		}
721 		if (iovcnt >= IOV_BATCH_SIZE) {
722 			break;
723 		}
724 
725 		if (last_req != NULL) {
726 			*last_req = req;
727 		}
728 		req = TAILQ_NEXT(req, internal.link);
729 	}
730 
731 end:
732 	return iovcnt;
733 }
734 
735 static int
736 spdk_sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
737 {
738 	struct spdk_sock_request *req;
739 	int i, retval;
740 	unsigned int offset;
741 	size_t len;
742 
743 	/* Consume the requests that were actually written */
744 	req = TAILQ_FIRST(&_sock->queued_reqs);
745 	while (req) {
746 		offset = req->internal.offset;
747 
748 		for (i = 0; i < req->iovcnt; i++) {
749 			/* Advance by the offset first */
750 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
751 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
752 				continue;
753 			}
754 
755 			/* Calculate the remaining length of this element */
756 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
757 
758 			if (len > (size_t)rc) {
759 				/* This element was partially sent. */
760 				req->internal.offset += rc;
761 				return 0;
762 			}
763 
764 			offset = 0;
765 			req->internal.offset += len;
766 			rc -= len;
767 		}
768 
769 		/* Handled a full request. */
770 		spdk_sock_request_pend(_sock, req);
771 
772 		retval = spdk_sock_request_put(_sock, req, 0);
773 		if (retval) {
774 			return retval;
775 		}
776 
777 		if (rc == 0) {
778 			break;
779 		}
780 
781 		req = TAILQ_FIRST(&_sock->queued_reqs);
782 	}
783 
784 	return 0;
785 }
786 
787 static void
788 _sock_flush(struct spdk_sock *_sock)
789 {
790 	struct spdk_uring_sock *sock = __uring_sock(_sock);
791 	struct spdk_uring_task *task = &sock->write_task;
792 	uint32_t iovcnt;
793 	struct io_uring_sqe *sqe;
794 
795 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
796 		return;
797 	}
798 
799 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
800 	if (!iovcnt) {
801 		return;
802 	}
803 
804 	task->iov_cnt = iovcnt;
805 	assert(sock->group != NULL);
806 	task->msg.msg_iov = task->iovs;
807 	task->msg.msg_iovlen = task->iov_cnt;
808 
809 	sock->group->io_queued++;
810 
811 	sqe = io_uring_get_sqe(&sock->group->uring);
812 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
813 	io_uring_sqe_set_data(sqe, task);
814 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
815 }
816 
817 static void
818 _sock_prep_pollin(struct spdk_sock *_sock)
819 {
820 	struct spdk_uring_sock *sock = __uring_sock(_sock);
821 	struct spdk_uring_task *task = &sock->pollin_task;
822 	struct io_uring_sqe *sqe;
823 
824 	/* Do not prepare pollin event */
825 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
826 		return;
827 	}
828 
829 	assert(sock->group != NULL);
830 	sock->group->io_queued++;
831 
832 	sqe = io_uring_get_sqe(&sock->group->uring);
833 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
834 	io_uring_sqe_set_data(sqe, task);
835 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
836 }
837 
838 static int
839 spdk_sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
840 			   struct spdk_sock **socks)
841 {
842 	int i, count, ret;
843 	struct io_uring_cqe *cqe;
844 	struct spdk_uring_sock *sock, *tmp;
845 	struct spdk_uring_task *task;
846 	int status;
847 
848 	for (i = 0; i < max; i++) {
849 		ret = io_uring_peek_cqe(&group->uring, &cqe);
850 		if (ret != 0) {
851 			break;
852 		}
853 
854 		if (cqe == NULL) {
855 			break;
856 		}
857 
858 		task = (struct spdk_uring_task *)cqe->user_data;
859 		assert(task != NULL);
860 		sock = task->sock;
861 		assert(sock != NULL);
862 		assert(sock->group != NULL);
863 		assert(sock->group == group);
864 		sock->group->io_inflight--;
865 		sock->group->io_avail++;
866 		status = cqe->res;
867 		io_uring_cqe_seen(&group->uring, cqe);
868 
869 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
870 
871 		if (spdk_unlikely(status <= 0)) {
872 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
873 				continue;
874 			}
875 		}
876 
877 		switch (task->type) {
878 		case SPDK_SOCK_TASK_POLLIN:
879 			if ((status & POLLIN) == POLLIN) {
880 				if ((socks != NULL) && (sock->base.cb_fn != NULL)) {
881 					assert(sock->pending_recv == false);
882 					sock->pending_recv = true;
883 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
884 				}
885 			} else {
886 				SPDK_UNREACHABLE();
887 			}
888 			break;
889 		case SPDK_SOCK_TASK_WRITE:
890 			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
891 			task->last_req = NULL;
892 			task->iov_cnt = 0;
893 			spdk_sock_complete_reqs(&sock->base, status);
894 
895 			/* For socket is removed from the group but having outstanding I/O */
896 			if (spdk_unlikely(task->sock->outstanding_io > 0 &&
897 					  TAILQ_EMPTY(&sock->base.pending_reqs))) {
898 				if (--sock->outstanding_io == 0) {
899 					sock->group = NULL;
900 					/* Just for sock close case */
901 					if (sock->base.flags.closed) {
902 						spdk_uring_sock_close(&sock->base);
903 					}
904 				}
905 			}
906 
907 			break;
908 		default:
909 			SPDK_UNREACHABLE();
910 		}
911 	}
912 
913 	count = 0;
914 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
915 		if (count == max_read_events) {
916 			break;
917 		}
918 
919 		socks[count++] = &sock->base;
920 	}
921 
922 	/* Cycle the pending_recv list so that each time we poll things aren't
923 	 * in the same order. */
924 	for (i = 0; i < count; i++) {
925 		sock = __uring_sock(socks[i]);
926 
927 		TAILQ_REMOVE(&group->pending_recv, sock, link);
928 
929 		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
930 			sock->pending_recv = false;
931 		} else {
932 			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
933 		}
934 	}
935 
936 	return count;
937 }
938 
939 static int
940 _sock_flush_client(struct spdk_sock *_sock)
941 {
942 	struct spdk_uring_sock *sock = __uring_sock(_sock);
943 	struct msghdr msg = {};
944 	struct iovec iovs[IOV_BATCH_SIZE];
945 	int iovcnt;
946 	ssize_t rc;
947 
948 	/* Can't flush from within a callback or we end up with recursive calls */
949 	if (_sock->cb_cnt > 0) {
950 		return 0;
951 	}
952 
953 	/* Gather an iov */
954 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL);
955 	if (iovcnt == 0) {
956 		return 0;
957 	}
958 
959 	/* Perform the vectored write */
960 	msg.msg_iov = iovs;
961 	msg.msg_iovlen = iovcnt;
962 	rc = sendmsg(sock->fd, &msg, 0);
963 	if (rc <= 0) {
964 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
965 			return 0;
966 		}
967 		return rc;
968 	}
969 
970 	spdk_sock_complete_reqs(_sock, rc);
971 
972 	return 0;
973 }
974 
975 static void
976 spdk_uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
977 {
978 	struct spdk_uring_sock *sock = __uring_sock(_sock);
979 	int rc;
980 
981 	spdk_sock_request_queue(_sock, req);
982 
983 	if (!sock->group) {
984 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
985 			rc = _sock_flush_client(_sock);
986 			if (rc) {
987 				spdk_sock_abort_requests(_sock);
988 			}
989 		}
990 	}
991 }
992 
993 static int
994 spdk_uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
995 {
996 	struct spdk_uring_sock *sock = __uring_sock(_sock);
997 	int val;
998 	int rc;
999 
1000 	assert(sock != NULL);
1001 
1002 	val = nbytes;
1003 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1004 	if (rc != 0) {
1005 		return -1;
1006 	}
1007 	return 0;
1008 }
1009 
1010 static bool
1011 spdk_uring_sock_is_ipv6(struct spdk_sock *_sock)
1012 {
1013 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1014 	struct sockaddr_storage sa;
1015 	socklen_t salen;
1016 	int rc;
1017 
1018 	assert(sock != NULL);
1019 
1020 	memset(&sa, 0, sizeof sa);
1021 	salen = sizeof sa;
1022 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1023 	if (rc != 0) {
1024 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1025 		return false;
1026 	}
1027 
1028 	return (sa.ss_family == AF_INET6);
1029 }
1030 
1031 static bool
1032 spdk_uring_sock_is_ipv4(struct spdk_sock *_sock)
1033 {
1034 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1035 	struct sockaddr_storage sa;
1036 	socklen_t salen;
1037 	int rc;
1038 
1039 	assert(sock != NULL);
1040 
1041 	memset(&sa, 0, sizeof sa);
1042 	salen = sizeof sa;
1043 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1044 	if (rc != 0) {
1045 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1046 		return false;
1047 	}
1048 
1049 	return (sa.ss_family == AF_INET);
1050 }
1051 
1052 static bool
1053 spdk_uring_sock_is_connected(struct spdk_sock *_sock)
1054 {
1055 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1056 	uint8_t byte;
1057 	int rc;
1058 
1059 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1060 	if (rc == 0) {
1061 		return false;
1062 	}
1063 
1064 	if (rc < 0) {
1065 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1066 			return true;
1067 		}
1068 
1069 		return false;
1070 	}
1071 
1072 	return true;
1073 }
1074 
1075 static int
1076 spdk_uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1077 {
1078 	int rc = -1;
1079 
1080 #if defined(SO_INCOMING_NAPI_ID)
1081 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1082 	socklen_t salen = sizeof(int);
1083 
1084 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1085 	if (rc != 0) {
1086 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1087 	}
1088 
1089 #endif
1090 	return rc;
1091 }
1092 
1093 static struct spdk_sock_group_impl *
1094 spdk_uring_sock_group_impl_create(void)
1095 {
1096 	struct spdk_uring_sock_group_impl *group_impl;
1097 
1098 	group_impl = calloc(1, sizeof(*group_impl));
1099 	if (group_impl == NULL) {
1100 		SPDK_ERRLOG("group_impl allocation failed\n");
1101 		return NULL;
1102 	}
1103 
1104 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1105 
1106 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1107 		SPDK_ERRLOG("uring I/O context setup failure\n");
1108 		free(group_impl);
1109 		return NULL;
1110 	}
1111 
1112 	TAILQ_INIT(&group_impl->pending_recv);
1113 
1114 	return &group_impl->base;
1115 }
1116 
1117 static int
1118 spdk_uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1119 				    struct spdk_sock *_sock)
1120 {
1121 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1122 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1123 
1124 	sock->group = group;
1125 	sock->write_task.sock = sock;
1126 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1127 
1128 	sock->pollin_task.sock = sock;
1129 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1130 
1131 	return 0;
1132 }
1133 
1134 static int
1135 spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1136 				       struct spdk_sock *_sock)
1137 {
1138 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1139 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1140 
1141 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1142 		sock->outstanding_io++;
1143 	}
1144 
1145 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1146 		sock->outstanding_io++;
1147 	}
1148 
1149 	if ((sock->recv_pipe != NULL) &&
1150 	    spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
1151 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1152 		sock->pending_recv = false;
1153 	}
1154 
1155 	if (!sock->outstanding_io) {
1156 		sock->group = NULL;
1157 	}
1158 
1159 	return 0;
1160 }
1161 
1162 static int
1163 spdk_uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1164 				struct spdk_sock **socks)
1165 {
1166 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1167 	int count, ret;
1168 	int to_complete, to_submit;
1169 	struct spdk_sock *_sock, *tmp;
1170 
1171 	TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1172 		_sock_flush(_sock);
1173 		_sock_prep_pollin(_sock);
1174 	}
1175 
1176 	to_submit = group->io_queued;
1177 
1178 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1179 	if (to_submit > 0) {
1180 		/* If there are I/O to submit, use io_uring_submit here.
1181 		 * It will automatically call io_uring_enter appropriately. */
1182 		ret = io_uring_submit(&group->uring);
1183 		if (ret < 0) {
1184 			return 1;
1185 		}
1186 		group->io_queued = 0;
1187 		group->io_inflight += to_submit;
1188 		group->io_avail -= to_submit;
1189 	}
1190 
1191 	count = 0;
1192 	to_complete = group->io_inflight;
1193 	if (to_complete > 0) {
1194 		count = spdk_sock_uring_group_reap(group, to_complete, max_events, socks);
1195 	}
1196 
1197 	return count;
1198 }
1199 
1200 static int
1201 spdk_uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1202 {
1203 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1204 
1205 	/* try to reap all the active I/O */
1206 	while (group->io_inflight) {
1207 		spdk_uring_sock_group_impl_poll(_group, 32, NULL);
1208 	}
1209 	assert(group->io_inflight == 0);
1210 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1211 
1212 	close(group->uring.ring_fd);
1213 	io_uring_queue_exit(&group->uring);
1214 
1215 	free(group);
1216 	return 0;
1217 }
1218 
1219 static int
1220 spdk_uring_sock_flush(struct spdk_sock *_sock)
1221 {
1222 	return _sock_flush_client(_sock);
1223 }
1224 
1225 static struct spdk_net_impl g_uring_net_impl = {
1226 	.name		= "uring",
1227 	.getaddr	= spdk_uring_sock_getaddr,
1228 	.connect	= spdk_uring_sock_connect,
1229 	.listen		= spdk_uring_sock_listen,
1230 	.accept		= spdk_uring_sock_accept,
1231 	.close		= spdk_uring_sock_close,
1232 	.recv		= spdk_uring_sock_recv,
1233 	.readv		= spdk_uring_sock_readv,
1234 	.writev		= spdk_uring_sock_writev,
1235 	.writev_async	= spdk_uring_sock_writev_async,
1236 	.flush          = spdk_uring_sock_flush,
1237 	.set_recvlowat	= spdk_uring_sock_set_recvlowat,
1238 	.set_recvbuf	= spdk_uring_sock_set_recvbuf,
1239 	.set_sendbuf	= spdk_uring_sock_set_sendbuf,
1240 	.is_ipv6	= spdk_uring_sock_is_ipv6,
1241 	.is_ipv4	= spdk_uring_sock_is_ipv4,
1242 	.is_connected   = spdk_uring_sock_is_connected,
1243 	.get_placement_id	= spdk_uring_sock_get_placement_id,
1244 	.group_impl_create	= spdk_uring_sock_group_impl_create,
1245 	.group_impl_add_sock	= spdk_uring_sock_group_impl_add_sock,
1246 	.group_impl_remove_sock = spdk_uring_sock_group_impl_remove_sock,
1247 	.group_impl_poll	= spdk_uring_sock_group_impl_poll,
1248 	.group_impl_close	= spdk_uring_sock_group_impl_close,
1249 };
1250 
1251 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1252