xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 2172c432cfdaecc5a279d64e37c6b51e794683c1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41 #include <sys/epoll.h>
42 
43 #include "spdk/conf.h"
44 #include "spdk/env.h"
45 #include "spdk/bdev.h"
46 #include "spdk/thread.h"
47 #include "spdk/json.h"
48 #include "spdk/string.h"
49 #include "spdk/util.h"
50 #include "spdk/likely.h"
51 
52 #include "spdk/bdev_module.h"
53 #include "spdk_internal/log.h"
54 
55 #define SPDK_RBD_QUEUE_DEPTH 128
56 #define MAX_EVENTS_PER_POLL 128
57 
58 static int bdev_rbd_count = 0;
59 
60 struct bdev_rbd {
61 	struct spdk_bdev disk;
62 	char *rbd_name;
63 	char *user_id;
64 	char *pool_name;
65 	char **config;
66 	rbd_image_info_t info;
67 	TAILQ_ENTRY(bdev_rbd) tailq;
68 	struct spdk_poller *reset_timer;
69 	struct spdk_bdev_io *reset_bdev_io;
70 };
71 
72 struct bdev_rbd_group_channel {
73 	struct spdk_poller *poller;
74 	int epoll_fd;
75 };
76 
77 struct bdev_rbd_io_channel {
78 	rados_ioctx_t io_ctx;
79 	rados_t cluster;
80 	int pfd;
81 	rbd_image_t image;
82 	struct bdev_rbd *disk;
83 	struct bdev_rbd_group_channel *group_ch;
84 };
85 
86 struct bdev_rbd_io {
87 	size_t	total_len;
88 };
89 
90 static void
91 bdev_rbd_free(struct bdev_rbd *rbd)
92 {
93 	if (!rbd) {
94 		return;
95 	}
96 
97 	free(rbd->disk.name);
98 	free(rbd->rbd_name);
99 	free(rbd->user_id);
100 	free(rbd->pool_name);
101 	bdev_rbd_free_config(rbd->config);
102 	free(rbd);
103 }
104 
105 void
106 bdev_rbd_free_config(char **config)
107 {
108 	char **entry;
109 
110 	if (config) {
111 		for (entry = config; *entry; entry++) {
112 			free(*entry);
113 		}
114 		free(config);
115 	}
116 }
117 
118 char **
119 bdev_rbd_dup_config(const char *const *config)
120 {
121 	size_t count;
122 	char **copy;
123 
124 	if (!config) {
125 		return NULL;
126 	}
127 	for (count = 0; config[count]; count++) {}
128 	copy = calloc(count + 1, sizeof(*copy));
129 	if (!copy) {
130 		return NULL;
131 	}
132 	for (count = 0; config[count]; count++) {
133 		if (!(copy[count] = strdup(config[count]))) {
134 			bdev_rbd_free_config(copy);
135 			return NULL;
136 		}
137 	}
138 	return copy;
139 }
140 
141 static int
142 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
143 			rados_t *cluster, rados_ioctx_t *io_ctx)
144 {
145 	int ret;
146 
147 	ret = rados_create(cluster, user_id);
148 	if (ret < 0) {
149 		SPDK_ERRLOG("Failed to create rados_t struct\n");
150 		return -1;
151 	}
152 
153 	if (config) {
154 		const char *const *entry = config;
155 		while (*entry) {
156 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
157 			if (ret < 0) {
158 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
159 				rados_shutdown(*cluster);
160 				return -1;
161 			}
162 			entry += 2;
163 		}
164 	} else {
165 		ret = rados_conf_read_file(*cluster, NULL);
166 		if (ret < 0) {
167 			SPDK_ERRLOG("Failed to read conf file\n");
168 			rados_shutdown(*cluster);
169 			return -1;
170 		}
171 	}
172 
173 	ret = rados_connect(*cluster);
174 	if (ret < 0) {
175 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
176 		rados_shutdown(*cluster);
177 		return -1;
178 	}
179 
180 	ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx);
181 
182 	if (ret < 0) {
183 		SPDK_ERRLOG("Failed to create ioctx\n");
184 		rados_shutdown(*cluster);
185 		return -1;
186 	}
187 
188 	return 0;
189 }
190 
191 static int
192 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
193 	      const char *rbd_name, rbd_image_info_t *info)
194 {
195 	int ret;
196 	rados_t cluster = NULL;
197 	rados_ioctx_t io_ctx = NULL;
198 	rbd_image_t image = NULL;
199 
200 	ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx);
201 	if (ret < 0) {
202 		SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n",
203 			    user_id ? user_id : "admin (the default)", rbd_pool_name);
204 		return -1;
205 	}
206 
207 	ret = rbd_open(io_ctx, rbd_name, &image, NULL);
208 	if (ret < 0) {
209 		SPDK_ERRLOG("Failed to open specified rbd device\n");
210 		goto err;
211 	}
212 	ret = rbd_stat(image, info, sizeof(*info));
213 	rbd_close(image);
214 	if (ret < 0) {
215 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
216 		goto err;
217 	}
218 
219 	rados_ioctx_destroy(io_ctx);
220 	return 0;
221 err:
222 	rados_ioctx_destroy(io_ctx);
223 	rados_shutdown(cluster);
224 	return -1;
225 }
226 
227 static void
228 bdev_rbd_exit(rbd_image_t image)
229 {
230 	rbd_flush(image);
231 	rbd_close(image);
232 }
233 
234 static void
235 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
236 {
237 	/* Doing nothing here */
238 }
239 
240 static int
241 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
242 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
243 {
244 	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
245 	int ret;
246 	rbd_completion_t comp;
247 	struct bdev_rbd_io *rbd_io;
248 	rbd_image_t image = rbdio_ch->image;
249 
250 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
251 					&comp);
252 	if (ret < 0) {
253 		return -1;
254 	}
255 
256 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
257 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
258 		rbd_io->total_len = len;
259 		if (spdk_likely(iovcnt == 1)) {
260 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
261 		} else {
262 			ret = rbd_aio_readv(image, iov, iovcnt, offset, comp);
263 		}
264 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
265 		if (spdk_likely(iovcnt == 1)) {
266 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
267 		} else {
268 			ret = rbd_aio_writev(image, iov, iovcnt, offset, comp);
269 		}
270 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
271 		ret = rbd_aio_flush(image, comp);
272 	}
273 
274 	if (ret < 0) {
275 		rbd_aio_release(comp);
276 		return -1;
277 	}
278 
279 	return 0;
280 }
281 
282 static int bdev_rbd_library_init(void);
283 
284 static void bdev_rbd_library_fini(void);
285 
286 static int
287 bdev_rbd_get_ctx_size(void)
288 {
289 	return sizeof(struct bdev_rbd_io);
290 }
291 
292 static struct spdk_bdev_module rbd_if = {
293 	.name = "rbd",
294 	.module_init = bdev_rbd_library_init,
295 	.module_fini = bdev_rbd_library_fini,
296 	.get_ctx_size = bdev_rbd_get_ctx_size,
297 
298 };
299 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
300 
301 static int
302 bdev_rbd_reset_timer(void *arg)
303 {
304 	struct bdev_rbd *disk = arg;
305 
306 	/*
307 	 * TODO: This should check if any I/O is still in flight before completing the reset.
308 	 * For now, just complete after the timer expires.
309 	 */
310 	spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
311 	spdk_poller_unregister(&disk->reset_timer);
312 	disk->reset_bdev_io = NULL;
313 
314 	return SPDK_POLLER_BUSY;
315 }
316 
317 static int
318 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
319 {
320 	/*
321 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
322 	 * timer to wait for in-flight I/O to complete.
323 	 */
324 	assert(disk->reset_bdev_io == NULL);
325 	disk->reset_bdev_io = bdev_io;
326 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
327 
328 	return 0;
329 }
330 
331 static int
332 bdev_rbd_destruct(void *ctx)
333 {
334 	struct bdev_rbd *rbd = ctx;
335 
336 	spdk_io_device_unregister(rbd, NULL);
337 
338 	bdev_rbd_free(rbd);
339 	return 0;
340 }
341 
342 static void
343 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
344 		    bool success)
345 {
346 	int ret;
347 
348 	if (!success) {
349 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
350 		return;
351 	}
352 
353 	ret = bdev_rbd_start_aio(ch,
354 				 bdev_io,
355 				 bdev_io->u.bdev.iovs,
356 				 bdev_io->u.bdev.iovcnt,
357 				 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
358 				 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
359 
360 	if (ret != 0) {
361 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
362 	}
363 }
364 
365 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
366 {
367 	switch (bdev_io->type) {
368 	case SPDK_BDEV_IO_TYPE_READ:
369 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
370 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
371 		return 0;
372 
373 	case SPDK_BDEV_IO_TYPE_WRITE:
374 	case SPDK_BDEV_IO_TYPE_FLUSH:
375 		return bdev_rbd_start_aio(ch,
376 					  bdev_io,
377 					  bdev_io->u.bdev.iovs,
378 					  bdev_io->u.bdev.iovcnt,
379 					  bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
380 					  bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
381 
382 	case SPDK_BDEV_IO_TYPE_RESET:
383 		return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
384 				      bdev_io);
385 
386 	default:
387 		return -1;
388 	}
389 	return 0;
390 }
391 
392 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
393 {
394 	if (_bdev_rbd_submit_request(ch, bdev_io) < 0) {
395 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
396 	}
397 }
398 
399 static bool
400 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
401 {
402 	switch (io_type) {
403 	case SPDK_BDEV_IO_TYPE_READ:
404 	case SPDK_BDEV_IO_TYPE_WRITE:
405 	case SPDK_BDEV_IO_TYPE_FLUSH:
406 	case SPDK_BDEV_IO_TYPE_RESET:
407 		return true;
408 
409 	default:
410 		return false;
411 	}
412 }
413 
414 static void
415 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
416 {
417 	int i, io_status, rc;
418 	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
419 	struct spdk_bdev_io *bdev_io;
420 	struct bdev_rbd_io *rbd_io;
421 	enum spdk_bdev_io_status bio_status;
422 
423 	rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
424 	for (i = 0; i < rc; i++) {
425 		bdev_io = rbd_aio_get_arg(comps[i]);
426 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
427 		io_status = rbd_aio_get_return_value(comps[i]);
428 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
429 
430 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
431 			if ((int)rbd_io->total_len != io_status) {
432 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
433 			}
434 		} else {
435 			/* For others, 0 means success */
436 			if (io_status != 0) {
437 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
438 			}
439 		}
440 
441 		rbd_aio_release(comps[i]);
442 
443 		spdk_bdev_io_complete(bdev_io, bio_status);
444 	}
445 }
446 
447 static void
448 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
449 {
450 	if (!ch) {
451 		return;
452 	}
453 
454 	if (ch->image) {
455 		bdev_rbd_exit(ch->image);
456 	}
457 
458 	if (ch->io_ctx) {
459 		rados_ioctx_destroy(ch->io_ctx);
460 	}
461 
462 	if (ch->cluster) {
463 		rados_shutdown(ch->cluster);
464 	}
465 
466 	if (ch->pfd >= 0) {
467 		close(ch->pfd);
468 	}
469 
470 	if (ch->group_ch) {
471 		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
472 	}
473 }
474 
475 static void *
476 bdev_rbd_handle(void *arg)
477 {
478 	struct bdev_rbd_io_channel *ch = arg;
479 	void *ret = arg;
480 	int rc;
481 
482 	rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name,
483 				     (const char *const *)ch->disk->config,
484 				     &ch->cluster, &ch->io_ctx);
485 	if (rc < 0) {
486 		SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n",
487 			    ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name);
488 		ret = NULL;
489 		goto end;
490 	}
491 
492 	if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
493 		SPDK_ERRLOG("Failed to open specified rbd device\n");
494 		ret = NULL;
495 	}
496 
497 end:
498 	return ret;
499 }
500 
501 static int
502 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
503 {
504 	struct bdev_rbd_io_channel *ch = ctx_buf;
505 	int ret;
506 	struct epoll_event event;
507 
508 	ch->disk = io_device;
509 	ch->image = NULL;
510 	ch->io_ctx = NULL;
511 	ch->pfd = -1;
512 
513 	if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
514 		goto err;
515 	}
516 
517 	ch->pfd = eventfd(0, EFD_NONBLOCK);
518 	if (ch->pfd < 0) {
519 		SPDK_ERRLOG("Failed to get eventfd\n");
520 		goto err;
521 	}
522 
523 	ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD);
524 	if (ret < 0) {
525 		SPDK_ERRLOG("Failed to set rbd image notification\n");
526 		goto err;
527 	}
528 
529 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
530 	assert(ch->group_ch != NULL);
531 	memset(&event, 0, sizeof(event));
532 	event.events = EPOLLIN;
533 	event.data.ptr = ch;
534 
535 	ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event);
536 	if (ret < 0) {
537 		SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch,
538 			    ch->group_ch);
539 		goto err;
540 	}
541 
542 	return 0;
543 
544 err:
545 	bdev_rbd_free_channel(ch);
546 	return -1;
547 }
548 
549 static void
550 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
551 {
552 	struct bdev_rbd_io_channel *io_channel = ctx_buf;
553 	int rc;
554 
555 	rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL,
556 		       io_channel->pfd, NULL);
557 	if (rc < 0) {
558 		SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n",
559 			    io_channel, io_channel->group_ch);
560 	}
561 
562 	bdev_rbd_free_channel(io_channel);
563 }
564 
565 static struct spdk_io_channel *
566 bdev_rbd_get_io_channel(void *ctx)
567 {
568 	struct bdev_rbd *rbd_bdev = ctx;
569 
570 	return spdk_get_io_channel(rbd_bdev);
571 }
572 
573 static int
574 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
575 {
576 	struct bdev_rbd *rbd_bdev = ctx;
577 
578 	spdk_json_write_named_object_begin(w, "rbd");
579 
580 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
581 
582 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
583 
584 	if (rbd_bdev->user_id) {
585 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
586 	}
587 
588 	if (rbd_bdev->config) {
589 		char **entry = rbd_bdev->config;
590 
591 		spdk_json_write_named_object_begin(w, "config");
592 		while (*entry) {
593 			spdk_json_write_named_string(w, entry[0], entry[1]);
594 			entry += 2;
595 		}
596 		spdk_json_write_object_end(w);
597 	}
598 
599 	spdk_json_write_object_end(w);
600 
601 	return 0;
602 }
603 
604 static void
605 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
606 {
607 	struct bdev_rbd *rbd = bdev->ctxt;
608 
609 	spdk_json_write_object_begin(w);
610 
611 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
612 
613 	spdk_json_write_named_object_begin(w, "params");
614 	spdk_json_write_named_string(w, "name", bdev->name);
615 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
616 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
617 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
618 	if (rbd->user_id) {
619 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
620 	}
621 
622 	if (rbd->config) {
623 		char **entry = rbd->config;
624 
625 		spdk_json_write_named_object_begin(w, "config");
626 		while (*entry) {
627 			spdk_json_write_named_string(w, entry[0], entry[1]);
628 			entry += 2;
629 		}
630 		spdk_json_write_object_end(w);
631 	}
632 
633 	spdk_json_write_object_end(w);
634 
635 	spdk_json_write_object_end(w);
636 }
637 
638 static const struct spdk_bdev_fn_table rbd_fn_table = {
639 	.destruct		= bdev_rbd_destruct,
640 	.submit_request		= bdev_rbd_submit_request,
641 	.io_type_supported	= bdev_rbd_io_type_supported,
642 	.get_io_channel		= bdev_rbd_get_io_channel,
643 	.dump_info_json		= bdev_rbd_dump_info_json,
644 	.write_config_json	= bdev_rbd_write_config_json,
645 };
646 
647 int
648 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
649 		const char *pool_name,
650 		const char *const *config,
651 		const char *rbd_name,
652 		uint32_t block_size)
653 {
654 	struct bdev_rbd *rbd;
655 	int ret;
656 
657 	if ((pool_name == NULL) || (rbd_name == NULL)) {
658 		return -EINVAL;
659 	}
660 
661 	rbd = calloc(1, sizeof(struct bdev_rbd));
662 	if (rbd == NULL) {
663 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
664 		return -ENOMEM;
665 	}
666 
667 	rbd->rbd_name = strdup(rbd_name);
668 	if (!rbd->rbd_name) {
669 		bdev_rbd_free(rbd);
670 		return -ENOMEM;
671 	}
672 
673 	if (user_id) {
674 		rbd->user_id = strdup(user_id);
675 		if (!rbd->user_id) {
676 			bdev_rbd_free(rbd);
677 			return -ENOMEM;
678 		}
679 	}
680 
681 	rbd->pool_name = strdup(pool_name);
682 	if (!rbd->pool_name) {
683 		bdev_rbd_free(rbd);
684 		return -ENOMEM;
685 	}
686 
687 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
688 		bdev_rbd_free(rbd);
689 		return -ENOMEM;
690 	}
691 
692 	ret = bdev_rbd_init(rbd->user_id, rbd->pool_name,
693 			    (const char *const *)rbd->config,
694 			    rbd_name, &rbd->info);
695 	if (ret < 0) {
696 		bdev_rbd_free(rbd);
697 		SPDK_ERRLOG("Failed to init rbd device\n");
698 		return ret;
699 	}
700 
701 	if (name) {
702 		rbd->disk.name = strdup(name);
703 	} else {
704 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
705 	}
706 	if (!rbd->disk.name) {
707 		bdev_rbd_free(rbd);
708 		return -ENOMEM;
709 	}
710 	rbd->disk.product_name = "Ceph Rbd Disk";
711 	bdev_rbd_count++;
712 
713 	rbd->disk.write_cache = 0;
714 	rbd->disk.blocklen = block_size;
715 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
716 	rbd->disk.ctxt = rbd;
717 	rbd->disk.fn_table = &rbd_fn_table;
718 	rbd->disk.module = &rbd_if;
719 
720 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
721 
722 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
723 				bdev_rbd_destroy_cb,
724 				sizeof(struct bdev_rbd_io_channel),
725 				rbd_name);
726 	ret = spdk_bdev_register(&rbd->disk);
727 	if (ret) {
728 		spdk_io_device_unregister(rbd, NULL);
729 		bdev_rbd_free(rbd);
730 		return ret;
731 	}
732 
733 	*bdev = &(rbd->disk);
734 
735 	return ret;
736 }
737 
738 void
739 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
740 {
741 	if (!bdev || bdev->module != &rbd_if) {
742 		cb_fn(cb_arg, -ENODEV);
743 		return;
744 	}
745 
746 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
747 }
748 
749 int
750 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
751 {
752 	struct spdk_io_channel *ch;
753 	struct bdev_rbd_io_channel *rbd_io_ch;
754 	int rc;
755 	uint64_t new_size_in_byte;
756 	uint64_t current_size_in_mb;
757 
758 	if (bdev->module != &rbd_if) {
759 		return -EINVAL;
760 	}
761 
762 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
763 	if (current_size_in_mb > new_size_in_mb) {
764 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
765 		return -EINVAL;
766 	}
767 
768 	ch = bdev_rbd_get_io_channel(bdev);
769 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
770 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
771 
772 	rc = rbd_resize(rbd_io_ch->image, new_size_in_byte);
773 	if (rc != 0) {
774 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
775 		return rc;
776 	}
777 
778 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
779 	if (rc != 0) {
780 		SPDK_ERRLOG("failed to notify block cnt change.\n");
781 		return rc;
782 	}
783 
784 	return rc;
785 }
786 
787 static int
788 bdev_rbd_group_poll(void *arg)
789 {
790 	struct bdev_rbd_group_channel *group_ch = arg;
791 	struct epoll_event events[MAX_EVENTS_PER_POLL];
792 	int num_events, i;
793 
794 	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
795 
796 	if (num_events <= 0) {
797 		return SPDK_POLLER_IDLE;
798 	}
799 
800 	for (i = 0; i < num_events; i++) {
801 		bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr);
802 	}
803 
804 	return SPDK_POLLER_BUSY;
805 }
806 
807 static int
808 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
809 {
810 	struct bdev_rbd_group_channel *ch = ctx_buf;
811 
812 	ch->epoll_fd = epoll_create1(0);
813 	if (ch->epoll_fd < 0) {
814 		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
815 		return -1;
816 	}
817 
818 	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0);
819 
820 	return 0;
821 }
822 
823 static void
824 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
825 {
826 	struct bdev_rbd_group_channel *ch = ctx_buf;
827 
828 	if (ch->epoll_fd >= 0) {
829 		close(ch->epoll_fd);
830 	}
831 
832 	spdk_poller_unregister(&ch->poller);
833 }
834 
835 static int
836 bdev_rbd_library_init(void)
837 {
838 	int i, rc = 0;
839 	const char *val;
840 	const char *pool_name;
841 	const char *rbd_name;
842 	struct spdk_bdev *bdev;
843 	uint32_t block_size;
844 	long int tmp;
845 	struct spdk_conf_section *sp;
846 
847 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
848 				sizeof(struct bdev_rbd_group_channel),
849 				"bdev_rbd_poll_groups");
850 
851 	sp = spdk_conf_find_section(NULL, "Ceph");
852 	if (sp == NULL) {
853 		/*
854 		 * Ceph section not found.  Do not initialize any rbd LUNS.
855 		 */
856 		goto end;
857 	}
858 
859 	/* Init rbd block devices */
860 	for (i = 0; ; i++) {
861 		val = spdk_conf_section_get_nval(sp, "Ceph", i);
862 		if (val == NULL) {
863 			break;
864 		}
865 
866 		/* get the Rbd_pool name */
867 		pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0);
868 		if (pool_name == NULL) {
869 			SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i);
870 			rc = -1;
871 			goto end;
872 		}
873 
874 		rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1);
875 		if (rbd_name == NULL) {
876 			SPDK_ERRLOG("Ceph%d: format error\n", i);
877 			rc = -1;
878 			goto end;
879 		}
880 
881 		val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2);
882 
883 		if (val == NULL) {
884 			block_size = 512; /* default value */
885 		} else {
886 			tmp = spdk_strtol(val, 10);
887 			if (tmp <= 0) {
888 				SPDK_ERRLOG("Invalid block size\n");
889 				rc = -1;
890 				goto end;
891 			} else if (tmp & 0x1ff) {
892 				SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n",
893 					    tmp);
894 				rc = -1;
895 				goto end;
896 			}
897 			block_size = (uint32_t)tmp;
898 		}
899 
900 		/* TODO(?): user_id and rbd config values */
901 		rc = bdev_rbd_create(&bdev, NULL, NULL, pool_name, NULL, rbd_name, block_size);
902 		if (rc) {
903 			goto end;
904 		}
905 	}
906 
907 end:
908 	return rc;
909 }
910 
911 static void
912 bdev_rbd_library_fini(void)
913 {
914 	spdk_io_device_unregister(&rbd_if, NULL);
915 }
916 
917 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
918