xref: /spdk/module/bdev/aio/bdev_aio.c (revision c164db9ffe3718ad4e4f5bab380ccfa62c2fa672)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_aio.h"
8 
9 #include "spdk/stdinc.h"
10 
11 #include "spdk/barrier.h"
12 #include "spdk/bdev.h"
13 #include "spdk/bdev_module.h"
14 #include "spdk/env.h"
15 #include "spdk/fd.h"
16 #include "spdk/likely.h"
17 #include "spdk/thread.h"
18 #include "spdk/json.h"
19 #include "spdk/util.h"
20 #include "spdk/string.h"
21 
22 #include "spdk/log.h"
23 
24 #include <sys/eventfd.h>
25 
26 #ifndef __FreeBSD__
27 #include <libaio.h>
28 #endif
29 
30 struct bdev_aio_io_channel {
31 	uint64_t				io_inflight;
32 #ifdef __FreeBSD__
33 	int					kqfd;
34 #else
35 	io_context_t				io_ctx;
36 #endif
37 	struct bdev_aio_group_channel		*group_ch;
38 	TAILQ_ENTRY(bdev_aio_io_channel)	link;
39 };
40 
41 struct bdev_aio_group_channel {
42 	/* eventfd for io completion notification in interrupt mode.
43 	 * Negative value like '-1' indicates it is invalid or unused.
44 	 */
45 	int					efd;
46 	struct spdk_interrupt			*intr;
47 	struct spdk_poller			*poller;
48 	TAILQ_HEAD(, bdev_aio_io_channel)	io_ch_head;
49 };
50 
51 struct bdev_aio_task {
52 #ifdef __FreeBSD__
53 	struct aiocb			aiocb;
54 #else
55 	struct iocb			iocb;
56 #endif
57 	uint64_t			len;
58 	struct bdev_aio_io_channel	*ch;
59 };
60 
61 struct file_disk {
62 	struct bdev_aio_task	*reset_task;
63 	struct spdk_poller	*reset_retry_timer;
64 	struct spdk_bdev	disk;
65 	char			*filename;
66 	int			fd;
67 #ifdef RWF_NOWAIT
68 	bool			use_nowait;
69 #endif
70 	TAILQ_ENTRY(file_disk)  link;
71 	bool			block_size_override;
72 	bool			readonly;
73 	bool			fallocate;
74 };
75 
76 /* For user space reaping of completions */
77 struct spdk_aio_ring {
78 	uint32_t id;
79 	uint32_t size;
80 	uint32_t head;
81 	uint32_t tail;
82 
83 	uint32_t version;
84 	uint32_t compat_features;
85 	uint32_t incompat_features;
86 	uint32_t header_length;
87 };
88 
89 #define SPDK_AIO_RING_VERSION	0xa10a10a1
90 
91 static int bdev_aio_initialize(void);
92 static void bdev_aio_fini(void);
93 static void aio_free_disk(struct file_disk *fdisk);
94 static TAILQ_HEAD(, file_disk) g_aio_disk_head = TAILQ_HEAD_INITIALIZER(g_aio_disk_head);
95 
96 #define SPDK_AIO_QUEUE_DEPTH 128
97 #define MAX_EVENTS_PER_POLL 32
98 
99 static int
100 bdev_aio_get_ctx_size(void)
101 {
102 	return sizeof(struct bdev_aio_task);
103 }
104 
105 static struct spdk_bdev_module aio_if = {
106 	.name		= "aio",
107 	.module_init	= bdev_aio_initialize,
108 	.module_fini	= bdev_aio_fini,
109 	.get_ctx_size	= bdev_aio_get_ctx_size,
110 };
111 
112 SPDK_BDEV_MODULE_REGISTER(aio, &aio_if)
113 
114 static int
115 bdev_aio_open(struct file_disk *disk)
116 {
117 	int fd;
118 	int io_flag = disk->readonly ? O_RDONLY : O_RDWR;
119 #ifdef RWF_NOWAIT
120 	struct stat st;
121 #endif
122 
123 	fd = open(disk->filename, io_flag | O_DIRECT);
124 	if (fd < 0) {
125 		/* Try without O_DIRECT for non-disk files */
126 		fd = open(disk->filename, io_flag);
127 		if (fd < 0) {
128 			SPDK_ERRLOG("open() failed (file:%s), errno %d: %s\n",
129 				    disk->filename, errno, spdk_strerror(errno));
130 			disk->fd = -1;
131 			return -1;
132 		}
133 	}
134 
135 	disk->fd = fd;
136 
137 #ifdef RWF_NOWAIT
138 	/* Some aio operations can block, for example if number outstanding
139 	 * I/O exceeds number of block layer tags. But not all files can
140 	 * support RWF_NOWAIT flag. So use RWF_NOWAIT on block devices only.
141 	 */
142 	disk->use_nowait = fstat(fd, &st) == 0 && S_ISBLK(st.st_mode);
143 #endif
144 
145 	return 0;
146 }
147 
148 static int
149 bdev_aio_close(struct file_disk *disk)
150 {
151 	int rc;
152 
153 	if (disk->fd == -1) {
154 		return 0;
155 	}
156 
157 	rc = close(disk->fd);
158 	if (rc < 0) {
159 		SPDK_ERRLOG("close() failed (fd=%d), errno %d: %s\n",
160 			    disk->fd, errno, spdk_strerror(errno));
161 		return -1;
162 	}
163 
164 	disk->fd = -1;
165 
166 	return 0;
167 }
168 
169 #ifdef __FreeBSD__
170 static int
171 bdev_aio_submit_io(enum spdk_bdev_io_type type, struct file_disk *fdisk,
172 		   struct spdk_io_channel *ch, struct bdev_aio_task *aio_task,
173 		   struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
174 {
175 	struct aiocb *aiocb = &aio_task->aiocb;
176 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
177 
178 	memset(aiocb, 0, sizeof(struct aiocb));
179 	aiocb->aio_fildes = fdisk->fd;
180 	aiocb->aio_iov = iov;
181 	aiocb->aio_iovcnt = iovcnt;
182 	aiocb->aio_offset = offset;
183 	aiocb->aio_sigevent.sigev_notify_kqueue = aio_ch->kqfd;
184 	aiocb->aio_sigevent.sigev_value.sival_ptr = aio_task;
185 	aiocb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
186 
187 	aio_task->len = nbytes;
188 	aio_task->ch = aio_ch;
189 
190 	if (type == SPDK_BDEV_IO_TYPE_READ) {
191 		return aio_readv(aiocb);
192 	}
193 
194 	return aio_writev(aiocb);
195 }
196 #else
197 static int
198 bdev_aio_submit_io(enum spdk_bdev_io_type type, struct file_disk *fdisk,
199 		   struct spdk_io_channel *ch, struct bdev_aio_task *aio_task,
200 		   struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
201 {
202 	struct iocb *iocb = &aio_task->iocb;
203 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
204 
205 	if (type == SPDK_BDEV_IO_TYPE_READ) {
206 		io_prep_preadv(iocb, fdisk->fd, iov, iovcnt, offset);
207 	} else {
208 		io_prep_pwritev(iocb, fdisk->fd, iov, iovcnt, offset);
209 	}
210 
211 	if (aio_ch->group_ch->efd >= 0) {
212 		io_set_eventfd(iocb, aio_ch->group_ch->efd);
213 	}
214 	iocb->data = aio_task;
215 #ifdef RWF_NOWAIT
216 	if (fdisk->use_nowait) {
217 		iocb->aio_rw_flags = RWF_NOWAIT;
218 	}
219 #endif
220 	aio_task->len = nbytes;
221 	aio_task->ch = aio_ch;
222 
223 	return io_submit(aio_ch->io_ctx, 1, &iocb);
224 }
225 #endif
226 
227 static void
228 bdev_aio_rw(enum spdk_bdev_io_type type, struct file_disk *fdisk,
229 	    struct spdk_io_channel *ch, struct bdev_aio_task *aio_task,
230 	    struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
231 {
232 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
233 	int rc;
234 
235 	if (type == SPDK_BDEV_IO_TYPE_READ) {
236 		SPDK_DEBUGLOG(aio, "read %d iovs size %lu to off: %#lx\n",
237 			      iovcnt, nbytes, offset);
238 	} else {
239 		SPDK_DEBUGLOG(aio, "write %d iovs size %lu from off: %#lx\n",
240 			      iovcnt, nbytes, offset);
241 	}
242 
243 	rc = bdev_aio_submit_io(type, fdisk, ch, aio_task, iov, iovcnt, nbytes, offset);
244 	if (spdk_unlikely(rc < 0)) {
245 		if (rc == -EAGAIN) {
246 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_NOMEM);
247 		} else {
248 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), rc);
249 			SPDK_ERRLOG("%s: io_submit returned %d\n", __func__, rc);
250 		}
251 	} else {
252 		aio_ch->io_inflight++;
253 	}
254 }
255 
256 static void
257 bdev_aio_flush(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
258 {
259 	int rc = fsync(fdisk->fd);
260 
261 	if (rc == 0) {
262 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
263 	} else {
264 		spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -errno);
265 	}
266 }
267 
268 #ifndef __FreeBSD__
269 static void
270 bdev_aio_fallocate(struct spdk_bdev_io *bdev_io, int mode)
271 {
272 	struct file_disk *fdisk = (struct file_disk *)bdev_io->bdev->ctxt;
273 	struct bdev_aio_task *aio_task = (struct bdev_aio_task *)bdev_io->driver_ctx;
274 	uint64_t offset_bytes = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
275 	uint64_t length_bytes = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
276 	int rc;
277 
278 	if (!fdisk->fallocate) {
279 		spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -ENOTSUP);
280 		return;
281 	}
282 
283 	rc = fallocate(fdisk->fd, mode, offset_bytes, length_bytes);
284 	if (rc == 0) {
285 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
286 	} else {
287 		spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -errno);
288 	}
289 }
290 
291 static void
292 bdev_aio_unmap(struct spdk_bdev_io *bdev_io)
293 {
294 	int mode = FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE;
295 
296 	bdev_aio_fallocate(bdev_io, mode);
297 }
298 
299 
300 static void
301 bdev_aio_write_zeros(struct spdk_bdev_io *bdev_io)
302 {
303 	int mode = FALLOC_FL_ZERO_RANGE;
304 
305 	bdev_aio_fallocate(bdev_io, mode);
306 }
307 #endif
308 
309 static void
310 bdev_aio_destruct_cb(void *io_device)
311 {
312 	struct file_disk *fdisk = io_device;
313 	int rc = 0;
314 
315 	TAILQ_REMOVE(&g_aio_disk_head, fdisk, link);
316 	rc = bdev_aio_close(fdisk);
317 	if (rc < 0) {
318 		SPDK_ERRLOG("bdev_aio_close() failed\n");
319 	}
320 	aio_free_disk(fdisk);
321 }
322 
323 static int
324 bdev_aio_destruct(void *ctx)
325 {
326 	struct file_disk *fdisk = ctx;
327 
328 	spdk_io_device_unregister(fdisk, bdev_aio_destruct_cb);
329 
330 	return 0;
331 }
332 
333 #ifdef __FreeBSD__
334 static int
335 bdev_user_io_getevents(int kq, unsigned int max, struct kevent *events)
336 {
337 	struct timespec ts;
338 	int count;
339 
340 	memset(events, 0, max * sizeof(struct kevent));
341 	memset(&ts, 0, sizeof(ts));
342 
343 	count = kevent(kq, NULL, 0, events, max, &ts);
344 	if (count < 0) {
345 		SPDK_ERRLOG("failed to get kevents: %s.\n", spdk_strerror(errno));
346 		return -errno;
347 	}
348 
349 	return count;
350 }
351 
352 static int
353 bdev_aio_io_channel_poll(struct bdev_aio_io_channel *io_ch)
354 {
355 	int nr, i, res = 0;
356 	struct bdev_aio_task *aio_task;
357 	struct kevent events[SPDK_AIO_QUEUE_DEPTH];
358 
359 	nr = bdev_user_io_getevents(io_ch->kqfd, SPDK_AIO_QUEUE_DEPTH, events);
360 	if (nr < 0) {
361 		return 0;
362 	}
363 
364 	for (i = 0; i < nr; i++) {
365 		aio_task = events[i].udata;
366 		aio_task->ch->io_inflight--;
367 		if (aio_task == NULL) {
368 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
369 			break;
370 		} else if ((uint64_t)aio_return(&aio_task->aiocb) == aio_task->len) {
371 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
372 		} else {
373 			SPDK_ERRLOG("failed to complete aio: rc %d\n", aio_error(&aio_task->aiocb));
374 			res = aio_error(&aio_task->aiocb);
375 			if (res != 0) {
376 				spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), res);
377 			} else {
378 				spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
379 			}
380 		}
381 	}
382 
383 	return nr;
384 }
385 #else
386 static int
387 bdev_user_io_getevents(io_context_t io_ctx, unsigned int max, struct io_event *uevents)
388 {
389 	uint32_t head, tail, count;
390 	struct spdk_aio_ring *ring;
391 	struct timespec timeout;
392 	struct io_event *kevents;
393 
394 	ring = (struct spdk_aio_ring *)io_ctx;
395 
396 	if (spdk_unlikely(ring->version != SPDK_AIO_RING_VERSION || ring->incompat_features != 0)) {
397 		timeout.tv_sec = 0;
398 		timeout.tv_nsec = 0;
399 
400 		return io_getevents(io_ctx, 0, max, uevents, &timeout);
401 	}
402 
403 	/* Read the current state out of the ring */
404 	head = ring->head;
405 	tail = ring->tail;
406 
407 	/* This memory barrier is required to prevent the loads above
408 	 * from being re-ordered with stores to the events array
409 	 * potentially occurring on other threads. */
410 	spdk_smp_rmb();
411 
412 	/* Calculate how many items are in the circular ring */
413 	count = tail - head;
414 	if (tail < head) {
415 		count += ring->size;
416 	}
417 
418 	/* Reduce the count to the limit provided by the user */
419 	count = spdk_min(max, count);
420 
421 	/* Grab the memory location of the event array */
422 	kevents = (struct io_event *)((uintptr_t)ring + ring->header_length);
423 
424 	/* Copy the events out of the ring. */
425 	if ((head + count) <= ring->size) {
426 		/* Only one copy is required */
427 		memcpy(uevents, &kevents[head], count * sizeof(struct io_event));
428 	} else {
429 		uint32_t first_part = ring->size - head;
430 		/* Two copies are required */
431 		memcpy(uevents, &kevents[head], first_part * sizeof(struct io_event));
432 		memcpy(&uevents[first_part], &kevents[0], (count - first_part) * sizeof(struct io_event));
433 	}
434 
435 	/* Update the head pointer. On x86, stores will not be reordered with older loads,
436 	 * so the copies out of the event array will always be complete prior to this
437 	 * update becoming visible. On other architectures this is not guaranteed, so
438 	 * add a barrier. */
439 #if defined(__i386__) || defined(__x86_64__)
440 	spdk_compiler_barrier();
441 #else
442 	spdk_smp_mb();
443 #endif
444 	ring->head = (head + count) % ring->size;
445 
446 	return count;
447 }
448 
449 static int
450 bdev_aio_io_channel_poll(struct bdev_aio_io_channel *io_ch)
451 {
452 	int nr, i, res = 0;
453 	struct bdev_aio_task *aio_task;
454 	struct io_event events[SPDK_AIO_QUEUE_DEPTH];
455 
456 	nr = bdev_user_io_getevents(io_ch->io_ctx, SPDK_AIO_QUEUE_DEPTH, events);
457 	if (nr < 0) {
458 		return 0;
459 	}
460 
461 	for (i = 0; i < nr; i++) {
462 		aio_task = events[i].data;
463 		aio_task->ch->io_inflight--;
464 		if (events[i].res == aio_task->len) {
465 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
466 		} else {
467 			/* From aio_abi.h, io_event.res is defined __s64, negative errno
468 			 * will be assigned to io_event.res for error situation.
469 			 * But from libaio.h, io_event.res is defined unsigned long, so
470 			 * convert it to signed value for error detection.
471 			 */
472 			res = (int)events[i].res;
473 			if (res < 0) {
474 				if (res == -EAGAIN) {
475 					spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_NOMEM);
476 				} else {
477 					SPDK_ERRLOG("failed to complete aio: rc %"PRId64"\n", events[i].res);
478 					spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), res);
479 				}
480 			} else {
481 				SPDK_ERRLOG("failed to complete aio: rc %"PRId64"\n", events[i].res);
482 				spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
483 			}
484 		}
485 	}
486 
487 	return nr;
488 }
489 #endif
490 
491 static int
492 bdev_aio_group_poll(void *arg)
493 {
494 	struct bdev_aio_group_channel *group_ch = arg;
495 	struct bdev_aio_io_channel *io_ch;
496 	int nr = 0;
497 
498 	TAILQ_FOREACH(io_ch, &group_ch->io_ch_head, link) {
499 		nr += bdev_aio_io_channel_poll(io_ch);
500 	}
501 
502 	return nr > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
503 }
504 
505 static int
506 bdev_aio_group_interrupt(void *arg)
507 {
508 	struct bdev_aio_group_channel *group_ch = arg;
509 	int rc;
510 	uint64_t num_events;
511 
512 	assert(group_ch->efd >= 0);
513 
514 	/* if completed IO number is larger than SPDK_AIO_QUEUE_DEPTH,
515 	 * io_getevent should be called again to ensure all completed IO are processed.
516 	 */
517 	rc = read(group_ch->efd, &num_events, sizeof(num_events));
518 	if (rc < 0) {
519 		SPDK_ERRLOG("failed to acknowledge aio group: %s.\n", spdk_strerror(errno));
520 		return -errno;
521 	}
522 
523 	if (num_events > SPDK_AIO_QUEUE_DEPTH) {
524 		num_events -= SPDK_AIO_QUEUE_DEPTH;
525 		rc = write(group_ch->efd, &num_events, sizeof(num_events));
526 		if (rc < 0) {
527 			SPDK_ERRLOG("failed to notify aio group: %s.\n", spdk_strerror(errno));
528 		}
529 	}
530 
531 	return bdev_aio_group_poll(group_ch);
532 }
533 
534 static void
535 _bdev_aio_get_io_inflight(struct spdk_io_channel_iter *i)
536 {
537 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
538 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
539 
540 	if (aio_ch->io_inflight) {
541 		spdk_for_each_channel_continue(i, -1);
542 		return;
543 	}
544 
545 	spdk_for_each_channel_continue(i, 0);
546 }
547 
548 static int bdev_aio_reset_retry_timer(void *arg);
549 
550 static void
551 _bdev_aio_get_io_inflight_done(struct spdk_io_channel_iter *i, int status)
552 {
553 	struct file_disk *fdisk = spdk_io_channel_iter_get_ctx(i);
554 
555 	if (status == -1) {
556 		fdisk->reset_retry_timer = SPDK_POLLER_REGISTER(bdev_aio_reset_retry_timer, fdisk, 500);
557 		return;
558 	}
559 
560 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(fdisk->reset_task), SPDK_BDEV_IO_STATUS_SUCCESS);
561 }
562 
563 static int
564 bdev_aio_reset_retry_timer(void *arg)
565 {
566 	struct file_disk *fdisk = arg;
567 
568 	if (fdisk->reset_retry_timer) {
569 		spdk_poller_unregister(&fdisk->reset_retry_timer);
570 	}
571 
572 	spdk_for_each_channel(fdisk,
573 			      _bdev_aio_get_io_inflight,
574 			      fdisk,
575 			      _bdev_aio_get_io_inflight_done);
576 
577 	return SPDK_POLLER_BUSY;
578 }
579 
580 static void
581 bdev_aio_reset(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
582 {
583 	fdisk->reset_task = aio_task;
584 
585 	bdev_aio_reset_retry_timer(fdisk);
586 }
587 
588 static void
589 bdev_aio_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
590 		    bool success)
591 {
592 	if (!success) {
593 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
594 		return;
595 	}
596 
597 	switch (bdev_io->type) {
598 	case SPDK_BDEV_IO_TYPE_READ:
599 	case SPDK_BDEV_IO_TYPE_WRITE:
600 		bdev_aio_rw(bdev_io->type,
601 			    (struct file_disk *)bdev_io->bdev->ctxt,
602 			    ch,
603 			    (struct bdev_aio_task *)bdev_io->driver_ctx,
604 			    bdev_io->u.bdev.iovs,
605 			    bdev_io->u.bdev.iovcnt,
606 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
607 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
608 		break;
609 	default:
610 		SPDK_ERRLOG("Wrong io type\n");
611 		break;
612 	}
613 }
614 
615 static int
616 _bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
617 {
618 	struct file_disk *fdisk = (struct file_disk *)bdev_io->bdev->ctxt;
619 
620 	switch (bdev_io->type) {
621 	/* Read and write operations must be performed on buffers aligned to
622 	 * bdev->required_alignment. If user specified unaligned buffers,
623 	 * get the aligned buffer from the pool by calling spdk_bdev_io_get_buf. */
624 	case SPDK_BDEV_IO_TYPE_READ:
625 		spdk_bdev_io_get_buf(bdev_io, bdev_aio_get_buf_cb,
626 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
627 		return 0;
628 	case SPDK_BDEV_IO_TYPE_WRITE:
629 		if (fdisk->readonly) {
630 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
631 		} else {
632 			spdk_bdev_io_get_buf(bdev_io, bdev_aio_get_buf_cb,
633 					     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
634 		}
635 		return 0;
636 
637 	case SPDK_BDEV_IO_TYPE_FLUSH:
638 		bdev_aio_flush((struct file_disk *)bdev_io->bdev->ctxt,
639 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
640 		return 0;
641 
642 	case SPDK_BDEV_IO_TYPE_RESET:
643 		bdev_aio_reset((struct file_disk *)bdev_io->bdev->ctxt,
644 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
645 		return 0;
646 
647 #ifndef __FreeBSD__
648 	case SPDK_BDEV_IO_TYPE_UNMAP:
649 		bdev_aio_unmap(bdev_io);
650 		return 0;
651 
652 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
653 		bdev_aio_write_zeros(bdev_io);
654 		return 0;
655 #endif
656 
657 	default:
658 		return -1;
659 	}
660 }
661 
662 static void
663 bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
664 {
665 	if (_bdev_aio_submit_request(ch, bdev_io) < 0) {
666 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
667 	}
668 }
669 
670 static bool
671 bdev_aio_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
672 {
673 	struct file_disk *fdisk = ctx;
674 
675 	switch (io_type) {
676 	case SPDK_BDEV_IO_TYPE_READ:
677 	case SPDK_BDEV_IO_TYPE_WRITE:
678 	case SPDK_BDEV_IO_TYPE_FLUSH:
679 	case SPDK_BDEV_IO_TYPE_RESET:
680 		return true;
681 
682 	case SPDK_BDEV_IO_TYPE_UNMAP:
683 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
684 		return fdisk->fallocate;
685 
686 	default:
687 		return false;
688 	}
689 }
690 
691 #ifdef __FreeBSD__
692 static int
693 bdev_aio_create_io(struct bdev_aio_io_channel *ch)
694 {
695 	ch->kqfd = kqueue();
696 	if (ch->kqfd < 0) {
697 		SPDK_ERRLOG("async I/O context setup failure: %s.\n", spdk_strerror(errno));
698 		return -1;
699 	}
700 
701 	return 0;
702 }
703 
704 static void
705 bdev_aio_destroy_io(struct bdev_aio_io_channel *ch)
706 {
707 	close(ch->kqfd);
708 }
709 #else
710 static int
711 bdev_aio_create_io(struct bdev_aio_io_channel *ch)
712 {
713 	if (io_setup(SPDK_AIO_QUEUE_DEPTH, &ch->io_ctx) < 0) {
714 		SPDK_ERRLOG("Async I/O context setup failure, likely due to exceeding kernel limit.\n");
715 		SPDK_ERRLOG("This limit may be increased using 'sysctl -w fs.aio-max-nr'.\n");
716 		return -1;
717 	}
718 
719 	return 0;
720 }
721 
722 static void
723 bdev_aio_destroy_io(struct bdev_aio_io_channel *ch)
724 {
725 	io_destroy(ch->io_ctx);
726 }
727 #endif
728 
729 static int
730 bdev_aio_create_cb(void *io_device, void *ctx_buf)
731 {
732 	struct bdev_aio_io_channel *ch = ctx_buf;
733 	int rc;
734 
735 	rc = bdev_aio_create_io(ch);
736 	if (rc < 0) {
737 		return rc;
738 	}
739 
740 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&aio_if));
741 	TAILQ_INSERT_TAIL(&ch->group_ch->io_ch_head, ch, link);
742 
743 	return 0;
744 }
745 
746 static void
747 bdev_aio_destroy_cb(void *io_device, void *ctx_buf)
748 {
749 	struct bdev_aio_io_channel *ch = ctx_buf;
750 
751 	bdev_aio_destroy_io(ch);
752 
753 	assert(ch->group_ch);
754 	TAILQ_REMOVE(&ch->group_ch->io_ch_head, ch, link);
755 
756 	spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
757 }
758 
759 static struct spdk_io_channel *
760 bdev_aio_get_io_channel(void *ctx)
761 {
762 	struct file_disk *fdisk = ctx;
763 
764 	return spdk_get_io_channel(fdisk);
765 }
766 
767 
768 static int
769 bdev_aio_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
770 {
771 	struct file_disk *fdisk = ctx;
772 
773 	spdk_json_write_named_object_begin(w, "aio");
774 
775 	spdk_json_write_named_string(w, "filename", fdisk->filename);
776 
777 	spdk_json_write_named_bool(w, "block_size_override", fdisk->block_size_override);
778 
779 	spdk_json_write_named_bool(w, "readonly", fdisk->readonly);
780 
781 	spdk_json_write_named_bool(w, "fallocate", fdisk->fallocate);
782 
783 	spdk_json_write_object_end(w);
784 
785 	return 0;
786 }
787 
788 static void
789 bdev_aio_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
790 {
791 	struct file_disk *fdisk = bdev->ctxt;
792 	const struct spdk_uuid *uuid = spdk_bdev_get_uuid(bdev);
793 
794 	spdk_json_write_object_begin(w);
795 
796 	spdk_json_write_named_string(w, "method", "bdev_aio_create");
797 
798 	spdk_json_write_named_object_begin(w, "params");
799 	spdk_json_write_named_string(w, "name", bdev->name);
800 	if (fdisk->block_size_override) {
801 		spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
802 	}
803 	spdk_json_write_named_string(w, "filename", fdisk->filename);
804 	spdk_json_write_named_bool(w, "readonly", fdisk->readonly);
805 	spdk_json_write_named_bool(w, "fallocate", fdisk->fallocate);
806 	if (!spdk_uuid_is_null(uuid)) {
807 		spdk_json_write_named_uuid(w, "uuid", uuid);
808 	}
809 	spdk_json_write_object_end(w);
810 
811 	spdk_json_write_object_end(w);
812 }
813 
814 static const struct spdk_bdev_fn_table aio_fn_table = {
815 	.destruct		= bdev_aio_destruct,
816 	.submit_request		= bdev_aio_submit_request,
817 	.io_type_supported	= bdev_aio_io_type_supported,
818 	.get_io_channel		= bdev_aio_get_io_channel,
819 	.dump_info_json		= bdev_aio_dump_info_json,
820 	.write_config_json	= bdev_aio_write_json_config,
821 };
822 
823 static void
824 aio_free_disk(struct file_disk *fdisk)
825 {
826 	if (fdisk == NULL) {
827 		return;
828 	}
829 	free(fdisk->filename);
830 	free(fdisk->disk.name);
831 	free(fdisk);
832 }
833 
834 static int
835 bdev_aio_register_interrupt(struct bdev_aio_group_channel *ch)
836 {
837 	int efd;
838 
839 	efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
840 	if (efd < 0) {
841 		return -1;
842 	}
843 
844 	ch->intr = SPDK_INTERRUPT_REGISTER(efd, bdev_aio_group_interrupt, ch);
845 	if (ch->intr == NULL) {
846 		close(efd);
847 		return -1;
848 	}
849 	ch->efd = efd;
850 
851 	return 0;
852 }
853 
854 static void
855 bdev_aio_unregister_interrupt(struct bdev_aio_group_channel *ch)
856 {
857 	spdk_interrupt_unregister(&ch->intr);
858 	close(ch->efd);
859 	ch->efd = -1;
860 }
861 
862 static int
863 bdev_aio_group_create_cb(void *io_device, void *ctx_buf)
864 {
865 	struct bdev_aio_group_channel *ch = ctx_buf;
866 	int rc;
867 
868 	TAILQ_INIT(&ch->io_ch_head);
869 	/* Initialize ch->efd to be invalid and unused. */
870 	ch->efd = -1;
871 	if (spdk_interrupt_mode_is_enabled()) {
872 		rc = bdev_aio_register_interrupt(ch);
873 		if (rc < 0) {
874 			SPDK_ERRLOG("Failed to prepare intr resource to bdev_aio\n");
875 			return rc;
876 		}
877 	}
878 
879 	ch->poller = SPDK_POLLER_REGISTER(bdev_aio_group_poll, ch, 0);
880 	spdk_poller_register_interrupt(ch->poller, NULL, NULL);
881 
882 	return 0;
883 }
884 
885 static void
886 bdev_aio_group_destroy_cb(void *io_device, void *ctx_buf)
887 {
888 	struct bdev_aio_group_channel *ch = ctx_buf;
889 
890 	if (!TAILQ_EMPTY(&ch->io_ch_head)) {
891 		SPDK_ERRLOG("Group channel of bdev aio has uncleared io channel\n");
892 	}
893 
894 	spdk_poller_unregister(&ch->poller);
895 	if (spdk_interrupt_mode_is_enabled()) {
896 		bdev_aio_unregister_interrupt(ch);
897 	}
898 }
899 
900 int
901 create_aio_bdev(const char *name, const char *filename, uint32_t block_size, bool readonly,
902 		bool fallocate, const struct spdk_uuid *uuid)
903 {
904 	struct file_disk *fdisk;
905 	uint32_t detected_block_size;
906 	uint64_t disk_size;
907 	int rc;
908 
909 #ifdef __FreeBSD__
910 	if (fallocate) {
911 		SPDK_ERRLOG("Unable to support fallocate on this platform\n");
912 		return -ENOTSUP;
913 	}
914 #endif
915 
916 	fdisk = calloc(1, sizeof(*fdisk));
917 	if (!fdisk) {
918 		SPDK_ERRLOG("Unable to allocate enough memory for aio backend\n");
919 		return -ENOMEM;
920 	}
921 	fdisk->readonly = readonly;
922 	fdisk->fallocate = fallocate;
923 
924 	fdisk->filename = strdup(filename);
925 	if (!fdisk->filename) {
926 		rc = -ENOMEM;
927 		goto error_return;
928 	}
929 
930 	if (bdev_aio_open(fdisk)) {
931 		SPDK_ERRLOG("Unable to open file %s. fd: %d errno: %d\n", filename, fdisk->fd, errno);
932 		rc = -errno;
933 		goto error_return;
934 	}
935 
936 	disk_size = spdk_fd_get_size(fdisk->fd);
937 
938 	fdisk->disk.name = strdup(name);
939 	if (!fdisk->disk.name) {
940 		rc = -ENOMEM;
941 		goto error_return;
942 	}
943 	fdisk->disk.product_name = "AIO disk";
944 	fdisk->disk.module = &aio_if;
945 
946 	fdisk->disk.write_cache = 1;
947 
948 	detected_block_size = spdk_fd_get_blocklen(fdisk->fd);
949 	if (block_size == 0) {
950 		/* User did not specify block size - use autodetected block size. */
951 		if (detected_block_size == 0) {
952 			SPDK_ERRLOG("Block size could not be auto-detected\n");
953 			rc = -EINVAL;
954 			goto error_return;
955 		}
956 		fdisk->block_size_override = false;
957 		block_size = detected_block_size;
958 	} else {
959 		if (block_size < detected_block_size) {
960 			SPDK_ERRLOG("Specified block size %" PRIu32 " is smaller than "
961 				    "auto-detected block size %" PRIu32 "\n",
962 				    block_size, detected_block_size);
963 			rc = -EINVAL;
964 			goto error_return;
965 		} else if (detected_block_size != 0 && block_size != detected_block_size) {
966 			SPDK_WARNLOG("Specified block size %" PRIu32 " does not match "
967 				     "auto-detected block size %" PRIu32 "\n",
968 				     block_size, detected_block_size);
969 		}
970 		fdisk->block_size_override = true;
971 	}
972 
973 	if (block_size < 512) {
974 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be at least 512).\n", block_size);
975 		rc = -EINVAL;
976 		goto error_return;
977 	}
978 
979 	if (!spdk_u32_is_pow2(block_size)) {
980 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be a power of 2.)\n", block_size);
981 		rc = -EINVAL;
982 		goto error_return;
983 	}
984 
985 	fdisk->disk.blocklen = block_size;
986 	if (fdisk->block_size_override && detected_block_size) {
987 		fdisk->disk.required_alignment = spdk_u32log2(detected_block_size);
988 	} else {
989 		fdisk->disk.required_alignment = spdk_u32log2(block_size);
990 	}
991 
992 	if (disk_size % fdisk->disk.blocklen != 0) {
993 		SPDK_ERRLOG("Disk size %" PRIu64 " is not a multiple of block size %" PRIu32 "\n",
994 			    disk_size, fdisk->disk.blocklen);
995 		rc = -EINVAL;
996 		goto error_return;
997 	}
998 
999 	fdisk->disk.blockcnt = disk_size / fdisk->disk.blocklen;
1000 	fdisk->disk.ctxt = fdisk;
1001 	spdk_uuid_copy(&fdisk->disk.uuid, uuid);
1002 
1003 	fdisk->disk.fn_table = &aio_fn_table;
1004 
1005 	spdk_io_device_register(fdisk, bdev_aio_create_cb, bdev_aio_destroy_cb,
1006 				sizeof(struct bdev_aio_io_channel),
1007 				fdisk->disk.name);
1008 	rc = spdk_bdev_register(&fdisk->disk);
1009 	if (rc) {
1010 		spdk_io_device_unregister(fdisk, NULL);
1011 		goto error_return;
1012 	}
1013 
1014 	TAILQ_INSERT_TAIL(&g_aio_disk_head, fdisk, link);
1015 	return 0;
1016 
1017 error_return:
1018 	bdev_aio_close(fdisk);
1019 	aio_free_disk(fdisk);
1020 	return rc;
1021 }
1022 
1023 static void
1024 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1025 {
1026 }
1027 
1028 int
1029 bdev_aio_rescan(const char *name)
1030 {
1031 	struct spdk_bdev_desc *desc;
1032 	struct spdk_bdev *bdev;
1033 	struct file_disk *fdisk;
1034 	uint64_t disk_size, blockcnt;
1035 	int rc;
1036 
1037 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1038 	if (rc != 0) {
1039 		return rc;
1040 	}
1041 
1042 	bdev = spdk_bdev_desc_get_bdev(desc);
1043 	if (bdev->module != &aio_if) {
1044 		rc = -ENODEV;
1045 		goto exit;
1046 	}
1047 
1048 	fdisk = SPDK_CONTAINEROF(bdev, struct file_disk, disk);
1049 	disk_size = spdk_fd_get_size(fdisk->fd);
1050 	blockcnt = disk_size / bdev->blocklen;
1051 
1052 	if (bdev->blockcnt != blockcnt) {
1053 		SPDK_NOTICELOG("AIO device is resized: bdev name %s, old block count %" PRIu64 ", new block count %"
1054 			       PRIu64 "\n",
1055 			       fdisk->filename,
1056 			       bdev->blockcnt,
1057 			       blockcnt);
1058 		rc = spdk_bdev_notify_blockcnt_change(bdev, blockcnt);
1059 		if (rc != 0) {
1060 			SPDK_ERRLOG("Could not change num blocks for aio bdev: name %s, errno: %d.\n",
1061 				    fdisk->filename, rc);
1062 			goto exit;
1063 		}
1064 	}
1065 
1066 exit:
1067 	spdk_bdev_close(desc);
1068 	return rc;
1069 }
1070 
1071 struct delete_aio_bdev_ctx {
1072 	delete_aio_bdev_complete cb_fn;
1073 	void *cb_arg;
1074 };
1075 
1076 static void
1077 aio_bdev_unregister_cb(void *arg, int bdeverrno)
1078 {
1079 	struct delete_aio_bdev_ctx *ctx = arg;
1080 
1081 	ctx->cb_fn(ctx->cb_arg, bdeverrno);
1082 	free(ctx);
1083 }
1084 
1085 void
1086 bdev_aio_delete(const char *name, delete_aio_bdev_complete cb_fn, void *cb_arg)
1087 {
1088 	struct delete_aio_bdev_ctx *ctx;
1089 	int rc;
1090 
1091 	ctx = calloc(1, sizeof(*ctx));
1092 	if (ctx == NULL) {
1093 		cb_fn(cb_arg, -ENOMEM);
1094 		return;
1095 	}
1096 
1097 	ctx->cb_fn = cb_fn;
1098 	ctx->cb_arg = cb_arg;
1099 	rc = spdk_bdev_unregister_by_name(name, &aio_if, aio_bdev_unregister_cb, ctx);
1100 	if (rc != 0) {
1101 		aio_bdev_unregister_cb(ctx, rc);
1102 	}
1103 }
1104 
1105 static int
1106 bdev_aio_initialize(void)
1107 {
1108 	spdk_io_device_register(&aio_if, bdev_aio_group_create_cb, bdev_aio_group_destroy_cb,
1109 				sizeof(struct bdev_aio_group_channel), "aio_module");
1110 
1111 	return 0;
1112 }
1113 
1114 static void
1115 bdev_aio_fini(void)
1116 {
1117 	spdk_io_device_unregister(&aio_if, NULL);
1118 }
1119 
1120 SPDK_LOG_REGISTER_COMPONENT(aio)
1121