xref: /spdk/module/bdev/aio/bdev_aio.c (revision cc6920a4763d4b9a43aa40583c8397d8f14fa100)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "bdev_aio.h"
35 
36 #include "spdk/stdinc.h"
37 
38 #include "spdk/barrier.h"
39 #include "spdk/bdev.h"
40 #include "spdk/bdev_module.h"
41 #include "spdk/env.h"
42 #include "spdk/fd.h"
43 #include "spdk/likely.h"
44 #include "spdk/thread.h"
45 #include "spdk/json.h"
46 #include "spdk/util.h"
47 #include "spdk/string.h"
48 
49 #include "spdk/log.h"
50 
51 #include <sys/eventfd.h>
52 #include <libaio.h>
53 
54 struct bdev_aio_io_channel {
55 	uint64_t				io_inflight;
56 	io_context_t				io_ctx;
57 	struct bdev_aio_group_channel		*group_ch;
58 	TAILQ_ENTRY(bdev_aio_io_channel)	link;
59 };
60 
61 struct bdev_aio_group_channel {
62 	/* eventfd for io completion notification in interrupt mode.
63 	 * Negative value like '-1' indicates it is invalid or unused.
64 	 */
65 	int					efd;
66 	struct spdk_interrupt			*intr;
67 	struct spdk_poller			*poller;
68 	TAILQ_HEAD(, bdev_aio_io_channel)	io_ch_head;
69 };
70 
71 struct bdev_aio_task {
72 	struct iocb			iocb;
73 	uint64_t			len;
74 	struct bdev_aio_io_channel	*ch;
75 };
76 
77 struct file_disk {
78 	struct bdev_aio_task	*reset_task;
79 	struct spdk_poller	*reset_retry_timer;
80 	struct spdk_bdev	disk;
81 	char			*filename;
82 	int			fd;
83 	TAILQ_ENTRY(file_disk)  link;
84 	bool			block_size_override;
85 };
86 
87 /* For user space reaping of completions */
88 struct spdk_aio_ring {
89 	uint32_t id;
90 	uint32_t size;
91 	uint32_t head;
92 	uint32_t tail;
93 
94 	uint32_t version;
95 	uint32_t compat_features;
96 	uint32_t incompat_features;
97 	uint32_t header_length;
98 };
99 
100 #define SPDK_AIO_RING_VERSION	0xa10a10a1
101 
102 static int bdev_aio_initialize(void);
103 static void bdev_aio_fini(void);
104 static void aio_free_disk(struct file_disk *fdisk);
105 static TAILQ_HEAD(, file_disk) g_aio_disk_head = TAILQ_HEAD_INITIALIZER(g_aio_disk_head);
106 
107 #define SPDK_AIO_QUEUE_DEPTH 128
108 #define MAX_EVENTS_PER_POLL 32
109 
110 static int
111 bdev_aio_get_ctx_size(void)
112 {
113 	return sizeof(struct bdev_aio_task);
114 }
115 
116 static struct spdk_bdev_module aio_if = {
117 	.name		= "aio",
118 	.module_init	= bdev_aio_initialize,
119 	.module_fini	= bdev_aio_fini,
120 	.get_ctx_size	= bdev_aio_get_ctx_size,
121 };
122 
123 SPDK_BDEV_MODULE_REGISTER(aio, &aio_if)
124 
125 static int
126 bdev_aio_open(struct file_disk *disk)
127 {
128 	int fd;
129 
130 	fd = open(disk->filename, O_RDWR | O_DIRECT);
131 	if (fd < 0) {
132 		/* Try without O_DIRECT for non-disk files */
133 		fd = open(disk->filename, O_RDWR);
134 		if (fd < 0) {
135 			SPDK_ERRLOG("open() failed (file:%s), errno %d: %s\n",
136 				    disk->filename, errno, spdk_strerror(errno));
137 			disk->fd = -1;
138 			return -1;
139 		}
140 	}
141 
142 	disk->fd = fd;
143 
144 	return 0;
145 }
146 
147 static int
148 bdev_aio_close(struct file_disk *disk)
149 {
150 	int rc;
151 
152 	if (disk->fd == -1) {
153 		return 0;
154 	}
155 
156 	rc = close(disk->fd);
157 	if (rc < 0) {
158 		SPDK_ERRLOG("close() failed (fd=%d), errno %d: %s\n",
159 			    disk->fd, errno, spdk_strerror(errno));
160 		return -1;
161 	}
162 
163 	disk->fd = -1;
164 
165 	return 0;
166 }
167 
168 static void
169 bdev_aio_readv(struct file_disk *fdisk, struct spdk_io_channel *ch,
170 	       struct bdev_aio_task *aio_task,
171 	       struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
172 {
173 	struct iocb *iocb = &aio_task->iocb;
174 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
175 	int rc;
176 
177 	io_prep_preadv(iocb, fdisk->fd, iov, iovcnt, offset);
178 	if (aio_ch->group_ch->efd >= 0) {
179 		io_set_eventfd(iocb, aio_ch->group_ch->efd);
180 	}
181 	iocb->data = aio_task;
182 	aio_task->len = nbytes;
183 	aio_task->ch = aio_ch;
184 
185 	SPDK_DEBUGLOG(aio, "read %d iovs size %lu to off: %#lx\n",
186 		      iovcnt, nbytes, offset);
187 
188 	rc = io_submit(aio_ch->io_ctx, 1, &iocb);
189 	if (spdk_unlikely(rc < 0)) {
190 		if (rc == -EAGAIN) {
191 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_NOMEM);
192 		} else {
193 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), rc);
194 			SPDK_ERRLOG("%s: io_submit returned %d\n", __func__, rc);
195 		}
196 	} else {
197 		aio_ch->io_inflight++;
198 	}
199 }
200 
201 static void
202 bdev_aio_writev(struct file_disk *fdisk, struct spdk_io_channel *ch,
203 		struct bdev_aio_task *aio_task,
204 		struct iovec *iov, int iovcnt, size_t len, uint64_t offset)
205 {
206 	struct iocb *iocb = &aio_task->iocb;
207 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
208 	int rc;
209 
210 	io_prep_pwritev(iocb, fdisk->fd, iov, iovcnt, offset);
211 	if (aio_ch->group_ch->efd >= 0) {
212 		io_set_eventfd(iocb, aio_ch->group_ch->efd);
213 	}
214 	iocb->data = aio_task;
215 	aio_task->len = len;
216 	aio_task->ch = aio_ch;
217 
218 	SPDK_DEBUGLOG(aio, "write %d iovs size %lu from off: %#lx\n",
219 		      iovcnt, len, offset);
220 
221 	rc = io_submit(aio_ch->io_ctx, 1, &iocb);
222 	if (spdk_unlikely(rc < 0)) {
223 		if (rc == -EAGAIN) {
224 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_NOMEM);
225 		} else {
226 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), rc);
227 			SPDK_ERRLOG("%s: io_submit returned %d\n", __func__, rc);
228 		}
229 	} else {
230 		aio_ch->io_inflight++;
231 	}
232 }
233 
234 static void
235 bdev_aio_flush(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
236 {
237 	int rc = fsync(fdisk->fd);
238 
239 	if (rc == 0) {
240 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
241 	} else {
242 		spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -errno);
243 	}
244 }
245 
246 static void
247 bdev_aio_destruct_cb(void *io_device)
248 {
249 	struct file_disk *fdisk = io_device;
250 	int rc = 0;
251 
252 	TAILQ_REMOVE(&g_aio_disk_head, fdisk, link);
253 	rc = bdev_aio_close(fdisk);
254 	if (rc < 0) {
255 		SPDK_ERRLOG("bdev_aio_close() failed\n");
256 	}
257 
258 	aio_free_disk(fdisk);
259 }
260 
261 static int
262 bdev_aio_destruct(void *ctx)
263 {
264 	struct file_disk *fdisk = ctx;
265 
266 	spdk_io_device_unregister(fdisk, bdev_aio_destruct_cb);
267 
268 	return 0;
269 }
270 
271 static int
272 bdev_user_io_getevents(io_context_t io_ctx, unsigned int max, struct io_event *uevents)
273 {
274 	uint32_t head, tail, count;
275 	struct spdk_aio_ring *ring;
276 	struct timespec timeout;
277 	struct io_event *kevents;
278 
279 	ring = (struct spdk_aio_ring *)io_ctx;
280 
281 	if (spdk_unlikely(ring->version != SPDK_AIO_RING_VERSION || ring->incompat_features != 0)) {
282 		timeout.tv_sec = 0;
283 		timeout.tv_nsec = 0;
284 
285 		return io_getevents(io_ctx, 0, max, uevents, &timeout);
286 	}
287 
288 	/* Read the current state out of the ring */
289 	head = ring->head;
290 	tail = ring->tail;
291 
292 	/* This memory barrier is required to prevent the loads above
293 	 * from being re-ordered with stores to the events array
294 	 * potentially occurring on other threads. */
295 	spdk_smp_rmb();
296 
297 	/* Calculate how many items are in the circular ring */
298 	count = tail - head;
299 	if (tail < head) {
300 		count += ring->size;
301 	}
302 
303 	/* Reduce the count to the limit provided by the user */
304 	count = spdk_min(max, count);
305 
306 	/* Grab the memory location of the event array */
307 	kevents = (struct io_event *)((uintptr_t)ring + ring->header_length);
308 
309 	/* Copy the events out of the ring. */
310 	if ((head + count) <= ring->size) {
311 		/* Only one copy is required */
312 		memcpy(uevents, &kevents[head], count * sizeof(struct io_event));
313 	} else {
314 		uint32_t first_part = ring->size - head;
315 		/* Two copies are required */
316 		memcpy(uevents, &kevents[head], first_part * sizeof(struct io_event));
317 		memcpy(&uevents[first_part], &kevents[0], (count - first_part) * sizeof(struct io_event));
318 	}
319 
320 	/* Update the head pointer. On x86, stores will not be reordered with older loads,
321 	 * so the copies out of the event array will always be complete prior to this
322 	 * update becoming visible. On other architectures this is not guaranteed, so
323 	 * add a barrier. */
324 #if defined(__i386__) || defined(__x86_64__)
325 	spdk_compiler_barrier();
326 #else
327 	spdk_smp_mb();
328 #endif
329 	ring->head = (head + count) % ring->size;
330 
331 	return count;
332 }
333 
334 static int
335 bdev_aio_io_channel_poll(struct bdev_aio_io_channel *io_ch)
336 {
337 	int nr, i = 0;
338 	struct bdev_aio_task *aio_task;
339 	struct io_event events[SPDK_AIO_QUEUE_DEPTH];
340 	uint64_t io_result;
341 
342 	nr = bdev_user_io_getevents(io_ch->io_ctx, SPDK_AIO_QUEUE_DEPTH, events);
343 
344 	if (nr < 0) {
345 		return 0;
346 	}
347 
348 #define MAX_AIO_ERRNO 256
349 	for (i = 0; i < nr; i++) {
350 		aio_task = events[i].data;
351 		aio_task->ch->io_inflight--;
352 		io_result = events[i].res;
353 		if (io_result == aio_task->len) {
354 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
355 		} else if (io_result < MAX_AIO_ERRNO) {
356 			/* Linux AIO will return its errno to io_event.res */
357 			int aio_errno = io_result;
358 
359 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -aio_errno);
360 		} else {
361 			SPDK_ERRLOG("failed to complete aio: requested len is %lu, but completed len is %lu.\n",
362 				    aio_task->len, io_result);
363 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
364 		}
365 	}
366 
367 	return nr;
368 }
369 
370 static int
371 bdev_aio_group_poll(void *arg)
372 {
373 	struct bdev_aio_group_channel *group_ch = arg;
374 	struct bdev_aio_io_channel *io_ch;
375 	int nr = 0;
376 
377 	TAILQ_FOREACH(io_ch, &group_ch->io_ch_head, link) {
378 		nr += bdev_aio_io_channel_poll(io_ch);
379 	}
380 
381 	return nr > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
382 }
383 
384 static int
385 bdev_aio_group_interrupt(void *arg)
386 {
387 	struct bdev_aio_group_channel *group_ch = arg;
388 	int rc;
389 	uint64_t num_events;
390 
391 	assert(group_ch->efd >= 0);
392 
393 	/* if completed IO number is larger than SPDK_AIO_QUEUE_DEPTH,
394 	 * io_getevent should be called again to ensure all completed IO are processed.
395 	 */
396 	rc = read(group_ch->efd, &num_events, sizeof(num_events));
397 	if (rc < 0) {
398 		SPDK_ERRLOG("failed to acknowledge aio group: %s.\n", spdk_strerror(errno));
399 		return -errno;
400 	}
401 
402 	if (num_events > SPDK_AIO_QUEUE_DEPTH) {
403 		num_events -= SPDK_AIO_QUEUE_DEPTH;
404 		rc = write(group_ch->efd, &num_events, sizeof(num_events));
405 		if (rc < 0) {
406 			SPDK_ERRLOG("failed to notify aio group: %s.\n", spdk_strerror(errno));
407 		}
408 	}
409 
410 	return bdev_aio_group_poll(group_ch);
411 }
412 
413 static void
414 _bdev_aio_get_io_inflight(struct spdk_io_channel_iter *i)
415 {
416 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
417 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
418 
419 	if (aio_ch->io_inflight) {
420 		spdk_for_each_channel_continue(i, -1);
421 		return;
422 	}
423 
424 	spdk_for_each_channel_continue(i, 0);
425 }
426 
427 static int bdev_aio_reset_retry_timer(void *arg);
428 
429 static void
430 _bdev_aio_get_io_inflight_done(struct spdk_io_channel_iter *i, int status)
431 {
432 	struct file_disk *fdisk = spdk_io_channel_iter_get_ctx(i);
433 
434 	if (status == -1) {
435 		fdisk->reset_retry_timer = SPDK_POLLER_REGISTER(bdev_aio_reset_retry_timer, fdisk, 500);
436 		return;
437 	}
438 
439 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(fdisk->reset_task), SPDK_BDEV_IO_STATUS_SUCCESS);
440 }
441 
442 static int
443 bdev_aio_reset_retry_timer(void *arg)
444 {
445 	struct file_disk *fdisk = arg;
446 
447 	if (fdisk->reset_retry_timer) {
448 		spdk_poller_unregister(&fdisk->reset_retry_timer);
449 	}
450 
451 	spdk_for_each_channel(fdisk,
452 			      _bdev_aio_get_io_inflight,
453 			      fdisk,
454 			      _bdev_aio_get_io_inflight_done);
455 
456 	return SPDK_POLLER_BUSY;
457 }
458 
459 static void
460 bdev_aio_reset(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
461 {
462 	fdisk->reset_task = aio_task;
463 
464 	bdev_aio_reset_retry_timer(fdisk);
465 }
466 
467 static void
468 bdev_aio_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
469 		    bool success)
470 {
471 	if (!success) {
472 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
473 		return;
474 	}
475 
476 	switch (bdev_io->type) {
477 	case SPDK_BDEV_IO_TYPE_READ:
478 		bdev_aio_readv((struct file_disk *)bdev_io->bdev->ctxt,
479 			       ch,
480 			       (struct bdev_aio_task *)bdev_io->driver_ctx,
481 			       bdev_io->u.bdev.iovs,
482 			       bdev_io->u.bdev.iovcnt,
483 			       bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
484 			       bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
485 		break;
486 	case SPDK_BDEV_IO_TYPE_WRITE:
487 		bdev_aio_writev((struct file_disk *)bdev_io->bdev->ctxt,
488 				ch,
489 				(struct bdev_aio_task *)bdev_io->driver_ctx,
490 				bdev_io->u.bdev.iovs,
491 				bdev_io->u.bdev.iovcnt,
492 				bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
493 				bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
494 		break;
495 	default:
496 		SPDK_ERRLOG("Wrong io type\n");
497 		break;
498 	}
499 }
500 
501 static int _bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
502 {
503 	switch (bdev_io->type) {
504 	/* Read and write operations must be performed on buffers aligned to
505 	 * bdev->required_alignment. If user specified unaligned buffers,
506 	 * get the aligned buffer from the pool by calling spdk_bdev_io_get_buf. */
507 	case SPDK_BDEV_IO_TYPE_READ:
508 	case SPDK_BDEV_IO_TYPE_WRITE:
509 		spdk_bdev_io_get_buf(bdev_io, bdev_aio_get_buf_cb,
510 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
511 		return 0;
512 	case SPDK_BDEV_IO_TYPE_FLUSH:
513 		bdev_aio_flush((struct file_disk *)bdev_io->bdev->ctxt,
514 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
515 		return 0;
516 
517 	case SPDK_BDEV_IO_TYPE_RESET:
518 		bdev_aio_reset((struct file_disk *)bdev_io->bdev->ctxt,
519 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
520 		return 0;
521 	default:
522 		return -1;
523 	}
524 }
525 
526 static void bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
527 {
528 	if (_bdev_aio_submit_request(ch, bdev_io) < 0) {
529 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
530 	}
531 }
532 
533 static bool
534 bdev_aio_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
535 {
536 	switch (io_type) {
537 	case SPDK_BDEV_IO_TYPE_READ:
538 	case SPDK_BDEV_IO_TYPE_WRITE:
539 	case SPDK_BDEV_IO_TYPE_FLUSH:
540 	case SPDK_BDEV_IO_TYPE_RESET:
541 		return true;
542 
543 	default:
544 		return false;
545 	}
546 }
547 
548 static int
549 bdev_aio_create_cb(void *io_device, void *ctx_buf)
550 {
551 	struct bdev_aio_io_channel *ch = ctx_buf;
552 
553 	if (io_setup(SPDK_AIO_QUEUE_DEPTH, &ch->io_ctx) < 0) {
554 		SPDK_ERRLOG("async I/O context setup failure\n");
555 		return -1;
556 	}
557 
558 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&aio_if));
559 	TAILQ_INSERT_TAIL(&ch->group_ch->io_ch_head, ch, link);
560 
561 	return 0;
562 }
563 
564 static void
565 bdev_aio_destroy_cb(void *io_device, void *ctx_buf)
566 {
567 	struct bdev_aio_io_channel *ch = ctx_buf;
568 
569 	io_destroy(ch->io_ctx);
570 
571 	assert(ch->group_ch);
572 	TAILQ_REMOVE(&ch->group_ch->io_ch_head, ch, link);
573 
574 	spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
575 }
576 
577 static struct spdk_io_channel *
578 bdev_aio_get_io_channel(void *ctx)
579 {
580 	struct file_disk *fdisk = ctx;
581 
582 	return spdk_get_io_channel(fdisk);
583 }
584 
585 
586 static int
587 bdev_aio_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
588 {
589 	struct file_disk *fdisk = ctx;
590 
591 	spdk_json_write_named_object_begin(w, "aio");
592 
593 	spdk_json_write_named_string(w, "filename", fdisk->filename);
594 
595 	spdk_json_write_object_end(w);
596 
597 	return 0;
598 }
599 
600 static void
601 bdev_aio_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
602 {
603 	struct file_disk *fdisk = bdev->ctxt;
604 
605 	spdk_json_write_object_begin(w);
606 
607 	spdk_json_write_named_string(w, "method", "bdev_aio_create");
608 
609 	spdk_json_write_named_object_begin(w, "params");
610 	spdk_json_write_named_string(w, "name", bdev->name);
611 	if (fdisk->block_size_override) {
612 		spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
613 	}
614 	spdk_json_write_named_string(w, "filename", fdisk->filename);
615 	spdk_json_write_object_end(w);
616 
617 	spdk_json_write_object_end(w);
618 }
619 
620 static const struct spdk_bdev_fn_table aio_fn_table = {
621 	.destruct		= bdev_aio_destruct,
622 	.submit_request		= bdev_aio_submit_request,
623 	.io_type_supported	= bdev_aio_io_type_supported,
624 	.get_io_channel		= bdev_aio_get_io_channel,
625 	.dump_info_json		= bdev_aio_dump_info_json,
626 	.write_config_json	= bdev_aio_write_json_config,
627 };
628 
629 static void aio_free_disk(struct file_disk *fdisk)
630 {
631 	if (fdisk == NULL) {
632 		return;
633 	}
634 	free(fdisk->filename);
635 	free(fdisk->disk.name);
636 	free(fdisk);
637 }
638 
639 static int
640 bdev_aio_register_interrupt(struct bdev_aio_group_channel *ch)
641 {
642 	int efd;
643 
644 	efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
645 	if (efd < 0) {
646 		return -1;
647 	}
648 
649 	ch->intr = SPDK_INTERRUPT_REGISTER(efd, bdev_aio_group_interrupt, ch);
650 	if (ch->intr == NULL) {
651 		close(efd);
652 		return -1;
653 	}
654 	ch->efd = efd;
655 
656 	return 0;
657 }
658 
659 static void
660 bdev_aio_unregister_interrupt(struct bdev_aio_group_channel *ch)
661 {
662 	spdk_interrupt_unregister(&ch->intr);
663 	close(ch->efd);
664 	ch->efd = -1;
665 }
666 
667 static void
668 bdev_aio_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
669 {
670 	return;
671 }
672 
673 static int
674 bdev_aio_group_create_cb(void *io_device, void *ctx_buf)
675 {
676 	struct bdev_aio_group_channel *ch = ctx_buf;
677 	int rc;
678 
679 	TAILQ_INIT(&ch->io_ch_head);
680 	/* Initialize ch->efd to be invalid and unused. */
681 	ch->efd = -1;
682 	if (spdk_interrupt_mode_is_enabled()) {
683 		rc = bdev_aio_register_interrupt(ch);
684 		if (rc < 0) {
685 			SPDK_ERRLOG("Failed to prepare intr resource to bdev_aio\n");
686 			return rc;
687 		}
688 	}
689 
690 	ch->poller = SPDK_POLLER_REGISTER(bdev_aio_group_poll, ch, 0);
691 	spdk_poller_register_interrupt(ch->poller, bdev_aio_poller_set_interrupt_mode, NULL);
692 
693 	return 0;
694 }
695 
696 static void
697 bdev_aio_group_destroy_cb(void *io_device, void *ctx_buf)
698 {
699 	struct bdev_aio_group_channel *ch = ctx_buf;
700 
701 	if (!TAILQ_EMPTY(&ch->io_ch_head)) {
702 		SPDK_ERRLOG("Group channel of bdev aio has uncleared io channel\n");
703 	}
704 
705 	spdk_poller_unregister(&ch->poller);
706 	if (spdk_interrupt_mode_is_enabled()) {
707 		bdev_aio_unregister_interrupt(ch);
708 	}
709 }
710 
711 int
712 create_aio_bdev(const char *name, const char *filename, uint32_t block_size)
713 {
714 	struct file_disk *fdisk;
715 	uint32_t detected_block_size;
716 	uint64_t disk_size;
717 	int rc;
718 
719 	fdisk = calloc(1, sizeof(*fdisk));
720 	if (!fdisk) {
721 		SPDK_ERRLOG("Unable to allocate enough memory for aio backend\n");
722 		return -ENOMEM;
723 	}
724 
725 	fdisk->filename = strdup(filename);
726 	if (!fdisk->filename) {
727 		rc = -ENOMEM;
728 		goto error_return;
729 	}
730 
731 	if (bdev_aio_open(fdisk)) {
732 		SPDK_ERRLOG("Unable to open file %s. fd: %d errno: %d\n", filename, fdisk->fd, errno);
733 		rc = -errno;
734 		goto error_return;
735 	}
736 
737 	disk_size = spdk_fd_get_size(fdisk->fd);
738 
739 	fdisk->disk.name = strdup(name);
740 	if (!fdisk->disk.name) {
741 		rc = -ENOMEM;
742 		goto error_return;
743 	}
744 	fdisk->disk.product_name = "AIO disk";
745 	fdisk->disk.module = &aio_if;
746 
747 	fdisk->disk.write_cache = 1;
748 
749 	detected_block_size = spdk_fd_get_blocklen(fdisk->fd);
750 	if (block_size == 0) {
751 		/* User did not specify block size - use autodetected block size. */
752 		if (detected_block_size == 0) {
753 			SPDK_ERRLOG("Block size could not be auto-detected\n");
754 			rc = -EINVAL;
755 			goto error_return;
756 		}
757 		fdisk->block_size_override = false;
758 		block_size = detected_block_size;
759 	} else {
760 		if (block_size < detected_block_size) {
761 			SPDK_ERRLOG("Specified block size %" PRIu32 " is smaller than "
762 				    "auto-detected block size %" PRIu32 "\n",
763 				    block_size, detected_block_size);
764 			rc = -EINVAL;
765 			goto error_return;
766 		} else if (detected_block_size != 0 && block_size != detected_block_size) {
767 			SPDK_WARNLOG("Specified block size %" PRIu32 " does not match "
768 				     "auto-detected block size %" PRIu32 "\n",
769 				     block_size, detected_block_size);
770 		}
771 		fdisk->block_size_override = true;
772 	}
773 
774 	if (block_size < 512) {
775 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be at least 512).\n", block_size);
776 		rc = -EINVAL;
777 		goto error_return;
778 	}
779 
780 	if (!spdk_u32_is_pow2(block_size)) {
781 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be a power of 2.)\n", block_size);
782 		rc = -EINVAL;
783 		goto error_return;
784 	}
785 
786 	fdisk->disk.blocklen = block_size;
787 	if (fdisk->block_size_override && detected_block_size) {
788 		fdisk->disk.required_alignment = spdk_u32log2(detected_block_size);
789 	} else {
790 		fdisk->disk.required_alignment = spdk_u32log2(block_size);
791 	}
792 
793 	if (disk_size % fdisk->disk.blocklen != 0) {
794 		SPDK_ERRLOG("Disk size %" PRIu64 " is not a multiple of block size %" PRIu32 "\n",
795 			    disk_size, fdisk->disk.blocklen);
796 		rc = -EINVAL;
797 		goto error_return;
798 	}
799 
800 	fdisk->disk.blockcnt = disk_size / fdisk->disk.blocklen;
801 	fdisk->disk.ctxt = fdisk;
802 
803 	fdisk->disk.fn_table = &aio_fn_table;
804 
805 	spdk_io_device_register(fdisk, bdev_aio_create_cb, bdev_aio_destroy_cb,
806 				sizeof(struct bdev_aio_io_channel),
807 				fdisk->disk.name);
808 	rc = spdk_bdev_register(&fdisk->disk);
809 	if (rc) {
810 		spdk_io_device_unregister(fdisk, NULL);
811 		goto error_return;
812 	}
813 
814 	TAILQ_INSERT_TAIL(&g_aio_disk_head, fdisk, link);
815 	return 0;
816 
817 error_return:
818 	bdev_aio_close(fdisk);
819 	aio_free_disk(fdisk);
820 	return rc;
821 }
822 
823 struct delete_aio_bdev_ctx {
824 	delete_aio_bdev_complete cb_fn;
825 	void *cb_arg;
826 };
827 
828 static void
829 aio_bdev_unregister_cb(void *arg, int bdeverrno)
830 {
831 	struct delete_aio_bdev_ctx *ctx = arg;
832 
833 	ctx->cb_fn(ctx->cb_arg, bdeverrno);
834 	free(ctx);
835 }
836 
837 void
838 bdev_aio_delete(struct spdk_bdev *bdev, delete_aio_bdev_complete cb_fn, void *cb_arg)
839 {
840 	struct delete_aio_bdev_ctx *ctx;
841 
842 	if (!bdev || bdev->module != &aio_if) {
843 		cb_fn(cb_arg, -ENODEV);
844 		return;
845 	}
846 
847 	ctx = calloc(1, sizeof(*ctx));
848 	if (ctx == NULL) {
849 		cb_fn(cb_arg, -ENOMEM);
850 		return;
851 	}
852 
853 	ctx->cb_fn = cb_fn;
854 	ctx->cb_arg = cb_arg;
855 	spdk_bdev_unregister(bdev, aio_bdev_unregister_cb, ctx);
856 }
857 
858 static int
859 bdev_aio_initialize(void)
860 {
861 	spdk_io_device_register(&aio_if, bdev_aio_group_create_cb, bdev_aio_group_destroy_cb,
862 				sizeof(struct bdev_aio_group_channel), "aio_module");
863 
864 	return 0;
865 }
866 
867 static void
868 bdev_aio_fini(void)
869 {
870 	spdk_io_device_unregister(&aio_if, NULL);
871 }
872 
873 SPDK_LOG_REGISTER_COMPONENT(aio)
874