xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 88e3ffd7b6c5ec1ea1a660354d25f02c766092e1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41 #include <sys/epoll.h>
42 
43 #include "spdk/env.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
49 #include "spdk/likely.h"
50 
51 #include "spdk/bdev_module.h"
52 #include "spdk/log.h"
53 
54 #define SPDK_RBD_QUEUE_DEPTH 128
55 #define MAX_EVENTS_PER_POLL 128
56 
57 static int bdev_rbd_count = 0;
58 
59 struct bdev_rbd {
60 	struct spdk_bdev disk;
61 	char *rbd_name;
62 	char *user_id;
63 	char *pool_name;
64 	char **config;
65 	rbd_image_info_t info;
66 	TAILQ_ENTRY(bdev_rbd) tailq;
67 	struct spdk_poller *reset_timer;
68 	struct spdk_bdev_io *reset_bdev_io;
69 };
70 
71 struct bdev_rbd_group_channel {
72 	struct spdk_poller *poller;
73 	int epoll_fd;
74 };
75 
76 struct bdev_rbd_io_channel {
77 	rados_ioctx_t io_ctx;
78 	rados_t cluster;
79 	int pfd;
80 	rbd_image_t image;
81 	struct bdev_rbd *disk;
82 	struct bdev_rbd_group_channel *group_ch;
83 };
84 
85 struct bdev_rbd_io {
86 	size_t	total_len;
87 };
88 
89 static void
90 bdev_rbd_free(struct bdev_rbd *rbd)
91 {
92 	if (!rbd) {
93 		return;
94 	}
95 
96 	free(rbd->disk.name);
97 	free(rbd->rbd_name);
98 	free(rbd->user_id);
99 	free(rbd->pool_name);
100 	bdev_rbd_free_config(rbd->config);
101 	free(rbd);
102 }
103 
104 void
105 bdev_rbd_free_config(char **config)
106 {
107 	char **entry;
108 
109 	if (config) {
110 		for (entry = config; *entry; entry++) {
111 			free(*entry);
112 		}
113 		free(config);
114 	}
115 }
116 
117 char **
118 bdev_rbd_dup_config(const char *const *config)
119 {
120 	size_t count;
121 	char **copy;
122 
123 	if (!config) {
124 		return NULL;
125 	}
126 	for (count = 0; config[count]; count++) {}
127 	copy = calloc(count + 1, sizeof(*copy));
128 	if (!copy) {
129 		return NULL;
130 	}
131 	for (count = 0; config[count]; count++) {
132 		if (!(copy[count] = strdup(config[count]))) {
133 			bdev_rbd_free_config(copy);
134 			return NULL;
135 		}
136 	}
137 	return copy;
138 }
139 
140 static int
141 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
142 			rados_t *cluster, rados_ioctx_t *io_ctx)
143 {
144 	int ret;
145 
146 	ret = rados_create(cluster, user_id);
147 	if (ret < 0) {
148 		SPDK_ERRLOG("Failed to create rados_t struct\n");
149 		return -1;
150 	}
151 
152 	if (config) {
153 		const char *const *entry = config;
154 		while (*entry) {
155 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
156 			if (ret < 0) {
157 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
158 				rados_shutdown(*cluster);
159 				return -1;
160 			}
161 			entry += 2;
162 		}
163 	} else {
164 		ret = rados_conf_read_file(*cluster, NULL);
165 		if (ret < 0) {
166 			SPDK_ERRLOG("Failed to read conf file\n");
167 			rados_shutdown(*cluster);
168 			return -1;
169 		}
170 	}
171 
172 	ret = rados_connect(*cluster);
173 	if (ret < 0) {
174 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
175 		rados_shutdown(*cluster);
176 		return -1;
177 	}
178 
179 	ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx);
180 
181 	if (ret < 0) {
182 		SPDK_ERRLOG("Failed to create ioctx\n");
183 		rados_shutdown(*cluster);
184 		return -1;
185 	}
186 
187 	return 0;
188 }
189 
190 static int
191 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
192 	      const char *rbd_name, rbd_image_info_t *info)
193 {
194 	int ret;
195 	rados_t cluster = NULL;
196 	rados_ioctx_t io_ctx = NULL;
197 	rbd_image_t image = NULL;
198 
199 	ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx);
200 	if (ret < 0) {
201 		SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n",
202 			    user_id ? user_id : "admin (the default)", rbd_pool_name);
203 		return -1;
204 	}
205 
206 	ret = rbd_open(io_ctx, rbd_name, &image, NULL);
207 	if (ret < 0) {
208 		SPDK_ERRLOG("Failed to open specified rbd device\n");
209 		goto err;
210 	}
211 	ret = rbd_stat(image, info, sizeof(*info));
212 	rbd_close(image);
213 	if (ret < 0) {
214 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
215 		goto err;
216 	}
217 
218 	rados_ioctx_destroy(io_ctx);
219 	return 0;
220 err:
221 	rados_ioctx_destroy(io_ctx);
222 	rados_shutdown(cluster);
223 	return -1;
224 }
225 
226 static void
227 bdev_rbd_exit(rbd_image_t image)
228 {
229 	rbd_flush(image);
230 	rbd_close(image);
231 }
232 
233 static void
234 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
235 {
236 	/* Doing nothing here */
237 }
238 
239 static int
240 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
241 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
242 {
243 	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
244 	int ret;
245 	rbd_completion_t comp;
246 	struct bdev_rbd_io *rbd_io;
247 	rbd_image_t image = rbdio_ch->image;
248 
249 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
250 					&comp);
251 	if (ret < 0) {
252 		return -1;
253 	}
254 
255 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
256 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
257 		rbd_io->total_len = len;
258 		if (spdk_likely(iovcnt == 1)) {
259 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
260 		} else {
261 			ret = rbd_aio_readv(image, iov, iovcnt, offset, comp);
262 		}
263 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
264 		if (spdk_likely(iovcnt == 1)) {
265 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
266 		} else {
267 			ret = rbd_aio_writev(image, iov, iovcnt, offset, comp);
268 		}
269 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
270 		ret = rbd_aio_flush(image, comp);
271 	}
272 
273 	if (ret < 0) {
274 		rbd_aio_release(comp);
275 		return -1;
276 	}
277 
278 	return 0;
279 }
280 
281 static int bdev_rbd_library_init(void);
282 
283 static void bdev_rbd_library_fini(void);
284 
285 static int
286 bdev_rbd_get_ctx_size(void)
287 {
288 	return sizeof(struct bdev_rbd_io);
289 }
290 
291 static struct spdk_bdev_module rbd_if = {
292 	.name = "rbd",
293 	.module_init = bdev_rbd_library_init,
294 	.module_fini = bdev_rbd_library_fini,
295 	.get_ctx_size = bdev_rbd_get_ctx_size,
296 
297 };
298 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
299 
300 static int
301 bdev_rbd_reset_timer(void *arg)
302 {
303 	struct bdev_rbd *disk = arg;
304 
305 	/*
306 	 * TODO: This should check if any I/O is still in flight before completing the reset.
307 	 * For now, just complete after the timer expires.
308 	 */
309 	spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
310 	spdk_poller_unregister(&disk->reset_timer);
311 	disk->reset_bdev_io = NULL;
312 
313 	return SPDK_POLLER_BUSY;
314 }
315 
316 static int
317 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
318 {
319 	/*
320 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
321 	 * timer to wait for in-flight I/O to complete.
322 	 */
323 	assert(disk->reset_bdev_io == NULL);
324 	disk->reset_bdev_io = bdev_io;
325 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
326 
327 	return 0;
328 }
329 
330 static int
331 bdev_rbd_destruct(void *ctx)
332 {
333 	struct bdev_rbd *rbd = ctx;
334 
335 	spdk_io_device_unregister(rbd, NULL);
336 
337 	bdev_rbd_free(rbd);
338 	return 0;
339 }
340 
341 static void
342 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
343 		    bool success)
344 {
345 	int ret;
346 
347 	if (!success) {
348 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
349 		return;
350 	}
351 
352 	ret = bdev_rbd_start_aio(ch,
353 				 bdev_io,
354 				 bdev_io->u.bdev.iovs,
355 				 bdev_io->u.bdev.iovcnt,
356 				 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
357 				 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
358 
359 	if (ret != 0) {
360 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
361 	}
362 }
363 
364 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
365 {
366 	switch (bdev_io->type) {
367 	case SPDK_BDEV_IO_TYPE_READ:
368 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
369 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
370 		return 0;
371 
372 	case SPDK_BDEV_IO_TYPE_WRITE:
373 	case SPDK_BDEV_IO_TYPE_FLUSH:
374 		return bdev_rbd_start_aio(ch,
375 					  bdev_io,
376 					  bdev_io->u.bdev.iovs,
377 					  bdev_io->u.bdev.iovcnt,
378 					  bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
379 					  bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
380 
381 	case SPDK_BDEV_IO_TYPE_RESET:
382 		return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
383 				      bdev_io);
384 
385 	default:
386 		return -1;
387 	}
388 	return 0;
389 }
390 
391 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
392 {
393 	if (_bdev_rbd_submit_request(ch, bdev_io) < 0) {
394 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
395 	}
396 }
397 
398 static bool
399 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
400 {
401 	switch (io_type) {
402 	case SPDK_BDEV_IO_TYPE_READ:
403 	case SPDK_BDEV_IO_TYPE_WRITE:
404 	case SPDK_BDEV_IO_TYPE_FLUSH:
405 	case SPDK_BDEV_IO_TYPE_RESET:
406 		return true;
407 
408 	default:
409 		return false;
410 	}
411 }
412 
413 static void
414 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
415 {
416 	int i, io_status, rc;
417 	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
418 	struct spdk_bdev_io *bdev_io;
419 	struct bdev_rbd_io *rbd_io;
420 	enum spdk_bdev_io_status bio_status;
421 
422 	rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
423 	for (i = 0; i < rc; i++) {
424 		bdev_io = rbd_aio_get_arg(comps[i]);
425 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
426 		io_status = rbd_aio_get_return_value(comps[i]);
427 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
428 
429 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
430 			if ((int)rbd_io->total_len != io_status) {
431 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
432 			}
433 		} else {
434 			/* For others, 0 means success */
435 			if (io_status != 0) {
436 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
437 			}
438 		}
439 
440 		rbd_aio_release(comps[i]);
441 
442 		spdk_bdev_io_complete(bdev_io, bio_status);
443 	}
444 }
445 
446 static void
447 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
448 {
449 	if (!ch) {
450 		return;
451 	}
452 
453 	if (ch->image) {
454 		bdev_rbd_exit(ch->image);
455 	}
456 
457 	if (ch->io_ctx) {
458 		rados_ioctx_destroy(ch->io_ctx);
459 	}
460 
461 	if (ch->cluster) {
462 		rados_shutdown(ch->cluster);
463 	}
464 
465 	if (ch->pfd >= 0) {
466 		close(ch->pfd);
467 	}
468 
469 	if (ch->group_ch) {
470 		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
471 	}
472 }
473 
474 static void *
475 bdev_rbd_handle(void *arg)
476 {
477 	struct bdev_rbd_io_channel *ch = arg;
478 	void *ret = arg;
479 	int rc;
480 
481 	rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name,
482 				     (const char *const *)ch->disk->config,
483 				     &ch->cluster, &ch->io_ctx);
484 	if (rc < 0) {
485 		SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n",
486 			    ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name);
487 		ret = NULL;
488 		goto end;
489 	}
490 
491 	if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
492 		SPDK_ERRLOG("Failed to open specified rbd device\n");
493 		ret = NULL;
494 	}
495 
496 end:
497 	return ret;
498 }
499 
500 static int
501 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
502 {
503 	struct bdev_rbd_io_channel *ch = ctx_buf;
504 	int ret;
505 	struct epoll_event event;
506 
507 	ch->disk = io_device;
508 	ch->image = NULL;
509 	ch->io_ctx = NULL;
510 	ch->pfd = -1;
511 
512 	if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
513 		goto err;
514 	}
515 
516 	ch->pfd = eventfd(0, EFD_NONBLOCK);
517 	if (ch->pfd < 0) {
518 		SPDK_ERRLOG("Failed to get eventfd\n");
519 		goto err;
520 	}
521 
522 	ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD);
523 	if (ret < 0) {
524 		SPDK_ERRLOG("Failed to set rbd image notification\n");
525 		goto err;
526 	}
527 
528 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
529 	assert(ch->group_ch != NULL);
530 	memset(&event, 0, sizeof(event));
531 	event.events = EPOLLIN;
532 	event.data.ptr = ch;
533 
534 	ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event);
535 	if (ret < 0) {
536 		SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch,
537 			    ch->group_ch);
538 		goto err;
539 	}
540 
541 	return 0;
542 
543 err:
544 	bdev_rbd_free_channel(ch);
545 	return -1;
546 }
547 
548 static void
549 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
550 {
551 	struct bdev_rbd_io_channel *io_channel = ctx_buf;
552 	int rc;
553 
554 	rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL,
555 		       io_channel->pfd, NULL);
556 	if (rc < 0) {
557 		SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n",
558 			    io_channel, io_channel->group_ch);
559 	}
560 
561 	bdev_rbd_free_channel(io_channel);
562 }
563 
564 static struct spdk_io_channel *
565 bdev_rbd_get_io_channel(void *ctx)
566 {
567 	struct bdev_rbd *rbd_bdev = ctx;
568 
569 	return spdk_get_io_channel(rbd_bdev);
570 }
571 
572 static int
573 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
574 {
575 	struct bdev_rbd *rbd_bdev = ctx;
576 
577 	spdk_json_write_named_object_begin(w, "rbd");
578 
579 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
580 
581 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
582 
583 	if (rbd_bdev->user_id) {
584 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
585 	}
586 
587 	if (rbd_bdev->config) {
588 		char **entry = rbd_bdev->config;
589 
590 		spdk_json_write_named_object_begin(w, "config");
591 		while (*entry) {
592 			spdk_json_write_named_string(w, entry[0], entry[1]);
593 			entry += 2;
594 		}
595 		spdk_json_write_object_end(w);
596 	}
597 
598 	spdk_json_write_object_end(w);
599 
600 	return 0;
601 }
602 
603 static void
604 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
605 {
606 	struct bdev_rbd *rbd = bdev->ctxt;
607 
608 	spdk_json_write_object_begin(w);
609 
610 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
611 
612 	spdk_json_write_named_object_begin(w, "params");
613 	spdk_json_write_named_string(w, "name", bdev->name);
614 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
615 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
616 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
617 	if (rbd->user_id) {
618 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
619 	}
620 
621 	if (rbd->config) {
622 		char **entry = rbd->config;
623 
624 		spdk_json_write_named_object_begin(w, "config");
625 		while (*entry) {
626 			spdk_json_write_named_string(w, entry[0], entry[1]);
627 			entry += 2;
628 		}
629 		spdk_json_write_object_end(w);
630 	}
631 
632 	spdk_json_write_object_end(w);
633 
634 	spdk_json_write_object_end(w);
635 }
636 
637 static const struct spdk_bdev_fn_table rbd_fn_table = {
638 	.destruct		= bdev_rbd_destruct,
639 	.submit_request		= bdev_rbd_submit_request,
640 	.io_type_supported	= bdev_rbd_io_type_supported,
641 	.get_io_channel		= bdev_rbd_get_io_channel,
642 	.dump_info_json		= bdev_rbd_dump_info_json,
643 	.write_config_json	= bdev_rbd_write_config_json,
644 };
645 
646 int
647 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
648 		const char *pool_name,
649 		const char *const *config,
650 		const char *rbd_name,
651 		uint32_t block_size)
652 {
653 	struct bdev_rbd *rbd;
654 	int ret;
655 
656 	if ((pool_name == NULL) || (rbd_name == NULL)) {
657 		return -EINVAL;
658 	}
659 
660 	rbd = calloc(1, sizeof(struct bdev_rbd));
661 	if (rbd == NULL) {
662 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
663 		return -ENOMEM;
664 	}
665 
666 	rbd->rbd_name = strdup(rbd_name);
667 	if (!rbd->rbd_name) {
668 		bdev_rbd_free(rbd);
669 		return -ENOMEM;
670 	}
671 
672 	if (user_id) {
673 		rbd->user_id = strdup(user_id);
674 		if (!rbd->user_id) {
675 			bdev_rbd_free(rbd);
676 			return -ENOMEM;
677 		}
678 	}
679 
680 	rbd->pool_name = strdup(pool_name);
681 	if (!rbd->pool_name) {
682 		bdev_rbd_free(rbd);
683 		return -ENOMEM;
684 	}
685 
686 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
687 		bdev_rbd_free(rbd);
688 		return -ENOMEM;
689 	}
690 
691 	ret = bdev_rbd_init(rbd->user_id, rbd->pool_name,
692 			    (const char *const *)rbd->config,
693 			    rbd_name, &rbd->info);
694 	if (ret < 0) {
695 		bdev_rbd_free(rbd);
696 		SPDK_ERRLOG("Failed to init rbd device\n");
697 		return ret;
698 	}
699 
700 	if (name) {
701 		rbd->disk.name = strdup(name);
702 	} else {
703 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
704 	}
705 	if (!rbd->disk.name) {
706 		bdev_rbd_free(rbd);
707 		return -ENOMEM;
708 	}
709 	rbd->disk.product_name = "Ceph Rbd Disk";
710 	bdev_rbd_count++;
711 
712 	rbd->disk.write_cache = 0;
713 	rbd->disk.blocklen = block_size;
714 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
715 	rbd->disk.ctxt = rbd;
716 	rbd->disk.fn_table = &rbd_fn_table;
717 	rbd->disk.module = &rbd_if;
718 
719 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
720 
721 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
722 				bdev_rbd_destroy_cb,
723 				sizeof(struct bdev_rbd_io_channel),
724 				rbd_name);
725 	ret = spdk_bdev_register(&rbd->disk);
726 	if (ret) {
727 		spdk_io_device_unregister(rbd, NULL);
728 		bdev_rbd_free(rbd);
729 		return ret;
730 	}
731 
732 	*bdev = &(rbd->disk);
733 
734 	return ret;
735 }
736 
737 void
738 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
739 {
740 	if (!bdev || bdev->module != &rbd_if) {
741 		cb_fn(cb_arg, -ENODEV);
742 		return;
743 	}
744 
745 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
746 }
747 
748 int
749 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
750 {
751 	struct spdk_io_channel *ch;
752 	struct bdev_rbd_io_channel *rbd_io_ch;
753 	int rc;
754 	uint64_t new_size_in_byte;
755 	uint64_t current_size_in_mb;
756 
757 	if (bdev->module != &rbd_if) {
758 		return -EINVAL;
759 	}
760 
761 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
762 	if (current_size_in_mb > new_size_in_mb) {
763 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
764 		return -EINVAL;
765 	}
766 
767 	ch = bdev_rbd_get_io_channel(bdev);
768 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
769 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
770 
771 	rc = rbd_resize(rbd_io_ch->image, new_size_in_byte);
772 	if (rc != 0) {
773 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
774 		return rc;
775 	}
776 
777 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
778 	if (rc != 0) {
779 		SPDK_ERRLOG("failed to notify block cnt change.\n");
780 		return rc;
781 	}
782 
783 	return rc;
784 }
785 
786 static int
787 bdev_rbd_group_poll(void *arg)
788 {
789 	struct bdev_rbd_group_channel *group_ch = arg;
790 	struct epoll_event events[MAX_EVENTS_PER_POLL];
791 	int num_events, i;
792 
793 	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
794 
795 	if (num_events <= 0) {
796 		return SPDK_POLLER_IDLE;
797 	}
798 
799 	for (i = 0; i < num_events; i++) {
800 		bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr);
801 	}
802 
803 	return SPDK_POLLER_BUSY;
804 }
805 
806 static int
807 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
808 {
809 	struct bdev_rbd_group_channel *ch = ctx_buf;
810 
811 	ch->epoll_fd = epoll_create1(0);
812 	if (ch->epoll_fd < 0) {
813 		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
814 		return -1;
815 	}
816 
817 	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0);
818 
819 	return 0;
820 }
821 
822 static void
823 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
824 {
825 	struct bdev_rbd_group_channel *ch = ctx_buf;
826 
827 	if (ch->epoll_fd >= 0) {
828 		close(ch->epoll_fd);
829 	}
830 
831 	spdk_poller_unregister(&ch->poller);
832 }
833 
834 static int
835 bdev_rbd_library_init(void)
836 {
837 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
838 				sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups");
839 
840 	return 0;
841 }
842 
843 static void
844 bdev_rbd_library_fini(void)
845 {
846 	spdk_io_device_unregister(&rbd_if, NULL);
847 }
848 
849 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
850