xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision d919a197d60e407aa1137d7512f8b0af92f3d593)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41 #include <sys/epoll.h>
42 
43 #include "spdk/env.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
49 #include "spdk/likely.h"
50 
51 #include "spdk/bdev_module.h"
52 #include "spdk/log.h"
53 
54 #define SPDK_RBD_QUEUE_DEPTH 128
55 #define MAX_EVENTS_PER_POLL 128
56 
57 static int bdev_rbd_count = 0;
58 
59 struct bdev_rbd {
60 	struct spdk_bdev disk;
61 	char *rbd_name;
62 	char *user_id;
63 	char *pool_name;
64 	char **config;
65 
66 	rados_t cluster;
67 	rados_t *cluster_p;
68 	char *cluster_name;
69 
70 	rados_ioctx_t io_ctx;
71 	rbd_image_t image;
72 	int pfd;
73 
74 	rbd_image_info_t info;
75 	pthread_mutex_t mutex;
76 	struct spdk_thread *main_td;
77 	struct spdk_thread *destruct_td;
78 	uint32_t ch_count;
79 	struct bdev_rbd_group_channel *group_ch;
80 
81 	TAILQ_ENTRY(bdev_rbd) tailq;
82 	struct spdk_poller *reset_timer;
83 	struct spdk_bdev_io *reset_bdev_io;
84 };
85 
86 struct bdev_rbd_group_channel {
87 	struct spdk_poller *poller;
88 	int epoll_fd;
89 };
90 
91 struct bdev_rbd_io_channel {
92 	struct bdev_rbd *disk;
93 };
94 
95 struct bdev_rbd_io {
96 	struct			spdk_thread *submit_td;
97 	enum			spdk_bdev_io_status status;
98 	rbd_completion_t	comp;
99 	size_t			total_len;
100 };
101 
102 struct bdev_rbd_cluster {
103 	char *name;
104 	char *user_id;
105 	char **config_param;
106 	char *config_file;
107 	rados_t cluster;
108 	uint32_t ref;
109 	STAILQ_ENTRY(bdev_rbd_cluster) link;
110 };
111 
112 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
113 			g_map_bdev_rbd_cluster);
114 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
115 
116 static void
117 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
118 {
119 	assert(entry != NULL);
120 
121 	bdev_rbd_free_config(entry->config_param);
122 	free(entry->config_file);
123 	free(entry->user_id);
124 	free(entry->name);
125 	free(entry);
126 }
127 
128 static void
129 bdev_rbd_put_cluster(rados_t **cluster)
130 {
131 	struct bdev_rbd_cluster *entry;
132 
133 	assert(cluster != NULL);
134 
135 	/* No need go through the map if *cluster equals to NULL */
136 	if (*cluster == NULL) {
137 		return;
138 	}
139 
140 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
141 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
142 		if (*cluster != &entry->cluster) {
143 			continue;
144 		}
145 
146 		assert(entry->ref > 0);
147 		entry->ref--;
148 		*cluster = NULL;
149 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
150 		return;
151 	}
152 
153 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
154 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
155 }
156 
157 static void
158 bdev_rbd_free(struct bdev_rbd *rbd)
159 {
160 	if (!rbd) {
161 		return;
162 	}
163 
164 	free(rbd->disk.name);
165 	free(rbd->rbd_name);
166 	free(rbd->user_id);
167 	free(rbd->pool_name);
168 	bdev_rbd_free_config(rbd->config);
169 
170 	if (rbd->io_ctx) {
171 		rados_ioctx_destroy(rbd->io_ctx);
172 	}
173 
174 	if (rbd->cluster_name) {
175 		bdev_rbd_put_cluster(&rbd->cluster_p);
176 		free(rbd->cluster_name);
177 	} else if (rbd->cluster) {
178 		rados_shutdown(rbd->cluster);
179 	}
180 
181 	pthread_mutex_destroy(&rbd->mutex);
182 	free(rbd);
183 }
184 
185 void
186 bdev_rbd_free_config(char **config)
187 {
188 	char **entry;
189 
190 	if (config) {
191 		for (entry = config; *entry; entry++) {
192 			free(*entry);
193 		}
194 		free(config);
195 	}
196 }
197 
198 char **
199 bdev_rbd_dup_config(const char *const *config)
200 {
201 	size_t count;
202 	char **copy;
203 
204 	if (!config) {
205 		return NULL;
206 	}
207 	for (count = 0; config[count]; count++) {}
208 	copy = calloc(count + 1, sizeof(*copy));
209 	if (!copy) {
210 		return NULL;
211 	}
212 	for (count = 0; config[count]; count++) {
213 		if (!(copy[count] = strdup(config[count]))) {
214 			bdev_rbd_free_config(copy);
215 			return NULL;
216 		}
217 	}
218 	return copy;
219 }
220 
221 static int
222 bdev_rados_cluster_init(const char *user_id, const char *const *config,
223 			rados_t *cluster)
224 {
225 	int ret;
226 
227 	ret = rados_create(cluster, user_id);
228 	if (ret < 0) {
229 		SPDK_ERRLOG("Failed to create rados_t struct\n");
230 		return -1;
231 	}
232 
233 	if (config) {
234 		const char *const *entry = config;
235 		while (*entry) {
236 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
237 			if (ret < 0) {
238 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
239 				rados_shutdown(*cluster);
240 				return -1;
241 			}
242 			entry += 2;
243 		}
244 	} else {
245 		ret = rados_conf_read_file(*cluster, NULL);
246 		if (ret < 0) {
247 			SPDK_ERRLOG("Failed to read conf file\n");
248 			rados_shutdown(*cluster);
249 			return -1;
250 		}
251 	}
252 
253 	ret = rados_connect(*cluster);
254 	if (ret < 0) {
255 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
256 		rados_shutdown(*cluster);
257 		return -1;
258 	}
259 
260 	return 0;
261 }
262 
263 static int
264 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
265 {
266 	struct bdev_rbd_cluster *entry;
267 
268 	if (cluster == NULL) {
269 		SPDK_ERRLOG("cluster should not be NULL\n");
270 		return -1;
271 	}
272 
273 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
274 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
275 		if (strcmp(cluster_name, entry->name) == 0) {
276 			entry->ref++;
277 			*cluster = &entry->cluster;
278 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
279 			return 0;
280 		}
281 	}
282 
283 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
284 	return -1;
285 }
286 
287 static int
288 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
289 {
290 	int ret;
291 
292 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
293 	if (ret < 0) {
294 		SPDK_ERRLOG("Failed to create rados_t struct\n");
295 		return -1;
296 	}
297 
298 	return ret;
299 }
300 
301 static void *
302 bdev_rbd_cluster_handle(void *arg)
303 {
304 	void *ret = arg;
305 	struct bdev_rbd *rbd = arg;
306 	int rc;
307 
308 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
309 				     &rbd->cluster);
310 	if (rc < 0) {
311 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
312 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
313 		ret = NULL;
314 	}
315 
316 	return ret;
317 }
318 
319 static void *
320 bdev_rbd_init_context(void *arg)
321 {
322 	struct bdev_rbd *rbd = arg;
323 	int rc;
324 
325 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
326 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
327 		return NULL;
328 	}
329 
330 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
331 	if (rc < 0) {
332 		SPDK_ERRLOG("Failed to open specified rbd device\n");
333 		return NULL;
334 	}
335 
336 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
337 	rbd_close(rbd->image);
338 	if (rc < 0) {
339 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
340 		return NULL;
341 	}
342 
343 	return arg;
344 }
345 
346 static int
347 bdev_rbd_init(struct bdev_rbd *rbd)
348 {
349 	int ret = 0;
350 
351 	if (!rbd->cluster_name) {
352 		rbd->cluster_p = &rbd->cluster;
353 		/* Cluster should be created in non-SPDK thread to avoid conflict between
354 		 * Rados and SPDK thread */
355 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
356 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
357 			return -1;
358 		}
359 	} else {
360 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
361 		if (ret < 0) {
362 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
363 				    rbd, rbd->cluster_name);
364 			return -1;
365 		}
366 	}
367 
368 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
369 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
370 	}
371 
372 	return ret;
373 }
374 
375 static void
376 bdev_rbd_exit(rbd_image_t image)
377 {
378 	rbd_flush(image);
379 	rbd_close(image);
380 }
381 
382 static void
383 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
384 {
385 	/* Doing nothing here */
386 }
387 
388 static void
389 _bdev_rbd_io_complete(void *_rbd_io)
390 {
391 	struct bdev_rbd_io *rbd_io = _rbd_io;
392 
393 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
394 }
395 
396 static void
397 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
398 {
399 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
400 	struct spdk_thread *current_thread = spdk_get_thread();
401 
402 	rbd_io->status = status;
403 	assert(rbd_io->submit_td != NULL);
404 	if (rbd_io->submit_td != current_thread) {
405 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
406 	} else {
407 		_bdev_rbd_io_complete(rbd_io);
408 	}
409 }
410 
411 static void
412 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
413 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
414 {
415 	int ret;
416 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
417 	rbd_image_t image = disk->image;
418 
419 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
420 					&rbd_io->comp);
421 	if (ret < 0) {
422 		goto err;
423 	}
424 
425 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
426 		rbd_io->total_len = len;
427 		if (spdk_likely(iovcnt == 1)) {
428 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
429 		} else {
430 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
431 		}
432 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
433 		if (spdk_likely(iovcnt == 1)) {
434 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
435 		} else {
436 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
437 		}
438 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
439 		ret = rbd_aio_flush(image, rbd_io->comp);
440 	}
441 
442 	if (ret < 0) {
443 		rbd_aio_release(rbd_io->comp);
444 		goto err;
445 	}
446 
447 	return;
448 
449 err:
450 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
451 }
452 
453 static int bdev_rbd_library_init(void);
454 
455 static void bdev_rbd_library_fini(void);
456 
457 static int
458 bdev_rbd_get_ctx_size(void)
459 {
460 	return sizeof(struct bdev_rbd_io);
461 }
462 
463 static struct spdk_bdev_module rbd_if = {
464 	.name = "rbd",
465 	.module_init = bdev_rbd_library_init,
466 	.module_fini = bdev_rbd_library_fini,
467 	.get_ctx_size = bdev_rbd_get_ctx_size,
468 
469 };
470 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
471 
472 static int
473 bdev_rbd_reset_timer(void *arg)
474 {
475 	struct bdev_rbd *disk = arg;
476 
477 	/*
478 	 * TODO: This should check if any I/O is still in flight before completing the reset.
479 	 * For now, just complete after the timer expires.
480 	 */
481 	bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
482 	spdk_poller_unregister(&disk->reset_timer);
483 	disk->reset_bdev_io = NULL;
484 
485 	return SPDK_POLLER_BUSY;
486 }
487 
488 static void
489 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
490 {
491 	/*
492 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
493 	 * timer to wait for in-flight I/O to complete.
494 	 */
495 	assert(disk->reset_bdev_io == NULL);
496 	disk->reset_bdev_io = bdev_io;
497 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
498 }
499 
500 static void
501 _bdev_rbd_destruct_done(void *io_device)
502 {
503 	struct bdev_rbd *rbd = io_device;
504 
505 	assert(rbd != NULL);
506 	assert(rbd->ch_count == 0);
507 
508 	spdk_bdev_destruct_done(&rbd->disk, 0);
509 	bdev_rbd_free(rbd);
510 }
511 
512 static void
513 bdev_rbd_free_cb(void *io_device)
514 {
515 	struct bdev_rbd *rbd = io_device;
516 
517 	/* The io device has been unregistered.  Send a message back to the
518 	 * original thread that started the destruct operation, so that the
519 	 * bdev unregister callback is invoked on the same thread that started
520 	 * this whole process.
521 	 */
522 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
523 }
524 
525 static void
526 _bdev_rbd_destruct(void *ctx)
527 {
528 	struct bdev_rbd *rbd = ctx;
529 
530 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
531 }
532 
533 static int
534 bdev_rbd_destruct(void *ctx)
535 {
536 	struct bdev_rbd *rbd = ctx;
537 	struct spdk_thread *td;
538 
539 	if (rbd->main_td == NULL) {
540 		td = spdk_get_thread();
541 	} else {
542 		td = rbd->main_td;
543 	}
544 
545 	/* Start the destruct operation on the rbd bdev's
546 	 * main thread.  This guarantees it will only start
547 	 * executing after any messages related to channel
548 	 * deletions have finished completing.  *Always*
549 	 * send a message, even if this function gets called
550 	 * from the main thread, in case there are pending
551 	 * channel delete messages in flight to this thread.
552 	 */
553 	assert(rbd->destruct_td == NULL);
554 	rbd->destruct_td = td;
555 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
556 
557 	/* Return 1 to indicate the destruct path is asynchronous. */
558 	return 1;
559 }
560 
561 static void
562 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
563 		    bool success)
564 {
565 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
566 
567 	if (!success) {
568 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
569 		return;
570 	}
571 
572 	bdev_rbd_start_aio(disk,
573 			   bdev_io,
574 			   bdev_io->u.bdev.iovs,
575 			   bdev_io->u.bdev.iovcnt,
576 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
577 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
578 }
579 
580 static void
581 _bdev_rbd_submit_request(void *ctx)
582 {
583 	struct spdk_bdev_io *bdev_io = ctx;
584 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
585 
586 	switch (bdev_io->type) {
587 	case SPDK_BDEV_IO_TYPE_READ:
588 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
589 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
590 		break;
591 
592 	case SPDK_BDEV_IO_TYPE_WRITE:
593 	case SPDK_BDEV_IO_TYPE_FLUSH:
594 		bdev_rbd_start_aio(disk,
595 				   bdev_io,
596 				   bdev_io->u.bdev.iovs,
597 				   bdev_io->u.bdev.iovcnt,
598 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
599 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
600 		break;
601 
602 	case SPDK_BDEV_IO_TYPE_RESET:
603 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
604 			       bdev_io);
605 		break;
606 
607 	default:
608 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
609 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
610 		break;
611 	}
612 }
613 
614 static void
615 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
616 {
617 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
618 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
619 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
620 
621 	rbd_io->submit_td = submit_td;
622 	if (disk->main_td != submit_td) {
623 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
624 	} else {
625 		_bdev_rbd_submit_request(bdev_io);
626 	}
627 }
628 
629 static bool
630 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
631 {
632 	switch (io_type) {
633 	case SPDK_BDEV_IO_TYPE_READ:
634 	case SPDK_BDEV_IO_TYPE_WRITE:
635 	case SPDK_BDEV_IO_TYPE_FLUSH:
636 	case SPDK_BDEV_IO_TYPE_RESET:
637 		return true;
638 
639 	default:
640 		return false;
641 	}
642 }
643 
644 static void
645 bdev_rbd_io_poll(struct bdev_rbd *disk)
646 {
647 	int i, io_status, rc;
648 	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
649 	struct spdk_bdev_io *bdev_io;
650 	struct bdev_rbd_io *rbd_io;
651 	enum spdk_bdev_io_status bio_status;
652 
653 	rc = rbd_poll_io_events(disk->image, comps, SPDK_RBD_QUEUE_DEPTH);
654 	for (i = 0; i < rc; i++) {
655 		bdev_io = rbd_aio_get_arg(comps[i]);
656 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
657 		io_status = rbd_aio_get_return_value(comps[i]);
658 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
659 
660 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
661 			if ((int)rbd_io->total_len != io_status) {
662 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
663 			}
664 		} else {
665 			/* For others, 0 means success */
666 			if (io_status != 0) {
667 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
668 			}
669 		}
670 
671 		rbd_aio_release(comps[i]);
672 
673 		bdev_rbd_io_complete(bdev_io, bio_status);
674 	}
675 }
676 
677 static void
678 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
679 {
680 	int rc;
681 
682 	assert(disk != NULL);
683 	assert(disk->main_td == spdk_get_thread());
684 	assert(disk->ch_count == 0);
685 	assert(disk->group_ch != NULL);
686 	rc = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_DEL,
687 		       disk->pfd, NULL);
688 	if (rc < 0) {
689 		SPDK_ERRLOG("Failed to remove fd on disk=%p from the polling group=%p\n",
690 			    disk, disk->group_ch);
691 	}
692 	spdk_put_io_channel(spdk_io_channel_from_ctx(disk->group_ch));
693 
694 	if (disk->image) {
695 		bdev_rbd_exit(disk->image);
696 	}
697 
698 	if (disk->pfd >= 0) {
699 		close(disk->pfd);
700 	}
701 
702 	disk->main_td = NULL;
703 	disk->group_ch = NULL;
704 }
705 
706 static void *
707 bdev_rbd_handle(void *arg)
708 {
709 	struct bdev_rbd *disk = arg;
710 	void *ret = arg;
711 
712 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
713 		SPDK_ERRLOG("Failed to open specified rbd device\n");
714 		ret = NULL;
715 	}
716 
717 	return ret;
718 }
719 
720 static int
721 _bdev_rbd_create_cb(struct bdev_rbd *disk)
722 {
723 	int ret;
724 	struct epoll_event event = {};
725 
726 	disk->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
727 	assert(disk->group_ch != NULL);
728 	event.events = EPOLLIN;
729 	event.data.ptr = disk;
730 
731 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
732 		goto err;
733 	}
734 
735 	disk->pfd = eventfd(0, EFD_NONBLOCK);
736 	if (disk->pfd < 0) {
737 		SPDK_ERRLOG("Failed to get eventfd\n");
738 		goto err;
739 	}
740 
741 	ret = rbd_set_image_notification(disk->image, disk->pfd, EVENT_TYPE_EVENTFD);
742 	if (ret < 0) {
743 		SPDK_ERRLOG("Failed to set rbd image notification\n");
744 		goto err;
745 	}
746 
747 	ret = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_ADD, disk->pfd, &event);
748 	if (ret < 0) {
749 		SPDK_ERRLOG("Failed to add the fd of disk=%p to the epoll group from group_ch=%p\n", disk,
750 			    disk->group_ch);
751 		goto err;
752 	}
753 
754 	return 0;
755 
756 err:
757 	bdev_rbd_free_channel_resources(disk);
758 	return -1;
759 }
760 
761 static int
762 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
763 {
764 	struct bdev_rbd_io_channel *ch = ctx_buf;
765 	struct bdev_rbd *disk = io_device;
766 	int rc;
767 
768 	ch->disk = disk;
769 	pthread_mutex_lock(&disk->mutex);
770 	if (disk->ch_count == 0) {
771 		assert(disk->main_td == NULL);
772 		rc = _bdev_rbd_create_cb(disk);
773 		if (rc) {
774 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
775 			pthread_mutex_unlock(&disk->mutex);
776 			return rc;
777 		}
778 
779 		disk->main_td = spdk_get_thread();
780 	}
781 
782 	disk->ch_count++;
783 	pthread_mutex_unlock(&disk->mutex);
784 
785 	return 0;
786 }
787 
788 static void
789 _bdev_rbd_destroy_cb(void *ctx)
790 {
791 	struct bdev_rbd *disk = ctx;
792 
793 	pthread_mutex_lock(&disk->mutex);
794 	assert(disk->ch_count > 0);
795 	disk->ch_count--;
796 
797 	if (disk->ch_count > 0) {
798 		/* A new channel was created between when message was sent and this function executed */
799 		pthread_mutex_unlock(&disk->mutex);
800 		return;
801 	}
802 
803 	bdev_rbd_free_channel_resources(disk);
804 	pthread_mutex_unlock(&disk->mutex);
805 }
806 
807 static void
808 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
809 {
810 	struct bdev_rbd *disk = io_device;
811 	struct spdk_thread *thread;
812 
813 	pthread_mutex_lock(&disk->mutex);
814 	assert(disk->ch_count > 0);
815 	disk->ch_count--;
816 	if (disk->ch_count == 0) {
817 		assert(disk->main_td != NULL);
818 		if (disk->main_td != spdk_get_thread()) {
819 			/* The final channel was destroyed on a different thread
820 			 * than where the first channel was created. Pass a message
821 			 * to the main thread to unregister the poller. */
822 			disk->ch_count++;
823 			thread = disk->main_td;
824 			pthread_mutex_unlock(&disk->mutex);
825 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
826 			return;
827 		}
828 
829 		bdev_rbd_free_channel_resources(disk);
830 	}
831 	pthread_mutex_unlock(&disk->mutex);
832 }
833 
834 static struct spdk_io_channel *
835 bdev_rbd_get_io_channel(void *ctx)
836 {
837 	struct bdev_rbd *rbd_bdev = ctx;
838 
839 	return spdk_get_io_channel(rbd_bdev);
840 }
841 
842 static void
843 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
844 {
845 	struct bdev_rbd_cluster *entry;
846 
847 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
848 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
849 		if (strcmp(cluster_name, entry->name)) {
850 			continue;
851 		}
852 		if (entry->user_id) {
853 			spdk_json_write_named_string(w, "user_id", entry->user_id);
854 		}
855 
856 		if (entry->config_param) {
857 			char **config_entry = entry->config_param;
858 
859 			spdk_json_write_named_object_begin(w, "config_param");
860 			while (*config_entry) {
861 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
862 				config_entry += 2;
863 			}
864 			spdk_json_write_object_end(w);
865 		} else if (entry->config_file) {
866 			spdk_json_write_named_string(w, "config_file", entry->config_file);
867 		}
868 
869 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
870 		return;
871 	}
872 
873 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
874 }
875 
876 static int
877 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
878 {
879 	struct bdev_rbd *rbd_bdev = ctx;
880 
881 	spdk_json_write_named_object_begin(w, "rbd");
882 
883 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
884 
885 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
886 
887 	if (rbd_bdev->cluster_name) {
888 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
889 		goto end;
890 	}
891 
892 	if (rbd_bdev->user_id) {
893 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
894 	}
895 
896 	if (rbd_bdev->config) {
897 		char **entry = rbd_bdev->config;
898 
899 		spdk_json_write_named_object_begin(w, "config");
900 		while (*entry) {
901 			spdk_json_write_named_string(w, entry[0], entry[1]);
902 			entry += 2;
903 		}
904 		spdk_json_write_object_end(w);
905 	}
906 
907 end:
908 	spdk_json_write_object_end(w);
909 
910 	return 0;
911 }
912 
913 static void
914 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
915 {
916 	struct bdev_rbd *rbd = bdev->ctxt;
917 
918 	spdk_json_write_object_begin(w);
919 
920 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
921 
922 	spdk_json_write_named_object_begin(w, "params");
923 	spdk_json_write_named_string(w, "name", bdev->name);
924 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
925 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
926 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
927 	if (rbd->user_id) {
928 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
929 	}
930 
931 	if (rbd->config) {
932 		char **entry = rbd->config;
933 
934 		spdk_json_write_named_object_begin(w, "config");
935 		while (*entry) {
936 			spdk_json_write_named_string(w, entry[0], entry[1]);
937 			entry += 2;
938 		}
939 		spdk_json_write_object_end(w);
940 	}
941 
942 	spdk_json_write_object_end(w);
943 
944 	spdk_json_write_object_end(w);
945 }
946 
947 static void
948 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
949 {
950 	assert(entry != NULL);
951 
952 	spdk_json_write_object_begin(w);
953 	spdk_json_write_named_string(w, "cluster_name", entry->name);
954 
955 	if (entry->user_id) {
956 		spdk_json_write_named_string(w, "user_id", entry->user_id);
957 	}
958 
959 	if (entry->config_param) {
960 		char **config_entry = entry->config_param;
961 
962 		spdk_json_write_named_object_begin(w, "config_param");
963 		while (*config_entry) {
964 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
965 			config_entry += 2;
966 		}
967 		spdk_json_write_object_end(w);
968 	} else if (entry->config_file) {
969 		spdk_json_write_named_string(w, "config_file", entry->config_file);
970 	}
971 
972 	spdk_json_write_object_end(w);
973 }
974 
975 int
976 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
977 {
978 	struct bdev_rbd_cluster *entry;
979 	struct spdk_json_write_ctx *w;
980 
981 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
982 
983 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
984 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
985 		return -ENOENT;
986 	}
987 
988 	/* If cluster name is provided */
989 	if (name) {
990 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
991 			if (strcmp(name, entry->name) == 0) {
992 				w = spdk_jsonrpc_begin_result(request);
993 				dump_single_cluster_entry(entry, w);
994 				spdk_jsonrpc_end_result(request, w);
995 
996 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
997 				return 0;
998 			}
999 		}
1000 
1001 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1002 		return -ENOENT;
1003 	}
1004 
1005 	w = spdk_jsonrpc_begin_result(request);
1006 	spdk_json_write_array_begin(w);
1007 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1008 		dump_single_cluster_entry(entry, w);
1009 	}
1010 	spdk_json_write_array_end(w);
1011 	spdk_jsonrpc_end_result(request, w);
1012 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1013 
1014 	return 0;
1015 }
1016 
1017 static const struct spdk_bdev_fn_table rbd_fn_table = {
1018 	.destruct		= bdev_rbd_destruct,
1019 	.submit_request		= bdev_rbd_submit_request,
1020 	.io_type_supported	= bdev_rbd_io_type_supported,
1021 	.get_io_channel		= bdev_rbd_get_io_channel,
1022 	.dump_info_json		= bdev_rbd_dump_info_json,
1023 	.write_config_json	= bdev_rbd_write_config_json,
1024 };
1025 
1026 static int
1027 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
1028 		     const char *config_file)
1029 {
1030 	struct bdev_rbd_cluster *entry;
1031 	int rc;
1032 
1033 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1034 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1035 		if (strcmp(name, entry->name) == 0) {
1036 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
1037 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1038 			return -1;
1039 		}
1040 	}
1041 
1042 	entry = calloc(1, sizeof(*entry));
1043 	if (!entry) {
1044 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1045 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1046 		return -1;
1047 	}
1048 
1049 	entry->name = strdup(name);
1050 	if (entry->name == NULL) {
1051 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1052 		goto err_handle;
1053 	}
1054 
1055 	if (user_id) {
1056 		entry->user_id = strdup(user_id);
1057 		if (entry->user_id == NULL) {
1058 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1059 			goto err_handle;
1060 		}
1061 	}
1062 
1063 	/* The first priority is the config_param, then we use the config_file */
1064 	if (config_param) {
1065 		entry->config_param = bdev_rbd_dup_config(config_param);
1066 		if (entry->config_param == NULL) {
1067 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1068 			goto err_handle;
1069 		}
1070 	} else if (config_file) {
1071 		entry->config_file = strdup(config_file);
1072 		if (entry->config_file == NULL) {
1073 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1074 			goto err_handle;
1075 		}
1076 	}
1077 
1078 	rc = rados_create(&entry->cluster, user_id);
1079 	if (rc < 0) {
1080 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1081 		goto err_handle;
1082 	}
1083 
1084 	if (config_param) {
1085 		const char *const *config_entry = config_param;
1086 		while (*config_entry) {
1087 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1088 			if (rc < 0) {
1089 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1090 				rados_shutdown(entry->cluster);
1091 				goto err_handle;
1092 			}
1093 			config_entry += 2;
1094 		}
1095 	} else {
1096 		rc = rados_conf_read_file(entry->cluster, entry->config_file);
1097 		if (rc < 0) {
1098 			SPDK_ERRLOG("Failed to read conf file\n");
1099 			rados_shutdown(entry->cluster);
1100 			goto err_handle;
1101 		}
1102 	}
1103 
1104 	rc = rados_connect(entry->cluster);
1105 	if (rc < 0) {
1106 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1107 		rados_shutdown(entry->cluster);
1108 		goto err_handle;
1109 	}
1110 
1111 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1112 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1113 
1114 	return 0;
1115 
1116 err_handle:
1117 	bdev_rbd_cluster_free(entry);
1118 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1119 	return -1;
1120 }
1121 
1122 int
1123 bdev_rbd_unregister_cluster(const char *name)
1124 {
1125 	struct bdev_rbd_cluster *entry;
1126 	int rc = 0;
1127 
1128 	if (name == NULL) {
1129 		return -1;
1130 	}
1131 
1132 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1133 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1134 		if (strcmp(name, entry->name) == 0) {
1135 			if (entry->ref == 0) {
1136 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1137 				rados_shutdown(entry->cluster);
1138 				bdev_rbd_cluster_free(entry);
1139 			} else {
1140 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1141 					    entry->name);
1142 				rc = -1;
1143 			}
1144 
1145 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1146 			return rc;
1147 		}
1148 	}
1149 
1150 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1151 
1152 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1153 
1154 	return -1;
1155 }
1156 
1157 static void *
1158 _bdev_rbd_register_cluster(void *arg)
1159 {
1160 	struct cluster_register_info *info = arg;
1161 	void *ret = arg;
1162 	int rc;
1163 
1164 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1165 				  (const char *const *)info->config_param, (const char *)info->config_file);
1166 	if (rc) {
1167 		ret = NULL;
1168 	}
1169 
1170 	return ret;
1171 }
1172 
1173 int
1174 bdev_rbd_register_cluster(struct cluster_register_info *info)
1175 {
1176 	assert(info != NULL);
1177 
1178 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1179 	 * resource contention */
1180 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1181 		return -1;
1182 	}
1183 
1184 	return 0;
1185 }
1186 
1187 int
1188 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1189 		const char *pool_name,
1190 		const char *const *config,
1191 		const char *rbd_name,
1192 		uint32_t block_size,
1193 		const char *cluster_name)
1194 {
1195 	struct bdev_rbd *rbd;
1196 	int ret;
1197 
1198 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1199 		return -EINVAL;
1200 	}
1201 
1202 	rbd = calloc(1, sizeof(struct bdev_rbd));
1203 	if (rbd == NULL) {
1204 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1205 		return -ENOMEM;
1206 	}
1207 
1208 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1209 	if (ret) {
1210 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1211 		free(rbd);
1212 		return ret;
1213 	}
1214 
1215 	rbd->pfd = -1;
1216 	rbd->rbd_name = strdup(rbd_name);
1217 	if (!rbd->rbd_name) {
1218 		bdev_rbd_free(rbd);
1219 		return -ENOMEM;
1220 	}
1221 
1222 	if (user_id) {
1223 		rbd->user_id = strdup(user_id);
1224 		if (!rbd->user_id) {
1225 			bdev_rbd_free(rbd);
1226 			return -ENOMEM;
1227 		}
1228 	}
1229 
1230 	if (cluster_name) {
1231 		rbd->cluster_name = strdup(cluster_name);
1232 		if (!rbd->cluster_name) {
1233 			bdev_rbd_free(rbd);
1234 			return -ENOMEM;
1235 		}
1236 	}
1237 	rbd->pool_name = strdup(pool_name);
1238 	if (!rbd->pool_name) {
1239 		bdev_rbd_free(rbd);
1240 		return -ENOMEM;
1241 	}
1242 
1243 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1244 		bdev_rbd_free(rbd);
1245 		return -ENOMEM;
1246 	}
1247 
1248 	ret = bdev_rbd_init(rbd);
1249 	if (ret < 0) {
1250 		bdev_rbd_free(rbd);
1251 		SPDK_ERRLOG("Failed to init rbd device\n");
1252 		return ret;
1253 	}
1254 
1255 	if (name) {
1256 		rbd->disk.name = strdup(name);
1257 	} else {
1258 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1259 	}
1260 	if (!rbd->disk.name) {
1261 		bdev_rbd_free(rbd);
1262 		return -ENOMEM;
1263 	}
1264 	rbd->disk.product_name = "Ceph Rbd Disk";
1265 	bdev_rbd_count++;
1266 
1267 	rbd->disk.write_cache = 0;
1268 	rbd->disk.blocklen = block_size;
1269 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1270 	rbd->disk.ctxt = rbd;
1271 	rbd->disk.fn_table = &rbd_fn_table;
1272 	rbd->disk.module = &rbd_if;
1273 
1274 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1275 
1276 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1277 				bdev_rbd_destroy_cb,
1278 				sizeof(struct bdev_rbd_io_channel),
1279 				rbd_name);
1280 	ret = spdk_bdev_register(&rbd->disk);
1281 	if (ret) {
1282 		spdk_io_device_unregister(rbd, NULL);
1283 		bdev_rbd_free(rbd);
1284 		return ret;
1285 	}
1286 
1287 	*bdev = &(rbd->disk);
1288 
1289 	return ret;
1290 }
1291 
1292 void
1293 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1294 {
1295 	if (!bdev || bdev->module != &rbd_if) {
1296 		cb_fn(cb_arg, -ENODEV);
1297 		return;
1298 	}
1299 
1300 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
1301 }
1302 
1303 int
1304 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
1305 {
1306 	struct spdk_io_channel *ch;
1307 	struct bdev_rbd_io_channel *rbd_io_ch;
1308 	int rc;
1309 	uint64_t new_size_in_byte;
1310 	uint64_t current_size_in_mb;
1311 
1312 	if (bdev->module != &rbd_if) {
1313 		return -EINVAL;
1314 	}
1315 
1316 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1317 	if (current_size_in_mb > new_size_in_mb) {
1318 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
1319 		return -EINVAL;
1320 	}
1321 
1322 	ch = bdev_rbd_get_io_channel(bdev);
1323 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1324 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1325 
1326 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1327 	spdk_put_io_channel(ch);
1328 	if (rc != 0) {
1329 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1330 		return rc;
1331 	}
1332 
1333 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1334 	if (rc != 0) {
1335 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1336 		return rc;
1337 	}
1338 
1339 	return rc;
1340 }
1341 
1342 static int
1343 bdev_rbd_group_poll(void *arg)
1344 {
1345 	struct bdev_rbd_group_channel *group_ch = arg;
1346 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1347 	int num_events, i;
1348 
1349 	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
1350 
1351 	if (num_events <= 0) {
1352 		return SPDK_POLLER_IDLE;
1353 	}
1354 
1355 	for (i = 0; i < num_events; i++) {
1356 		bdev_rbd_io_poll((struct bdev_rbd *)events[i].data.ptr);
1357 	}
1358 
1359 	return SPDK_POLLER_BUSY;
1360 }
1361 
1362 static int
1363 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1364 {
1365 	struct bdev_rbd_group_channel *ch = ctx_buf;
1366 
1367 	ch->epoll_fd = epoll_create1(0);
1368 	if (ch->epoll_fd < 0) {
1369 		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
1370 		return -1;
1371 	}
1372 
1373 	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0);
1374 
1375 	return 0;
1376 }
1377 
1378 static void
1379 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1380 {
1381 	struct bdev_rbd_group_channel *ch = ctx_buf;
1382 
1383 	if (ch->epoll_fd >= 0) {
1384 		close(ch->epoll_fd);
1385 	}
1386 
1387 	spdk_poller_unregister(&ch->poller);
1388 }
1389 
1390 static int
1391 bdev_rbd_library_init(void)
1392 {
1393 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1394 				sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups");
1395 
1396 	return 0;
1397 }
1398 
1399 static void
1400 bdev_rbd_library_fini(void)
1401 {
1402 	spdk_io_device_unregister(&rbd_if, NULL);
1403 }
1404 
1405 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1406