xref: /spdk/module/bdev/aio/bdev_aio.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_aio.h"
8 
9 #include "spdk/stdinc.h"
10 
11 #include "spdk/barrier.h"
12 #include "spdk/bdev.h"
13 #include "spdk/bdev_module.h"
14 #include "spdk/env.h"
15 #include "spdk/fd.h"
16 #include "spdk/likely.h"
17 #include "spdk/thread.h"
18 #include "spdk/json.h"
19 #include "spdk/util.h"
20 #include "spdk/string.h"
21 
22 #include "spdk/log.h"
23 
24 #include <sys/eventfd.h>
25 
26 #ifndef __FreeBSD__
27 #include <libaio.h>
28 #endif
29 
30 struct bdev_aio_io_channel {
31 	uint64_t				io_inflight;
32 #ifdef __FreeBSD__
33 	int					kqfd;
34 #else
35 	io_context_t				io_ctx;
36 #endif
37 	struct bdev_aio_group_channel		*group_ch;
38 	TAILQ_ENTRY(bdev_aio_io_channel)	link;
39 };
40 
41 struct bdev_aio_group_channel {
42 	/* eventfd for io completion notification in interrupt mode.
43 	 * Negative value like '-1' indicates it is invalid or unused.
44 	 */
45 	int					efd;
46 	struct spdk_interrupt			*intr;
47 	struct spdk_poller			*poller;
48 	TAILQ_HEAD(, bdev_aio_io_channel)	io_ch_head;
49 };
50 
51 struct bdev_aio_task {
52 #ifdef __FreeBSD__
53 	struct aiocb			aiocb;
54 #else
55 	struct iocb			iocb;
56 #endif
57 	uint64_t			len;
58 	struct bdev_aio_io_channel	*ch;
59 };
60 
61 struct file_disk {
62 	struct bdev_aio_task	*reset_task;
63 	struct spdk_poller	*reset_retry_timer;
64 	struct spdk_bdev	disk;
65 	char			*filename;
66 	int			fd;
67 	TAILQ_ENTRY(file_disk)  link;
68 	bool			block_size_override;
69 	bool			readonly;
70 };
71 
72 /* For user space reaping of completions */
73 struct spdk_aio_ring {
74 	uint32_t id;
75 	uint32_t size;
76 	uint32_t head;
77 	uint32_t tail;
78 
79 	uint32_t version;
80 	uint32_t compat_features;
81 	uint32_t incompat_features;
82 	uint32_t header_length;
83 };
84 
85 #define SPDK_AIO_RING_VERSION	0xa10a10a1
86 
87 static int bdev_aio_initialize(void);
88 static void bdev_aio_fini(void);
89 static void aio_free_disk(struct file_disk *fdisk);
90 static TAILQ_HEAD(, file_disk) g_aio_disk_head = TAILQ_HEAD_INITIALIZER(g_aio_disk_head);
91 
92 #define SPDK_AIO_QUEUE_DEPTH 128
93 #define MAX_EVENTS_PER_POLL 32
94 
95 static int
96 bdev_aio_get_ctx_size(void)
97 {
98 	return sizeof(struct bdev_aio_task);
99 }
100 
101 static struct spdk_bdev_module aio_if = {
102 	.name		= "aio",
103 	.module_init	= bdev_aio_initialize,
104 	.module_fini	= bdev_aio_fini,
105 	.get_ctx_size	= bdev_aio_get_ctx_size,
106 };
107 
108 SPDK_BDEV_MODULE_REGISTER(aio, &aio_if)
109 
110 static int
111 bdev_aio_open(struct file_disk *disk)
112 {
113 	int fd;
114 	int io_flag = disk->readonly ? O_RDONLY : O_RDWR;
115 
116 	fd = open(disk->filename, io_flag | O_DIRECT);
117 	if (fd < 0) {
118 		/* Try without O_DIRECT for non-disk files */
119 		fd = open(disk->filename, io_flag);
120 		if (fd < 0) {
121 			SPDK_ERRLOG("open() failed (file:%s), errno %d: %s\n",
122 				    disk->filename, errno, spdk_strerror(errno));
123 			disk->fd = -1;
124 			return -1;
125 		}
126 	}
127 
128 	disk->fd = fd;
129 
130 	return 0;
131 }
132 
133 static int
134 bdev_aio_close(struct file_disk *disk)
135 {
136 	int rc;
137 
138 	if (disk->fd == -1) {
139 		return 0;
140 	}
141 
142 	rc = close(disk->fd);
143 	if (rc < 0) {
144 		SPDK_ERRLOG("close() failed (fd=%d), errno %d: %s\n",
145 			    disk->fd, errno, spdk_strerror(errno));
146 		return -1;
147 	}
148 
149 	disk->fd = -1;
150 
151 	return 0;
152 }
153 
154 #ifdef __FreeBSD__
155 static int
156 bdev_aio_submit_io(enum spdk_bdev_io_type type, struct file_disk *fdisk,
157 		   struct spdk_io_channel *ch, struct bdev_aio_task *aio_task,
158 		   struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
159 {
160 	struct aiocb *aiocb = &aio_task->aiocb;
161 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
162 
163 	memset(aiocb, 0, sizeof(struct aiocb));
164 	aiocb->aio_fildes = fdisk->fd;
165 	aiocb->aio_iov = iov;
166 	aiocb->aio_iovcnt = iovcnt;
167 	aiocb->aio_offset = offset;
168 	aiocb->aio_sigevent.sigev_notify_kqueue = aio_ch->kqfd;
169 	aiocb->aio_sigevent.sigev_value.sival_ptr = aio_task;
170 	aiocb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
171 
172 	aio_task->len = nbytes;
173 	aio_task->ch = aio_ch;
174 
175 	if (type == SPDK_BDEV_IO_TYPE_READ) {
176 		return aio_readv(aiocb);
177 	}
178 
179 	return aio_writev(aiocb);
180 }
181 #else
182 static int
183 bdev_aio_submit_io(enum spdk_bdev_io_type type, struct file_disk *fdisk,
184 		   struct spdk_io_channel *ch, struct bdev_aio_task *aio_task,
185 		   struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
186 {
187 	struct iocb *iocb = &aio_task->iocb;
188 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
189 
190 	if (type == SPDK_BDEV_IO_TYPE_READ) {
191 		io_prep_preadv(iocb, fdisk->fd, iov, iovcnt, offset);
192 	} else {
193 		io_prep_pwritev(iocb, fdisk->fd, iov, iovcnt, offset);
194 	}
195 
196 	if (aio_ch->group_ch->efd >= 0) {
197 		io_set_eventfd(iocb, aio_ch->group_ch->efd);
198 	}
199 	iocb->data = aio_task;
200 	aio_task->len = nbytes;
201 	aio_task->ch = aio_ch;
202 
203 	return io_submit(aio_ch->io_ctx, 1, &iocb);
204 }
205 #endif
206 
207 static void
208 bdev_aio_rw(enum spdk_bdev_io_type type, struct file_disk *fdisk,
209 	    struct spdk_io_channel *ch, struct bdev_aio_task *aio_task,
210 	    struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
211 {
212 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
213 	int rc;
214 
215 	if (type == SPDK_BDEV_IO_TYPE_READ) {
216 		SPDK_DEBUGLOG(aio, "read %d iovs size %lu to off: %#lx\n",
217 			      iovcnt, nbytes, offset);
218 	} else {
219 		SPDK_DEBUGLOG(aio, "write %d iovs size %lu from off: %#lx\n",
220 			      iovcnt, nbytes, offset);
221 	}
222 
223 	rc = bdev_aio_submit_io(type, fdisk, ch, aio_task, iov, iovcnt, nbytes, offset);
224 	if (spdk_unlikely(rc < 0)) {
225 		if (rc == -EAGAIN) {
226 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_NOMEM);
227 		} else {
228 			spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), rc);
229 			SPDK_ERRLOG("%s: io_submit returned %d\n", __func__, rc);
230 		}
231 	} else {
232 		aio_ch->io_inflight++;
233 	}
234 }
235 
236 static void
237 bdev_aio_flush(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
238 {
239 	int rc = fsync(fdisk->fd);
240 
241 	if (rc == 0) {
242 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
243 	} else {
244 		spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), -errno);
245 	}
246 }
247 
248 static void
249 bdev_aio_destruct_cb(void *io_device)
250 {
251 	struct file_disk *fdisk = io_device;
252 	int rc = 0;
253 
254 	TAILQ_REMOVE(&g_aio_disk_head, fdisk, link);
255 	rc = bdev_aio_close(fdisk);
256 	if (rc < 0) {
257 		SPDK_ERRLOG("bdev_aio_close() failed\n");
258 	}
259 	aio_free_disk(fdisk);
260 }
261 
262 static int
263 bdev_aio_destruct(void *ctx)
264 {
265 	struct file_disk *fdisk = ctx;
266 
267 	spdk_io_device_unregister(fdisk, bdev_aio_destruct_cb);
268 
269 	return 0;
270 }
271 
272 #ifdef __FreeBSD__
273 static int
274 bdev_user_io_getevents(int kq, unsigned int max, struct kevent *events)
275 {
276 	struct timespec ts;
277 	int count;
278 
279 	memset(events, 0, max * sizeof(struct kevent));
280 	memset(&ts, 0, sizeof(ts));
281 
282 	count = kevent(kq, NULL, 0, events, max, &ts);
283 	if (count < 0) {
284 		SPDK_ERRLOG("failed to get kevents: %s.\n", spdk_strerror(errno));
285 		return -errno;
286 	}
287 
288 	return count;
289 }
290 
291 static int
292 bdev_aio_io_channel_poll(struct bdev_aio_io_channel *io_ch)
293 {
294 	int nr, i, res = 0;
295 	struct bdev_aio_task *aio_task;
296 	struct kevent events[SPDK_AIO_QUEUE_DEPTH];
297 
298 	nr = bdev_user_io_getevents(io_ch->kqfd, SPDK_AIO_QUEUE_DEPTH, events);
299 	if (nr < 0) {
300 		return 0;
301 	}
302 
303 	for (i = 0; i < nr; i++) {
304 		aio_task = events[i].udata;
305 		aio_task->ch->io_inflight--;
306 		if (aio_task == NULL) {
307 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
308 			break;
309 		} else if ((uint64_t)aio_return(&aio_task->aiocb) == aio_task->len) {
310 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
311 		} else {
312 			SPDK_ERRLOG("failed to complete aio: rc %d\n", aio_error(&aio_task->aiocb));
313 			res = aio_error(&aio_task->aiocb);
314 			if (res != 0) {
315 				spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), res);
316 			} else {
317 				spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
318 			}
319 		}
320 	}
321 
322 	return nr;
323 }
324 #else
325 static int
326 bdev_user_io_getevents(io_context_t io_ctx, unsigned int max, struct io_event *uevents)
327 {
328 	uint32_t head, tail, count;
329 	struct spdk_aio_ring *ring;
330 	struct timespec timeout;
331 	struct io_event *kevents;
332 
333 	ring = (struct spdk_aio_ring *)io_ctx;
334 
335 	if (spdk_unlikely(ring->version != SPDK_AIO_RING_VERSION || ring->incompat_features != 0)) {
336 		timeout.tv_sec = 0;
337 		timeout.tv_nsec = 0;
338 
339 		return io_getevents(io_ctx, 0, max, uevents, &timeout);
340 	}
341 
342 	/* Read the current state out of the ring */
343 	head = ring->head;
344 	tail = ring->tail;
345 
346 	/* This memory barrier is required to prevent the loads above
347 	 * from being re-ordered with stores to the events array
348 	 * potentially occurring on other threads. */
349 	spdk_smp_rmb();
350 
351 	/* Calculate how many items are in the circular ring */
352 	count = tail - head;
353 	if (tail < head) {
354 		count += ring->size;
355 	}
356 
357 	/* Reduce the count to the limit provided by the user */
358 	count = spdk_min(max, count);
359 
360 	/* Grab the memory location of the event array */
361 	kevents = (struct io_event *)((uintptr_t)ring + ring->header_length);
362 
363 	/* Copy the events out of the ring. */
364 	if ((head + count) <= ring->size) {
365 		/* Only one copy is required */
366 		memcpy(uevents, &kevents[head], count * sizeof(struct io_event));
367 	} else {
368 		uint32_t first_part = ring->size - head;
369 		/* Two copies are required */
370 		memcpy(uevents, &kevents[head], first_part * sizeof(struct io_event));
371 		memcpy(&uevents[first_part], &kevents[0], (count - first_part) * sizeof(struct io_event));
372 	}
373 
374 	/* Update the head pointer. On x86, stores will not be reordered with older loads,
375 	 * so the copies out of the event array will always be complete prior to this
376 	 * update becoming visible. On other architectures this is not guaranteed, so
377 	 * add a barrier. */
378 #if defined(__i386__) || defined(__x86_64__)
379 	spdk_compiler_barrier();
380 #else
381 	spdk_smp_mb();
382 #endif
383 	ring->head = (head + count) % ring->size;
384 
385 	return count;
386 }
387 
388 static int
389 bdev_aio_io_channel_poll(struct bdev_aio_io_channel *io_ch)
390 {
391 	int nr, i, res = 0;
392 	struct bdev_aio_task *aio_task;
393 	struct io_event events[SPDK_AIO_QUEUE_DEPTH];
394 
395 	nr = bdev_user_io_getevents(io_ch->io_ctx, SPDK_AIO_QUEUE_DEPTH, events);
396 	if (nr < 0) {
397 		return 0;
398 	}
399 
400 	for (i = 0; i < nr; i++) {
401 		aio_task = events[i].data;
402 		aio_task->ch->io_inflight--;
403 		if (events[i].res == aio_task->len) {
404 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_SUCCESS);
405 		} else {
406 			/* From aio_abi.h, io_event.res is defined __s64, negative errno
407 			 * will be assigned to io_event.res for error situation.
408 			 * But from libaio.h, io_event.res is defined unsigned long, so
409 			 * convert it to signed value for error detection.
410 			 */
411 			SPDK_ERRLOG("failed to complete aio: rc %"PRId64"\n", events[i].res);
412 			res = (int)events[i].res;
413 			if (res < 0) {
414 				spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(aio_task), res);
415 			} else {
416 				spdk_bdev_io_complete(spdk_bdev_io_from_ctx(aio_task), SPDK_BDEV_IO_STATUS_FAILED);
417 			}
418 		}
419 	}
420 
421 	return nr;
422 }
423 #endif
424 
425 static int
426 bdev_aio_group_poll(void *arg)
427 {
428 	struct bdev_aio_group_channel *group_ch = arg;
429 	struct bdev_aio_io_channel *io_ch;
430 	int nr = 0;
431 
432 	TAILQ_FOREACH(io_ch, &group_ch->io_ch_head, link) {
433 		nr += bdev_aio_io_channel_poll(io_ch);
434 	}
435 
436 	return nr > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
437 }
438 
439 static int
440 bdev_aio_group_interrupt(void *arg)
441 {
442 	struct bdev_aio_group_channel *group_ch = arg;
443 	int rc;
444 	uint64_t num_events;
445 
446 	assert(group_ch->efd >= 0);
447 
448 	/* if completed IO number is larger than SPDK_AIO_QUEUE_DEPTH,
449 	 * io_getevent should be called again to ensure all completed IO are processed.
450 	 */
451 	rc = read(group_ch->efd, &num_events, sizeof(num_events));
452 	if (rc < 0) {
453 		SPDK_ERRLOG("failed to acknowledge aio group: %s.\n", spdk_strerror(errno));
454 		return -errno;
455 	}
456 
457 	if (num_events > SPDK_AIO_QUEUE_DEPTH) {
458 		num_events -= SPDK_AIO_QUEUE_DEPTH;
459 		rc = write(group_ch->efd, &num_events, sizeof(num_events));
460 		if (rc < 0) {
461 			SPDK_ERRLOG("failed to notify aio group: %s.\n", spdk_strerror(errno));
462 		}
463 	}
464 
465 	return bdev_aio_group_poll(group_ch);
466 }
467 
468 static void
469 _bdev_aio_get_io_inflight(struct spdk_io_channel_iter *i)
470 {
471 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
472 	struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
473 
474 	if (aio_ch->io_inflight) {
475 		spdk_for_each_channel_continue(i, -1);
476 		return;
477 	}
478 
479 	spdk_for_each_channel_continue(i, 0);
480 }
481 
482 static int bdev_aio_reset_retry_timer(void *arg);
483 
484 static void
485 _bdev_aio_get_io_inflight_done(struct spdk_io_channel_iter *i, int status)
486 {
487 	struct file_disk *fdisk = spdk_io_channel_iter_get_ctx(i);
488 
489 	if (status == -1) {
490 		fdisk->reset_retry_timer = SPDK_POLLER_REGISTER(bdev_aio_reset_retry_timer, fdisk, 500);
491 		return;
492 	}
493 
494 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(fdisk->reset_task), SPDK_BDEV_IO_STATUS_SUCCESS);
495 }
496 
497 static int
498 bdev_aio_reset_retry_timer(void *arg)
499 {
500 	struct file_disk *fdisk = arg;
501 
502 	if (fdisk->reset_retry_timer) {
503 		spdk_poller_unregister(&fdisk->reset_retry_timer);
504 	}
505 
506 	spdk_for_each_channel(fdisk,
507 			      _bdev_aio_get_io_inflight,
508 			      fdisk,
509 			      _bdev_aio_get_io_inflight_done);
510 
511 	return SPDK_POLLER_BUSY;
512 }
513 
514 static void
515 bdev_aio_reset(struct file_disk *fdisk, struct bdev_aio_task *aio_task)
516 {
517 	fdisk->reset_task = aio_task;
518 
519 	bdev_aio_reset_retry_timer(fdisk);
520 }
521 
522 static void
523 bdev_aio_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
524 		    bool success)
525 {
526 	if (!success) {
527 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
528 		return;
529 	}
530 
531 	switch (bdev_io->type) {
532 	case SPDK_BDEV_IO_TYPE_READ:
533 	case SPDK_BDEV_IO_TYPE_WRITE:
534 		bdev_aio_rw(bdev_io->type,
535 			    (struct file_disk *)bdev_io->bdev->ctxt,
536 			    ch,
537 			    (struct bdev_aio_task *)bdev_io->driver_ctx,
538 			    bdev_io->u.bdev.iovs,
539 			    bdev_io->u.bdev.iovcnt,
540 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
541 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
542 		break;
543 	default:
544 		SPDK_ERRLOG("Wrong io type\n");
545 		break;
546 	}
547 }
548 
549 static int
550 _bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
551 {
552 	struct file_disk *fdisk = (struct file_disk *)bdev_io->bdev->ctxt;
553 
554 	switch (bdev_io->type) {
555 	/* Read and write operations must be performed on buffers aligned to
556 	 * bdev->required_alignment. If user specified unaligned buffers,
557 	 * get the aligned buffer from the pool by calling spdk_bdev_io_get_buf. */
558 	case SPDK_BDEV_IO_TYPE_READ:
559 		spdk_bdev_io_get_buf(bdev_io, bdev_aio_get_buf_cb,
560 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
561 		return 0;
562 	case SPDK_BDEV_IO_TYPE_WRITE:
563 		if (fdisk->readonly) {
564 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
565 		} else {
566 			spdk_bdev_io_get_buf(bdev_io, bdev_aio_get_buf_cb,
567 					     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
568 		}
569 		return 0;
570 
571 	case SPDK_BDEV_IO_TYPE_FLUSH:
572 		bdev_aio_flush((struct file_disk *)bdev_io->bdev->ctxt,
573 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
574 		return 0;
575 
576 	case SPDK_BDEV_IO_TYPE_RESET:
577 		bdev_aio_reset((struct file_disk *)bdev_io->bdev->ctxt,
578 			       (struct bdev_aio_task *)bdev_io->driver_ctx);
579 		return 0;
580 	default:
581 		return -1;
582 	}
583 }
584 
585 static void
586 bdev_aio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
587 {
588 	if (_bdev_aio_submit_request(ch, bdev_io) < 0) {
589 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
590 	}
591 }
592 
593 static bool
594 bdev_aio_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
595 {
596 	switch (io_type) {
597 	case SPDK_BDEV_IO_TYPE_READ:
598 	case SPDK_BDEV_IO_TYPE_WRITE:
599 	case SPDK_BDEV_IO_TYPE_FLUSH:
600 	case SPDK_BDEV_IO_TYPE_RESET:
601 		return true;
602 
603 	default:
604 		return false;
605 	}
606 }
607 
608 #ifdef __FreeBSD__
609 static int
610 bdev_aio_create_io(struct bdev_aio_io_channel *ch)
611 {
612 	ch->kqfd = kqueue();
613 	if (ch->kqfd < 0) {
614 		SPDK_ERRLOG("async I/O context setup failure: %s.\n", spdk_strerror(errno));
615 		return -1;
616 	}
617 
618 	return 0;
619 }
620 
621 static void
622 bdev_aio_destroy_io(struct bdev_aio_io_channel *ch)
623 {
624 	close(ch->kqfd);
625 }
626 #else
627 static int
628 bdev_aio_create_io(struct bdev_aio_io_channel *ch)
629 {
630 	if (io_setup(SPDK_AIO_QUEUE_DEPTH, &ch->io_ctx) < 0) {
631 		SPDK_ERRLOG("Async I/O context setup failure, likely due to exceeding kernel limit.\n");
632 		SPDK_ERRLOG("This limit may be increased using 'sysctl -w fs.aio-max-nr'.\n");
633 		return -1;
634 	}
635 
636 	return 0;
637 }
638 
639 static void
640 bdev_aio_destroy_io(struct bdev_aio_io_channel *ch)
641 {
642 	io_destroy(ch->io_ctx);
643 }
644 #endif
645 
646 static int
647 bdev_aio_create_cb(void *io_device, void *ctx_buf)
648 {
649 	struct bdev_aio_io_channel *ch = ctx_buf;
650 	int rc;
651 
652 	rc = bdev_aio_create_io(ch);
653 	if (rc < 0) {
654 		return rc;
655 	}
656 
657 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&aio_if));
658 	TAILQ_INSERT_TAIL(&ch->group_ch->io_ch_head, ch, link);
659 
660 	return 0;
661 }
662 
663 static void
664 bdev_aio_destroy_cb(void *io_device, void *ctx_buf)
665 {
666 	struct bdev_aio_io_channel *ch = ctx_buf;
667 
668 	bdev_aio_destroy_io(ch);
669 
670 	assert(ch->group_ch);
671 	TAILQ_REMOVE(&ch->group_ch->io_ch_head, ch, link);
672 
673 	spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
674 }
675 
676 static struct spdk_io_channel *
677 bdev_aio_get_io_channel(void *ctx)
678 {
679 	struct file_disk *fdisk = ctx;
680 
681 	return spdk_get_io_channel(fdisk);
682 }
683 
684 
685 static int
686 bdev_aio_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
687 {
688 	struct file_disk *fdisk = ctx;
689 
690 	spdk_json_write_named_object_begin(w, "aio");
691 
692 	spdk_json_write_named_string(w, "filename", fdisk->filename);
693 
694 	spdk_json_write_named_bool(w, "block_size_override", fdisk->block_size_override);
695 
696 	spdk_json_write_named_bool(w, "readonly", fdisk->readonly);
697 
698 	spdk_json_write_object_end(w);
699 
700 	return 0;
701 }
702 
703 static void
704 bdev_aio_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
705 {
706 	struct file_disk *fdisk = bdev->ctxt;
707 
708 	spdk_json_write_object_begin(w);
709 
710 	spdk_json_write_named_string(w, "method", "bdev_aio_create");
711 
712 	spdk_json_write_named_object_begin(w, "params");
713 	spdk_json_write_named_string(w, "name", bdev->name);
714 	if (fdisk->block_size_override) {
715 		spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
716 	}
717 	spdk_json_write_named_string(w, "filename", fdisk->filename);
718 	spdk_json_write_named_bool(w, "readonly", fdisk->readonly);
719 	spdk_json_write_object_end(w);
720 
721 	spdk_json_write_object_end(w);
722 }
723 
724 static const struct spdk_bdev_fn_table aio_fn_table = {
725 	.destruct		= bdev_aio_destruct,
726 	.submit_request		= bdev_aio_submit_request,
727 	.io_type_supported	= bdev_aio_io_type_supported,
728 	.get_io_channel		= bdev_aio_get_io_channel,
729 	.dump_info_json		= bdev_aio_dump_info_json,
730 	.write_config_json	= bdev_aio_write_json_config,
731 };
732 
733 static void
734 aio_free_disk(struct file_disk *fdisk)
735 {
736 	if (fdisk == NULL) {
737 		return;
738 	}
739 	free(fdisk->filename);
740 	free(fdisk->disk.name);
741 	free(fdisk);
742 }
743 
744 static int
745 bdev_aio_register_interrupt(struct bdev_aio_group_channel *ch)
746 {
747 	int efd;
748 
749 	efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
750 	if (efd < 0) {
751 		return -1;
752 	}
753 
754 	ch->intr = SPDK_INTERRUPT_REGISTER(efd, bdev_aio_group_interrupt, ch);
755 	if (ch->intr == NULL) {
756 		close(efd);
757 		return -1;
758 	}
759 	ch->efd = efd;
760 
761 	return 0;
762 }
763 
764 static void
765 bdev_aio_unregister_interrupt(struct bdev_aio_group_channel *ch)
766 {
767 	spdk_interrupt_unregister(&ch->intr);
768 	close(ch->efd);
769 	ch->efd = -1;
770 }
771 
772 static void
773 bdev_aio_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
774 {
775 	return;
776 }
777 
778 static int
779 bdev_aio_group_create_cb(void *io_device, void *ctx_buf)
780 {
781 	struct bdev_aio_group_channel *ch = ctx_buf;
782 	int rc;
783 
784 	TAILQ_INIT(&ch->io_ch_head);
785 	/* Initialize ch->efd to be invalid and unused. */
786 	ch->efd = -1;
787 	if (spdk_interrupt_mode_is_enabled()) {
788 		rc = bdev_aio_register_interrupt(ch);
789 		if (rc < 0) {
790 			SPDK_ERRLOG("Failed to prepare intr resource to bdev_aio\n");
791 			return rc;
792 		}
793 	}
794 
795 	ch->poller = SPDK_POLLER_REGISTER(bdev_aio_group_poll, ch, 0);
796 	spdk_poller_register_interrupt(ch->poller, bdev_aio_poller_set_interrupt_mode, NULL);
797 
798 	return 0;
799 }
800 
801 static void
802 bdev_aio_group_destroy_cb(void *io_device, void *ctx_buf)
803 {
804 	struct bdev_aio_group_channel *ch = ctx_buf;
805 
806 	if (!TAILQ_EMPTY(&ch->io_ch_head)) {
807 		SPDK_ERRLOG("Group channel of bdev aio has uncleared io channel\n");
808 	}
809 
810 	spdk_poller_unregister(&ch->poller);
811 	if (spdk_interrupt_mode_is_enabled()) {
812 		bdev_aio_unregister_interrupt(ch);
813 	}
814 }
815 
816 int
817 create_aio_bdev(const char *name, const char *filename, uint32_t block_size, bool readonly)
818 {
819 	struct file_disk *fdisk;
820 	uint32_t detected_block_size;
821 	uint64_t disk_size;
822 	int rc;
823 
824 	fdisk = calloc(1, sizeof(*fdisk));
825 	if (!fdisk) {
826 		SPDK_ERRLOG("Unable to allocate enough memory for aio backend\n");
827 		return -ENOMEM;
828 	}
829 	fdisk->readonly = readonly;
830 
831 	fdisk->filename = strdup(filename);
832 	if (!fdisk->filename) {
833 		rc = -ENOMEM;
834 		goto error_return;
835 	}
836 
837 	if (bdev_aio_open(fdisk)) {
838 		SPDK_ERRLOG("Unable to open file %s. fd: %d errno: %d\n", filename, fdisk->fd, errno);
839 		rc = -errno;
840 		goto error_return;
841 	}
842 
843 	disk_size = spdk_fd_get_size(fdisk->fd);
844 
845 	fdisk->disk.name = strdup(name);
846 	if (!fdisk->disk.name) {
847 		rc = -ENOMEM;
848 		goto error_return;
849 	}
850 	fdisk->disk.product_name = "AIO disk";
851 	fdisk->disk.module = &aio_if;
852 
853 	fdisk->disk.write_cache = 1;
854 
855 	detected_block_size = spdk_fd_get_blocklen(fdisk->fd);
856 	if (block_size == 0) {
857 		/* User did not specify block size - use autodetected block size. */
858 		if (detected_block_size == 0) {
859 			SPDK_ERRLOG("Block size could not be auto-detected\n");
860 			rc = -EINVAL;
861 			goto error_return;
862 		}
863 		fdisk->block_size_override = false;
864 		block_size = detected_block_size;
865 	} else {
866 		if (block_size < detected_block_size) {
867 			SPDK_ERRLOG("Specified block size %" PRIu32 " is smaller than "
868 				    "auto-detected block size %" PRIu32 "\n",
869 				    block_size, detected_block_size);
870 			rc = -EINVAL;
871 			goto error_return;
872 		} else if (detected_block_size != 0 && block_size != detected_block_size) {
873 			SPDK_WARNLOG("Specified block size %" PRIu32 " does not match "
874 				     "auto-detected block size %" PRIu32 "\n",
875 				     block_size, detected_block_size);
876 		}
877 		fdisk->block_size_override = true;
878 	}
879 
880 	if (block_size < 512) {
881 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be at least 512).\n", block_size);
882 		rc = -EINVAL;
883 		goto error_return;
884 	}
885 
886 	if (!spdk_u32_is_pow2(block_size)) {
887 		SPDK_ERRLOG("Invalid block size %" PRIu32 " (must be a power of 2.)\n", block_size);
888 		rc = -EINVAL;
889 		goto error_return;
890 	}
891 
892 	fdisk->disk.blocklen = block_size;
893 	if (fdisk->block_size_override && detected_block_size) {
894 		fdisk->disk.required_alignment = spdk_u32log2(detected_block_size);
895 	} else {
896 		fdisk->disk.required_alignment = spdk_u32log2(block_size);
897 	}
898 
899 	if (disk_size % fdisk->disk.blocklen != 0) {
900 		SPDK_ERRLOG("Disk size %" PRIu64 " is not a multiple of block size %" PRIu32 "\n",
901 			    disk_size, fdisk->disk.blocklen);
902 		rc = -EINVAL;
903 		goto error_return;
904 	}
905 
906 	fdisk->disk.blockcnt = disk_size / fdisk->disk.blocklen;
907 	fdisk->disk.ctxt = fdisk;
908 
909 	fdisk->disk.fn_table = &aio_fn_table;
910 
911 	spdk_io_device_register(fdisk, bdev_aio_create_cb, bdev_aio_destroy_cb,
912 				sizeof(struct bdev_aio_io_channel),
913 				fdisk->disk.name);
914 	rc = spdk_bdev_register(&fdisk->disk);
915 	if (rc) {
916 		spdk_io_device_unregister(fdisk, NULL);
917 		goto error_return;
918 	}
919 
920 	TAILQ_INSERT_TAIL(&g_aio_disk_head, fdisk, link);
921 	return 0;
922 
923 error_return:
924 	bdev_aio_close(fdisk);
925 	aio_free_disk(fdisk);
926 	return rc;
927 }
928 
929 static void
930 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
931 {
932 }
933 
934 int
935 bdev_aio_rescan(const char *name)
936 {
937 	struct spdk_bdev_desc *desc;
938 	struct spdk_bdev *bdev;
939 	struct file_disk *fdisk;
940 	uint64_t disk_size, blockcnt;
941 	int rc;
942 
943 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
944 	if (rc != 0) {
945 		return rc;
946 	}
947 
948 	bdev = spdk_bdev_desc_get_bdev(desc);
949 	if (bdev->module != &aio_if) {
950 		rc = -ENODEV;
951 		goto exit;
952 	}
953 
954 	fdisk = SPDK_CONTAINEROF(bdev, struct file_disk, disk);
955 	disk_size = spdk_fd_get_size(fdisk->fd);
956 	blockcnt = disk_size / bdev->blocklen;
957 
958 	if (bdev->blockcnt != blockcnt) {
959 		SPDK_NOTICELOG("AIO device is resized: bdev name %s, old block count %" PRIu64 ", new block count %"
960 			       PRIu64 "\n",
961 			       fdisk->filename,
962 			       bdev->blockcnt,
963 			       blockcnt);
964 		rc = spdk_bdev_notify_blockcnt_change(bdev, blockcnt);
965 		if (rc != 0) {
966 			SPDK_ERRLOG("Could not change num blocks for aio bdev: name %s, errno: %d.\n",
967 				    fdisk->filename, rc);
968 			goto exit;
969 		}
970 	}
971 
972 exit:
973 	spdk_bdev_close(desc);
974 	return rc;
975 }
976 
977 struct delete_aio_bdev_ctx {
978 	delete_aio_bdev_complete cb_fn;
979 	void *cb_arg;
980 };
981 
982 static void
983 aio_bdev_unregister_cb(void *arg, int bdeverrno)
984 {
985 	struct delete_aio_bdev_ctx *ctx = arg;
986 
987 	ctx->cb_fn(ctx->cb_arg, bdeverrno);
988 	free(ctx);
989 }
990 
991 void
992 bdev_aio_delete(const char *name, delete_aio_bdev_complete cb_fn, void *cb_arg)
993 {
994 	struct delete_aio_bdev_ctx *ctx;
995 	int rc;
996 
997 	ctx = calloc(1, sizeof(*ctx));
998 	if (ctx == NULL) {
999 		cb_fn(cb_arg, -ENOMEM);
1000 		return;
1001 	}
1002 
1003 	ctx->cb_fn = cb_fn;
1004 	ctx->cb_arg = cb_arg;
1005 	rc = spdk_bdev_unregister_by_name(name, &aio_if, aio_bdev_unregister_cb, ctx);
1006 	if (rc != 0) {
1007 		aio_bdev_unregister_cb(ctx, rc);
1008 	}
1009 }
1010 
1011 static int
1012 bdev_aio_initialize(void)
1013 {
1014 	spdk_io_device_register(&aio_if, bdev_aio_group_create_cb, bdev_aio_group_destroy_cb,
1015 				sizeof(struct bdev_aio_group_channel), "aio_module");
1016 
1017 	return 0;
1018 }
1019 
1020 static void
1021 bdev_aio_fini(void)
1022 {
1023 	spdk_io_device_unregister(&aio_if, NULL);
1024 }
1025 
1026 SPDK_LOG_REGISTER_COMPONENT(aio)
1027