xref: /spdk/module/bdev/aio/bdev_aio.c (revision 7506a7aa53d239f533af3bc768f0d2af55e735fe)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "bdev_aio.h"
36 
37 #include "spdk/stdinc.h"
38 
39 #include "spdk/barrier.h"
40 #include "spdk/bdev.h"
41 #include "spdk/bdev_module.h"
42 #include "spdk/env.h"
43 #include "spdk/fd.h"
44 #include "spdk/likely.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/util.h"
48 #include "spdk/string.h"
49 
50 #include "spdk/log.h"
51 
52 #include <sys/eventfd.h>
53 #include <libaio.h>
54 
55 struct bdev_aio_io_channel {
56 	uint64_t				io_inflight;
57 	io_context_t				io_ctx;
58 	struct bdev_aio_group_channel		*group_ch;
59 	TAILQ_ENTRY(bdev_aio_io_channel)	link;
60 };
61 
62 struct bdev_aio_group_channel {
63 	/* eventfd for io completion notification in interrupt mode.
64 	 * Negative value like '-1' indicates it is invalid or unused.
65 	 */
66 	int					efd;
67 	struct spdk_interrupt			*intr;
68 	struct spdk_poller			*poller;
69 	TAILQ_HEAD(, bdev_aio_io_channel)	io_ch_head;
70 };
71 
72 struct bdev_aio_task {
73 	struct iocb			iocb;
74 	uint64_t			len;
75 	struct bdev_aio_io_channel	*ch;
76 };
77 
78 struct file_disk {
79 	struct bdev_aio_task	*reset_task;
80 	struct spdk_poller	*reset_retry_timer;
81 	struct spdk_bdev	disk;
82 	char			*filename;
83 	int			fd;
84 	TAILQ_ENTRY(file_disk)  link;
85 	bool			block_size_override;
86 };
87 
88 /* For user space reaping of completions */
89 struct spdk_aio_ring {
90 	uint32_t id;
91 	uint32_t size;
92 	uint32_t head;
93 	uint32_t tail;
94 
95 	uint32_t version;
96 	uint32_t compat_features;
97 	uint32_t incompat_features;
98 	uint32_t header_length;
99 };
100 
101 #define SPDK_AIO_RING_VERSION	0xa10a10a1
102 
103 static int bdev_aio_initialize(void);
104 static void bdev_aio_fini(void);
105 static void aio_free_disk(struct file_disk *fdisk);
106 static TAILQ_HEAD(, file_disk) g_aio_disk_head = TAILQ_HEAD_INITIALIZER(g_aio_disk_head);
107 
108 #define SPDK_AIO_QUEUE_DEPTH 128
109 #define MAX_EVENTS_PER_POLL 32
110 
111 static int
112 bdev_aio_get_ctx_size(void)
113 {
114 	return sizeof(struct bdev_aio_task);
115 }
116 
117 static struct spdk_bdev_module aio_if = {
118 	.name		= "aio",
119 	.module_init	= bdev_aio_initialize,
120 	.module_fini	= bdev_aio_fini,
121 	.get_ctx_size	= bdev_aio_get_ctx_size,
122 };
123 
124 SPDK_BDEV_MODULE_REGISTER(aio, &aio_if)
125 
126 static int
127 bdev_aio_open(struct file_disk *disk)
128 {
129 	int fd;
130 
131 	fd = open(disk->filename, O_RDWR | O_DIRECT);
132 	if (fd < 0) {
133 		/* Try without O_DIRECT for non-disk files */
134 		fd = open(disk->filename, O_RDWR);
135 		if (fd < 0) {
136 			SPDK_ERRLOG("open() failed (file:%s), errno %d: %s\n",
137 				    disk->filename, errno, spdk_strerror(errno));
138 			disk->fd = -1;
139 			return -1;
140 		}
141 	}
142 
143 	disk->fd = fd;
144 
145 	return 0;
146 }
147 
148 static int
149 bdev_aio_close(struct file_disk *disk)
150 {
151 	int rc;
152 
153 	if (disk->fd == -1) {
154 		return 0;
155 	}
156 
157 	rc = close(disk->fd);
158 	if (rc < 0) {
159 		SPDK_ERRLOG("close() failed (fd=%d), errno %d: %s\n",
160 			    disk->fd, errno, spdk_strerror(errno));
161 		return -1;
162 	}
163 
164 	disk->fd = -1;
165 
166 	return 0;
167 }
168 
169 static void
170 bdev_aio_readv(struct file_disk *fdisk, struct spdk_io_channel *ch,
171 	       struct bdev_aio_task *aio_task,
172 	       struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
173 {
174 	struct iocb *iocb = &aio_task->iocb;
175 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
176 	int rc;
177 
178 	io_prep_preadv(iocb, fdisk->fd, iov, iovcnt, offset);
179 	if (aio_ch->group_ch->efd >= 0) {
180 		io_set_eventfd(iocb, aio_ch->group_ch->efd);
181 	}
182 	iocb->data = aio_task;
183 	aio_task->len = nbytes;
184 	aio_task->ch = aio_ch;
185 
186 	SPDK_DEBUGLOG(aio, "read %d iovs size %lu to off: %#lx\n",
187 		      iovcnt, nbytes, offset);
188 
189 	rc = io_submit(aio_ch->io_ctx, 1, &iocb);
190 	if (spdk_unlikely(rc < 0)) {
191 		if (rc == -EAGAIN) {
192 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_NOMEM);
193 		} else {
194 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), rc);
195 			SPDK_ERRLOG("%s: io_submit returned %d\n", __func__, rc);
196 		}
197 	} else {
198 		aio_ch->io_inflight++;
199 	}
200 }
201 
202 static void
203 bdev_aio_writev(struct file_disk *fdisk, struct spdk_io_channel *ch,
204 		struct bdev_aio_task *aio_task,
205 		struct iovec *iov, int iovcnt, size_t len, uint64_t offset)
206 {
207 	struct iocb *iocb = &aio_task->iocb;
208 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
209 	int rc;
210 
211 	io_prep_pwritev(iocb, fdisk->fd, iov, iovcnt, offset);
212 	if (aio_ch->group_ch->efd >= 0) {
213 		io_set_eventfd(iocb, aio_ch->group_ch->efd);
214 	}
215 	iocb->data = aio_task;
216 	aio_task->len = len;
217 	aio_task->ch = aio_ch;
218 
219 	SPDK_DEBUGLOG(aio, "write %d iovs size %lu from off: %#lx\n",
220 		      iovcnt, len, offset);
221 
222 	rc = io_submit(aio_ch->io_ctx, 1, &iocb);
223 	if (spdk_unlikely(rc < 0)) {
224 		if (rc == -EAGAIN) {
225 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_NOMEM);
226 		} else {
227 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), rc);
228 			SPDK_ERRLOG("%s: io_submit returned %d\n", __func__, rc);
229 		}
230 	} else {
231 		aio_ch->io_inflight++;
232 	}
233 }
234 
235 static void
236 bdev_aio_flush(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
237 {
238 	int rc = fsync(fdisk->fd);
239 
240 	if (rc == 0) {
241 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
242 	} else {
243 		spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -errno);
244 	}
245 }
246 
247 static void
248 bdev_aio_destruct_cb(void *io_device)
249 {
250 	struct file_disk *fdisk = io_device;
251 	int rc = 0;
252 
253 	TAILQ_REMOVE(&g_aio_disk_head, fdisk, link);
254 	rc = bdev_aio_close(fdisk);
255 	if (rc < 0) {
256 		SPDK_ERRLOG("bdev_aio_close() failed\n");
257 	}
258 
259 	aio_free_disk(fdisk);
260 }
261 
262 static int
263 bdev_aio_destruct(void *ctx)
264 {
265 	struct file_disk *fdisk = ctx;
266 
267 	spdk_io_device_unregister(fdisk, bdev_aio_destruct_cb);
268 
269 	return 0;
270 }
271 
272 static int
273 bdev_user_io_getevents(io_context_t io_ctx, unsigned int max, struct io_event *uevents)
274 {
275 	uint32_t head, tail, count;
276 	struct spdk_aio_ring *ring;
277 	struct timespec timeout;
278 	struct io_event *kevents;
279 
280 	ring = (struct spdk_aio_ring *)io_ctx;
281 
282 	if (spdk_unlikely(ring->version != SPDK_AIO_RING_VERSION || ring->incompat_features != 0)) {
283 		timeout.tv_sec = 0;
284 		timeout.tv_nsec = 0;
285 
286 		return io_getevents(io_ctx, 0, max, uevents, &timeout);
287 	}
288 
289 	/* Read the current state out of the ring */
290 	head = ring->head;
291 	tail = ring->tail;
292 
293 	/* This memory barrier is required to prevent the loads above
294 	 * from being re-ordered with stores to the events array
295 	 * potentially occurring on other threads. */
296 	spdk_smp_rmb();
297 
298 	/* Calculate how many items are in the circular ring */
299 	count = tail - head;
300 	if (tail < head) {
301 		count += ring->size;
302 	}
303 
304 	/* Reduce the count to the limit provided by the user */
305 	count = spdk_min(max, count);
306 
307 	/* Grab the memory location of the event array */
308 	kevents = (struct io_event *)((uintptr_t)ring + ring->header_length);
309 
310 	/* Copy the events out of the ring. */
311 	if ((head + count) <= ring->size) {
312 		/* Only one copy is required */
313 		memcpy(uevents, &kevents[head], count * sizeof(struct io_event));
314 	} else {
315 		uint32_t first_part = ring->size - head;
316 		/* Two copies are required */
317 		memcpy(uevents, &kevents[head], first_part * sizeof(struct io_event));
318 		memcpy(&uevents[first_part], &kevents[0], (count - first_part) * sizeof(struct io_event));
319 	}
320 
321 	/* Update the head pointer. On x86, stores will not be reordered with older loads,
322 	 * so the copies out of the event array will always be complete prior to this
323 	 * update becoming visible. On other architectures this is not guaranteed, so
324 	 * add a barrier. */
325 #if defined(__i386__) || defined(__x86_64__)
326 	spdk_compiler_barrier();
327 #else
328 	spdk_smp_mb();
329 #endif
330 	ring->head = (head + count) % ring->size;
331 
332 	return count;
333 }
334 
335 static int
336 bdev_aio_io_channel_poll(struct bdev_aio_io_channel *io_ch)
337 {
338 	int nr, i = 0;
339 	struct bdev_aio_task *aio_task;
340 	struct io_event events[SPDK_AIO_QUEUE_DEPTH];
341 	uint64_t io_result;
342 
343 	nr = bdev_user_io_getevents(io_ch->io_ctx, SPDK_AIO_QUEUE_DEPTH, events);
344 
345 	if (nr < 0) {
346 		return 0;
347 	}
348 
349 #define MAX_AIO_ERRNO 256
350 	for (i = 0; i < nr; i++) {
351 		aio_task = events[i].data;
352 		aio_task->ch->io_inflight--;
353 		io_result = events[i].res;
354 		if (io_result == aio_task->len) {
355 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
356 		} else if (io_result < MAX_AIO_ERRNO) {
357 			/* Linux AIO will return its errno to io_event.res */
358 			int aio_errno = io_result;
359 
360 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -aio_errno);
361 		} else {
362 			SPDK_ERRLOG("failed to complete aio: rc %"PRId64"\n", events[i].res);
363 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
364 		}
365 	}
366 
367 	return nr;
368 }
369 
370 static int
371 bdev_aio_group_poll(void *arg)
372 {
373 	struct bdev_aio_group_channel *group_ch = arg;
374 	struct bdev_aio_io_channel *io_ch;
375 	int nr = 0;
376 
377 	TAILQ_FOREACH(io_ch, &group_ch->io_ch_head, link) {
378 		nr += bdev_aio_io_channel_poll(io_ch);
379 	}
380 
381 	return nr > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
382 }
383 
384 static int
385 bdev_aio_group_interrupt(void *arg)
386 {
387 	struct bdev_aio_group_channel *group_ch = arg;
388 	int rc;
389 	uint64_t num_events;
390 
391 	assert(group_ch->efd >= 0);
392 
393 	/* if completed IO number is larger than SPDK_AIO_QUEUE_DEPTH,
394 	 * io_getevent should be called again to ensure all completed IO are processed.
395 	 */
396 	rc = read(group_ch->efd, &num_events, sizeof(num_events));
397 	if (rc < 0) {
398 		SPDK_ERRLOG("failed to acknowledge aio group: %s.\n", spdk_strerror(errno));
399 		return -errno;
400 	}
401 
402 	if (num_events > SPDK_AIO_QUEUE_DEPTH) {
403 		num_events -= SPDK_AIO_QUEUE_DEPTH;
404 		rc = write(group_ch->efd, &num_events, sizeof(num_events));
405 		if (rc < 0) {
406 			SPDK_ERRLOG("failed to notify aio group: %s.\n", spdk_strerror(errno));
407 		}
408 	}
409 
410 	return bdev_aio_group_poll(group_ch);
411 }
412 
413 static void
414 _bdev_aio_get_io_inflight(struct spdk_io_channel_iter *i)
415 {
416 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
417 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
418 
419 	if (aio_ch->io_inflight) {
420 		spdk_for_each_channel_continue(i, -1);
421 		return;
422 	}
423 
424 	spdk_for_each_channel_continue(i, 0);
425 }
426 
427 static int bdev_aio_reset_retry_timer(void *arg);
428 
429 static void
430 _bdev_aio_get_io_inflight_done(struct spdk_io_channel_iter *i, int status)
431 {
432 	struct file_disk *fdisk = spdk_io_channel_iter_get_ctx(i);
433 
434 	if (status == -1) {
435 		fdisk->reset_retry_timer = SPDK_POLLER_REGISTER(bdev_aio_reset_retry_timer, fdisk, 500);
436 		return;
437 	}
438 
439 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(fdisk->reset_task), SPDK_BDEV_IO_STATUS_SUCCESS);
440 }
441 
442 static int
443 bdev_aio_reset_retry_timer(void *arg)
444 {
445 	struct file_disk *fdisk = arg;
446 
447 	if (fdisk->reset_retry_timer) {
448 		spdk_poller_unregister(&fdisk->reset_retry_timer);
449 	}
450 
451 	spdk_for_each_channel(fdisk,
452 			      _bdev_aio_get_io_inflight,
453 			      fdisk,
454 			      _bdev_aio_get_io_inflight_done);
455 
456 	return SPDK_POLLER_BUSY;
457 }
458 
459 static void
460 bdev_aio_reset(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
461 {
462 	fdisk->reset_task = aio_task;
463 
464 	bdev_aio_reset_retry_timer(fdisk);
465 }
466 
467 static void
468 bdev_aio_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
469 		    bool success)
470 {
471 	if (!success) {
472 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
473 		return;
474 	}
475 
476 	switch (bdev_io->type) {
477 	case SPDK_BDEV_IO_TYPE_READ:
478 		bdev_aio_readv((struct file_disk *)bdev_io->bdev->ctxt,
479 			       ch,
480 			       (struct bdev_aio_task *)bdev_io->driver_ctx,
481 			       bdev_io->u.bdev.iovs,
482 			       bdev_io->u.bdev.iovcnt,
483 			       bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
484 			       bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
485 		break;
486 	case SPDK_BDEV_IO_TYPE_WRITE:
487 		bdev_aio_writev((struct file_disk *)bdev_io->bdev->ctxt,
488 				ch,
489 				(struct bdev_aio_task *)bdev_io->driver_ctx,
490 				bdev_io->u.bdev.iovs,
491 				bdev_io->u.bdev.iovcnt,
492 				bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
493 				bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
494 		break;
495 	default:
496 		SPDK_ERRLOG("Wrong io type\n");
497 		break;
498 	}
499 }
500 
501 static int _bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
502 {
503 	switch (bdev_io->type) {
504 	/* Read and write operations must be performed on buffers aligned to
505 	 * bdev->required_alignment. If user specified unaligned buffers,
506 	 * get the aligned buffer from the pool by calling spdk_bdev_io_get_buf. */
507 	case SPDK_BDEV_IO_TYPE_READ:
508 	case SPDK_BDEV_IO_TYPE_WRITE:
509 		spdk_bdev_io_get_buf(bdev_io, bdev_aio_get_buf_cb,
510 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
511 		return 0;
512 	case SPDK_BDEV_IO_TYPE_FLUSH:
513 		bdev_aio_flush((struct file_disk *)bdev_io->bdev->ctxt,
514 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
515 		return 0;
516 
517 	case SPDK_BDEV_IO_TYPE_RESET:
518 		bdev_aio_reset((struct file_disk *)bdev_io->bdev->ctxt,
519 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
520 		return 0;
521 	default:
522 		return -1;
523 	}
524 }
525 
526 static void bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
527 {
528 	if (_bdev_aio_submit_request(ch, bdev_io) < 0) {
529 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
530 	}
531 }
532 
533 static bool
534 bdev_aio_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
535 {
536 	switch (io_type) {
537 	case SPDK_BDEV_IO_TYPE_READ:
538 	case SPDK_BDEV_IO_TYPE_WRITE:
539 	case SPDK_BDEV_IO_TYPE_FLUSH:
540 	case SPDK_BDEV_IO_TYPE_RESET:
541 		return true;
542 
543 	default:
544 		return false;
545 	}
546 }
547 
548 static int
549 bdev_aio_create_cb(void *io_device, void *ctx_buf)
550 {
551 	struct bdev_aio_io_channel *ch = ctx_buf;
552 
553 	if (io_setup(SPDK_AIO_QUEUE_DEPTH, &ch->io_ctx) < 0) {
554 		SPDK_ERRLOG("async I/O context setup failure\n");
555 		return -1;
556 	}
557 
558 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&aio_if));
559 	TAILQ_INSERT_TAIL(&ch->group_ch->io_ch_head, ch, link);
560 
561 	return 0;
562 }
563 
564 static void
565 bdev_aio_destroy_cb(void *io_device, void *ctx_buf)
566 {
567 	struct bdev_aio_io_channel *ch = ctx_buf;
568 
569 	io_destroy(ch->io_ctx);
570 
571 	assert(ch->group_ch);
572 	TAILQ_REMOVE(&ch->group_ch->io_ch_head, ch, link);
573 
574 	spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
575 }
576 
577 static struct spdk_io_channel *
578 bdev_aio_get_io_channel(void *ctx)
579 {
580 	struct file_disk *fdisk = ctx;
581 
582 	return spdk_get_io_channel(fdisk);
583 }
584 
585 
586 static int
587 bdev_aio_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
588 {
589 	struct file_disk *fdisk = ctx;
590 
591 	spdk_json_write_named_object_begin(w, "aio");
592 
593 	spdk_json_write_named_string(w, "filename", fdisk->filename);
594 
595 	spdk_json_write_object_end(w);
596 
597 	return 0;
598 }
599 
600 static void
601 bdev_aio_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
602 {
603 	struct file_disk *fdisk = bdev->ctxt;
604 
605 	spdk_json_write_object_begin(w);
606 
607 	spdk_json_write_named_string(w, "method", "bdev_aio_create");
608 
609 	spdk_json_write_named_object_begin(w, "params");
610 	spdk_json_write_named_string(w, "name", bdev->name);
611 	if (fdisk->block_size_override) {
612 		spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
613 	}
614 	spdk_json_write_named_string(w, "filename", fdisk->filename);
615 	spdk_json_write_object_end(w);
616 
617 	spdk_json_write_object_end(w);
618 }
619 
620 static const struct spdk_bdev_fn_table aio_fn_table = {
621 	.destruct		= bdev_aio_destruct,
622 	.submit_request		= bdev_aio_submit_request,
623 	.io_type_supported	= bdev_aio_io_type_supported,
624 	.get_io_channel		= bdev_aio_get_io_channel,
625 	.dump_info_json		= bdev_aio_dump_info_json,
626 	.write_config_json	= bdev_aio_write_json_config,
627 };
628 
629 static void aio_free_disk(struct file_disk *fdisk)
630 {
631 	if (fdisk == NULL) {
632 		return;
633 	}
634 	free(fdisk->filename);
635 	free(fdisk->disk.name);
636 	free(fdisk);
637 }
638 
639 static int
640 bdev_aio_register_interrupt(struct bdev_aio_group_channel *ch)
641 {
642 	int efd;
643 
644 	efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
645 	if (efd < 0) {
646 		return -1;
647 	}
648 
649 	ch->intr = SPDK_INTERRUPT_REGISTER(efd, bdev_aio_group_interrupt, ch);
650 	if (ch->intr == NULL) {
651 		close(efd);
652 		return -1;
653 	}
654 	ch->efd = efd;
655 
656 	return 0;
657 }
658 
659 static void
660 bdev_aio_unregister_interrupt(struct bdev_aio_group_channel *ch)
661 {
662 	spdk_interrupt_unregister(&ch->intr);
663 	close(ch->efd);
664 	ch->efd = -1;
665 }
666 
667 static void
668 bdev_aio_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
669 {
670 	return;
671 }
672 
673 static int
674 bdev_aio_group_create_cb(void *io_device, void *ctx_buf)
675 {
676 	struct bdev_aio_group_channel *ch = ctx_buf;
677 	int rc;
678 
679 	TAILQ_INIT(&ch->io_ch_head);
680 	/* Initialize ch->efd to be invalid and unused. */
681 	ch->efd = -1;
682 	if (spdk_interrupt_mode_is_enabled()) {
683 		rc = bdev_aio_register_interrupt(ch);
684 		if (rc < 0) {
685 			SPDK_ERRLOG("Failed to prepare intr resource to bdev_aio\n");
686 			return rc;
687 		}
688 	}
689 
690 	ch->poller = SPDK_POLLER_REGISTER(bdev_aio_group_poll, ch, 0);
691 	spdk_poller_register_interrupt(ch->poller, bdev_aio_poller_set_interrupt_mode, NULL);
692 
693 	return 0;
694 }
695 
696 static void
697 bdev_aio_group_destroy_cb(void *io_device, void *ctx_buf)
698 {
699 	struct bdev_aio_group_channel *ch = ctx_buf;
700 
701 	if (!TAILQ_EMPTY(&ch->io_ch_head)) {
702 		SPDK_ERRLOG("Group channel of bdev aio has uncleared io channel\n");
703 	}
704 
705 	spdk_poller_unregister(&ch->poller);
706 	if (spdk_interrupt_mode_is_enabled()) {
707 		bdev_aio_unregister_interrupt(ch);
708 	}
709 }
710 
711 int
712 create_aio_bdev(const char *name, const char *filename, uint32_t block_size)
713 {
714 	struct file_disk *fdisk;
715 	uint32_t detected_block_size;
716 	uint64_t disk_size;
717 	int rc;
718 
719 	fdisk = calloc(1, sizeof(*fdisk));
720 	if (!fdisk) {
721 		SPDK_ERRLOG("Unable to allocate enough memory for aio backend\n");
722 		return -ENOMEM;
723 	}
724 
725 	fdisk->filename = strdup(filename);
726 	if (!fdisk->filename) {
727 		rc = -ENOMEM;
728 		goto error_return;
729 	}
730 
731 	if (bdev_aio_open(fdisk)) {
732 		SPDK_ERRLOG("Unable to open file %s. fd: %d errno: %d\n", filename, fdisk->fd, errno);
733 		rc = -errno;
734 		goto error_return;
735 	}
736 
737 	disk_size = spdk_fd_get_size(fdisk->fd);
738 
739 	fdisk->disk.name = strdup(name);
740 	if (!fdisk->disk.name) {
741 		rc = -ENOMEM;
742 		goto error_return;
743 	}
744 	fdisk->disk.product_name = "AIO disk";
745 	fdisk->disk.module = &aio_if;
746 
747 	fdisk->disk.write_cache = 1;
748 
749 	detected_block_size = spdk_fd_get_blocklen(fdisk->fd);
750 	if (block_size == 0) {
751 		/* User did not specify block size - use autodetected block size. */
752 		if (detected_block_size == 0) {
753 			SPDK_ERRLOG("Block size could not be auto-detected\n");
754 			rc = -EINVAL;
755 			goto error_return;
756 		}
757 		fdisk->block_size_override = false;
758 		block_size = detected_block_size;
759 	} else {
760 		if (block_size < detected_block_size) {
761 			SPDK_ERRLOG("Specified block size %" PRIu32 " is smaller than "
762 				    "auto-detected block size %" PRIu32 "\n",
763 				    block_size, detected_block_size);
764 			rc = -EINVAL;
765 			goto error_return;
766 		} else if (detected_block_size != 0 && block_size != detected_block_size) {
767 			SPDK_WARNLOG("Specified block size %" PRIu32 " does not match "
768 				     "auto-detected block size %" PRIu32 "\n",
769 				     block_size, detected_block_size);
770 		}
771 		fdisk->block_size_override = true;
772 	}
773 
774 	if (block_size < 512) {
775 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be at least 512).\n", block_size);
776 		rc = -EINVAL;
777 		goto error_return;
778 	}
779 
780 	if (!spdk_u32_is_pow2(block_size)) {
781 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be a power of 2.)\n", block_size);
782 		rc = -EINVAL;
783 		goto error_return;
784 	}
785 
786 	fdisk->disk.blocklen = block_size;
787 	if (fdisk->block_size_override && detected_block_size) {
788 		fdisk->disk.required_alignment = spdk_u32log2(detected_block_size);
789 	} else {
790 		fdisk->disk.required_alignment = spdk_u32log2(block_size);
791 	}
792 
793 	if (disk_size % fdisk->disk.blocklen != 0) {
794 		SPDK_ERRLOG("Disk size %" PRIu64 " is not a multiple of block size %" PRIu32 "\n",
795 			    disk_size, fdisk->disk.blocklen);
796 		rc = -EINVAL;
797 		goto error_return;
798 	}
799 
800 	fdisk->disk.blockcnt = disk_size / fdisk->disk.blocklen;
801 	fdisk->disk.ctxt = fdisk;
802 
803 	fdisk->disk.fn_table = &aio_fn_table;
804 
805 	spdk_io_device_register(fdisk, bdev_aio_create_cb, bdev_aio_destroy_cb,
806 				sizeof(struct bdev_aio_io_channel),
807 				fdisk->disk.name);
808 	rc = spdk_bdev_register(&fdisk->disk);
809 	if (rc) {
810 		spdk_io_device_unregister(fdisk, NULL);
811 		goto error_return;
812 	}
813 
814 	TAILQ_INSERT_TAIL(&g_aio_disk_head, fdisk, link);
815 	return 0;
816 
817 error_return:
818 	bdev_aio_close(fdisk);
819 	aio_free_disk(fdisk);
820 	return rc;
821 }
822 
823 static void
824 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
825 {
826 }
827 
828 int
829 bdev_aio_rescan(const char *name)
830 {
831 	struct spdk_bdev_desc *desc;
832 	struct spdk_bdev *bdev;
833 	struct file_disk *fdisk;
834 	uint64_t disk_size, blockcnt;
835 	int rc;
836 
837 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
838 	if (rc != 0) {
839 		return rc;
840 	}
841 
842 	bdev = spdk_bdev_desc_get_bdev(desc);
843 	if (bdev->module != &aio_if) {
844 		rc = -ENODEV;
845 		goto exit;
846 	}
847 
848 	fdisk = SPDK_CONTAINEROF(bdev, struct file_disk, disk);
849 	disk_size = spdk_fd_get_size(fdisk->fd);
850 	blockcnt = disk_size / bdev->blocklen;
851 
852 	if (bdev->blockcnt != blockcnt) {
853 		SPDK_NOTICELOG("AIO device is resized: bdev name %s, old block count %" PRIu64 ", new block count %"
854 			       PRIu64 "\n",
855 			       fdisk->filename,
856 			       bdev->blockcnt,
857 			       blockcnt);
858 		rc = spdk_bdev_notify_blockcnt_change(bdev, blockcnt);
859 		if (rc != 0) {
860 			SPDK_ERRLOG("Could not change num blocks for aio bdev: name %s, errno: %d.\n",
861 				    fdisk->filename, rc);
862 			goto exit;
863 		}
864 	}
865 
866 exit:
867 	spdk_bdev_close(desc);
868 	return rc;
869 }
870 
871 struct delete_aio_bdev_ctx {
872 	delete_aio_bdev_complete cb_fn;
873 	void *cb_arg;
874 };
875 
876 static void
877 aio_bdev_unregister_cb(void *arg, int bdeverrno)
878 {
879 	struct delete_aio_bdev_ctx *ctx = arg;
880 
881 	ctx->cb_fn(ctx->cb_arg, bdeverrno);
882 	free(ctx);
883 }
884 
885 void
886 bdev_aio_delete(const char *name, delete_aio_bdev_complete cb_fn, void *cb_arg)
887 {
888 	struct delete_aio_bdev_ctx *ctx;
889 	int rc;
890 
891 	ctx = calloc(1, sizeof(*ctx));
892 	if (ctx == NULL) {
893 		cb_fn(cb_arg, -ENOMEM);
894 		return;
895 	}
896 
897 	ctx->cb_fn = cb_fn;
898 	ctx->cb_arg = cb_arg;
899 	rc = spdk_bdev_unregister_by_name(name, &aio_if, aio_bdev_unregister_cb, ctx);
900 	if (rc != 0) {
901 		aio_bdev_unregister_cb(ctx, rc);
902 	}
903 }
904 
905 static int
906 bdev_aio_initialize(void)
907 {
908 	spdk_io_device_register(&aio_if, bdev_aio_group_create_cb, bdev_aio_group_destroy_cb,
909 				sizeof(struct bdev_aio_group_channel), "aio_module");
910 
911 	return 0;
912 }
913 
914 static void
915 bdev_aio_fini(void)
916 {
917 	spdk_io_device_unregister(&aio_if, NULL);
918 }
919 
920 SPDK_LOG_REGISTER_COMPONENT(aio)
921