xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 0ed85362c8132a2d1927757fbcade66b6660d26a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41 #include <sys/epoll.h>
42 
43 #include "spdk/conf.h"
44 #include "spdk/env.h"
45 #include "spdk/bdev.h"
46 #include "spdk/thread.h"
47 #include "spdk/json.h"
48 #include "spdk/string.h"
49 #include "spdk/util.h"
50 
51 #include "spdk/bdev_module.h"
52 #include "spdk_internal/log.h"
53 
54 #define SPDK_RBD_QUEUE_DEPTH 128
55 #define MAX_EVENTS_PER_POLL 128
56 
57 static int bdev_rbd_count = 0;
58 
59 #define BDEV_RBD_POLL_US 50
60 
61 struct bdev_rbd {
62 	struct spdk_bdev disk;
63 	char *rbd_name;
64 	char *user_id;
65 	char *pool_name;
66 	char **config;
67 	rbd_image_info_t info;
68 	TAILQ_ENTRY(bdev_rbd) tailq;
69 	struct spdk_poller *reset_timer;
70 	struct spdk_bdev_io *reset_bdev_io;
71 };
72 
73 struct bdev_rbd_group_channel {
74 	struct spdk_poller *poller;
75 	int epoll_fd;
76 };
77 
78 struct bdev_rbd_io_channel {
79 	rados_ioctx_t io_ctx;
80 	rados_t cluster;
81 	int pfd;
82 	rbd_image_t image;
83 	struct bdev_rbd *disk;
84 	struct bdev_rbd_group_channel *group_ch;
85 };
86 
87 struct bdev_rbd_io {
88 	size_t	total_len;
89 };
90 
91 static void
92 bdev_rbd_free(struct bdev_rbd *rbd)
93 {
94 	if (!rbd) {
95 		return;
96 	}
97 
98 	free(rbd->disk.name);
99 	free(rbd->rbd_name);
100 	free(rbd->user_id);
101 	free(rbd->pool_name);
102 	bdev_rbd_free_config(rbd->config);
103 	free(rbd);
104 }
105 
106 void
107 bdev_rbd_free_config(char **config)
108 {
109 	char **entry;
110 
111 	if (config) {
112 		for (entry = config; *entry; entry++) {
113 			free(*entry);
114 		}
115 		free(config);
116 	}
117 }
118 
119 char **
120 bdev_rbd_dup_config(const char *const *config)
121 {
122 	size_t count;
123 	char **copy;
124 
125 	if (!config) {
126 		return NULL;
127 	}
128 	for (count = 0; config[count]; count++) {}
129 	copy = calloc(count + 1, sizeof(*copy));
130 	if (!copy) {
131 		return NULL;
132 	}
133 	for (count = 0; config[count]; count++) {
134 		if (!(copy[count] = strdup(config[count]))) {
135 			bdev_rbd_free_config(copy);
136 			return NULL;
137 		}
138 	}
139 	return copy;
140 }
141 
142 static int
143 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
144 			rados_t *cluster, rados_ioctx_t *io_ctx)
145 {
146 	int ret;
147 
148 	ret = rados_create(cluster, user_id);
149 	if (ret < 0) {
150 		SPDK_ERRLOG("Failed to create rados_t struct\n");
151 		return -1;
152 	}
153 
154 	if (config) {
155 		const char *const *entry = config;
156 		while (*entry) {
157 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
158 			if (ret < 0) {
159 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
160 				rados_shutdown(*cluster);
161 				return -1;
162 			}
163 			entry += 2;
164 		}
165 	} else {
166 		ret = rados_conf_read_file(*cluster, NULL);
167 		if (ret < 0) {
168 			SPDK_ERRLOG("Failed to read conf file\n");
169 			rados_shutdown(*cluster);
170 			return -1;
171 		}
172 	}
173 
174 	ret = rados_connect(*cluster);
175 	if (ret < 0) {
176 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
177 		rados_shutdown(*cluster);
178 		return -1;
179 	}
180 
181 	ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx);
182 
183 	if (ret < 0) {
184 		SPDK_ERRLOG("Failed to create ioctx\n");
185 		rados_shutdown(*cluster);
186 		return -1;
187 	}
188 
189 	return 0;
190 }
191 
192 static int
193 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
194 	      const char *rbd_name, rbd_image_info_t *info)
195 {
196 	int ret;
197 	rados_t cluster = NULL;
198 	rados_ioctx_t io_ctx = NULL;
199 	rbd_image_t image = NULL;
200 
201 	ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx);
202 	if (ret < 0) {
203 		SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n",
204 			    user_id ? user_id : "admin (the default)", rbd_pool_name);
205 		return -1;
206 	}
207 
208 	ret = rbd_open(io_ctx, rbd_name, &image, NULL);
209 	if (ret < 0) {
210 		SPDK_ERRLOG("Failed to open specified rbd device\n");
211 		goto err;
212 	}
213 	ret = rbd_stat(image, info, sizeof(*info));
214 	rbd_close(image);
215 	if (ret < 0) {
216 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
217 		goto err;
218 	}
219 
220 	rados_ioctx_destroy(io_ctx);
221 	return 0;
222 err:
223 	rados_ioctx_destroy(io_ctx);
224 	rados_shutdown(cluster);
225 	return -1;
226 }
227 
228 static void
229 bdev_rbd_exit(rbd_image_t image)
230 {
231 	rbd_flush(image);
232 	rbd_close(image);
233 }
234 
235 static void
236 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
237 {
238 	/* Doing nothing here */
239 }
240 
241 static int
242 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
243 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
244 {
245 	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
246 	int ret;
247 	rbd_completion_t comp;
248 	struct bdev_rbd_io *rbd_io;
249 	rbd_image_t image = rbdio_ch->image;
250 
251 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
252 					&comp);
253 	if (ret < 0) {
254 		return -1;
255 	}
256 
257 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
258 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
259 		rbd_io->total_len = len;
260 		ret = rbd_aio_readv(image, iov, iovcnt, offset, comp);
261 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
262 		ret = rbd_aio_writev(image, iov, iovcnt, offset, comp);
263 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
264 		ret = rbd_aio_flush(image, comp);
265 	}
266 
267 	if (ret < 0) {
268 		rbd_aio_release(comp);
269 		return -1;
270 	}
271 
272 	return 0;
273 }
274 
275 static int bdev_rbd_library_init(void);
276 
277 static void bdev_rbd_library_fini(void);
278 
279 static int
280 bdev_rbd_get_ctx_size(void)
281 {
282 	return sizeof(struct bdev_rbd_io);
283 }
284 
285 static struct spdk_bdev_module rbd_if = {
286 	.name = "rbd",
287 	.module_init = bdev_rbd_library_init,
288 	.module_fini = bdev_rbd_library_fini,
289 	.get_ctx_size = bdev_rbd_get_ctx_size,
290 
291 };
292 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
293 
294 static int
295 bdev_rbd_reset_timer(void *arg)
296 {
297 	struct bdev_rbd *disk = arg;
298 
299 	/*
300 	 * TODO: This should check if any I/O is still in flight before completing the reset.
301 	 * For now, just complete after the timer expires.
302 	 */
303 	spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
304 	spdk_poller_unregister(&disk->reset_timer);
305 	disk->reset_bdev_io = NULL;
306 
307 	return SPDK_POLLER_BUSY;
308 }
309 
310 static int
311 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
312 {
313 	/*
314 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
315 	 * timer to wait for in-flight I/O to complete.
316 	 */
317 	assert(disk->reset_bdev_io == NULL);
318 	disk->reset_bdev_io = bdev_io;
319 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
320 
321 	return 0;
322 }
323 
324 static int
325 bdev_rbd_destruct(void *ctx)
326 {
327 	struct bdev_rbd *rbd = ctx;
328 
329 	spdk_io_device_unregister(rbd, NULL);
330 
331 	bdev_rbd_free(rbd);
332 	return 0;
333 }
334 
335 static void
336 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
337 		    bool success)
338 {
339 	int ret;
340 
341 	if (!success) {
342 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
343 		return;
344 	}
345 
346 	ret = bdev_rbd_start_aio(ch,
347 				 bdev_io,
348 				 bdev_io->u.bdev.iovs,
349 				 bdev_io->u.bdev.iovcnt,
350 				 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
351 				 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
352 
353 	if (ret != 0) {
354 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
355 	}
356 }
357 
358 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
359 {
360 	switch (bdev_io->type) {
361 	case SPDK_BDEV_IO_TYPE_READ:
362 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
363 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
364 		return 0;
365 
366 	case SPDK_BDEV_IO_TYPE_WRITE:
367 	case SPDK_BDEV_IO_TYPE_FLUSH:
368 		return bdev_rbd_start_aio(ch,
369 					  bdev_io,
370 					  bdev_io->u.bdev.iovs,
371 					  bdev_io->u.bdev.iovcnt,
372 					  bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
373 					  bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
374 
375 	case SPDK_BDEV_IO_TYPE_RESET:
376 		return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
377 				      bdev_io);
378 
379 	default:
380 		return -1;
381 	}
382 	return 0;
383 }
384 
385 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
386 {
387 	if (_bdev_rbd_submit_request(ch, bdev_io) < 0) {
388 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
389 	}
390 }
391 
392 static bool
393 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
394 {
395 	switch (io_type) {
396 	case SPDK_BDEV_IO_TYPE_READ:
397 	case SPDK_BDEV_IO_TYPE_WRITE:
398 	case SPDK_BDEV_IO_TYPE_FLUSH:
399 	case SPDK_BDEV_IO_TYPE_RESET:
400 		return true;
401 
402 	default:
403 		return false;
404 	}
405 }
406 
407 static void
408 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
409 {
410 	int i, io_status, rc;
411 	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
412 	struct spdk_bdev_io *bdev_io;
413 	struct bdev_rbd_io *rbd_io;
414 	enum spdk_bdev_io_status bio_status;
415 
416 	rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
417 	for (i = 0; i < rc; i++) {
418 		bdev_io = rbd_aio_get_arg(comps[i]);
419 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
420 		io_status = rbd_aio_get_return_value(comps[i]);
421 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
422 
423 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
424 			if ((int)rbd_io->total_len != io_status) {
425 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
426 			}
427 		} else {
428 			/* For others, 0 means success */
429 			if (io_status != 0) {
430 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
431 			}
432 		}
433 
434 		rbd_aio_release(comps[i]);
435 
436 		spdk_bdev_io_complete(bdev_io, bio_status);
437 	}
438 }
439 
440 static void
441 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
442 {
443 	if (!ch) {
444 		return;
445 	}
446 
447 	if (ch->image) {
448 		bdev_rbd_exit(ch->image);
449 	}
450 
451 	if (ch->io_ctx) {
452 		rados_ioctx_destroy(ch->io_ctx);
453 	}
454 
455 	if (ch->cluster) {
456 		rados_shutdown(ch->cluster);
457 	}
458 
459 	if (ch->pfd >= 0) {
460 		close(ch->pfd);
461 	}
462 
463 	if (ch->group_ch) {
464 		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
465 	}
466 }
467 
468 static void *
469 bdev_rbd_handle(void *arg)
470 {
471 	struct bdev_rbd_io_channel *ch = arg;
472 	void *ret = arg;
473 	int rc;
474 
475 	rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name,
476 				     (const char *const *)ch->disk->config,
477 				     &ch->cluster, &ch->io_ctx);
478 	if (rc < 0) {
479 		SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n",
480 			    ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name);
481 		ret = NULL;
482 		goto end;
483 	}
484 
485 	if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
486 		SPDK_ERRLOG("Failed to open specified rbd device\n");
487 		ret = NULL;
488 	}
489 
490 end:
491 	return ret;
492 }
493 
494 static int
495 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
496 {
497 	struct bdev_rbd_io_channel *ch = ctx_buf;
498 	int ret;
499 	struct epoll_event event;
500 
501 	ch->disk = io_device;
502 	ch->image = NULL;
503 	ch->io_ctx = NULL;
504 	ch->pfd = -1;
505 
506 	if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
507 		goto err;
508 	}
509 
510 	ch->pfd = eventfd(0, EFD_NONBLOCK);
511 	if (ch->pfd < 0) {
512 		SPDK_ERRLOG("Failed to get eventfd\n");
513 		goto err;
514 	}
515 
516 	ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD);
517 	if (ret < 0) {
518 		SPDK_ERRLOG("Failed to set rbd image notification\n");
519 		goto err;
520 	}
521 
522 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
523 	assert(ch->group_ch != NULL);
524 	memset(&event, 0, sizeof(event));
525 	event.events = EPOLLIN;
526 	event.data.ptr = ch;
527 
528 	ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event);
529 	if (ret < 0) {
530 		SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch,
531 			    ch->group_ch);
532 		goto err;
533 	}
534 
535 	return 0;
536 
537 err:
538 	bdev_rbd_free_channel(ch);
539 	return -1;
540 }
541 
542 static void
543 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
544 {
545 	struct bdev_rbd_io_channel *io_channel = ctx_buf;
546 	int rc;
547 
548 	rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL,
549 		       io_channel->pfd, NULL);
550 	if (rc < 0) {
551 		SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n",
552 			    io_channel, io_channel->group_ch);
553 	}
554 
555 	bdev_rbd_free_channel(io_channel);
556 }
557 
558 static struct spdk_io_channel *
559 bdev_rbd_get_io_channel(void *ctx)
560 {
561 	struct bdev_rbd *rbd_bdev = ctx;
562 
563 	return spdk_get_io_channel(rbd_bdev);
564 }
565 
566 static int
567 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
568 {
569 	struct bdev_rbd *rbd_bdev = ctx;
570 
571 	spdk_json_write_named_object_begin(w, "rbd");
572 
573 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
574 
575 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
576 
577 	if (rbd_bdev->user_id) {
578 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
579 	}
580 
581 	if (rbd_bdev->config) {
582 		char **entry = rbd_bdev->config;
583 
584 		spdk_json_write_named_object_begin(w, "config");
585 		while (*entry) {
586 			spdk_json_write_named_string(w, entry[0], entry[1]);
587 			entry += 2;
588 		}
589 		spdk_json_write_object_end(w);
590 	}
591 
592 	spdk_json_write_object_end(w);
593 
594 	return 0;
595 }
596 
597 static void
598 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
599 {
600 	struct bdev_rbd *rbd = bdev->ctxt;
601 
602 	spdk_json_write_object_begin(w);
603 
604 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
605 
606 	spdk_json_write_named_object_begin(w, "params");
607 	spdk_json_write_named_string(w, "name", bdev->name);
608 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
609 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
610 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
611 	if (rbd->user_id) {
612 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
613 	}
614 
615 	if (rbd->config) {
616 		char **entry = rbd->config;
617 
618 		spdk_json_write_named_object_begin(w, "config");
619 		while (*entry) {
620 			spdk_json_write_named_string(w, entry[0], entry[1]);
621 			entry += 2;
622 		}
623 		spdk_json_write_object_end(w);
624 	}
625 
626 	spdk_json_write_object_end(w);
627 
628 	spdk_json_write_object_end(w);
629 }
630 
631 static const struct spdk_bdev_fn_table rbd_fn_table = {
632 	.destruct		= bdev_rbd_destruct,
633 	.submit_request		= bdev_rbd_submit_request,
634 	.io_type_supported	= bdev_rbd_io_type_supported,
635 	.get_io_channel		= bdev_rbd_get_io_channel,
636 	.dump_info_json		= bdev_rbd_dump_info_json,
637 	.write_config_json	= bdev_rbd_write_config_json,
638 };
639 
640 int
641 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
642 		const char *pool_name,
643 		const char *const *config,
644 		const char *rbd_name,
645 		uint32_t block_size)
646 {
647 	struct bdev_rbd *rbd;
648 	int ret;
649 
650 	if ((pool_name == NULL) || (rbd_name == NULL)) {
651 		return -EINVAL;
652 	}
653 
654 	rbd = calloc(1, sizeof(struct bdev_rbd));
655 	if (rbd == NULL) {
656 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
657 		return -ENOMEM;
658 	}
659 
660 	rbd->rbd_name = strdup(rbd_name);
661 	if (!rbd->rbd_name) {
662 		bdev_rbd_free(rbd);
663 		return -ENOMEM;
664 	}
665 
666 	if (user_id) {
667 		rbd->user_id = strdup(user_id);
668 		if (!rbd->user_id) {
669 			bdev_rbd_free(rbd);
670 			return -ENOMEM;
671 		}
672 	}
673 
674 	rbd->pool_name = strdup(pool_name);
675 	if (!rbd->pool_name) {
676 		bdev_rbd_free(rbd);
677 		return -ENOMEM;
678 	}
679 
680 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
681 		bdev_rbd_free(rbd);
682 		return -ENOMEM;
683 	}
684 
685 	ret = bdev_rbd_init(rbd->user_id, rbd->pool_name,
686 			    (const char *const *)rbd->config,
687 			    rbd_name, &rbd->info);
688 	if (ret < 0) {
689 		bdev_rbd_free(rbd);
690 		SPDK_ERRLOG("Failed to init rbd device\n");
691 		return ret;
692 	}
693 
694 	if (name) {
695 		rbd->disk.name = strdup(name);
696 	} else {
697 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
698 	}
699 	if (!rbd->disk.name) {
700 		bdev_rbd_free(rbd);
701 		return -ENOMEM;
702 	}
703 	rbd->disk.product_name = "Ceph Rbd Disk";
704 	bdev_rbd_count++;
705 
706 	rbd->disk.write_cache = 0;
707 	rbd->disk.blocklen = block_size;
708 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
709 	rbd->disk.ctxt = rbd;
710 	rbd->disk.fn_table = &rbd_fn_table;
711 	rbd->disk.module = &rbd_if;
712 
713 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
714 
715 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
716 				bdev_rbd_destroy_cb,
717 				sizeof(struct bdev_rbd_io_channel),
718 				rbd_name);
719 	ret = spdk_bdev_register(&rbd->disk);
720 	if (ret) {
721 		spdk_io_device_unregister(rbd, NULL);
722 		bdev_rbd_free(rbd);
723 		return ret;
724 	}
725 
726 	*bdev = &(rbd->disk);
727 
728 	return ret;
729 }
730 
731 void
732 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
733 {
734 	if (!bdev || bdev->module != &rbd_if) {
735 		cb_fn(cb_arg, -ENODEV);
736 		return;
737 	}
738 
739 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
740 }
741 
742 int
743 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
744 {
745 	struct spdk_io_channel *ch;
746 	struct bdev_rbd_io_channel *rbd_io_ch;
747 	int rc;
748 	uint64_t new_size_in_byte;
749 	uint64_t current_size_in_mb;
750 
751 	if (bdev->module != &rbd_if) {
752 		return -EINVAL;
753 	}
754 
755 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
756 	if (current_size_in_mb > new_size_in_mb) {
757 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
758 		return -EINVAL;
759 	}
760 
761 	ch = bdev_rbd_get_io_channel(bdev);
762 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
763 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
764 
765 	rc = rbd_resize(rbd_io_ch->image, new_size_in_byte);
766 	if (rc != 0) {
767 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
768 		return rc;
769 	}
770 
771 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
772 	if (rc != 0) {
773 		SPDK_ERRLOG("failed to notify block cnt change.\n");
774 		return rc;
775 	}
776 
777 	return rc;
778 }
779 
780 static int
781 bdev_rbd_group_poll(void *arg)
782 {
783 	struct bdev_rbd_group_channel *group_ch = arg;
784 	struct epoll_event events[MAX_EVENTS_PER_POLL];
785 	int num_events, i;
786 
787 	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
788 
789 	if (num_events <= 0) {
790 		return SPDK_POLLER_IDLE;
791 	}
792 
793 	for (i = 0; i < num_events; i++) {
794 		bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr);
795 	}
796 
797 	return SPDK_POLLER_BUSY;
798 }
799 
800 static int
801 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
802 {
803 	struct bdev_rbd_group_channel *ch = ctx_buf;
804 
805 	ch->epoll_fd = epoll_create1(0);
806 	if (ch->epoll_fd < 0) {
807 		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
808 		return -1;
809 	}
810 
811 	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, BDEV_RBD_POLL_US);
812 
813 	return 0;
814 }
815 
816 static void
817 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
818 {
819 	struct bdev_rbd_group_channel *ch = ctx_buf;
820 
821 	if (ch->epoll_fd >= 0) {
822 		close(ch->epoll_fd);
823 	}
824 
825 	spdk_poller_unregister(&ch->poller);
826 }
827 
828 static int
829 bdev_rbd_library_init(void)
830 {
831 	int i, rc = 0;
832 	const char *val;
833 	const char *pool_name;
834 	const char *rbd_name;
835 	struct spdk_bdev *bdev;
836 	uint32_t block_size;
837 	long int tmp;
838 	struct spdk_conf_section *sp;
839 
840 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
841 				sizeof(struct bdev_rbd_group_channel),
842 				"bdev_rbd_poll_groups");
843 
844 	sp = spdk_conf_find_section(NULL, "Ceph");
845 	if (sp == NULL) {
846 		/*
847 		 * Ceph section not found.  Do not initialize any rbd LUNS.
848 		 */
849 		goto end;
850 	}
851 
852 	/* Init rbd block devices */
853 	for (i = 0; ; i++) {
854 		val = spdk_conf_section_get_nval(sp, "Ceph", i);
855 		if (val == NULL) {
856 			break;
857 		}
858 
859 		/* get the Rbd_pool name */
860 		pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0);
861 		if (pool_name == NULL) {
862 			SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i);
863 			rc = -1;
864 			goto end;
865 		}
866 
867 		rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1);
868 		if (rbd_name == NULL) {
869 			SPDK_ERRLOG("Ceph%d: format error\n", i);
870 			rc = -1;
871 			goto end;
872 		}
873 
874 		val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2);
875 
876 		if (val == NULL) {
877 			block_size = 512; /* default value */
878 		} else {
879 			tmp = spdk_strtol(val, 10);
880 			if (tmp <= 0) {
881 				SPDK_ERRLOG("Invalid block size\n");
882 				rc = -1;
883 				goto end;
884 			} else if (tmp & 0x1ff) {
885 				SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n",
886 					    tmp);
887 				rc = -1;
888 				goto end;
889 			}
890 			block_size = (uint32_t)tmp;
891 		}
892 
893 		/* TODO(?): user_id and rbd config values */
894 		rc = bdev_rbd_create(&bdev, NULL, NULL, pool_name, NULL, rbd_name, block_size);
895 		if (rc) {
896 			goto end;
897 		}
898 	}
899 
900 end:
901 	return rc;
902 }
903 
904 static void
905 bdev_rbd_library_fini(void)
906 {
907 	spdk_io_device_unregister(&rbd_if, NULL);
908 }
909 
910 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD)
911