xref: /spdk/module/sock/uring/uring.c (revision cdb0726b95631d46eaf4f2e39ddb6533f150fd27)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/config.h"
8 
9 #include <linux/errqueue.h>
10 #include <sys/epoll.h>
11 #include <liburing.h>
12 
13 #include "spdk/barrier.h"
14 #include "spdk/env.h"
15 #include "spdk/log.h"
16 #include "spdk/pipe.h"
17 #include "spdk/sock.h"
18 #include "spdk/string.h"
19 #include "spdk/util.h"
20 
21 #include "spdk_internal/sock.h"
22 #include "spdk_internal/assert.h"
23 #include "../sock_kernel.h"
24 
25 #define MAX_TMPBUF 1024
26 #define PORTNUMLEN 32
27 #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
28 #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
29 
30 enum spdk_sock_task_type {
31 	SPDK_SOCK_TASK_POLLIN = 0,
32 	SPDK_SOCK_TASK_ERRQUEUE,
33 	SPDK_SOCK_TASK_WRITE,
34 	SPDK_SOCK_TASK_CANCEL,
35 };
36 
37 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
38 #define SPDK_ZEROCOPY
39 #endif
40 
41 enum spdk_uring_sock_task_status {
42 	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
43 	SPDK_URING_SOCK_TASK_IN_PROCESS,
44 };
45 
46 struct spdk_uring_task {
47 	enum spdk_uring_sock_task_status	status;
48 	enum spdk_sock_task_type		type;
49 	struct spdk_uring_sock			*sock;
50 	struct msghdr				msg;
51 	struct iovec				iovs[IOV_BATCH_SIZE];
52 	int					iov_cnt;
53 	struct spdk_sock_request		*last_req;
54 	bool					is_zcopy;
55 	STAILQ_ENTRY(spdk_uring_task)		link;
56 };
57 
58 struct spdk_uring_sock {
59 	struct spdk_sock			base;
60 	int					fd;
61 	uint32_t				sendmsg_idx;
62 	struct spdk_uring_sock_group_impl	*group;
63 	struct spdk_uring_task			write_task;
64 	struct spdk_uring_task			errqueue_task;
65 	struct spdk_uring_task			pollin_task;
66 	struct spdk_uring_task			cancel_task;
67 	struct spdk_pipe			*recv_pipe;
68 	void					*recv_buf;
69 	int					recv_buf_sz;
70 	bool					zcopy;
71 	bool					pending_recv;
72 	int					zcopy_send_flags;
73 	int					connection_status;
74 	int					placement_id;
75 	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
76 	TAILQ_ENTRY(spdk_uring_sock)		link;
77 };
78 
79 TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
80 
81 struct spdk_uring_sock_group_impl {
82 	struct spdk_sock_group_impl		base;
83 	struct io_uring				uring;
84 	uint32_t				io_inflight;
85 	uint32_t				io_queued;
86 	uint32_t				io_avail;
87 	struct pending_recv_list		pending_recv;
88 };
89 
90 static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
91 	.recv_buf_size = MIN_SO_RCVBUF_SIZE,
92 	.send_buf_size = MIN_SO_SNDBUF_SIZE,
93 	.enable_recv_pipe = true,
94 	.enable_quickack = false,
95 	.enable_placement_id = PLACEMENT_NONE,
96 	.enable_zerocopy_send_server = false,
97 	.enable_zerocopy_send_client = false,
98 	.zerocopy_threshold = 0,
99 	.tls_version = 0,
100 	.enable_ktls = false,
101 	.psk_key = NULL,
102 	.psk_identity = NULL
103 };
104 
105 static struct spdk_sock_map g_map = {
106 	.entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
107 	.mtx = PTHREAD_MUTEX_INITIALIZER
108 };
109 
110 __attribute((destructor)) static void
111 uring_sock_map_cleanup(void)
112 {
113 	spdk_sock_map_cleanup(&g_map);
114 }
115 
116 #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
117 
118 #define __uring_sock(sock) (struct spdk_uring_sock *)sock
119 #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
120 
121 static void
122 uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
123 			  size_t len)
124 {
125 #define FIELD_OK(field) \
126 	offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
127 
128 #define SET_FIELD(field) \
129 	if (FIELD_OK(field)) { \
130 		dest->field = src->field; \
131 	}
132 
133 	SET_FIELD(recv_buf_size);
134 	SET_FIELD(send_buf_size);
135 	SET_FIELD(enable_recv_pipe);
136 	SET_FIELD(enable_quickack);
137 	SET_FIELD(enable_placement_id);
138 	SET_FIELD(enable_zerocopy_send_server);
139 	SET_FIELD(enable_zerocopy_send_client);
140 	SET_FIELD(zerocopy_threshold);
141 	SET_FIELD(tls_version);
142 	SET_FIELD(enable_ktls);
143 	SET_FIELD(psk_key);
144 	SET_FIELD(psk_identity);
145 
146 #undef SET_FIELD
147 #undef FIELD_OK
148 }
149 
150 static int
151 uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
152 {
153 	if (!opts || !len) {
154 		errno = EINVAL;
155 		return -1;
156 	}
157 
158 	assert(sizeof(*opts) >= *len);
159 	memset(opts, 0, *len);
160 
161 	uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len);
162 	*len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
163 
164 	return 0;
165 }
166 
167 static int
168 uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
169 {
170 	if (!opts) {
171 		errno = EINVAL;
172 		return -1;
173 	}
174 
175 	assert(sizeof(*opts) >= len);
176 	uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len);
177 
178 	return 0;
179 }
180 
181 static void
182 uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
183 {
184 	/* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
185 	memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest));
186 
187 	if (opts->impl_opts != NULL) {
188 		assert(sizeof(*dest) >= opts->impl_opts_size);
189 		uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
190 	}
191 }
192 
193 static int
194 uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
195 		   char *caddr, int clen, uint16_t *cport)
196 {
197 	struct spdk_uring_sock *sock = __uring_sock(_sock);
198 	struct sockaddr_storage sa;
199 	socklen_t salen;
200 	int rc;
201 
202 	assert(sock != NULL);
203 
204 	memset(&sa, 0, sizeof sa);
205 	salen = sizeof sa;
206 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
207 	if (rc != 0) {
208 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
209 		return -1;
210 	}
211 
212 	switch (sa.ss_family) {
213 	case AF_UNIX:
214 		/* Acceptable connection types that don't have IPs */
215 		return 0;
216 	case AF_INET:
217 	case AF_INET6:
218 		/* Code below will get IP addresses */
219 		break;
220 	default:
221 		/* Unsupported socket family */
222 		return -1;
223 	}
224 
225 	rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
226 	if (rc != 0) {
227 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
228 		return -1;
229 	}
230 
231 	if (sport) {
232 		if (sa.ss_family == AF_INET) {
233 			*sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
234 		} else if (sa.ss_family == AF_INET6) {
235 			*sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
236 		}
237 	}
238 
239 	memset(&sa, 0, sizeof sa);
240 	salen = sizeof sa;
241 	rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
242 	if (rc != 0) {
243 		SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
244 		return -1;
245 	}
246 
247 	rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
248 	if (rc != 0) {
249 		SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
250 		return -1;
251 	}
252 
253 	if (cport) {
254 		if (sa.ss_family == AF_INET) {
255 			*cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
256 		} else if (sa.ss_family == AF_INET6) {
257 			*cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
258 		}
259 	}
260 
261 	return 0;
262 }
263 
264 enum uring_sock_create_type {
265 	SPDK_SOCK_CREATE_LISTEN,
266 	SPDK_SOCK_CREATE_CONNECT,
267 };
268 
269 static int
270 uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
271 {
272 	uint8_t *new_buf;
273 	struct spdk_pipe *new_pipe;
274 	struct iovec siov[2];
275 	struct iovec diov[2];
276 	int sbytes;
277 	ssize_t bytes;
278 
279 	if (sock->recv_buf_sz == sz) {
280 		return 0;
281 	}
282 
283 	/* If the new size is 0, just free the pipe */
284 	if (sz == 0) {
285 		spdk_pipe_destroy(sock->recv_pipe);
286 		free(sock->recv_buf);
287 		sock->recv_pipe = NULL;
288 		sock->recv_buf = NULL;
289 		return 0;
290 	} else if (sz < MIN_SOCK_PIPE_SIZE) {
291 		SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
292 		return -1;
293 	}
294 
295 	/* Round up to next 64 byte multiple */
296 	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
297 	if (!new_buf) {
298 		SPDK_ERRLOG("socket recv buf allocation failed\n");
299 		return -ENOMEM;
300 	}
301 
302 	new_pipe = spdk_pipe_create(new_buf, sz + 1);
303 	if (new_pipe == NULL) {
304 		SPDK_ERRLOG("socket pipe allocation failed\n");
305 		free(new_buf);
306 		return -ENOMEM;
307 	}
308 
309 	if (sock->recv_pipe != NULL) {
310 		/* Pull all of the data out of the old pipe */
311 		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
312 		if (sbytes > sz) {
313 			/* Too much data to fit into the new pipe size */
314 			spdk_pipe_destroy(new_pipe);
315 			free(new_buf);
316 			return -EINVAL;
317 		}
318 
319 		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
320 		assert(sbytes == sz);
321 
322 		bytes = spdk_iovcpy(siov, 2, diov, 2);
323 		spdk_pipe_writer_advance(new_pipe, bytes);
324 
325 		spdk_pipe_destroy(sock->recv_pipe);
326 		free(sock->recv_buf);
327 	}
328 
329 	sock->recv_buf_sz = sz;
330 	sock->recv_buf = new_buf;
331 	sock->recv_pipe = new_pipe;
332 
333 	return 0;
334 }
335 
336 static int
337 uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
338 {
339 	struct spdk_uring_sock *sock = __uring_sock(_sock);
340 	int rc;
341 
342 	assert(sock != NULL);
343 
344 	if (_sock->impl_opts.enable_recv_pipe) {
345 		rc = uring_sock_alloc_pipe(sock, sz);
346 		if (rc) {
347 			SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
348 			return rc;
349 		}
350 	}
351 
352 	if (sz < MIN_SO_RCVBUF_SIZE) {
353 		sz = MIN_SO_RCVBUF_SIZE;
354 	}
355 
356 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
357 	if (rc < 0) {
358 		return rc;
359 	}
360 
361 	_sock->impl_opts.recv_buf_size = sz;
362 
363 	return 0;
364 }
365 
366 static int
367 uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
368 {
369 	struct spdk_uring_sock *sock = __uring_sock(_sock);
370 	int rc;
371 
372 	assert(sock != NULL);
373 
374 	if (sz < MIN_SO_SNDBUF_SIZE) {
375 		sz = MIN_SO_SNDBUF_SIZE;
376 	}
377 
378 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
379 	if (rc < 0) {
380 		return rc;
381 	}
382 
383 	_sock->impl_opts.send_buf_size = sz;
384 
385 	return 0;
386 }
387 
388 static struct spdk_uring_sock *
389 uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
390 {
391 	struct spdk_uring_sock *sock;
392 #if defined(__linux__)
393 	int flag;
394 	int rc;
395 #endif
396 
397 	sock = calloc(1, sizeof(*sock));
398 	if (sock == NULL) {
399 		SPDK_ERRLOG("sock allocation failed\n");
400 		return NULL;
401 	}
402 
403 	sock->fd = fd;
404 	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
405 
406 #if defined(__linux__)
407 	flag = 1;
408 
409 	if (sock->base.impl_opts.enable_quickack) {
410 		rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
411 		if (rc != 0) {
412 			SPDK_ERRLOG("quickack was failed to set\n");
413 		}
414 	}
415 
416 	spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
417 				   &sock->placement_id);
418 #ifdef SPDK_ZEROCOPY
419 	/* Try to turn on zero copy sends */
420 	flag = 1;
421 
422 	if (enable_zero_copy) {
423 		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
424 		if (rc == 0) {
425 			sock->zcopy = true;
426 			sock->zcopy_send_flags = MSG_ZEROCOPY;
427 		}
428 	}
429 #endif
430 #endif
431 
432 	return sock;
433 }
434 
435 static struct spdk_sock *
436 uring_sock_create(const char *ip, int port,
437 		  enum uring_sock_create_type type,
438 		  struct spdk_sock_opts *opts)
439 {
440 	struct spdk_uring_sock *sock;
441 	struct spdk_sock_impl_opts impl_opts;
442 	char buf[MAX_TMPBUF];
443 	char portnum[PORTNUMLEN];
444 	char *p;
445 	struct addrinfo hints, *res, *res0;
446 	int fd, flag;
447 	int val = 1;
448 	int rc;
449 	bool enable_zcopy_impl_opts = false;
450 	bool enable_zcopy_user_opts = true;
451 
452 	assert(opts != NULL);
453 	uring_opts_get_impl_opts(opts, &impl_opts);
454 
455 	if (ip == NULL) {
456 		return NULL;
457 	}
458 	if (ip[0] == '[') {
459 		snprintf(buf, sizeof(buf), "%s", ip + 1);
460 		p = strchr(buf, ']');
461 		if (p != NULL) {
462 			*p = '\0';
463 		}
464 		ip = (const char *) &buf[0];
465 	}
466 
467 	snprintf(portnum, sizeof portnum, "%d", port);
468 	memset(&hints, 0, sizeof hints);
469 	hints.ai_family = PF_UNSPEC;
470 	hints.ai_socktype = SOCK_STREAM;
471 	hints.ai_flags = AI_NUMERICSERV;
472 	hints.ai_flags |= AI_PASSIVE;
473 	hints.ai_flags |= AI_NUMERICHOST;
474 	rc = getaddrinfo(ip, portnum, &hints, &res0);
475 	if (rc != 0) {
476 		SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
477 		return NULL;
478 	}
479 
480 	/* try listen */
481 	fd = -1;
482 	for (res = res0; res != NULL; res = res->ai_next) {
483 retry:
484 		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
485 		if (fd < 0) {
486 			/* error */
487 			continue;
488 		}
489 
490 		val = impl_opts.recv_buf_size;
491 		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
492 		if (rc) {
493 			/* Not fatal */
494 		}
495 
496 		val = impl_opts.send_buf_size;
497 		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
498 		if (rc) {
499 			/* Not fatal */
500 		}
501 
502 		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
503 		if (rc != 0) {
504 			close(fd);
505 			fd = -1;
506 			/* error */
507 			continue;
508 		}
509 		rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
510 		if (rc != 0) {
511 			close(fd);
512 			fd = -1;
513 			/* error */
514 			continue;
515 		}
516 
517 		if (opts->ack_timeout) {
518 #if defined(__linux__)
519 			val = opts->ack_timeout;
520 			rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val);
521 			if (rc != 0) {
522 				close(fd);
523 				fd = -1;
524 				/* error */
525 				continue;
526 			}
527 #else
528 			SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
529 #endif
530 		}
531 
532 
533 
534 #if defined(SO_PRIORITY)
535 		if (opts != NULL && opts->priority) {
536 			rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
537 			if (rc != 0) {
538 				close(fd);
539 				fd = -1;
540 				/* error */
541 				continue;
542 			}
543 		}
544 #endif
545 		if (res->ai_family == AF_INET6) {
546 			rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
547 			if (rc != 0) {
548 				close(fd);
549 				fd = -1;
550 				/* error */
551 				continue;
552 			}
553 		}
554 
555 		if (type == SPDK_SOCK_CREATE_LISTEN) {
556 			rc = bind(fd, res->ai_addr, res->ai_addrlen);
557 			if (rc != 0) {
558 				SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
559 				switch (errno) {
560 				case EINTR:
561 					/* interrupted? */
562 					close(fd);
563 					goto retry;
564 				case EADDRNOTAVAIL:
565 					SPDK_ERRLOG("IP address %s not available. "
566 						    "Verify IP address in config file "
567 						    "and make sure setup script is "
568 						    "run before starting spdk app.\n", ip);
569 				/* FALLTHROUGH */
570 				default:
571 					/* try next family */
572 					close(fd);
573 					fd = -1;
574 					continue;
575 				}
576 			}
577 			/* bind OK */
578 			rc = listen(fd, 512);
579 			if (rc != 0) {
580 				SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
581 				close(fd);
582 				fd = -1;
583 				break;
584 			}
585 
586 			flag = fcntl(fd, F_GETFL);
587 			if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
588 				SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
589 				close(fd);
590 				fd = -1;
591 				break;
592 			}
593 
594 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
595 		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
596 			rc = connect(fd, res->ai_addr, res->ai_addrlen);
597 			if (rc != 0) {
598 				SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
599 				/* try next family */
600 				close(fd);
601 				fd = -1;
602 				continue;
603 			}
604 
605 			flag = fcntl(fd, F_GETFL);
606 			if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
607 				SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
608 				close(fd);
609 				fd = -1;
610 				break;
611 			}
612 
613 			enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
614 		}
615 		break;
616 	}
617 	freeaddrinfo(res0);
618 
619 	if (fd < 0) {
620 		return NULL;
621 	}
622 
623 	enable_zcopy_user_opts = opts->zcopy && !sock_is_loopback(fd);
624 	sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
625 	if (sock == NULL) {
626 		SPDK_ERRLOG("sock allocation failed\n");
627 		close(fd);
628 		return NULL;
629 	}
630 
631 	return &sock->base;
632 }
633 
634 static struct spdk_sock *
635 uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
636 {
637 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
638 }
639 
640 static struct spdk_sock *
641 uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
642 {
643 	return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
644 }
645 
646 static struct spdk_sock *
647 uring_sock_accept(struct spdk_sock *_sock)
648 {
649 	struct spdk_uring_sock		*sock = __uring_sock(_sock);
650 	struct sockaddr_storage		sa;
651 	socklen_t			salen;
652 	int				rc, fd;
653 	struct spdk_uring_sock		*new_sock;
654 	int				flag;
655 
656 	memset(&sa, 0, sizeof(sa));
657 	salen = sizeof(sa);
658 
659 	assert(sock != NULL);
660 
661 	rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
662 
663 	if (rc == -1) {
664 		return NULL;
665 	}
666 
667 	fd = rc;
668 
669 	flag = fcntl(fd, F_GETFL);
670 	if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
671 		SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
672 		close(fd);
673 		return NULL;
674 	}
675 
676 #if defined(SO_PRIORITY)
677 	/* The priority is not inherited, so call this function again */
678 	if (sock->base.opts.priority) {
679 		rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
680 		if (rc != 0) {
681 			close(fd);
682 			return NULL;
683 		}
684 	}
685 #endif
686 
687 	new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
688 	if (new_sock == NULL) {
689 		close(fd);
690 		return NULL;
691 	}
692 
693 	return &new_sock->base;
694 }
695 
696 static int
697 uring_sock_close(struct spdk_sock *_sock)
698 {
699 	struct spdk_uring_sock *sock = __uring_sock(_sock);
700 
701 	assert(TAILQ_EMPTY(&_sock->pending_reqs));
702 	assert(sock->group == NULL);
703 
704 	/* If the socket fails to close, the best choice is to
705 	 * leak the fd but continue to free the rest of the sock
706 	 * memory. */
707 	close(sock->fd);
708 
709 	spdk_pipe_destroy(sock->recv_pipe);
710 	free(sock->recv_buf);
711 	free(sock);
712 
713 	return 0;
714 }
715 
716 static ssize_t
717 uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
718 {
719 	struct iovec siov[2];
720 	int sbytes;
721 	ssize_t bytes;
722 	struct spdk_uring_sock_group_impl *group;
723 
724 	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
725 	if (sbytes < 0) {
726 		errno = EINVAL;
727 		return -1;
728 	} else if (sbytes == 0) {
729 		errno = EAGAIN;
730 		return -1;
731 	}
732 
733 	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
734 
735 	if (bytes == 0) {
736 		/* The only way this happens is if diov is 0 length */
737 		errno = EINVAL;
738 		return -1;
739 	}
740 
741 	spdk_pipe_reader_advance(sock->recv_pipe, bytes);
742 
743 	/* If we drained the pipe, take it off the level-triggered list */
744 	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
745 		group = __uring_group_impl(sock->base.group_impl);
746 		TAILQ_REMOVE(&group->pending_recv, sock, link);
747 		sock->pending_recv = false;
748 	}
749 
750 	return bytes;
751 }
752 
753 static inline ssize_t
754 sock_readv(int fd, struct iovec *iov, int iovcnt)
755 {
756 	struct msghdr msg = {
757 		.msg_iov = iov,
758 		.msg_iovlen = iovcnt,
759 	};
760 
761 	return recvmsg(fd, &msg, MSG_DONTWAIT);
762 }
763 
764 static inline ssize_t
765 uring_sock_read(struct spdk_uring_sock *sock)
766 {
767 	struct iovec iov[2];
768 	int bytes;
769 	struct spdk_uring_sock_group_impl *group;
770 
771 	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
772 
773 	if (bytes > 0) {
774 		bytes = sock_readv(sock->fd, iov, 2);
775 		if (bytes > 0) {
776 			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
777 			if (sock->base.group_impl) {
778 				group = __uring_group_impl(sock->base.group_impl);
779 				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
780 				sock->pending_recv = true;
781 			}
782 		}
783 	}
784 
785 	return bytes;
786 }
787 
788 static ssize_t
789 uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
790 {
791 	struct spdk_uring_sock *sock = __uring_sock(_sock);
792 	int rc, i;
793 	size_t len;
794 
795 	if (sock->recv_pipe == NULL) {
796 		return sock_readv(sock->fd, iov, iovcnt);
797 	}
798 
799 	len = 0;
800 	for (i = 0; i < iovcnt; i++) {
801 		len += iov[i].iov_len;
802 	}
803 
804 	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
805 		/* If the user is receiving a sufficiently large amount of data,
806 		 * receive directly to their buffers. */
807 		if (len >= MIN_SOCK_PIPE_SIZE) {
808 			return sock_readv(sock->fd, iov, iovcnt);
809 		}
810 
811 		/* Otherwise, do a big read into our pipe */
812 		rc = uring_sock_read(sock);
813 		if (rc <= 0) {
814 			return rc;
815 		}
816 	}
817 
818 	return uring_sock_recv_from_pipe(sock, iov, iovcnt);
819 }
820 
821 static ssize_t
822 uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
823 {
824 	struct iovec iov[1];
825 
826 	iov[0].iov_base = buf;
827 	iov[0].iov_len = len;
828 
829 	return uring_sock_readv(sock, iov, 1);
830 }
831 
832 static ssize_t
833 uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
834 {
835 	struct spdk_uring_sock *sock = __uring_sock(_sock);
836 	struct msghdr msg = {
837 		.msg_iov = iov,
838 		.msg_iovlen = iovcnt,
839 	};
840 
841 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
842 		errno = EAGAIN;
843 		return -1;
844 	}
845 
846 	return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
847 }
848 
849 static ssize_t
850 sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc)
851 {
852 	unsigned int offset;
853 	size_t len;
854 	int i;
855 
856 	offset = req->internal.offset;
857 	for (i = 0; i < req->iovcnt; i++) {
858 		/* Advance by the offset first */
859 		if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
860 			offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
861 			continue;
862 		}
863 
864 		/* Calculate the remaining length of this element */
865 		len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
866 
867 		if (len > (size_t)rc) {
868 			req->internal.offset += rc;
869 			return -1;
870 		}
871 
872 		offset = 0;
873 		req->internal.offset += len;
874 		rc -= len;
875 	}
876 
877 	return rc;
878 }
879 
880 static int
881 sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
882 {
883 	struct spdk_uring_sock *sock = __uring_sock(_sock);
884 	struct spdk_sock_request *req;
885 	int retval;
886 
887 	if (is_zcopy) {
888 		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
889 		 * req->internal.offset, so sendmsg_idx should not be zero */
890 		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
891 			sock->sendmsg_idx = 1;
892 		} else {
893 			sock->sendmsg_idx++;
894 		}
895 	}
896 
897 	/* Consume the requests that were actually written */
898 	req = TAILQ_FIRST(&_sock->queued_reqs);
899 	while (req) {
900 		/* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
901 		req->internal.is_zcopy = is_zcopy;
902 
903 		rc = sock_request_advance_offset(req, rc);
904 		if (rc < 0) {
905 			/* This element was partially sent. */
906 			return 0;
907 		}
908 
909 		/* Handled a full request. */
910 		spdk_sock_request_pend(_sock, req);
911 
912 		if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
913 			retval = spdk_sock_request_put(_sock, req, 0);
914 			if (retval) {
915 				return retval;
916 			}
917 		} else {
918 			/* Re-use the offset field to hold the sendmsg call index. The
919 			 * index is 0 based, so subtract one here because we've already
920 			 * incremented above. */
921 			req->internal.offset = sock->sendmsg_idx - 1;
922 		}
923 
924 		if (rc == 0) {
925 			break;
926 		}
927 
928 		req = TAILQ_FIRST(&_sock->queued_reqs);
929 	}
930 
931 	return 0;
932 }
933 
934 #ifdef SPDK_ZEROCOPY
935 static int
936 _sock_check_zcopy(struct spdk_sock *_sock, int status)
937 {
938 	struct spdk_uring_sock *sock = __uring_sock(_sock);
939 	ssize_t rc;
940 	struct sock_extended_err *serr;
941 	struct cmsghdr *cm;
942 	uint32_t idx;
943 	struct spdk_sock_request *req, *treq;
944 	bool found;
945 
946 	assert(sock->zcopy == true);
947 	if (spdk_unlikely(status) < 0) {
948 		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
949 			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
950 				    status);
951 		} else {
952 			SPDK_WARNLOG("Recvmsg yielded an error!\n");
953 		}
954 		return 0;
955 	}
956 
957 	cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
958 	if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
959 	      (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) {
960 		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
961 		return 0;
962 	}
963 
964 	serr = (struct sock_extended_err *)CMSG_DATA(cm);
965 	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
966 		SPDK_WARNLOG("Unexpected extended error origin\n");
967 		return 0;
968 	}
969 
970 	/* Most of the time, the pending_reqs array is in the exact
971 	 * order we need such that all of the requests to complete are
972 	 * in order, in the front. It is guaranteed that all requests
973 	 * belonging to the same sendmsg call are sequential, so once
974 	 * we encounter one match we can stop looping as soon as a
975 	 * non-match is found.
976 	 */
977 	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
978 		found = false;
979 		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
980 			if (!req->internal.is_zcopy) {
981 				/* This wasn't a zcopy request. It was just waiting in line to complete */
982 				rc = spdk_sock_request_put(_sock, req, 0);
983 				if (rc < 0) {
984 					return rc;
985 				}
986 			} else if (req->internal.offset == idx) {
987 				found = true;
988 				rc = spdk_sock_request_put(_sock, req, 0);
989 				if (rc < 0) {
990 					return rc;
991 				}
992 			} else if (found) {
993 				break;
994 			}
995 		}
996 	}
997 
998 	return 0;
999 }
1000 
1001 static void
1002 _sock_prep_errqueue(struct spdk_sock *_sock)
1003 {
1004 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1005 	struct spdk_uring_task *task = &sock->errqueue_task;
1006 	struct io_uring_sqe *sqe;
1007 
1008 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1009 		return;
1010 	}
1011 
1012 	assert(sock->group != NULL);
1013 	sock->group->io_queued++;
1014 
1015 	sqe = io_uring_get_sqe(&sock->group->uring);
1016 	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
1017 	io_uring_sqe_set_data(sqe, task);
1018 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1019 }
1020 
1021 #endif
1022 
1023 static void
1024 _sock_flush(struct spdk_sock *_sock)
1025 {
1026 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1027 	struct spdk_uring_task *task = &sock->write_task;
1028 	uint32_t iovcnt;
1029 	struct io_uring_sqe *sqe;
1030 	int flags;
1031 
1032 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1033 		return;
1034 	}
1035 
1036 #ifdef SPDK_ZEROCOPY
1037 	if (sock->zcopy) {
1038 		flags = MSG_DONTWAIT | sock->zcopy_send_flags;
1039 	} else
1040 #endif
1041 	{
1042 		flags = MSG_DONTWAIT;
1043 	}
1044 
1045 	iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
1046 	if (!iovcnt) {
1047 		return;
1048 	}
1049 
1050 	task->iov_cnt = iovcnt;
1051 	assert(sock->group != NULL);
1052 	task->msg.msg_iov = task->iovs;
1053 	task->msg.msg_iovlen = task->iov_cnt;
1054 #ifdef SPDK_ZEROCOPY
1055 	task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
1056 #endif
1057 	sock->group->io_queued++;
1058 
1059 	sqe = io_uring_get_sqe(&sock->group->uring);
1060 	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
1061 	io_uring_sqe_set_data(sqe, task);
1062 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1063 }
1064 
1065 static void
1066 _sock_prep_pollin(struct spdk_sock *_sock)
1067 {
1068 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1069 	struct spdk_uring_task *task = &sock->pollin_task;
1070 	struct io_uring_sqe *sqe;
1071 
1072 	/* Do not prepare pollin event */
1073 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) {
1074 		return;
1075 	}
1076 
1077 	assert(sock->group != NULL);
1078 	sock->group->io_queued++;
1079 
1080 	sqe = io_uring_get_sqe(&sock->group->uring);
1081 	io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR);
1082 	io_uring_sqe_set_data(sqe, task);
1083 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1084 }
1085 
1086 static void
1087 _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
1088 {
1089 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1090 	struct spdk_uring_task *task = &sock->cancel_task;
1091 	struct io_uring_sqe *sqe;
1092 
1093 	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
1094 		return;
1095 	}
1096 
1097 	assert(sock->group != NULL);
1098 	sock->group->io_queued++;
1099 
1100 	sqe = io_uring_get_sqe(&sock->group->uring);
1101 	io_uring_prep_cancel(sqe, user_data, 0);
1102 	io_uring_sqe_set_data(sqe, task);
1103 	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
1104 }
1105 
1106 static int
1107 sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
1108 		      struct spdk_sock **socks)
1109 {
1110 	int i, count, ret;
1111 	struct io_uring_cqe *cqe;
1112 	struct spdk_uring_sock *sock, *tmp;
1113 	struct spdk_uring_task *task;
1114 	int status;
1115 
1116 	for (i = 0; i < max; i++) {
1117 		ret = io_uring_peek_cqe(&group->uring, &cqe);
1118 		if (ret != 0) {
1119 			break;
1120 		}
1121 
1122 		if (cqe == NULL) {
1123 			break;
1124 		}
1125 
1126 		task = (struct spdk_uring_task *)cqe->user_data;
1127 		assert(task != NULL);
1128 		sock = task->sock;
1129 		assert(sock != NULL);
1130 		assert(sock->group != NULL);
1131 		assert(sock->group == group);
1132 		sock->group->io_inflight--;
1133 		sock->group->io_avail++;
1134 		status = cqe->res;
1135 		io_uring_cqe_seen(&group->uring, cqe);
1136 
1137 		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
1138 
1139 		if (spdk_unlikely(status <= 0)) {
1140 			if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) {
1141 				continue;
1142 			}
1143 		}
1144 
1145 		switch (task->type) {
1146 		case SPDK_SOCK_TASK_POLLIN:
1147 #ifdef SPDK_ZEROCOPY
1148 			if ((status & POLLERR) == POLLERR) {
1149 				_sock_prep_errqueue(&sock->base);
1150 			}
1151 #endif
1152 			if ((status & POLLIN) == POLLIN) {
1153 				if (sock->base.cb_fn != NULL &&
1154 				    sock->pending_recv == false) {
1155 					sock->pending_recv = true;
1156 					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1157 				}
1158 			}
1159 			break;
1160 		case SPDK_SOCK_TASK_WRITE:
1161 			task->last_req = NULL;
1162 			task->iov_cnt = 0;
1163 			task->is_zcopy = false;
1164 			if (spdk_unlikely(status) < 0) {
1165 				sock->connection_status = status;
1166 				spdk_sock_abort_requests(&sock->base);
1167 			} else {
1168 				sock_complete_write_reqs(&sock->base, status, task->is_zcopy);
1169 			}
1170 
1171 			break;
1172 #ifdef SPDK_ZEROCOPY
1173 		case SPDK_SOCK_TASK_ERRQUEUE:
1174 			if (spdk_unlikely(status == -ECANCELED)) {
1175 				sock->connection_status = status;
1176 				break;
1177 			}
1178 			_sock_check_zcopy(&sock->base, status);
1179 			break;
1180 #endif
1181 		case SPDK_SOCK_TASK_CANCEL:
1182 			/* Do nothing */
1183 			break;
1184 		default:
1185 			SPDK_UNREACHABLE();
1186 		}
1187 	}
1188 
1189 	if (!socks) {
1190 		return 0;
1191 	}
1192 	count = 0;
1193 	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
1194 		if (count == max_read_events) {
1195 			break;
1196 		}
1197 
1198 		if (spdk_unlikely(sock->base.cb_fn == NULL) ||
1199 		    (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0)) {
1200 			sock->pending_recv = false;
1201 			TAILQ_REMOVE(&group->pending_recv, sock, link);
1202 			if (spdk_unlikely(sock->base.cb_fn == NULL)) {
1203 				/* If the socket's cb_fn is NULL, do not add it to socks array */
1204 				continue;
1205 			}
1206 		}
1207 
1208 		socks[count++] = &sock->base;
1209 	}
1210 
1211 
1212 	/* Cycle the pending_recv list so that each time we poll things aren't
1213 	 * in the same order. Say we have 6 sockets in the list, named as follows:
1214 	 * A B C D E F
1215 	 * And all 6 sockets had the poll events, but max_events is only 3. That means
1216 	 * psock currently points at D. We want to rearrange the list to the following:
1217 	 * D E F A B C
1218 	 *
1219 	 * The variables below are named according to this example to make it easier to
1220 	 * follow the swaps.
1221 	 */
1222 	if (sock != NULL) {
1223 		struct spdk_uring_sock *ua, *uc, *ud, *uf;
1224 
1225 		/* Capture pointers to the elements we need */
1226 		ud = sock;
1227 
1228 		ua = TAILQ_FIRST(&group->pending_recv);
1229 		if (ua == ud) {
1230 			goto end;
1231 		}
1232 
1233 		uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
1234 		if (uf == ud) {
1235 			TAILQ_REMOVE(&group->pending_recv, ud, link);
1236 			TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
1237 			goto end;
1238 		}
1239 
1240 		uc = TAILQ_PREV(ud, pending_recv_list, link);
1241 		assert(uc != NULL);
1242 
1243 		/* Break the link between C and D */
1244 		uc->link.tqe_next = NULL;
1245 
1246 		/* Connect F to A */
1247 		uf->link.tqe_next = ua;
1248 		ua->link.tqe_prev = &uf->link.tqe_next;
1249 
1250 		/* Fix up the list first/last pointers */
1251 		group->pending_recv.tqh_first = ud;
1252 		group->pending_recv.tqh_last = &uc->link.tqe_next;
1253 
1254 		/* D is in front of the list, make tqe prev pointer point to the head of list */
1255 		ud->link.tqe_prev = &group->pending_recv.tqh_first;
1256 	}
1257 
1258 end:
1259 	return count;
1260 }
1261 
1262 static int
1263 _sock_flush_client(struct spdk_sock *_sock)
1264 {
1265 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1266 	struct msghdr msg = {};
1267 	struct iovec iovs[IOV_BATCH_SIZE];
1268 	int iovcnt;
1269 	ssize_t rc;
1270 	int flags = sock->zcopy_send_flags;
1271 	int retval;
1272 	bool is_zcopy = false;
1273 
1274 	/* Can't flush from within a callback or we end up with recursive calls */
1275 	if (_sock->cb_cnt > 0) {
1276 		return 0;
1277 	}
1278 
1279 	/* Gather an iov */
1280 	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
1281 	if (iovcnt == 0) {
1282 		return 0;
1283 	}
1284 
1285 	/* Perform the vectored write */
1286 	msg.msg_iov = iovs;
1287 	msg.msg_iovlen = iovcnt;
1288 	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
1289 	if (rc <= 0) {
1290 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1291 			return 0;
1292 		}
1293 		return rc;
1294 	}
1295 
1296 #ifdef SPDK_ZEROCOPY
1297 	is_zcopy = flags & MSG_ZEROCOPY;
1298 #endif
1299 	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
1300 	if (retval < 0) {
1301 		/* if the socket is closed, return to avoid heap-use-after-free error */
1302 		return retval;
1303 	}
1304 
1305 #ifdef SPDK_ZEROCOPY
1306 	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
1307 		_sock_check_zcopy(_sock, 0);
1308 	}
1309 #endif
1310 
1311 	return 0;
1312 }
1313 
1314 static void
1315 uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
1316 {
1317 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1318 	int rc;
1319 
1320 	if (spdk_unlikely(sock->connection_status)) {
1321 		req->cb_fn(req->cb_arg, sock->connection_status);
1322 		return;
1323 	}
1324 
1325 	spdk_sock_request_queue(_sock, req);
1326 
1327 	if (!sock->group) {
1328 		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
1329 			rc = _sock_flush_client(_sock);
1330 			if (rc) {
1331 				spdk_sock_abort_requests(_sock);
1332 			}
1333 		}
1334 	}
1335 }
1336 
1337 static void
1338 uring_sock_readv_async(struct spdk_sock *sock, struct spdk_sock_request *req)
1339 {
1340 	req->cb_fn(req->cb_arg, -ENOTSUP);
1341 }
1342 
1343 static int
1344 uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1345 {
1346 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1347 	int val;
1348 	int rc;
1349 
1350 	assert(sock != NULL);
1351 
1352 	val = nbytes;
1353 	rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1354 	if (rc != 0) {
1355 		return -1;
1356 	}
1357 	return 0;
1358 }
1359 
1360 static bool
1361 uring_sock_is_ipv6(struct spdk_sock *_sock)
1362 {
1363 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1364 	struct sockaddr_storage sa;
1365 	socklen_t salen;
1366 	int rc;
1367 
1368 	assert(sock != NULL);
1369 
1370 	memset(&sa, 0, sizeof sa);
1371 	salen = sizeof sa;
1372 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1373 	if (rc != 0) {
1374 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1375 		return false;
1376 	}
1377 
1378 	return (sa.ss_family == AF_INET6);
1379 }
1380 
1381 static bool
1382 uring_sock_is_ipv4(struct spdk_sock *_sock)
1383 {
1384 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1385 	struct sockaddr_storage sa;
1386 	socklen_t salen;
1387 	int rc;
1388 
1389 	assert(sock != NULL);
1390 
1391 	memset(&sa, 0, sizeof sa);
1392 	salen = sizeof sa;
1393 	rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1394 	if (rc != 0) {
1395 		SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1396 		return false;
1397 	}
1398 
1399 	return (sa.ss_family == AF_INET);
1400 }
1401 
1402 static bool
1403 uring_sock_is_connected(struct spdk_sock *_sock)
1404 {
1405 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1406 	uint8_t byte;
1407 	int rc;
1408 
1409 	rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
1410 	if (rc == 0) {
1411 		return false;
1412 	}
1413 
1414 	if (rc < 0) {
1415 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1416 			return true;
1417 		}
1418 
1419 		return false;
1420 	}
1421 
1422 	return true;
1423 }
1424 
1425 static struct spdk_sock_group_impl *
1426 uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
1427 {
1428 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1429 	struct spdk_sock_group_impl *group;
1430 
1431 	if (sock->placement_id != -1) {
1432 		spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
1433 		return group;
1434 	}
1435 
1436 	return NULL;
1437 }
1438 
1439 static struct spdk_sock_group_impl *
1440 uring_sock_group_impl_create(void)
1441 {
1442 	struct spdk_uring_sock_group_impl *group_impl;
1443 
1444 	group_impl = calloc(1, sizeof(*group_impl));
1445 	if (group_impl == NULL) {
1446 		SPDK_ERRLOG("group_impl allocation failed\n");
1447 		return NULL;
1448 	}
1449 
1450 	group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
1451 
1452 	if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
1453 		SPDK_ERRLOG("uring I/O context setup failure\n");
1454 		free(group_impl);
1455 		return NULL;
1456 	}
1457 
1458 	TAILQ_INIT(&group_impl->pending_recv);
1459 
1460 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1461 		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
1462 	}
1463 
1464 	return &group_impl->base;
1465 }
1466 
1467 static int
1468 uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
1469 			       struct spdk_sock *_sock)
1470 {
1471 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1472 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1473 	int rc;
1474 
1475 	sock->group = group;
1476 	sock->write_task.sock = sock;
1477 	sock->write_task.type = SPDK_SOCK_TASK_WRITE;
1478 
1479 	sock->pollin_task.sock = sock;
1480 	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
1481 
1482 	sock->errqueue_task.sock = sock;
1483 	sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE;
1484 	sock->errqueue_task.msg.msg_control = sock->buf;
1485 	sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
1486 
1487 	sock->cancel_task.sock = sock;
1488 	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
1489 
1490 	/* switched from another polling group due to scheduling */
1491 	if (spdk_unlikely(sock->recv_pipe != NULL &&
1492 			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1493 		assert(sock->pending_recv == false);
1494 		sock->pending_recv = true;
1495 		TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1496 	}
1497 
1498 	if (sock->placement_id != -1) {
1499 		rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
1500 		if (rc != 0) {
1501 			SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
1502 			/* Do not treat this as an error. The system will continue running. */
1503 		}
1504 	}
1505 
1506 	return 0;
1507 }
1508 
1509 static int
1510 uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1511 			   struct spdk_sock **socks)
1512 {
1513 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1514 	int count, ret;
1515 	int to_complete, to_submit;
1516 	struct spdk_sock *_sock, *tmp;
1517 	struct spdk_uring_sock *sock;
1518 
1519 	if (spdk_likely(socks)) {
1520 		TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
1521 			sock = __uring_sock(_sock);
1522 			if (spdk_unlikely(sock->connection_status)) {
1523 				continue;
1524 			}
1525 			_sock_flush(_sock);
1526 			_sock_prep_pollin(_sock);
1527 		}
1528 	}
1529 
1530 	to_submit = group->io_queued;
1531 
1532 	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
1533 	if (to_submit > 0) {
1534 		/* If there are I/O to submit, use io_uring_submit here.
1535 		 * It will automatically call io_uring_enter appropriately. */
1536 		ret = io_uring_submit(&group->uring);
1537 		if (ret < 0) {
1538 			return 1;
1539 		}
1540 		group->io_queued = 0;
1541 		group->io_inflight += to_submit;
1542 		group->io_avail -= to_submit;
1543 	}
1544 
1545 	count = 0;
1546 	to_complete = group->io_inflight;
1547 	if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
1548 		count = sock_uring_group_reap(group, to_complete, max_events, socks);
1549 	}
1550 
1551 	return count;
1552 }
1553 
1554 static int
1555 uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
1556 				  struct spdk_sock *_sock)
1557 {
1558 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1559 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1560 
1561 	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1562 		_sock_prep_cancel_task(_sock, &sock->write_task);
1563 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1564 		 * currently can use a while loop here. */
1565 		while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1566 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1567 			uring_sock_group_impl_poll(_group, 32, NULL);
1568 		}
1569 	}
1570 
1571 	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1572 		_sock_prep_cancel_task(_sock, &sock->pollin_task);
1573 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1574 		 * currently can use a while loop here. */
1575 		while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1576 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1577 			uring_sock_group_impl_poll(_group, 32, NULL);
1578 		}
1579 	}
1580 
1581 	if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
1582 		_sock_prep_cancel_task(_sock, &sock->errqueue_task);
1583 		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
1584 		 * currently can use a while loop here. */
1585 		while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
1586 		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
1587 			uring_sock_group_impl_poll(_group, 32, NULL);
1588 		}
1589 	}
1590 
1591 	/* Make sure the cancelling the tasks above didn't cause sending new requests */
1592 	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1593 	assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1594 	assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
1595 
1596 	if (sock->pending_recv) {
1597 		TAILQ_REMOVE(&group->pending_recv, sock, link);
1598 		sock->pending_recv = false;
1599 	}
1600 	assert(sock->pending_recv == false);
1601 
1602 	if (sock->placement_id != -1) {
1603 		spdk_sock_map_release(&g_map, sock->placement_id);
1604 	}
1605 
1606 	sock->group = NULL;
1607 	return 0;
1608 }
1609 
1610 static int
1611 uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1612 {
1613 	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
1614 
1615 	/* try to reap all the active I/O */
1616 	while (group->io_inflight) {
1617 		uring_sock_group_impl_poll(_group, 32, NULL);
1618 	}
1619 	assert(group->io_inflight == 0);
1620 	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
1621 
1622 	io_uring_queue_exit(&group->uring);
1623 
1624 	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
1625 		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
1626 	}
1627 
1628 	free(group);
1629 	return 0;
1630 }
1631 
1632 static int
1633 uring_sock_flush(struct spdk_sock *_sock)
1634 {
1635 	struct spdk_uring_sock *sock = __uring_sock(_sock);
1636 
1637 	if (!sock->group) {
1638 		return _sock_flush_client(_sock);
1639 	}
1640 
1641 	return 0;
1642 }
1643 
1644 static struct spdk_net_impl g_uring_net_impl = {
1645 	.name		= "uring",
1646 	.getaddr	= uring_sock_getaddr,
1647 	.connect	= uring_sock_connect,
1648 	.listen		= uring_sock_listen,
1649 	.accept		= uring_sock_accept,
1650 	.close		= uring_sock_close,
1651 	.recv		= uring_sock_recv,
1652 	.readv		= uring_sock_readv,
1653 	.readv_async	= uring_sock_readv_async,
1654 	.writev		= uring_sock_writev,
1655 	.writev_async	= uring_sock_writev_async,
1656 	.flush          = uring_sock_flush,
1657 	.set_recvlowat	= uring_sock_set_recvlowat,
1658 	.set_recvbuf	= uring_sock_set_recvbuf,
1659 	.set_sendbuf	= uring_sock_set_sendbuf,
1660 	.is_ipv6	= uring_sock_is_ipv6,
1661 	.is_ipv4	= uring_sock_is_ipv4,
1662 	.is_connected   = uring_sock_is_connected,
1663 	.group_impl_get_optimal	= uring_sock_group_impl_get_optimal,
1664 	.group_impl_create	= uring_sock_group_impl_create,
1665 	.group_impl_add_sock	= uring_sock_group_impl_add_sock,
1666 	.group_impl_remove_sock = uring_sock_group_impl_remove_sock,
1667 	.group_impl_poll	= uring_sock_group_impl_poll,
1668 	.group_impl_close	= uring_sock_group_impl_close,
1669 	.get_opts		= uring_sock_impl_get_opts,
1670 	.set_opts		= uring_sock_impl_set_opts,
1671 };
1672 
1673 SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
1674