xref: /spdk/module/fsdev/aio/aio_mgr.c (revision 8afdeef3becfe9409cc9e7372bd0bc10e8b7d46d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  */
4 #include "spdk/stdinc.h"
5 #include "spdk/util.h"
6 #include "spdk/log.h"
7 #include "aio_mgr.h"
8 
9 #define REQS_PER_AIO 10
10 
11 struct spdk_aio_mgr_req {
12 	struct aiocb io;
13 	TAILQ_ENTRY(spdk_aio_mgr_req) link;
14 };
15 
16 struct spdk_aio_mgr_io {
17 	TAILQ_ENTRY(spdk_aio_mgr_io) link;
18 	TAILQ_HEAD(, spdk_aio_mgr_req) reqs;
19 	struct spdk_aio_mgr *mgr;
20 	fsdev_aio_done_cb clb;
21 	void *ctx;
22 	uint32_t data_size;
23 	int err;
24 };
25 
26 struct spdk_aio_mgr {
27 	TAILQ_HEAD(, spdk_aio_mgr_io) in_flight;
28 	struct {
29 		struct spdk_aio_mgr_req *arr;
30 		uint32_t size;
31 		TAILQ_HEAD(, spdk_aio_mgr_req) pool;
32 	} reqs;
33 	struct {
34 		struct spdk_aio_mgr_io *arr;
35 		uint32_t size;
36 		TAILQ_HEAD(, spdk_aio_mgr_io) pool;
37 	} aios;
38 };
39 
40 static inline struct spdk_aio_mgr_req *
41 aio_mgr_get_aio_req(struct spdk_aio_mgr *mgr)
42 {
43 	struct spdk_aio_mgr_req *req = TAILQ_FIRST(&mgr->reqs.pool);
44 
45 	if (req) {
46 		TAILQ_REMOVE(&mgr->reqs.pool, req, link);
47 	}
48 
49 	return req;
50 }
51 
52 static inline void
53 aio_mgr_put_aio_req(struct spdk_aio_mgr *mgr, struct spdk_aio_mgr_req *req)
54 {
55 	TAILQ_INSERT_TAIL(&mgr->reqs.pool, req, link);
56 }
57 
58 static uint32_t
59 fsdev_aio_submit(struct spdk_aio_mgr_io *aio, int fd, uint64_t offs, uint32_t size,
60 		 struct iovec *iovs,
61 		 uint32_t iovcnt, int (*aio_submit_func)(struct aiocb *), const char *aio_type)
62 {
63 	uint32_t bytes_handled = 0;
64 	uint32_t iov_idx = 0;
65 
66 	assert(aio->mgr);
67 	assert(!aio->err);
68 	assert(iovs != NULL);
69 	assert(iovcnt != 0);
70 
71 	while (size && iov_idx < iovcnt) {
72 		struct spdk_aio_mgr_req *req;
73 		struct iovec *iov = &iovs[iov_idx];
74 		size_t ho_handle = spdk_min(iov->iov_len, size);
75 
76 		req = aio_mgr_get_aio_req(aio->mgr);
77 		if (!req) {
78 			SPDK_ERRLOG("cannot get aio req\n");
79 			aio->err = EINVAL;
80 			break;
81 		}
82 
83 		memset(&req->io, 0, sizeof(req->io));
84 
85 		req->io.aio_nbytes = ho_handle;
86 		req->io.aio_buf = iov->iov_base;
87 		req->io.aio_offset = offs + bytes_handled;
88 		req->io.aio_fildes = fd;
89 
90 		SPDK_DEBUGLOG(spdk_aio_mgr_io,
91 			      "aio to %s: aio=%p req=%p aio_nbytes=%zu aio_buf=%p aio_offset=%" PRIu64 " aio_fildes=%d\n",
92 			      aio_type, aio, req, req->io.aio_nbytes, req->io.aio_buf, (uint64_t)req->io.aio_offset,
93 			      req->io.aio_fildes);
94 
95 		if (aio_submit_func(&req->io)) {
96 			aio->err = errno;
97 			SPDK_ERRLOG("aio_%s of io[%" PRIu32 "] at offset %" PRIu64 " failed with err=%d\n",
98 				    aio_type, iov_idx, offs, aio->err);
99 			aio_mgr_put_aio_req(aio->mgr, req);
100 			break;
101 		}
102 
103 		TAILQ_INSERT_TAIL(&aio->reqs, req, link);
104 
105 		bytes_handled += ho_handle;
106 		size -= ho_handle;
107 
108 		iov_idx++;
109 	}
110 
111 	return bytes_handled;
112 }
113 
114 static void
115 fsdev_aio_cancel(struct spdk_aio_mgr_io *aio)
116 {
117 	struct spdk_aio_mgr_req *req;
118 	TAILQ_FOREACH(req, &aio->reqs, link) {
119 		aio_cancel(req->io.aio_fildes, &req->io);
120 	}
121 }
122 
123 static struct spdk_aio_mgr_io *
124 aio_mgr_get_aio(struct spdk_aio_mgr *mgr, fsdev_aio_done_cb clb, void *ctx)
125 {
126 	struct spdk_aio_mgr_io *aio = TAILQ_FIRST(&mgr->aios.pool);
127 
128 	if (aio) {
129 		aio->mgr = mgr;
130 		aio->clb = clb;
131 		aio->ctx = ctx;
132 		aio->err = 0;
133 		aio->data_size = 0;
134 		TAILQ_INIT(&aio->reqs);
135 		TAILQ_REMOVE(&mgr->aios.pool, aio, link);
136 	}
137 
138 	return aio;
139 }
140 
141 static inline void
142 aio_mgr_put_aio(struct spdk_aio_mgr *mgr, struct spdk_aio_mgr_io *aio)
143 {
144 	TAILQ_INSERT_TAIL(&aio->mgr->aios.pool, aio, link);
145 }
146 
147 static struct spdk_aio_mgr_io *
148 spdk_aio_mgr_submit_io(struct spdk_aio_mgr *mgr, fsdev_aio_done_cb clb, void *ctx,
149 		       int fd, uint64_t offs, uint32_t size, struct iovec *iovs, uint32_t iovcnt,
150 		       int (*aio_submit_func)(struct aiocb *), const char *aio_type)
151 {
152 	struct spdk_aio_mgr_io *aio;
153 	uint32_t bytes_handled;
154 
155 	SPDK_DEBUGLOG(spdk_aio_mgr_io, "%s: fd=%d offs=%" PRIu64 " size=%" PRIu32 " iovcnt=%" PRIu32 "\n",
156 		      aio_type, fd, offs, size, iovcnt);
157 
158 	aio = aio_mgr_get_aio(mgr, clb, ctx);
159 	if (!aio) {
160 		SPDK_ERRLOG("Cannot get aio\n");
161 		clb(ctx, 0, EFAULT);
162 		return NULL;
163 	}
164 
165 	bytes_handled = fsdev_aio_submit(aio, fd, offs, size, iovs, iovcnt, aio_submit_func, aio_type);
166 	SPDK_DEBUGLOG(spdk_aio_mgr_io, "%s: aio=%p: handled %" PRIu32 " bytes\n", aio_type, aio,
167 		      bytes_handled);
168 	if (bytes_handled) {
169 		TAILQ_INSERT_TAIL(&aio->mgr->in_flight, aio, link);
170 		return aio;
171 	} else {
172 		aio->clb(aio->ctx, 0, aio->err);
173 		aio_mgr_put_aio(mgr, aio);
174 		return NULL;
175 	}
176 }
177 
178 struct spdk_aio_mgr *
179 spdk_aio_mgr_create(uint32_t max_aios)
180 {
181 	struct spdk_aio_mgr *mgr;
182 	uint32_t i;
183 
184 	mgr = calloc(1, sizeof(*mgr));
185 	if (!mgr) {
186 		SPDK_ERRLOG("cannot alloc mgr of %zu bytes\n", sizeof(*mgr));
187 		return NULL;
188 	}
189 
190 	mgr->reqs.arr = calloc(max_aios * REQS_PER_AIO, sizeof(mgr->reqs.arr[0]));
191 	if (!mgr->reqs.arr) {
192 		SPDK_ERRLOG("cannot alloc req pool of %" PRIu32 " * %d\n", max_aios, REQS_PER_AIO);
193 		free(mgr);
194 		return NULL;
195 	}
196 
197 	mgr->aios.arr = calloc(max_aios, sizeof(mgr->aios.arr[0]));
198 	if (!mgr->aios.arr) {
199 		SPDK_ERRLOG("cannot alloc aios pool of %" PRIu32 "\n", max_aios);
200 		free(mgr->reqs.arr);
201 		free(mgr);
202 		return NULL;
203 	}
204 
205 	TAILQ_INIT(&mgr->in_flight);
206 	TAILQ_INIT(&mgr->reqs.pool);
207 	TAILQ_INIT(&mgr->aios.pool);
208 
209 	for (i = 0; i < max_aios * REQS_PER_AIO; i++) {
210 		TAILQ_INSERT_TAIL(&mgr->reqs.pool, &mgr->reqs.arr[i], link);
211 	}
212 
213 	for (i = 0; i < max_aios; i++) {
214 		TAILQ_INSERT_TAIL(&mgr->aios.pool, &mgr->aios.arr[i], link);
215 	}
216 
217 	return mgr;
218 }
219 
220 struct spdk_aio_mgr_io *
221 spdk_aio_mgr_read(struct spdk_aio_mgr *mgr, fsdev_aio_done_cb clb, void *ctx,
222 		  int fd, uint64_t offs, uint32_t size, struct iovec *iovs, uint32_t iovcnt)
223 {
224 	return spdk_aio_mgr_submit_io(mgr, clb, ctx, fd, offs, size, iovs, iovcnt, aio_read, "read");
225 }
226 
227 struct spdk_aio_mgr_io *
228 spdk_aio_mgr_write(struct spdk_aio_mgr *mgr, fsdev_aio_done_cb clb, void *ctx,
229 		   int fd, uint64_t offs, uint32_t size, const struct iovec *iovs, uint32_t iovcnt)
230 {
231 	return spdk_aio_mgr_submit_io(mgr, clb, ctx, fd, offs, size, (struct iovec *)iovs, iovcnt,
232 				      aio_write, "write");
233 }
234 
235 
236 void
237 spdk_aio_mgr_cancel(struct spdk_aio_mgr *mgr, struct spdk_aio_mgr_io *aio)
238 {
239 	assert(mgr == aio->mgr);
240 
241 	SPDK_DEBUGLOG(spdk_aio_mgr_io, "aio=%p cancelled\n", aio);
242 	fsdev_aio_cancel(aio);
243 }
244 
245 void
246 spdk_aio_mgr_poll(struct spdk_aio_mgr *mgr)
247 {
248 	struct spdk_aio_mgr_io *aio, *tmp_aio;
249 	TAILQ_FOREACH_SAFE(aio, &mgr->in_flight, link, tmp_aio) {
250 		struct spdk_aio_mgr_req *req, *tmp_req;
251 		TAILQ_FOREACH_SAFE(req, &aio->reqs, link, tmp_req) {
252 			ssize_t ret;
253 			int err = aio_error(&req->io);
254 			if (err == EINPROGRESS) { /* the request has not been completed yet */
255 				break; /* stop checking completions for this aio */
256 			}
257 
258 			if (!err) { /* the request completed successfull */
259 				;
260 			} else if (err == ECANCELED) { /* the request was canceled */
261 				SPDK_WARNLOG("aio processing was cancelled\n");
262 				aio->err = EAGAIN;
263 			} else {
264 				SPDK_ERRLOG("aio processing failed with err=%d\n", err);
265 				aio->err = err;
266 			}
267 
268 			ret = aio_return(&req->io);
269 			if (ret > 0) {
270 				aio->data_size += ret;
271 			}
272 
273 			SPDK_DEBUGLOG(spdk_aio_mgr_io, "aio completed: aio=%p req=%p err=%d ret=%zd\n", aio, req, err, ret);
274 
275 			/* the request processing is done */
276 			TAILQ_REMOVE(&aio->reqs, req, link); /* remove the req from the aio */
277 			TAILQ_INSERT_TAIL(&mgr->reqs.pool, req, link); /* return the req to the pool */
278 			if (TAILQ_EMPTY(&aio->reqs)) { /* all the aio's requests have been processed */
279 				SPDK_DEBUGLOG(spdk_aio_mgr_io, "aio=%p is done (data_size=%" PRIu32 ")\n", aio, aio->data_size);
280 				aio->clb(aio->ctx, aio->data_size, aio->err); /* call the user's callback */
281 				TAILQ_REMOVE(&mgr->in_flight, aio, link); /* remove the aio from the in_flight aios list */
282 				TAILQ_INSERT_TAIL(&mgr->aios.pool, aio, link); /* return the aio to the pool */
283 			}
284 		}
285 	}
286 }
287 
288 void
289 spdk_aio_mgr_delete(struct spdk_aio_mgr *mgr)
290 {
291 	assert(TAILQ_EMPTY(&mgr->in_flight));
292 	free(mgr->aios.arr);
293 	free(mgr->reqs.arr);
294 	free(mgr);
295 }
296 
297 SPDK_LOG_REGISTER_COMPONENT(spdk_aio_mgr_io)
298