xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 7c06be855a536a76a47e88f1632af3f64afc3af2)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41 #include <sys/epoll.h>
42 
43 #include "spdk/env.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
49 #include "spdk/likely.h"
50 
51 #include "spdk/bdev_module.h"
52 #include "spdk/log.h"
53 
54 #define SPDK_RBD_QUEUE_DEPTH 128
55 #define MAX_EVENTS_PER_POLL 128
56 
57 static int bdev_rbd_count = 0;
58 
59 struct bdev_rbd {
60 	struct spdk_bdev disk;
61 	char *rbd_name;
62 	char *user_id;
63 	char *pool_name;
64 	char **config;
65 
66 	rados_t cluster;
67 	rados_t *cluster_p;
68 	char *cluster_name;
69 
70 	rados_ioctx_t io_ctx;
71 	rbd_image_t image;
72 	int pfd;
73 
74 	rbd_image_info_t info;
75 	pthread_mutex_t mutex;
76 	struct spdk_thread *main_td;
77 	struct spdk_thread *destruct_td;
78 	uint32_t ch_count;
79 	struct bdev_rbd_group_channel *group_ch;
80 
81 	TAILQ_ENTRY(bdev_rbd) tailq;
82 	struct spdk_poller *reset_timer;
83 	struct spdk_bdev_io *reset_bdev_io;
84 };
85 
86 struct bdev_rbd_group_channel {
87 	struct spdk_poller *poller;
88 	int epoll_fd;
89 };
90 
91 struct bdev_rbd_io_channel {
92 	struct bdev_rbd *disk;
93 };
94 
95 struct bdev_rbd_io {
96 	struct	spdk_thread *submit_td;
97 	enum	spdk_bdev_io_status status;
98 	size_t	total_len;
99 };
100 
101 struct bdev_rbd_cluster {
102 	char *name;
103 	char *user_id;
104 	char **config_param;
105 	char *config_file;
106 	rados_t cluster;
107 	uint32_t ref;
108 	STAILQ_ENTRY(bdev_rbd_cluster) link;
109 };
110 
111 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
112 			g_map_bdev_rbd_cluster);
113 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
114 
115 static void
116 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
117 {
118 	assert(entry != NULL);
119 
120 	bdev_rbd_free_config(entry->config_param);
121 	free(entry->config_file);
122 	free(entry->user_id);
123 	free(entry->name);
124 	free(entry);
125 }
126 
127 static void
128 bdev_rbd_put_cluster(rados_t **cluster)
129 {
130 	struct bdev_rbd_cluster *entry;
131 
132 	assert(cluster != NULL);
133 
134 	/* No need go through the map if *cluster equals to NULL */
135 	if (*cluster == NULL) {
136 		return;
137 	}
138 
139 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
140 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
141 		if (*cluster != &entry->cluster) {
142 			continue;
143 		}
144 
145 		assert(entry->ref > 0);
146 		entry->ref--;
147 		*cluster = NULL;
148 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
149 		return;
150 	}
151 
152 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
153 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
154 }
155 
156 static void
157 bdev_rbd_free(struct bdev_rbd *rbd)
158 {
159 	if (!rbd) {
160 		return;
161 	}
162 
163 	free(rbd->disk.name);
164 	free(rbd->rbd_name);
165 	free(rbd->user_id);
166 	free(rbd->pool_name);
167 	bdev_rbd_free_config(rbd->config);
168 
169 	if (rbd->io_ctx) {
170 		rados_ioctx_destroy(rbd->io_ctx);
171 	}
172 
173 	if (rbd->cluster_name) {
174 		bdev_rbd_put_cluster(&rbd->cluster_p);
175 		free(rbd->cluster_name);
176 	} else if (rbd->cluster) {
177 		rados_shutdown(rbd->cluster);
178 	}
179 
180 	pthread_mutex_destroy(&rbd->mutex);
181 	free(rbd);
182 }
183 
184 void
185 bdev_rbd_free_config(char **config)
186 {
187 	char **entry;
188 
189 	if (config) {
190 		for (entry = config; *entry; entry++) {
191 			free(*entry);
192 		}
193 		free(config);
194 	}
195 }
196 
197 char **
198 bdev_rbd_dup_config(const char *const *config)
199 {
200 	size_t count;
201 	char **copy;
202 
203 	if (!config) {
204 		return NULL;
205 	}
206 	for (count = 0; config[count]; count++) {}
207 	copy = calloc(count + 1, sizeof(*copy));
208 	if (!copy) {
209 		return NULL;
210 	}
211 	for (count = 0; config[count]; count++) {
212 		if (!(copy[count] = strdup(config[count]))) {
213 			bdev_rbd_free_config(copy);
214 			return NULL;
215 		}
216 	}
217 	return copy;
218 }
219 
220 static int
221 bdev_rados_cluster_init(const char *user_id, const char *const *config,
222 			rados_t *cluster)
223 {
224 	int ret;
225 
226 	ret = rados_create(cluster, user_id);
227 	if (ret < 0) {
228 		SPDK_ERRLOG("Failed to create rados_t struct\n");
229 		return -1;
230 	}
231 
232 	if (config) {
233 		const char *const *entry = config;
234 		while (*entry) {
235 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
236 			if (ret < 0) {
237 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
238 				rados_shutdown(*cluster);
239 				return -1;
240 			}
241 			entry += 2;
242 		}
243 	} else {
244 		ret = rados_conf_read_file(*cluster, NULL);
245 		if (ret < 0) {
246 			SPDK_ERRLOG("Failed to read conf file\n");
247 			rados_shutdown(*cluster);
248 			return -1;
249 		}
250 	}
251 
252 	ret = rados_connect(*cluster);
253 	if (ret < 0) {
254 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
255 		rados_shutdown(*cluster);
256 		return -1;
257 	}
258 
259 	return 0;
260 }
261 
262 static int
263 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
264 {
265 	struct bdev_rbd_cluster *entry;
266 
267 	if (cluster == NULL) {
268 		SPDK_ERRLOG("cluster should not be NULL\n");
269 		return -1;
270 	}
271 
272 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
273 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
274 		if (strcmp(cluster_name, entry->name) == 0) {
275 			entry->ref++;
276 			*cluster = &entry->cluster;
277 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
278 			return 0;
279 		}
280 	}
281 
282 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
283 	return -1;
284 }
285 
286 static int
287 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
288 {
289 	int ret;
290 
291 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
292 	if (ret < 0) {
293 		SPDK_ERRLOG("Failed to create rados_t struct\n");
294 		return -1;
295 	}
296 
297 	return ret;
298 }
299 
300 static void *
301 bdev_rbd_cluster_handle(void *arg)
302 {
303 	void *ret = arg;
304 	struct bdev_rbd *rbd = arg;
305 	int rc;
306 
307 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
308 				     &rbd->cluster);
309 	if (rc < 0) {
310 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
311 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
312 		ret = NULL;
313 	}
314 
315 	return ret;
316 }
317 
318 static void *
319 bdev_rbd_init_context(void *arg)
320 {
321 	struct bdev_rbd *rbd = arg;
322 	int rc;
323 
324 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
325 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
326 		return NULL;
327 	}
328 
329 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
330 	if (rc < 0) {
331 		SPDK_ERRLOG("Failed to open specified rbd device\n");
332 		return NULL;
333 	}
334 
335 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
336 	rbd_close(rbd->image);
337 	if (rc < 0) {
338 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
339 		return NULL;
340 	}
341 
342 	return arg;
343 }
344 
345 static int
346 bdev_rbd_init(struct bdev_rbd *rbd)
347 {
348 	int ret = 0;
349 
350 	if (!rbd->cluster_name) {
351 		rbd->cluster_p = &rbd->cluster;
352 		/* Cluster should be created in non-SPDK thread to avoid conflict between
353 		 * Rados and SPDK thread */
354 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
355 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
356 			return -1;
357 		}
358 	} else {
359 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
360 		if (ret < 0) {
361 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
362 				    rbd, rbd->cluster_name);
363 			return -1;
364 		}
365 	}
366 
367 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
368 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
369 	}
370 
371 	return ret;
372 }
373 
374 static void
375 bdev_rbd_exit(rbd_image_t image)
376 {
377 	rbd_flush(image);
378 	rbd_close(image);
379 }
380 
381 static void
382 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
383 {
384 	/* Doing nothing here */
385 }
386 
387 static void
388 _bdev_rbd_io_complete(void *_rbd_io)
389 {
390 	struct bdev_rbd_io *rbd_io = _rbd_io;
391 
392 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
393 }
394 
395 static void
396 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
397 {
398 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
399 
400 	rbd_io->status = status;
401 	if (rbd_io->submit_td != NULL) {
402 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
403 	} else {
404 		_bdev_rbd_io_complete(rbd_io);
405 	}
406 }
407 
408 static void
409 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
410 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
411 {
412 	int ret;
413 	rbd_completion_t comp;
414 	struct bdev_rbd_io *rbd_io;
415 	rbd_image_t image = disk->image;
416 
417 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
418 					&comp);
419 	if (ret < 0) {
420 		goto err;
421 	}
422 
423 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
424 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
425 		rbd_io->total_len = len;
426 		if (spdk_likely(iovcnt == 1)) {
427 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
428 		} else {
429 			ret = rbd_aio_readv(image, iov, iovcnt, offset, comp);
430 		}
431 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
432 		if (spdk_likely(iovcnt == 1)) {
433 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
434 		} else {
435 			ret = rbd_aio_writev(image, iov, iovcnt, offset, comp);
436 		}
437 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
438 		ret = rbd_aio_flush(image, comp);
439 	}
440 
441 	if (ret < 0) {
442 		rbd_aio_release(comp);
443 		goto err;
444 	}
445 
446 	return;
447 
448 err:
449 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
450 }
451 
452 static int bdev_rbd_library_init(void);
453 
454 static void bdev_rbd_library_fini(void);
455 
456 static int
457 bdev_rbd_get_ctx_size(void)
458 {
459 	return sizeof(struct bdev_rbd_io);
460 }
461 
462 static struct spdk_bdev_module rbd_if = {
463 	.name = "rbd",
464 	.module_init = bdev_rbd_library_init,
465 	.module_fini = bdev_rbd_library_fini,
466 	.get_ctx_size = bdev_rbd_get_ctx_size,
467 
468 };
469 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
470 
471 static int
472 bdev_rbd_reset_timer(void *arg)
473 {
474 	struct bdev_rbd *disk = arg;
475 
476 	/*
477 	 * TODO: This should check if any I/O is still in flight before completing the reset.
478 	 * For now, just complete after the timer expires.
479 	 */
480 	bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
481 	spdk_poller_unregister(&disk->reset_timer);
482 	disk->reset_bdev_io = NULL;
483 
484 	return SPDK_POLLER_BUSY;
485 }
486 
487 static void
488 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
489 {
490 	/*
491 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
492 	 * timer to wait for in-flight I/O to complete.
493 	 */
494 	assert(disk->reset_bdev_io == NULL);
495 	disk->reset_bdev_io = bdev_io;
496 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
497 }
498 
499 static void
500 _bdev_rbd_destruct_done(void *io_device)
501 {
502 	struct bdev_rbd *rbd = io_device;
503 
504 	assert(rbd != NULL);
505 	assert(rbd->ch_count == 0);
506 
507 	spdk_bdev_destruct_done(&rbd->disk, 0);
508 	bdev_rbd_free(rbd);
509 }
510 
511 static void
512 bdev_rbd_free_cb(void *io_device)
513 {
514 	struct bdev_rbd *rbd = io_device;
515 
516 	/* The io device has been unregistered.  Send a message back to the
517 	 * original thread that started the destruct operation, so that the
518 	 * bdev unregister callback is invoked on the same thread that started
519 	 * this whole process.
520 	 */
521 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
522 }
523 
524 static void
525 _bdev_rbd_destruct(void *ctx)
526 {
527 	struct bdev_rbd *rbd = ctx;
528 
529 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
530 }
531 
532 static int
533 bdev_rbd_destruct(void *ctx)
534 {
535 	struct bdev_rbd *rbd = ctx;
536 	struct spdk_thread *td;
537 
538 	if (rbd->main_td == NULL) {
539 		td = spdk_get_thread();
540 	} else {
541 		td = rbd->main_td;
542 	}
543 
544 	/* Start the destruct operation on the rbd bdev's
545 	 * main thread.  This guarantees it will only start
546 	 * executing after any messages related to channel
547 	 * deletions have finished completing.  *Always*
548 	 * send a message, even if this function gets called
549 	 * from the main thread, in case there are pending
550 	 * channel delete messages in flight to this thread.
551 	 */
552 	assert(rbd->destruct_td == NULL);
553 	rbd->destruct_td = td;
554 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
555 
556 	/* Return 1 to indicate the destruct path is asynchronous. */
557 	return 1;
558 }
559 
560 static void
561 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
562 		    bool success)
563 {
564 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
565 
566 	if (!success) {
567 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
568 		return;
569 	}
570 
571 	bdev_rbd_start_aio(disk,
572 			   bdev_io,
573 			   bdev_io->u.bdev.iovs,
574 			   bdev_io->u.bdev.iovcnt,
575 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
576 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
577 }
578 
579 static void
580 _bdev_rbd_submit_request(void *ctx)
581 {
582 	struct spdk_bdev_io *bdev_io = ctx;
583 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
584 
585 	switch (bdev_io->type) {
586 	case SPDK_BDEV_IO_TYPE_READ:
587 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
588 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
589 		break;
590 
591 	case SPDK_BDEV_IO_TYPE_WRITE:
592 	case SPDK_BDEV_IO_TYPE_FLUSH:
593 		bdev_rbd_start_aio(disk,
594 				   bdev_io,
595 				   bdev_io->u.bdev.iovs,
596 				   bdev_io->u.bdev.iovcnt,
597 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
598 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
599 		break;
600 
601 	case SPDK_BDEV_IO_TYPE_RESET:
602 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
603 			       bdev_io);
604 		break;
605 
606 	default:
607 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
608 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
609 		break;
610 	}
611 }
612 
613 static void
614 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
615 {
616 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
617 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
618 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
619 
620 	if (disk->main_td != submit_td) {
621 		rbd_io->submit_td = submit_td;
622 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
623 	} else {
624 		rbd_io->submit_td = NULL;
625 		_bdev_rbd_submit_request(bdev_io);
626 	}
627 }
628 
629 static bool
630 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
631 {
632 	switch (io_type) {
633 	case SPDK_BDEV_IO_TYPE_READ:
634 	case SPDK_BDEV_IO_TYPE_WRITE:
635 	case SPDK_BDEV_IO_TYPE_FLUSH:
636 	case SPDK_BDEV_IO_TYPE_RESET:
637 		return true;
638 
639 	default:
640 		return false;
641 	}
642 }
643 
644 static void
645 bdev_rbd_io_poll(struct bdev_rbd *disk)
646 {
647 	int i, io_status, rc;
648 	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
649 	struct spdk_bdev_io *bdev_io;
650 	struct bdev_rbd_io *rbd_io;
651 	enum spdk_bdev_io_status bio_status;
652 
653 	rc = rbd_poll_io_events(disk->image, comps, SPDK_RBD_QUEUE_DEPTH);
654 	for (i = 0; i < rc; i++) {
655 		bdev_io = rbd_aio_get_arg(comps[i]);
656 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
657 		io_status = rbd_aio_get_return_value(comps[i]);
658 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
659 
660 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
661 			if ((int)rbd_io->total_len != io_status) {
662 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
663 			}
664 		} else {
665 			/* For others, 0 means success */
666 			if (io_status != 0) {
667 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
668 			}
669 		}
670 
671 		rbd_aio_release(comps[i]);
672 
673 		bdev_rbd_io_complete(bdev_io, bio_status);
674 	}
675 }
676 
677 static void
678 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
679 {
680 	int rc;
681 
682 	assert(disk != NULL);
683 	assert(disk->main_td == spdk_get_thread());
684 	assert(disk->ch_count == 0);
685 	assert(disk->group_ch != NULL);
686 	rc = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_DEL,
687 		       disk->pfd, NULL);
688 	if (rc < 0) {
689 		SPDK_ERRLOG("Failed to remove fd on disk=%p from the polling group=%p\n",
690 			    disk, disk->group_ch);
691 	}
692 	spdk_put_io_channel(spdk_io_channel_from_ctx(disk->group_ch));
693 
694 	if (disk->image) {
695 		bdev_rbd_exit(disk->image);
696 	}
697 
698 	if (disk->pfd >= 0) {
699 		close(disk->pfd);
700 	}
701 
702 	disk->main_td = NULL;
703 	disk->group_ch = NULL;
704 }
705 
706 static void *
707 bdev_rbd_handle(void *arg)
708 {
709 	struct bdev_rbd *disk = arg;
710 	void *ret = arg;
711 
712 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
713 		SPDK_ERRLOG("Failed to open specified rbd device\n");
714 		ret = NULL;
715 	}
716 
717 	return ret;
718 }
719 
720 static int
721 _bdev_rbd_create_cb(struct bdev_rbd *disk)
722 {
723 	int ret;
724 	struct epoll_event event = {};
725 
726 	disk->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
727 	assert(disk->group_ch != NULL);
728 	event.events = EPOLLIN;
729 	event.data.ptr = disk;
730 
731 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
732 		goto err;
733 	}
734 
735 	disk->pfd = eventfd(0, EFD_NONBLOCK);
736 	if (disk->pfd < 0) {
737 		SPDK_ERRLOG("Failed to get eventfd\n");
738 		goto err;
739 	}
740 
741 	ret = rbd_set_image_notification(disk->image, disk->pfd, EVENT_TYPE_EVENTFD);
742 	if (ret < 0) {
743 		SPDK_ERRLOG("Failed to set rbd image notification\n");
744 		goto err;
745 	}
746 
747 	ret = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_ADD, disk->pfd, &event);
748 	if (ret < 0) {
749 		SPDK_ERRLOG("Failed to add the fd of disk=%p to the epoll group from group_ch=%p\n", disk,
750 			    disk->group_ch);
751 		goto err;
752 	}
753 
754 	return 0;
755 
756 err:
757 	bdev_rbd_free_channel_resources(disk);
758 	return -1;
759 }
760 
761 static int
762 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
763 {
764 	struct bdev_rbd_io_channel *ch = ctx_buf;
765 	struct bdev_rbd *disk = io_device;
766 	int rc;
767 
768 	ch->disk = disk;
769 	pthread_mutex_lock(&disk->mutex);
770 	if (disk->ch_count == 0) {
771 		assert(disk->main_td == NULL);
772 		rc = _bdev_rbd_create_cb(disk);
773 		if (rc) {
774 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
775 			pthread_mutex_unlock(&disk->mutex);
776 			return rc;
777 		}
778 
779 		disk->main_td = spdk_get_thread();
780 	}
781 
782 	disk->ch_count++;
783 	pthread_mutex_unlock(&disk->mutex);
784 
785 	return 0;
786 }
787 
788 static void
789 _bdev_rbd_destroy_cb(void *ctx)
790 {
791 	struct bdev_rbd *disk = ctx;
792 
793 	pthread_mutex_lock(&disk->mutex);
794 	assert(disk->ch_count > 0);
795 	disk->ch_count--;
796 
797 	if (disk->ch_count > 0) {
798 		/* A new channel was created between when message was sent and this function executed */
799 		pthread_mutex_unlock(&disk->mutex);
800 		return;
801 	}
802 
803 	bdev_rbd_free_channel_resources(disk);
804 	pthread_mutex_unlock(&disk->mutex);
805 }
806 
807 static void
808 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
809 {
810 	struct bdev_rbd *disk = io_device;
811 	struct spdk_thread *thread;
812 
813 	pthread_mutex_lock(&disk->mutex);
814 	assert(disk->ch_count > 0);
815 	disk->ch_count--;
816 	if (disk->ch_count == 0) {
817 		assert(disk->main_td != NULL);
818 		if (disk->main_td != spdk_get_thread()) {
819 			/* The final channel was destroyed on a different thread
820 			 * than where the first channel was created. Pass a message
821 			 * to the main thread to unregister the poller. */
822 			disk->ch_count++;
823 			thread = disk->main_td;
824 			pthread_mutex_unlock(&disk->mutex);
825 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
826 			return;
827 		}
828 
829 		bdev_rbd_free_channel_resources(disk);
830 	}
831 	pthread_mutex_unlock(&disk->mutex);
832 }
833 
834 static struct spdk_io_channel *
835 bdev_rbd_get_io_channel(void *ctx)
836 {
837 	struct bdev_rbd *rbd_bdev = ctx;
838 
839 	return spdk_get_io_channel(rbd_bdev);
840 }
841 
842 static void
843 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
844 {
845 	struct bdev_rbd_cluster *entry;
846 
847 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
848 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
849 		if (strcmp(cluster_name, entry->name)) {
850 			continue;
851 		}
852 		if (entry->user_id) {
853 			spdk_json_write_named_string(w, "user_id", entry->user_id);
854 		}
855 
856 		if (entry->config_param) {
857 			char **config_entry = entry->config_param;
858 
859 			spdk_json_write_named_object_begin(w, "config_param");
860 			while (*config_entry) {
861 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
862 				config_entry += 2;
863 			}
864 			spdk_json_write_object_end(w);
865 		} else if (entry->config_file) {
866 			spdk_json_write_named_string(w, "config_file", entry->config_file);
867 		}
868 
869 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
870 		return;
871 	}
872 
873 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
874 }
875 
876 static int
877 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
878 {
879 	struct bdev_rbd *rbd_bdev = ctx;
880 
881 	spdk_json_write_named_object_begin(w, "rbd");
882 
883 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
884 
885 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
886 
887 	if (rbd_bdev->cluster_name) {
888 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
889 		goto end;
890 	}
891 
892 	if (rbd_bdev->user_id) {
893 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
894 	}
895 
896 	if (rbd_bdev->config) {
897 		char **entry = rbd_bdev->config;
898 
899 		spdk_json_write_named_object_begin(w, "config");
900 		while (*entry) {
901 			spdk_json_write_named_string(w, entry[0], entry[1]);
902 			entry += 2;
903 		}
904 		spdk_json_write_object_end(w);
905 	}
906 
907 end:
908 	spdk_json_write_object_end(w);
909 
910 	return 0;
911 }
912 
913 static void
914 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
915 {
916 	struct bdev_rbd *rbd = bdev->ctxt;
917 
918 	spdk_json_write_object_begin(w);
919 
920 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
921 
922 	spdk_json_write_named_object_begin(w, "params");
923 	spdk_json_write_named_string(w, "name", bdev->name);
924 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
925 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
926 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
927 	if (rbd->user_id) {
928 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
929 	}
930 
931 	if (rbd->config) {
932 		char **entry = rbd->config;
933 
934 		spdk_json_write_named_object_begin(w, "config");
935 		while (*entry) {
936 			spdk_json_write_named_string(w, entry[0], entry[1]);
937 			entry += 2;
938 		}
939 		spdk_json_write_object_end(w);
940 	}
941 
942 	spdk_json_write_object_end(w);
943 
944 	spdk_json_write_object_end(w);
945 }
946 
947 static void
948 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
949 {
950 	assert(entry != NULL);
951 
952 	spdk_json_write_object_begin(w);
953 	spdk_json_write_named_string(w, "cluster_name", entry->name);
954 
955 	if (entry->user_id) {
956 		spdk_json_write_named_string(w, "user_id", entry->user_id);
957 	}
958 
959 	if (entry->config_param) {
960 		char **config_entry = entry->config_param;
961 
962 		spdk_json_write_named_object_begin(w, "config_param");
963 		while (*config_entry) {
964 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
965 			config_entry += 2;
966 		}
967 		spdk_json_write_object_end(w);
968 	} else if (entry->config_file) {
969 		spdk_json_write_named_string(w, "config_file", entry->config_file);
970 	}
971 
972 	spdk_json_write_object_end(w);
973 }
974 
975 int
976 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
977 {
978 	struct bdev_rbd_cluster *entry;
979 	struct spdk_json_write_ctx *w;
980 
981 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
982 
983 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
984 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
985 		return -ENOENT;
986 	}
987 
988 	/* If cluster name is provided */
989 	if (name) {
990 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
991 			if (strcmp(name, entry->name) == 0) {
992 				w = spdk_jsonrpc_begin_result(request);
993 				dump_single_cluster_entry(entry, w);
994 				spdk_jsonrpc_end_result(request, w);
995 
996 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
997 				return 0;
998 			}
999 		}
1000 
1001 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1002 		return -ENOENT;
1003 	}
1004 
1005 	w = spdk_jsonrpc_begin_result(request);
1006 	spdk_json_write_array_begin(w);
1007 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1008 		dump_single_cluster_entry(entry, w);
1009 	}
1010 	spdk_json_write_array_end(w);
1011 	spdk_jsonrpc_end_result(request, w);
1012 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1013 
1014 	return 0;
1015 }
1016 
1017 static const struct spdk_bdev_fn_table rbd_fn_table = {
1018 	.destruct		= bdev_rbd_destruct,
1019 	.submit_request		= bdev_rbd_submit_request,
1020 	.io_type_supported	= bdev_rbd_io_type_supported,
1021 	.get_io_channel		= bdev_rbd_get_io_channel,
1022 	.dump_info_json		= bdev_rbd_dump_info_json,
1023 	.write_config_json	= bdev_rbd_write_config_json,
1024 };
1025 
1026 static int
1027 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
1028 		     const char *config_file)
1029 {
1030 	struct bdev_rbd_cluster *entry;
1031 	int rc;
1032 
1033 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1034 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1035 		if (strcmp(name, entry->name) == 0) {
1036 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
1037 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1038 			return -1;
1039 		}
1040 	}
1041 
1042 	entry = calloc(1, sizeof(*entry));
1043 	if (!entry) {
1044 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1045 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1046 		return -1;
1047 	}
1048 
1049 	entry->name = strdup(name);
1050 	if (entry->name == NULL) {
1051 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1052 		goto err_handle;
1053 	}
1054 
1055 	if (user_id) {
1056 		entry->user_id = strdup(user_id);
1057 		if (entry->user_id == NULL) {
1058 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1059 			goto err_handle;
1060 		}
1061 	}
1062 
1063 	/* The first priority is the config_param, then we use the config_file */
1064 	if (config_param) {
1065 		entry->config_param = bdev_rbd_dup_config(config_param);
1066 		if (entry->config_param == NULL) {
1067 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1068 			goto err_handle;
1069 		}
1070 	} else if (config_file) {
1071 		entry->config_file = strdup(config_file);
1072 		if (entry->config_file == NULL) {
1073 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1074 			goto err_handle;
1075 		}
1076 	}
1077 
1078 	rc = rados_create(&entry->cluster, user_id);
1079 	if (rc < 0) {
1080 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1081 		goto err_handle;
1082 	}
1083 
1084 	if (config_param) {
1085 		const char *const *config_entry = config_param;
1086 		while (*config_entry) {
1087 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1088 			if (rc < 0) {
1089 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1090 				rados_shutdown(entry->cluster);
1091 				goto err_handle;
1092 			}
1093 			config_entry += 2;
1094 		}
1095 	} else {
1096 		rc = rados_conf_read_file(entry->cluster, entry->config_file);
1097 		if (rc < 0) {
1098 			SPDK_ERRLOG("Failed to read conf file\n");
1099 			rados_shutdown(entry->cluster);
1100 			goto err_handle;
1101 		}
1102 	}
1103 
1104 	rc = rados_connect(entry->cluster);
1105 	if (rc < 0) {
1106 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1107 		rados_shutdown(entry->cluster);
1108 		goto err_handle;
1109 	}
1110 
1111 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1112 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1113 
1114 	return 0;
1115 
1116 err_handle:
1117 	bdev_rbd_cluster_free(entry);
1118 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1119 	return -1;
1120 }
1121 
1122 int
1123 bdev_rbd_unregister_cluster(const char *name)
1124 {
1125 	struct bdev_rbd_cluster *entry;
1126 	int rc = 0;
1127 
1128 	if (name == NULL) {
1129 		return -1;
1130 	}
1131 
1132 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1133 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1134 		if (strcmp(name, entry->name) == 0) {
1135 			if (entry->ref == 0) {
1136 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1137 				rados_shutdown(entry->cluster);
1138 				bdev_rbd_cluster_free(entry);
1139 			} else {
1140 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1141 					    entry->name);
1142 				rc = -1;
1143 			}
1144 
1145 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1146 			return rc;
1147 		}
1148 	}
1149 
1150 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1151 
1152 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1153 
1154 	return -1;
1155 }
1156 
1157 static void *
1158 _bdev_rbd_register_cluster(void *arg)
1159 {
1160 	struct cluster_register_info *info = arg;
1161 	void *ret = arg;
1162 	int rc;
1163 
1164 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1165 				  (const char *const *)info->config_param, (const char *)info->config_file);
1166 	if (rc) {
1167 		ret = NULL;
1168 	}
1169 
1170 	return ret;
1171 }
1172 
1173 int
1174 bdev_rbd_register_cluster(struct cluster_register_info *info)
1175 {
1176 	assert(info != NULL);
1177 
1178 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1179 	 * resource contention */
1180 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1181 		return -1;
1182 	}
1183 
1184 	return 0;
1185 }
1186 
1187 int
1188 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1189 		const char *pool_name,
1190 		const char *const *config,
1191 		const char *rbd_name,
1192 		uint32_t block_size,
1193 		const char *cluster_name)
1194 {
1195 	struct bdev_rbd *rbd;
1196 	int ret;
1197 
1198 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1199 		return -EINVAL;
1200 	}
1201 
1202 	rbd = calloc(1, sizeof(struct bdev_rbd));
1203 	if (rbd == NULL) {
1204 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1205 		return -ENOMEM;
1206 	}
1207 
1208 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1209 	if (ret) {
1210 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1211 		free(rbd);
1212 		return ret;
1213 	}
1214 
1215 	rbd->pfd = -1;
1216 	rbd->rbd_name = strdup(rbd_name);
1217 	if (!rbd->rbd_name) {
1218 		bdev_rbd_free(rbd);
1219 		return -ENOMEM;
1220 	}
1221 
1222 	if (user_id) {
1223 		rbd->user_id = strdup(user_id);
1224 		if (!rbd->user_id) {
1225 			bdev_rbd_free(rbd);
1226 			return -ENOMEM;
1227 		}
1228 	}
1229 
1230 	if (cluster_name) {
1231 		rbd->cluster_name = strdup(cluster_name);
1232 		if (!rbd->cluster_name) {
1233 			bdev_rbd_free(rbd);
1234 			return -ENOMEM;
1235 		}
1236 	}
1237 	rbd->pool_name = strdup(pool_name);
1238 	if (!rbd->pool_name) {
1239 		bdev_rbd_free(rbd);
1240 		return -ENOMEM;
1241 	}
1242 
1243 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1244 		bdev_rbd_free(rbd);
1245 		return -ENOMEM;
1246 	}
1247 
1248 	ret = bdev_rbd_init(rbd);
1249 	if (ret < 0) {
1250 		bdev_rbd_free(rbd);
1251 		SPDK_ERRLOG("Failed to init rbd device\n");
1252 		return ret;
1253 	}
1254 
1255 	if (name) {
1256 		rbd->disk.name = strdup(name);
1257 	} else {
1258 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1259 	}
1260 	if (!rbd->disk.name) {
1261 		bdev_rbd_free(rbd);
1262 		return -ENOMEM;
1263 	}
1264 	rbd->disk.product_name = "Ceph Rbd Disk";
1265 	bdev_rbd_count++;
1266 
1267 	rbd->disk.write_cache = 0;
1268 	rbd->disk.blocklen = block_size;
1269 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1270 	rbd->disk.ctxt = rbd;
1271 	rbd->disk.fn_table = &rbd_fn_table;
1272 	rbd->disk.module = &rbd_if;
1273 
1274 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1275 
1276 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1277 				bdev_rbd_destroy_cb,
1278 				sizeof(struct bdev_rbd_io_channel),
1279 				rbd_name);
1280 	ret = spdk_bdev_register(&rbd->disk);
1281 	if (ret) {
1282 		spdk_io_device_unregister(rbd, NULL);
1283 		bdev_rbd_free(rbd);
1284 		return ret;
1285 	}
1286 
1287 	*bdev = &(rbd->disk);
1288 
1289 	return ret;
1290 }
1291 
1292 void
1293 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1294 {
1295 	if (!bdev || bdev->module != &rbd_if) {
1296 		cb_fn(cb_arg, -ENODEV);
1297 		return;
1298 	}
1299 
1300 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
1301 }
1302 
1303 int
1304 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
1305 {
1306 	struct spdk_io_channel *ch;
1307 	struct bdev_rbd_io_channel *rbd_io_ch;
1308 	int rc;
1309 	uint64_t new_size_in_byte;
1310 	uint64_t current_size_in_mb;
1311 
1312 	if (bdev->module != &rbd_if) {
1313 		return -EINVAL;
1314 	}
1315 
1316 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1317 	if (current_size_in_mb > new_size_in_mb) {
1318 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
1319 		return -EINVAL;
1320 	}
1321 
1322 	ch = bdev_rbd_get_io_channel(bdev);
1323 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1324 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1325 
1326 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1327 	spdk_put_io_channel(ch);
1328 	if (rc != 0) {
1329 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1330 		return rc;
1331 	}
1332 
1333 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1334 	if (rc != 0) {
1335 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1336 		return rc;
1337 	}
1338 
1339 	return rc;
1340 }
1341 
1342 static int
1343 bdev_rbd_group_poll(void *arg)
1344 {
1345 	struct bdev_rbd_group_channel *group_ch = arg;
1346 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1347 	int num_events, i;
1348 
1349 	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
1350 
1351 	if (num_events <= 0) {
1352 		return SPDK_POLLER_IDLE;
1353 	}
1354 
1355 	for (i = 0; i < num_events; i++) {
1356 		bdev_rbd_io_poll((struct bdev_rbd *)events[i].data.ptr);
1357 	}
1358 
1359 	return SPDK_POLLER_BUSY;
1360 }
1361 
1362 static int
1363 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1364 {
1365 	struct bdev_rbd_group_channel *ch = ctx_buf;
1366 
1367 	ch->epoll_fd = epoll_create1(0);
1368 	if (ch->epoll_fd < 0) {
1369 		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
1370 		return -1;
1371 	}
1372 
1373 	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0);
1374 
1375 	return 0;
1376 }
1377 
1378 static void
1379 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1380 {
1381 	struct bdev_rbd_group_channel *ch = ctx_buf;
1382 
1383 	if (ch->epoll_fd >= 0) {
1384 		close(ch->epoll_fd);
1385 	}
1386 
1387 	spdk_poller_unregister(&ch->poller);
1388 }
1389 
1390 static int
1391 bdev_rbd_library_init(void)
1392 {
1393 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1394 				sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups");
1395 
1396 	return 0;
1397 }
1398 
1399 static void
1400 bdev_rbd_library_fini(void)
1401 {
1402 	spdk_io_device_unregister(&rbd_if, NULL);
1403 }
1404 
1405 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1406