xref: /spdk/module/sock/uring/uring.c (revision d18e63206a4b5d710756d7e7d9c42ef0db0c5d58)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/config.h"
36 
37 #include <sys/epoll.h>
38 #include <liburing.h>
39 
40 #include "spdk/barrier.h"
41 #include "spdk/likely.h"
42 #include "spdk/log.h"
43 #include "spdk/pipe.h"
44 #include "spdk/sock.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/sock.h"
49 #include "spdk_internal/assert.h"
50 
51 #define MAX_TMPBUF 1024
52 #define PORTNUMLEN 32
53 #define SO_RCVBUF_SIZE (2 * 1024 * 1024)
54 #define SO_SNDBUF_SIZE (2 * 1024 * 1024)
55 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
56 #define IOV_BATCH_SIZE 64
57 
58 enum spdk_sock_task_type {
59 	SPDK_SOCK_TASK_POLLIN = 0,
60 	SPDK_SOCK_TASK_WRITE,
61 	SPDK_SOCK_TASK_CANCEL,
62 };
63 
64 enum spdk_uring_sock_task_status {
65 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
66 	SPDK_URING_SOCK_TASK_IN_PROCESS,
67 };
68 
69 struct spdk_uring_task {
70 	enum spdk_uring_sock_task_status	status;
71 	enum spdk_sock_task_type		type;
72 	struct spdk_uring_sock			*sock;
73 	struct msghdr				msg;
74 	struct iovec				iovs[IOV_BATCH_SIZE];
75 	int					iov_cnt;
76 	struct spdk_sock_request		*last_req;
77 	STAILQ_ENTRY(spdk_uring_task)		link;
78 };
79 
80 struct spdk_uring_sock {
81 	struct spdk_sock			base;
82 	int					fd;
83 	struct spdk_uring_sock_group_impl	*group;
84 	struct spdk_uring_task			write_task;
85 	struct spdk_uring_task			pollin_task;
86 	struct spdk_uring_task			cancel_task;
87 	int					outstanding_io;
88 	struct spdk_pipe			*recv_pipe;
89 	void					*recv_buf;
90 	int					recv_buf_sz;
91 	bool					pending_recv;
92 	TAILQ_ENTRY(spdk_uring_sock)		link;
93 };
94 
95 struct spdk_uring_sock_group_impl {
96 	struct spdk_sock_group_impl		base;
97 	struct io_uring				uring;
98 	uint32_t				io_inflight;
99 	uint32_t				io_queued;
100 	uint32_t				io_avail;
101 	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
102 };
103 
104 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
105 
106 static int
107 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
108 {
109 	const char *result = NULL;
110 
111 	if (sa == NULL || host == NULL) {
112 		return -1;
113 	}
114 
115 	switch (sa->sa_family) {
116 	case AF_INET:
117 		result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
118 				   host, hlen);
119 		break;
120 	case AF_INET6:
121 		result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
122 				   host, hlen);
123 		break;
124 	default:
125 		break;
126 	}
127 
128 	if (result != NULL) {
129 		return 0;
130 	} else {
131 		return -1;
132 	}
133 }
134 
135 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
136 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
137 
138 static int
139 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
140 		   char *caddr, int clen, uint16_t *cport)
141 {
142 	struct spdk_uring_sock *sock = __uring_sock(_sock);
143 	struct sockaddr_storage sa;
144 	socklen_t salen;
145 	int rc;
146 
147 	assert(sock != NULL);
148 
149 	memset(&sa, 0, sizeof sa);
150 	salen = sizeof sa;
151 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
152 	if (rc != 0) {
153 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
154 		return -1;
155 	}
156 
157 	switch (sa.ss_family) {
158 	case AF_UNIX:
159 		/* Acceptable connection types that don't have IPs */
160 		return 0;
161 	case AF_INET:
162 	case AF_INET6:
163 		/* Code below will get IP addresses */
164 		break;
165 	default:
166 		/* Unsupported socket family */
167 		return -1;
168 	}
169 
170 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
171 	if (rc != 0) {
172 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
173 		return -1;
174 	}
175 
176 	if (sport) {
177 		if (sa.ss_family == AF_INET) {
178 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
179 		} else if (sa.ss_family == AF_INET6) {
180 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
181 		}
182 	}
183 
184 	memset(&sa, 0, sizeof sa);
185 	salen = sizeof sa;
186 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
187 	if (rc != 0) {
188 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
189 		return -1;
190 	}
191 
192 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
193 	if (rc != 0) {
194 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
195 		return -1;
196 	}
197 
198 	if (cport) {
199 		if (sa.ss_family == AF_INET) {
200 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
201 		} else if (sa.ss_family == AF_INET6) {
202 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
203 		}
204 	}
205 
206 	return 0;
207 }
208 
209 enum uring_sock_create_type {
210 	SPDK_SOCK_CREATE_LISTEN,
211 	SPDK_SOCK_CREATE_CONNECT,
212 };
213 
214 static int
215 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
216 {
217 	uint8_t *new_buf;
218 	struct spdk_pipe *new_pipe;
219 	struct iovec siov[2];
220 	struct iovec diov[2];
221 	int sbytes;
222 	ssize_t bytes;
223 
224 	if (sock->recv_buf_sz == sz) {
225 		return 0;
226 	}
227 
228 	/* If the new size is 0, just free the pipe */
229 	if (sz == 0) {
230 		spdk_pipe_destroy(sock->recv_pipe);
231 		free(sock->recv_buf);
232 		sock->recv_pipe = NULL;
233 		sock->recv_buf = NULL;
234 		return 0;
235 	}
236 
237 	/* Round up to next 64 byte multiple */
238 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
239 	if (!new_buf) {
240 		SPDK_ERRLOG("socket recv buf allocation failed\n");
241 		return -ENOMEM;
242 	}
243 
244 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
245 	if (new_pipe == NULL) {
246 		SPDK_ERRLOG("socket pipe allocation failed\n");
247 		free(new_buf);
248 		return -ENOMEM;
249 	}
250 
251 	if (sock->recv_pipe != NULL) {
252 		/* Pull all of the data out of the old pipe */
253 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
254 		if (sbytes > sz) {
255 			/* Too much data to fit into the new pipe size */
256 			spdk_pipe_destroy(new_pipe);
257 			free(new_buf);
258 			return -EINVAL;
259 		}
260 
261 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
262 		assert(sbytes == sz);
263 
264 		bytes = spdk_iovcpy(siov, 2, diov, 2);
265 		spdk_pipe_writer_advance(new_pipe, bytes);
266 
267 		spdk_pipe_destroy(sock->recv_pipe);
268 		free(sock->recv_buf);
269 	}
270 
271 	sock->recv_buf_sz = sz;
272 	sock->recv_buf = new_buf;
273 	sock->recv_pipe = new_pipe;
274 
275 	return 0;
276 }
277 
278 static int
279 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
280 {
281 	struct spdk_uring_sock *sock = __uring_sock(_sock);
282 	int rc;
283 
284 	assert(sock != NULL);
285 
286 #ifndef __aarch64__
287 	/* On ARM systems, this buffering does not help. Skip it. */
288 	/* The size of the pipe is purely derived from benchmarks. It seems to work well. */
289 	rc = uring_sock_alloc_pipe(sock, sz);
290 	if (rc) {
291 		SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
292 		return rc;
293 	}
294 #endif
295 
296 	if (sz < SO_RCVBUF_SIZE) {
297 		sz = SO_RCVBUF_SIZE;
298 	}
299 
300 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
301 	if (rc < 0) {
302 		return rc;
303 	}
304 
305 	return 0;
306 }
307 
308 static int
309 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
310 {
311 	struct spdk_uring_sock *sock = __uring_sock(_sock);
312 	int rc;
313 
314 	assert(sock != NULL);
315 
316 	if (sz < SO_SNDBUF_SIZE) {
317 		sz = SO_SNDBUF_SIZE;
318 	}
319 
320 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
321 	if (rc < 0) {
322 		return rc;
323 	}
324 
325 	return 0;
326 }
327 
328 static struct spdk_uring_sock *
329 uring_sock_alloc(int fd)
330 {
331 	struct spdk_uring_sock *sock;
332 
333 	sock = calloc(1, sizeof(*sock));
334 	if (sock == NULL) {
335 		SPDK_ERRLOG("sock allocation failed\n");
336 		return NULL;
337 	}
338 
339 	sock->fd = fd;
340 	return sock;
341 }
342 
343 static struct spdk_sock *
344 uring_sock_create(const char *ip, int port,
345 		  enum uring_sock_create_type type,
346 		  struct spdk_sock_opts *opts)
347 {
348 	struct spdk_uring_sock *sock;
349 	char buf[MAX_TMPBUF];
350 	char portnum[PORTNUMLEN];
351 	char *p;
352 	struct addrinfo hints, *res, *res0;
353 	int fd, flag;
354 	int val = 1;
355 	int rc;
356 
357 	if (ip == NULL) {
358 		return NULL;
359 	}
360 	if (ip[0] == '[') {
361 		snprintf(buf, sizeof(buf), "%s", ip + 1);
362 		p = strchr(buf, ']');
363 		if (p != NULL) {
364 			*p = '\0';
365 		}
366 		ip = (const char *) &buf[0];
367 	}
368 
369 	snprintf(portnum, sizeof portnum, "%d", port);
370 	memset(&hints, 0, sizeof hints);
371 	hints.ai_family = PF_UNSPEC;
372 	hints.ai_socktype = SOCK_STREAM;
373 	hints.ai_flags = AI_NUMERICSERV;
374 	hints.ai_flags |= AI_PASSIVE;
375 	hints.ai_flags |= AI_NUMERICHOST;
376 	rc = getaddrinfo(ip, portnum, &hints, &res0);
377 	if (rc != 0) {
378 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
379 		return NULL;
380 	}
381 
382 	/* try listen */
383 	fd = -1;
384 	for (res = res0; res != NULL; res = res->ai_next) {
385 retry:
386 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
387 		if (fd < 0) {
388 			/* error */
389 			continue;
390 		}
391 
392 		val = SO_RCVBUF_SIZE;
393 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
394 		if (rc) {
395 			/* Not fatal */
396 		}
397 
398 		val = SO_SNDBUF_SIZE;
399 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
400 		if (rc) {
401 			/* Not fatal */
402 		}
403 
404 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
405 		if (rc != 0) {
406 			close(fd);
407 			/* error */
408 			continue;
409 		}
410 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
411 		if (rc != 0) {
412 			close(fd);
413 			/* error */
414 			continue;
415 		}
416 
417 #if defined(SO_PRIORITY)
418 		if (opts != NULL && opts->priority) {
419 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
420 			if (rc != 0) {
421 				close(fd);
422 				/* error */
423 				continue;
424 			}
425 		}
426 #endif
427 		if (res->ai_family == AF_INET6) {
428 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
429 			if (rc != 0) {
430 				close(fd);
431 				/* error */
432 				continue;
433 			}
434 		}
435 
436 		if (type == SPDK_SOCK_CREATE_LISTEN) {
437 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
438 			if (rc != 0) {
439 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
440 				switch (errno) {
441 				case EINTR:
442 					/* interrupted? */
443 					close(fd);
444 					goto retry;
445 				case EADDRNOTAVAIL:
446 					SPDK_ERRLOG("IP address %s not available. "
447 						    "Verify IP address in config file "
448 						    "and make sure setup script is "
449 						    "run before starting spdk app.\n", ip);
450 				/* FALLTHROUGH */
451 				default:
452 					/* try next family */
453 					close(fd);
454 					fd = -1;
455 					continue;
456 				}
457 			}
458 			/* bind OK */
459 			rc = listen(fd, 512);
460 			if (rc != 0) {
461 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
462 				close(fd);
463 				fd = -1;
464 				break;
465 			}
466 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
467 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
468 			if (rc != 0) {
469 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
470 				/* try next family */
471 				close(fd);
472 				fd = -1;
473 				continue;
474 			}
475 		}
476 
477 		flag = fcntl(fd, F_GETFL);
478 		if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
479 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
480 			close(fd);
481 			fd = -1;
482 			break;
483 		}
484 		break;
485 	}
486 	freeaddrinfo(res0);
487 
488 	if (fd < 0) {
489 		return NULL;
490 	}
491 
492 	sock = uring_sock_alloc(fd);
493 	if (sock == NULL) {
494 		SPDK_ERRLOG("sock allocation failed\n");
495 		close(fd);
496 		return NULL;
497 	}
498 
499 	return &sock->base;
500 }
501 
502 static struct spdk_sock *
503 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
504 {
505 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
506 }
507 
508 static struct spdk_sock *
509 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
510 {
511 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
512 }
513 
514 static struct spdk_sock *
515 uring_sock_accept(struct spdk_sock *_sock)
516 {
517 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
518 	struct sockaddr_storage		sa;
519 	socklen_t			salen;
520 	int				rc, fd;
521 	struct spdk_uring_sock		*new_sock;
522 	int				flag;
523 
524 	memset(&sa, 0, sizeof(sa));
525 	salen = sizeof(sa);
526 
527 	assert(sock != NULL);
528 
529 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
530 
531 	if (rc == -1) {
532 		return NULL;
533 	}
534 
535 	fd = rc;
536 
537 	flag = fcntl(fd, F_GETFL);
538 	if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
539 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
540 		close(fd);
541 		return NULL;
542 	}
543 
544 #if defined(SO_PRIORITY)
545 	/* The priority is not inherited, so call this function again */
546 	if (sock->base.opts.priority) {
547 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
548 		if (rc != 0) {
549 			close(fd);
550 			return NULL;
551 		}
552 	}
553 #endif
554 
555 	new_sock = uring_sock_alloc(fd);
556 	if (new_sock == NULL) {
557 		close(fd);
558 		return NULL;
559 	}
560 
561 	return &new_sock->base;
562 }
563 
564 static int
565 uring_sock_close(struct spdk_sock *_sock)
566 {
567 	struct spdk_uring_sock *sock = __uring_sock(_sock);
568 	int rc;
569 
570 	/* defer the socket close if there is outstanding I/O */
571 	if (sock->outstanding_io) {
572 		return 0;
573 	}
574 
575 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
576 	assert(sock->group == NULL);
577 
578 	spdk_pipe_destroy(sock->recv_pipe);
579 	free(sock->recv_buf);
580 	rc = close(sock->fd);
581 	if (rc == 0) {
582 		free(sock);
583 	}
584 
585 	return rc;
586 }
587 
588 static ssize_t
589 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
590 {
591 	struct iovec siov[2];
592 	int sbytes;
593 	ssize_t bytes;
594 	struct spdk_uring_sock_group_impl *group;
595 
596 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
597 	if (sbytes < 0) {
598 		errno = EINVAL;
599 		return -1;
600 	} else if (sbytes == 0) {
601 		errno = EAGAIN;
602 		return -1;
603 	}
604 
605 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
606 
607 	if (bytes == 0) {
608 		/* The only way this happens is if diov is 0 length */
609 		errno = EINVAL;
610 		return -1;
611 	}
612 
613 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
614 
615 	/* If we drained the pipe, take it off the level-triggered list */
616 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
617 		group = __uring_group_impl(sock->base.group_impl);
618 		TAILQ_REMOVE(&group->pending_recv, sock, link);
619 		sock->pending_recv = false;
620 	}
621 
622 	return bytes;
623 }
624 
625 static inline ssize_t
626 uring_sock_read(struct spdk_uring_sock *sock)
627 {
628 	struct iovec iov[2];
629 	int bytes;
630 	struct spdk_uring_sock_group_impl *group;
631 
632 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
633 
634 	if (bytes > 0) {
635 		bytes = readv(sock->fd, iov, 2);
636 		if (bytes > 0) {
637 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
638 			if (sock->base.group_impl) {
639 				group = __uring_group_impl(sock->base.group_impl);
640 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
641 				sock->pending_recv = true;
642 			}
643 		}
644 	}
645 
646 	return bytes;
647 }
648 
649 static ssize_t
650 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
651 {
652 	struct spdk_uring_sock *sock = __uring_sock(_sock);
653 	int rc, i;
654 	size_t len;
655 
656 	if (sock->recv_pipe == NULL) {
657 		return readv(sock->fd, iov, iovcnt);
658 	}
659 
660 	len = 0;
661 	for (i = 0; i < iovcnt; i++) {
662 		len += iov[i].iov_len;
663 	}
664 
665 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
666 		/* If the user is receiving a sufficiently large amount of data,
667 		 * receive directly to their buffers. */
668 		if (len >= 1024) {
669 			return readv(sock->fd, iov, iovcnt);
670 		}
671 
672 		/* Otherwise, do a big read into our pipe */
673 		rc = uring_sock_read(sock);
674 		if (rc <= 0) {
675 			return rc;
676 		}
677 	}
678 
679 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
680 }
681 
682 static ssize_t
683 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
684 {
685 	struct iovec iov[1];
686 
687 	iov[0].iov_base = buf;
688 	iov[0].iov_len = len;
689 
690 	return uring_sock_readv(sock, iov, 1);
691 }
692 
693 static ssize_t
694 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
695 {
696 	struct spdk_uring_sock *sock = __uring_sock(_sock);
697 
698 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
699 		errno = EAGAIN;
700 		return -1;
701 	}
702 
703 	return writev(sock->fd, iov, iovcnt);
704 }
705 
706 static int
707 sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
708 	       struct spdk_sock_request **last_req)
709 {
710 	int iovcnt, i;
711 	struct spdk_sock_request *req;
712 	unsigned int offset;
713 
714 	/* Gather an iov */
715 	iovcnt = index;
716 	if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
717 		goto end;
718 	}
719 
720 	if (last_req != NULL && *last_req != NULL) {
721 		req = TAILQ_NEXT(*last_req, internal.link);
722 	} else {
723 		req = TAILQ_FIRST(&_sock->queued_reqs);
724 	}
725 
726 	while (req) {
727 		offset = req->internal.offset;
728 
729 		for (i = 0; i < req->iovcnt; i++) {
730 			/* Consume any offset first */
731 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
732 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
733 				continue;
734 			}
735 
736 			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
737 			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
738 			iovcnt++;
739 
740 			offset = 0;
741 
742 			if (iovcnt >= IOV_BATCH_SIZE) {
743 				break;
744 			}
745 		}
746 		if (iovcnt >= IOV_BATCH_SIZE) {
747 			break;
748 		}
749 
750 		if (last_req != NULL) {
751 			*last_req = req;
752 		}
753 		req = TAILQ_NEXT(req, internal.link);
754 	}
755 
756 end:
757 	return iovcnt;
758 }
759 
760 static int
761 sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
762 {
763 	struct spdk_sock_request *req;
764 	int i, retval;
765 	unsigned int offset;
766 	size_t len;
767 
768 	/* Consume the requests that were actually written */
769 	req = TAILQ_FIRST(&_sock->queued_reqs);
770 	while (req) {
771 		offset = req->internal.offset;
772 
773 		for (i = 0; i < req->iovcnt; i++) {
774 			/* Advance by the offset first */
775 			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
776 				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
777 				continue;
778 			}
779 
780 			/* Calculate the remaining length of this element */
781 			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
782 
783 			if (len > (size_t)rc) {
784 				/* This element was partially sent. */
785 				req->internal.offset += rc;
786 				return 0;
787 			}
788 
789 			offset = 0;
790 			req->internal.offset += len;
791 			rc -= len;
792 		}
793 
794 		/* Handled a full request. */
795 		spdk_sock_request_pend(_sock, req);
796 
797 		retval = spdk_sock_request_put(_sock, req, 0);
798 		if (retval) {
799 			return retval;
800 		}
801 
802 		if (rc == 0) {
803 			break;
804 		}
805 
806 		req = TAILQ_FIRST(&_sock->queued_reqs);
807 	}
808 
809 	return 0;
810 }
811 
812 static void
813 _sock_flush(struct spdk_sock *_sock)
814 {
815 	struct spdk_uring_sock *sock = __uring_sock(_sock);
816 	struct spdk_uring_task *task = &sock->write_task;
817 	uint32_t iovcnt;
818 	struct io_uring_sqe *sqe;
819 
820 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
821 		return;
822 	}
823 
824 	iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
825 	if (!iovcnt) {
826 		return;
827 	}
828 
829 	task->iov_cnt = iovcnt;
830 	assert(sock->group != NULL);
831 	task->msg.msg_iov = task->iovs;
832 	task->msg.msg_iovlen = task->iov_cnt;
833 
834 	sock->group->io_queued++;
835 
836 	sqe = io_uring_get_sqe(&sock->group->uring);
837 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
838 	io_uring_sqe_set_data(sqe, task);
839 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
840 }
841 
842 static void
843 _sock_prep_pollin(struct spdk_sock *_sock)
844 {
845 	struct spdk_uring_sock *sock = __uring_sock(_sock);
846 	struct spdk_uring_task *task = &sock->pollin_task;
847 	struct io_uring_sqe *sqe;
848 
849 	/* Do not prepare pollin event */
850 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
851 		return;
852 	}
853 
854 	assert(sock->group != NULL);
855 	sock->group->io_queued++;
856 
857 	sqe = io_uring_get_sqe(&sock->group->uring);
858 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
859 	io_uring_sqe_set_data(sqe, task);
860 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
861 }
862 
863 static void
864 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
865 {
866 	struct spdk_uring_sock *sock = __uring_sock(_sock);
867 	struct spdk_uring_task *task = &sock->cancel_task;
868 	struct io_uring_sqe *sqe;
869 
870 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
871 		return;
872 	}
873 
874 	assert(sock->group != NULL);
875 	sock->group->io_queued++;
876 
877 	sqe = io_uring_get_sqe(&sock->group->uring);
878 	io_uring_prep_cancel(sqe, user_data, 0);
879 	io_uring_sqe_set_data(sqe, task);
880 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
881 }
882 
883 static int
884 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
885 		      struct spdk_sock **socks)
886 {
887 	int count, i, completed_cqe_num;
888 	struct io_uring_cqe *cqes[SPDK_SOCK_GROUP_QUEUE_DEPTH];
889 	struct spdk_uring_sock *sock, *tmp;
890 	struct spdk_uring_task *task;
891 	int status;
892 
893 	max = spdk_min(max, SPDK_SOCK_GROUP_QUEUE_DEPTH);
894 	completed_cqe_num = io_uring_peek_batch_cqe(&group->uring, cqes, max);
895 	for (i = 0; i < completed_cqe_num; i++) {
896 		assert(cqes[i] != NULL);
897 
898 		task = (struct spdk_uring_task *)cqes[i]->user_data;
899 		assert(task != NULL);
900 		sock = task->sock;
901 		assert(sock != NULL);
902 		assert(sock->group != NULL);
903 		assert(sock->group == group);
904 		sock->group->io_inflight--;
905 		sock->group->io_avail++;
906 		status = cqes[i]->res;
907 		io_uring_cqe_seen(&group->uring, cqes[i]);
908 
909 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
910 
911 		if (spdk_unlikely(status <= 0)) {
912 			if (status == -EAGAIN || status == -EWOULDBLOCK) {
913 				continue;
914 			}
915 		}
916 
917 		switch (task->type) {
918 		case SPDK_SOCK_TASK_POLLIN:
919 			if ((status & POLLIN) == POLLIN) {
920 				if ((socks != NULL) && (sock->base.cb_fn != NULL)) {
921 					assert(sock->pending_recv == false);
922 					sock->pending_recv = true;
923 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
924 				}
925 			} else {
926 				SPDK_UNREACHABLE();
927 			}
928 			break;
929 		case SPDK_SOCK_TASK_WRITE:
930 			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
931 			task->last_req = NULL;
932 			task->iov_cnt = 0;
933 			sock_complete_reqs(&sock->base, status);
934 
935 			/* For socket is removed from the group but having outstanding I/O */
936 			if (spdk_unlikely(task->sock->outstanding_io > 0 &&
937 					  TAILQ_EMPTY(&sock->base.pending_reqs))) {
938 				if (--sock->outstanding_io == 0) {
939 					/* Just for sock close case */
940 					if (sock->base.flags.closed) {
941 						uring_sock_close(&sock->base);
942 					}
943 				}
944 			}
945 
946 			break;
947 		case SPDK_SOCK_TASK_CANCEL:
948 			if ((status == 0) && (sock->outstanding_io > 0)) {
949 				sock->outstanding_io--;
950 			}
951 			break;
952 		default:
953 			SPDK_UNREACHABLE();
954 		}
955 	}
956 
957 	count = 0;
958 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
959 		if (count == max_read_events) {
960 			break;
961 		}
962 
963 		socks[count++] = &sock->base;
964 	}
965 
966 	/* Cycle the pending_recv list so that each time we poll things aren't
967 	 * in the same order. */
968 	for (i = 0; i < count; i++) {
969 		sock = __uring_sock(socks[i]);
970 
971 		TAILQ_REMOVE(&group->pending_recv, sock, link);
972 
973 		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
974 			sock->pending_recv = false;
975 		} else {
976 			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
977 		}
978 	}
979 
980 	return count;
981 }
982 
983 static int
984 _sock_flush_client(struct spdk_sock *_sock)
985 {
986 	struct spdk_uring_sock *sock = __uring_sock(_sock);
987 	struct msghdr msg = {};
988 	struct iovec iovs[IOV_BATCH_SIZE];
989 	int iovcnt;
990 	ssize_t rc;
991 
992 	/* Can't flush from within a callback or we end up with recursive calls */
993 	if (_sock->cb_cnt > 0) {
994 		return 0;
995 	}
996 
997 	/* Gather an iov */
998 	iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL);
999 	if (iovcnt == 0) {
1000 		return 0;
1001 	}
1002 
1003 	/* Perform the vectored write */
1004 	msg.msg_iov = iovs;
1005 	msg.msg_iovlen = iovcnt;
1006 	rc = sendmsg(sock->fd, &msg, 0);
1007 	if (rc <= 0) {
1008 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1009 			return 0;
1010 		}
1011 		return rc;
1012 	}
1013 
1014 	sock_complete_reqs(_sock, rc);
1015 
1016 	return 0;
1017 }
1018 
1019 static void
1020 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1021 {
1022 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1023 	int rc;
1024 
1025 	spdk_sock_request_queue(_sock, req);
1026 
1027 	if (!sock->group) {
1028 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1029 			rc = _sock_flush_client(_sock);
1030 			if (rc) {
1031 				spdk_sock_abort_requests(_sock);
1032 			}
1033 		}
1034 	}
1035 }
1036 
1037 static int
1038 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1039 {
1040 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1041 	int val;
1042 	int rc;
1043 
1044 	assert(sock != NULL);
1045 
1046 	val = nbytes;
1047 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1048 	if (rc != 0) {
1049 		return -1;
1050 	}
1051 	return 0;
1052 }
1053 
1054 static bool
1055 uring_sock_is_ipv6(struct spdk_sock *_sock)
1056 {
1057 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1058 	struct sockaddr_storage sa;
1059 	socklen_t salen;
1060 	int rc;
1061 
1062 	assert(sock != NULL);
1063 
1064 	memset(&sa, 0, sizeof sa);
1065 	salen = sizeof sa;
1066 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1067 	if (rc != 0) {
1068 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1069 		return false;
1070 	}
1071 
1072 	return (sa.ss_family == AF_INET6);
1073 }
1074 
1075 static bool
1076 uring_sock_is_ipv4(struct spdk_sock *_sock)
1077 {
1078 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1079 	struct sockaddr_storage sa;
1080 	socklen_t salen;
1081 	int rc;
1082 
1083 	assert(sock != NULL);
1084 
1085 	memset(&sa, 0, sizeof sa);
1086 	salen = sizeof sa;
1087 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1088 	if (rc != 0) {
1089 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1090 		return false;
1091 	}
1092 
1093 	return (sa.ss_family == AF_INET);
1094 }
1095 
1096 static bool
1097 uring_sock_is_connected(struct spdk_sock *_sock)
1098 {
1099 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1100 	uint8_t byte;
1101 	int rc;
1102 
1103 	rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1104 	if (rc == 0) {
1105 		return false;
1106 	}
1107 
1108 	if (rc < 0) {
1109 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1110 			return true;
1111 		}
1112 
1113 		return false;
1114 	}
1115 
1116 	return true;
1117 }
1118 
1119 static int
1120 uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1121 {
1122 	int rc = -1;
1123 
1124 #if defined(SO_INCOMING_NAPI_ID)
1125 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1126 	socklen_t salen = sizeof(int);
1127 
1128 	rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1129 	if (rc != 0) {
1130 		SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1131 	}
1132 
1133 #endif
1134 	return rc;
1135 }
1136 
1137 static struct spdk_sock_group_impl *
1138 uring_sock_group_impl_create(void)
1139 {
1140 	struct spdk_uring_sock_group_impl *group_impl;
1141 
1142 	group_impl = calloc(1, sizeof(*group_impl));
1143 	if (group_impl == NULL) {
1144 		SPDK_ERRLOG("group_impl allocation failed\n");
1145 		return NULL;
1146 	}
1147 
1148 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1149 
1150 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1151 		SPDK_ERRLOG("uring I/O context setup failure\n");
1152 		free(group_impl);
1153 		return NULL;
1154 	}
1155 
1156 	TAILQ_INIT(&group_impl->pending_recv);
1157 
1158 	return &group_impl->base;
1159 }
1160 
1161 static int
1162 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1163 			       struct spdk_sock *_sock)
1164 {
1165 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1166 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1167 
1168 	sock->group = group;
1169 	sock->write_task.sock = sock;
1170 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1171 
1172 	sock->pollin_task.sock = sock;
1173 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1174 
1175 	sock->cancel_task.sock = sock;
1176 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1177 
1178 	/* switched from another polling group due to scheduling */
1179 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1180 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1181 		assert(sock->pending_recv == false);
1182 		sock->pending_recv = true;
1183 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1184 	}
1185 
1186 	return 0;
1187 }
1188 
1189 static int
1190 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1191 			   struct spdk_sock **socks)
1192 {
1193 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1194 	int count, ret;
1195 	int to_complete, to_submit;
1196 	struct spdk_sock *_sock, *tmp;
1197 
1198 	TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1199 		_sock_flush(_sock);
1200 		_sock_prep_pollin(_sock);
1201 	}
1202 
1203 	to_submit = group->io_queued;
1204 
1205 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1206 	if (to_submit > 0) {
1207 		/* If there are I/O to submit, use io_uring_submit here.
1208 		 * It will automatically call io_uring_enter appropriately. */
1209 		ret = io_uring_submit(&group->uring);
1210 		if (ret < 0) {
1211 			return 1;
1212 		}
1213 		group->io_queued = 0;
1214 		group->io_inflight += to_submit;
1215 		group->io_avail -= to_submit;
1216 	}
1217 
1218 	count = 0;
1219 	to_complete = group->io_inflight;
1220 	if (to_complete > 0) {
1221 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1222 	}
1223 
1224 	return count;
1225 }
1226 
1227 static int
1228 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1229 				  struct spdk_sock *_sock)
1230 {
1231 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1232 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1233 
1234 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1235 		/* For write, we do not need to cancel it */
1236 		sock->outstanding_io++;
1237 	}
1238 
1239 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1240 		sock->outstanding_io++;
1241 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1242 	}
1243 
1244 
1245 	/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1246 	 * currently can use a while loop here. */
1247 	while (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1248 		uring_sock_group_impl_poll(_group, 32, NULL);
1249 	}
1250 
1251 	if (sock->recv_pipe != NULL) {
1252 		if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
1253 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1254 			sock->pending_recv = false;
1255 		}
1256 		assert(sock->pending_recv == false);
1257 	}
1258 	sock->group = NULL;
1259 	return 0;
1260 }
1261 
1262 static int
1263 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1264 {
1265 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1266 
1267 	/* try to reap all the active I/O */
1268 	while (group->io_inflight) {
1269 		uring_sock_group_impl_poll(_group, 32, NULL);
1270 	}
1271 	assert(group->io_inflight == 0);
1272 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1273 
1274 	close(group->uring.ring_fd);
1275 	io_uring_queue_exit(&group->uring);
1276 
1277 	free(group);
1278 	return 0;
1279 }
1280 
1281 static int
1282 uring_sock_flush(struct spdk_sock *_sock)
1283 {
1284 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1285 
1286 	if (!sock->group) {
1287 		return _sock_flush_client(_sock);
1288 	}
1289 
1290 	return 0;
1291 }
1292 
1293 static struct spdk_net_impl g_uring_net_impl = {
1294 	.name		= "uring",
1295 	.getaddr	= uring_sock_getaddr,
1296 	.connect	= uring_sock_connect,
1297 	.listen		= uring_sock_listen,
1298 	.accept		= uring_sock_accept,
1299 	.close		= uring_sock_close,
1300 	.recv		= uring_sock_recv,
1301 	.readv		= uring_sock_readv,
1302 	.writev		= uring_sock_writev,
1303 	.writev_async	= uring_sock_writev_async,
1304 	.flush          = uring_sock_flush,
1305 	.set_recvlowat	= uring_sock_set_recvlowat,
1306 	.set_recvbuf	= uring_sock_set_recvbuf,
1307 	.set_sendbuf	= uring_sock_set_sendbuf,
1308 	.is_ipv6	= uring_sock_is_ipv6,
1309 	.is_ipv4	= uring_sock_is_ipv4,
1310 	.is_connected   = uring_sock_is_connected,
1311 	.get_placement_id	= uring_sock_get_placement_id,
1312 	.group_impl_create	= uring_sock_group_impl_create,
1313 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1314 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1315 	.group_impl_poll	= uring_sock_group_impl_poll,
1316 	.group_impl_close	= uring_sock_group_impl_close,
1317 };
1318 
1319 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1320