xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision f869197b76ff6981e901b6d9a05789e1b993494a)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd {
27 	struct spdk_bdev disk;
28 	char *rbd_name;
29 	char *user_id;
30 	char *pool_name;
31 	char **config;
32 
33 	rados_t cluster;
34 	rados_t *cluster_p;
35 	char *cluster_name;
36 
37 	rados_ioctx_t io_ctx;
38 	rbd_image_t image;
39 
40 	rbd_image_info_t info;
41 	pthread_mutex_t mutex;
42 	struct spdk_thread *main_td;
43 	struct spdk_thread *destruct_td;
44 	uint32_t ch_count;
45 	struct spdk_io_channel *group_ch;
46 
47 	TAILQ_ENTRY(bdev_rbd) tailq;
48 	struct spdk_poller *reset_timer;
49 	struct spdk_bdev_io *reset_bdev_io;
50 };
51 
52 struct bdev_rbd_io_channel {
53 	struct bdev_rbd *disk;
54 };
55 
56 struct bdev_rbd_io {
57 	struct			spdk_thread *submit_td;
58 	enum			spdk_bdev_io_status status;
59 	rbd_completion_t	comp;
60 	size_t			total_len;
61 };
62 
63 struct bdev_rbd_cluster {
64 	char *name;
65 	char *user_id;
66 	char **config_param;
67 	char *config_file;
68 	char *key_file;
69 	rados_t cluster;
70 	uint32_t ref;
71 	STAILQ_ENTRY(bdev_rbd_cluster) link;
72 };
73 
74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
75 			g_map_bdev_rbd_cluster);
76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
77 
78 static void
79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
80 {
81 	assert(entry != NULL);
82 
83 	bdev_rbd_free_config(entry->config_param);
84 	free(entry->config_file);
85 	free(entry->key_file);
86 	free(entry->user_id);
87 	free(entry->name);
88 	free(entry);
89 }
90 
91 static void
92 bdev_rbd_put_cluster(rados_t **cluster)
93 {
94 	struct bdev_rbd_cluster *entry;
95 
96 	assert(cluster != NULL);
97 
98 	/* No need go through the map if *cluster equals to NULL */
99 	if (*cluster == NULL) {
100 		return;
101 	}
102 
103 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
104 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
105 		if (*cluster != &entry->cluster) {
106 			continue;
107 		}
108 
109 		assert(entry->ref > 0);
110 		entry->ref--;
111 		*cluster = NULL;
112 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
113 		return;
114 	}
115 
116 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
117 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
118 }
119 
120 static void
121 bdev_rbd_free(struct bdev_rbd *rbd)
122 {
123 	if (!rbd) {
124 		return;
125 	}
126 
127 	free(rbd->disk.name);
128 	free(rbd->rbd_name);
129 	free(rbd->user_id);
130 	free(rbd->pool_name);
131 	bdev_rbd_free_config(rbd->config);
132 
133 	if (rbd->io_ctx) {
134 		rados_ioctx_destroy(rbd->io_ctx);
135 	}
136 
137 	if (rbd->cluster_name) {
138 		bdev_rbd_put_cluster(&rbd->cluster_p);
139 		free(rbd->cluster_name);
140 	} else if (rbd->cluster) {
141 		rados_shutdown(rbd->cluster);
142 	}
143 
144 	pthread_mutex_destroy(&rbd->mutex);
145 	free(rbd);
146 }
147 
148 void
149 bdev_rbd_free_config(char **config)
150 {
151 	char **entry;
152 
153 	if (config) {
154 		for (entry = config; *entry; entry++) {
155 			free(*entry);
156 		}
157 		free(config);
158 	}
159 }
160 
161 char **
162 bdev_rbd_dup_config(const char *const *config)
163 {
164 	size_t count;
165 	char **copy;
166 
167 	if (!config) {
168 		return NULL;
169 	}
170 	for (count = 0; config[count]; count++) {}
171 	copy = calloc(count + 1, sizeof(*copy));
172 	if (!copy) {
173 		return NULL;
174 	}
175 	for (count = 0; config[count]; count++) {
176 		if (!(copy[count] = strdup(config[count]))) {
177 			bdev_rbd_free_config(copy);
178 			return NULL;
179 		}
180 	}
181 	return copy;
182 }
183 
184 static int
185 bdev_rados_cluster_init(const char *user_id, const char *const *config,
186 			rados_t *cluster)
187 {
188 	int ret;
189 
190 	ret = rados_create(cluster, user_id);
191 	if (ret < 0) {
192 		SPDK_ERRLOG("Failed to create rados_t struct\n");
193 		return -1;
194 	}
195 
196 	if (config) {
197 		const char *const *entry = config;
198 		while (*entry) {
199 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
200 			if (ret < 0) {
201 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
202 				rados_shutdown(*cluster);
203 				return -1;
204 			}
205 			entry += 2;
206 		}
207 	} else {
208 		ret = rados_conf_read_file(*cluster, NULL);
209 		if (ret < 0) {
210 			SPDK_ERRLOG("Failed to read conf file\n");
211 			rados_shutdown(*cluster);
212 			return -1;
213 		}
214 	}
215 
216 	ret = rados_connect(*cluster);
217 	if (ret < 0) {
218 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
219 		rados_shutdown(*cluster);
220 		return -1;
221 	}
222 
223 	return 0;
224 }
225 
226 static int
227 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
228 {
229 	struct bdev_rbd_cluster *entry;
230 
231 	if (cluster == NULL) {
232 		SPDK_ERRLOG("cluster should not be NULL\n");
233 		return -1;
234 	}
235 
236 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
237 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
238 		if (strcmp(cluster_name, entry->name) == 0) {
239 			entry->ref++;
240 			*cluster = &entry->cluster;
241 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
242 			return 0;
243 		}
244 	}
245 
246 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
247 	return -1;
248 }
249 
250 static int
251 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
252 {
253 	int ret;
254 
255 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
256 	if (ret < 0) {
257 		SPDK_ERRLOG("Failed to create rados_t struct\n");
258 		return -1;
259 	}
260 
261 	return ret;
262 }
263 
264 static void *
265 bdev_rbd_cluster_handle(void *arg)
266 {
267 	void *ret = arg;
268 	struct bdev_rbd *rbd = arg;
269 	int rc;
270 
271 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
272 				     &rbd->cluster);
273 	if (rc < 0) {
274 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
275 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
276 		ret = NULL;
277 	}
278 
279 	return ret;
280 }
281 
282 static void *
283 bdev_rbd_init_context(void *arg)
284 {
285 	struct bdev_rbd *rbd = arg;
286 	int rc;
287 
288 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
289 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
290 		return NULL;
291 	}
292 
293 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
294 	if (rc < 0) {
295 		SPDK_ERRLOG("Failed to open specified rbd device\n");
296 		return NULL;
297 	}
298 
299 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
300 	rbd_close(rbd->image);
301 	if (rc < 0) {
302 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
303 		return NULL;
304 	}
305 
306 	return arg;
307 }
308 
309 static int
310 bdev_rbd_init(struct bdev_rbd *rbd)
311 {
312 	int ret = 0;
313 
314 	if (!rbd->cluster_name) {
315 		rbd->cluster_p = &rbd->cluster;
316 		/* Cluster should be created in non-SPDK thread to avoid conflict between
317 		 * Rados and SPDK thread */
318 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
319 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
320 			return -1;
321 		}
322 	} else {
323 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
324 		if (ret < 0) {
325 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
326 				    rbd, rbd->cluster_name);
327 			return -1;
328 		}
329 	}
330 
331 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
332 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
333 	}
334 
335 	return ret;
336 }
337 
338 static void
339 bdev_rbd_exit(rbd_image_t image)
340 {
341 	rbd_flush(image);
342 	rbd_close(image);
343 }
344 
345 static void
346 _bdev_rbd_io_complete(void *_rbd_io)
347 {
348 	struct bdev_rbd_io *rbd_io = _rbd_io;
349 
350 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
351 }
352 
353 static void
354 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
355 {
356 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
357 	struct spdk_thread *current_thread = spdk_get_thread();
358 
359 	rbd_io->status = status;
360 	assert(rbd_io->submit_td != NULL);
361 	if (rbd_io->submit_td != current_thread) {
362 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
363 	} else {
364 		_bdev_rbd_io_complete(rbd_io);
365 	}
366 }
367 
368 static void
369 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
370 {
371 	int io_status;
372 	struct spdk_bdev_io *bdev_io;
373 	struct bdev_rbd_io *rbd_io;
374 	enum spdk_bdev_io_status bio_status;
375 
376 	bdev_io = rbd_aio_get_arg(cb);
377 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
378 	io_status = rbd_aio_get_return_value(cb);
379 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
380 
381 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
382 		if ((int)rbd_io->total_len != io_status) {
383 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
384 		}
385 	} else {
386 		/* For others, 0 means success */
387 		if (io_status != 0) {
388 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
389 		}
390 	}
391 
392 	rbd_aio_release(cb);
393 
394 	bdev_rbd_io_complete(bdev_io, bio_status);
395 }
396 
397 static void
398 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
399 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
400 {
401 	int ret;
402 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
403 	rbd_image_t image = disk->image;
404 
405 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
406 					&rbd_io->comp);
407 	if (ret < 0) {
408 		goto err;
409 	}
410 
411 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
412 		rbd_io->total_len = len;
413 		if (spdk_likely(iovcnt == 1)) {
414 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
415 		} else {
416 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
417 		}
418 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
419 		if (spdk_likely(iovcnt == 1)) {
420 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
421 		} else {
422 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
423 		}
424 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) {
425 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
426 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
427 		ret = rbd_aio_flush(image, rbd_io->comp);
428 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) {
429 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0);
430 	}
431 
432 	if (ret < 0) {
433 		rbd_aio_release(rbd_io->comp);
434 		goto err;
435 	}
436 
437 	return;
438 
439 err:
440 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
441 }
442 
443 static int bdev_rbd_library_init(void);
444 static void bdev_rbd_library_fini(void);
445 
446 static int
447 bdev_rbd_get_ctx_size(void)
448 {
449 	return sizeof(struct bdev_rbd_io);
450 }
451 
452 static struct spdk_bdev_module rbd_if = {
453 	.name = "rbd",
454 	.module_init = bdev_rbd_library_init,
455 	.module_fini = bdev_rbd_library_fini,
456 	.get_ctx_size = bdev_rbd_get_ctx_size,
457 
458 };
459 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
460 
461 static int bdev_rbd_reset_timer(void *arg);
462 
463 static void
464 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
465 			       void *cb_arg, int rc)
466 {
467 	struct bdev_rbd *disk = cb_arg;
468 	enum spdk_bdev_io_status bio_status;
469 
470 	if (rc == 0 && current_qd > 0) {
471 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
472 		return;
473 	}
474 
475 	if (rc != 0) {
476 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
477 	} else {
478 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
479 	}
480 
481 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
482 	disk->reset_bdev_io = NULL;
483 }
484 
485 static int
486 bdev_rbd_reset_timer(void *arg)
487 {
488 	struct bdev_rbd *disk = arg;
489 
490 	spdk_poller_unregister(&disk->reset_timer);
491 
492 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
493 
494 	return SPDK_POLLER_BUSY;
495 }
496 
497 static void
498 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
499 {
500 	/*
501 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
502 	 * poller to wait for in-flight I/O to complete.
503 	 */
504 	assert(disk->reset_bdev_io == NULL);
505 	disk->reset_bdev_io = bdev_io;
506 
507 	bdev_rbd_reset_timer(disk);
508 }
509 
510 static void
511 _bdev_rbd_destruct_done(void *io_device)
512 {
513 	struct bdev_rbd *rbd = io_device;
514 
515 	assert(rbd != NULL);
516 	assert(rbd->ch_count == 0);
517 
518 	spdk_bdev_destruct_done(&rbd->disk, 0);
519 	bdev_rbd_free(rbd);
520 }
521 
522 static void
523 bdev_rbd_free_cb(void *io_device)
524 {
525 	struct bdev_rbd *rbd = io_device;
526 
527 	/* The io device has been unregistered.  Send a message back to the
528 	 * original thread that started the destruct operation, so that the
529 	 * bdev unregister callback is invoked on the same thread that started
530 	 * this whole process.
531 	 */
532 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
533 }
534 
535 static void
536 _bdev_rbd_destruct(void *ctx)
537 {
538 	struct bdev_rbd *rbd = ctx;
539 
540 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
541 }
542 
543 static int
544 bdev_rbd_destruct(void *ctx)
545 {
546 	struct bdev_rbd *rbd = ctx;
547 	struct spdk_thread *td;
548 
549 	if (rbd->main_td == NULL) {
550 		td = spdk_get_thread();
551 	} else {
552 		td = rbd->main_td;
553 	}
554 
555 	/* Start the destruct operation on the rbd bdev's
556 	 * main thread.  This guarantees it will only start
557 	 * executing after any messages related to channel
558 	 * deletions have finished completing.  *Always*
559 	 * send a message, even if this function gets called
560 	 * from the main thread, in case there are pending
561 	 * channel delete messages in flight to this thread.
562 	 */
563 	assert(rbd->destruct_td == NULL);
564 	rbd->destruct_td = td;
565 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
566 
567 	/* Return 1 to indicate the destruct path is asynchronous. */
568 	return 1;
569 }
570 
571 static void
572 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
573 		    bool success)
574 {
575 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
576 
577 	if (!success) {
578 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
579 		return;
580 	}
581 
582 	bdev_rbd_start_aio(disk,
583 			   bdev_io,
584 			   bdev_io->u.bdev.iovs,
585 			   bdev_io->u.bdev.iovcnt,
586 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
587 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
588 }
589 
590 static void
591 _bdev_rbd_submit_request(void *ctx)
592 {
593 	struct spdk_bdev_io *bdev_io = ctx;
594 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
595 
596 	switch (bdev_io->type) {
597 	case SPDK_BDEV_IO_TYPE_READ:
598 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
599 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
600 		break;
601 
602 	case SPDK_BDEV_IO_TYPE_WRITE:
603 	case SPDK_BDEV_IO_TYPE_UNMAP:
604 	case SPDK_BDEV_IO_TYPE_FLUSH:
605 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
606 		bdev_rbd_start_aio(disk,
607 				   bdev_io,
608 				   bdev_io->u.bdev.iovs,
609 				   bdev_io->u.bdev.iovcnt,
610 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
611 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
612 		break;
613 
614 	case SPDK_BDEV_IO_TYPE_RESET:
615 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
616 			       bdev_io);
617 		break;
618 
619 	default:
620 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
621 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
622 		break;
623 	}
624 }
625 
626 static void
627 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
628 {
629 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
630 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
631 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
632 
633 	rbd_io->submit_td = submit_td;
634 	if (disk->main_td != submit_td) {
635 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
636 	} else {
637 		_bdev_rbd_submit_request(bdev_io);
638 	}
639 }
640 
641 static bool
642 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
643 {
644 	switch (io_type) {
645 	case SPDK_BDEV_IO_TYPE_READ:
646 	case SPDK_BDEV_IO_TYPE_WRITE:
647 	case SPDK_BDEV_IO_TYPE_UNMAP:
648 	case SPDK_BDEV_IO_TYPE_FLUSH:
649 	case SPDK_BDEV_IO_TYPE_RESET:
650 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
651 		return true;
652 
653 	default:
654 		return false;
655 	}
656 }
657 
658 static void
659 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
660 {
661 	assert(disk != NULL);
662 	assert(disk->main_td == spdk_get_thread());
663 	assert(disk->ch_count == 0);
664 
665 	spdk_put_io_channel(disk->group_ch);
666 	if (disk->image) {
667 		bdev_rbd_exit(disk->image);
668 	}
669 
670 	disk->main_td = NULL;
671 	disk->group_ch = NULL;
672 }
673 
674 static void *
675 bdev_rbd_handle(void *arg)
676 {
677 	struct bdev_rbd *disk = arg;
678 	void *ret = arg;
679 
680 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
681 		SPDK_ERRLOG("Failed to open specified rbd device\n");
682 		ret = NULL;
683 	}
684 
685 	return ret;
686 }
687 
688 static int
689 _bdev_rbd_create_cb(struct bdev_rbd *disk)
690 {
691 	disk->group_ch = spdk_get_io_channel(&rbd_if);
692 	assert(disk->group_ch != NULL);
693 
694 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
695 		bdev_rbd_free_channel_resources(disk);
696 		return -1;
697 	}
698 
699 	return 0;
700 }
701 
702 static int
703 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
704 {
705 	struct bdev_rbd_io_channel *ch = ctx_buf;
706 	struct bdev_rbd *disk = io_device;
707 	int rc;
708 
709 	ch->disk = disk;
710 	pthread_mutex_lock(&disk->mutex);
711 	if (disk->ch_count == 0) {
712 		assert(disk->main_td == NULL);
713 		rc = _bdev_rbd_create_cb(disk);
714 		if (rc) {
715 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
716 			pthread_mutex_unlock(&disk->mutex);
717 			return rc;
718 		}
719 
720 		disk->main_td = spdk_get_thread();
721 	}
722 
723 	disk->ch_count++;
724 	pthread_mutex_unlock(&disk->mutex);
725 
726 	return 0;
727 }
728 
729 static void
730 _bdev_rbd_destroy_cb(void *ctx)
731 {
732 	struct bdev_rbd *disk = ctx;
733 
734 	pthread_mutex_lock(&disk->mutex);
735 	assert(disk->ch_count > 0);
736 	disk->ch_count--;
737 
738 	if (disk->ch_count > 0) {
739 		/* A new channel was created between when message was sent and this function executed */
740 		pthread_mutex_unlock(&disk->mutex);
741 		return;
742 	}
743 
744 	bdev_rbd_free_channel_resources(disk);
745 	pthread_mutex_unlock(&disk->mutex);
746 }
747 
748 static void
749 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
750 {
751 	struct bdev_rbd *disk = io_device;
752 	struct spdk_thread *thread;
753 
754 	pthread_mutex_lock(&disk->mutex);
755 	assert(disk->ch_count > 0);
756 	disk->ch_count--;
757 	if (disk->ch_count == 0) {
758 		assert(disk->main_td != NULL);
759 		if (disk->main_td != spdk_get_thread()) {
760 			/* The final channel was destroyed on a different thread
761 			 * than where the first channel was created. Pass a message
762 			 * to the main thread to unregister the poller. */
763 			disk->ch_count++;
764 			thread = disk->main_td;
765 			pthread_mutex_unlock(&disk->mutex);
766 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
767 			return;
768 		}
769 
770 		bdev_rbd_free_channel_resources(disk);
771 	}
772 	pthread_mutex_unlock(&disk->mutex);
773 }
774 
775 static struct spdk_io_channel *
776 bdev_rbd_get_io_channel(void *ctx)
777 {
778 	struct bdev_rbd *rbd_bdev = ctx;
779 
780 	return spdk_get_io_channel(rbd_bdev);
781 }
782 
783 static void
784 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
785 {
786 	struct bdev_rbd_cluster *entry;
787 
788 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
789 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
790 		if (strcmp(cluster_name, entry->name)) {
791 			continue;
792 		}
793 		if (entry->user_id) {
794 			spdk_json_write_named_string(w, "user_id", entry->user_id);
795 		}
796 
797 		if (entry->config_param) {
798 			char **config_entry = entry->config_param;
799 
800 			spdk_json_write_named_object_begin(w, "config_param");
801 			while (*config_entry) {
802 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
803 				config_entry += 2;
804 			}
805 			spdk_json_write_object_end(w);
806 		}
807 		if (entry->config_file) {
808 			spdk_json_write_named_string(w, "config_file", entry->config_file);
809 		}
810 		if (entry->key_file) {
811 			spdk_json_write_named_string(w, "key_file", entry->key_file);
812 		}
813 
814 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
815 		return;
816 	}
817 
818 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
819 }
820 
821 static int
822 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
823 {
824 	struct bdev_rbd *rbd_bdev = ctx;
825 
826 	spdk_json_write_named_object_begin(w, "rbd");
827 
828 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
829 
830 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
831 
832 	if (rbd_bdev->cluster_name) {
833 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
834 		goto end;
835 	}
836 
837 	if (rbd_bdev->user_id) {
838 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
839 	}
840 
841 	if (rbd_bdev->config) {
842 		char **entry = rbd_bdev->config;
843 
844 		spdk_json_write_named_object_begin(w, "config");
845 		while (*entry) {
846 			spdk_json_write_named_string(w, entry[0], entry[1]);
847 			entry += 2;
848 		}
849 		spdk_json_write_object_end(w);
850 	}
851 
852 end:
853 	spdk_json_write_object_end(w);
854 
855 	return 0;
856 }
857 
858 static void
859 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
860 {
861 	struct bdev_rbd *rbd = bdev->ctxt;
862 	char uuid_str[SPDK_UUID_STRING_LEN];
863 
864 	spdk_json_write_object_begin(w);
865 
866 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
867 
868 	spdk_json_write_named_object_begin(w, "params");
869 	spdk_json_write_named_string(w, "name", bdev->name);
870 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
871 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
872 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
873 	if (rbd->user_id) {
874 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
875 	}
876 
877 	if (rbd->config) {
878 		char **entry = rbd->config;
879 
880 		spdk_json_write_named_object_begin(w, "config");
881 		while (*entry) {
882 			spdk_json_write_named_string(w, entry[0], entry[1]);
883 			entry += 2;
884 		}
885 		spdk_json_write_object_end(w);
886 	}
887 
888 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
889 	spdk_json_write_named_string(w, "uuid", uuid_str);
890 
891 	spdk_json_write_object_end(w);
892 
893 	spdk_json_write_object_end(w);
894 }
895 
896 static void
897 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
898 {
899 	assert(entry != NULL);
900 
901 	spdk_json_write_object_begin(w);
902 	spdk_json_write_named_string(w, "cluster_name", entry->name);
903 
904 	if (entry->user_id) {
905 		spdk_json_write_named_string(w, "user_id", entry->user_id);
906 	}
907 
908 	if (entry->config_param) {
909 		char **config_entry = entry->config_param;
910 
911 		spdk_json_write_named_object_begin(w, "config_param");
912 		while (*config_entry) {
913 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
914 			config_entry += 2;
915 		}
916 		spdk_json_write_object_end(w);
917 	}
918 	if (entry->config_file) {
919 		spdk_json_write_named_string(w, "config_file", entry->config_file);
920 	}
921 	if (entry->key_file) {
922 		spdk_json_write_named_string(w, "key_file", entry->key_file);
923 	}
924 
925 	spdk_json_write_object_end(w);
926 }
927 
928 int
929 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
930 {
931 	struct bdev_rbd_cluster *entry;
932 	struct spdk_json_write_ctx *w;
933 
934 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
935 
936 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
937 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
938 		return -ENOENT;
939 	}
940 
941 	/* If cluster name is provided */
942 	if (name) {
943 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
944 			if (strcmp(name, entry->name) == 0) {
945 				w = spdk_jsonrpc_begin_result(request);
946 				dump_single_cluster_entry(entry, w);
947 				spdk_jsonrpc_end_result(request, w);
948 
949 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
950 				return 0;
951 			}
952 		}
953 
954 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
955 		return -ENOENT;
956 	}
957 
958 	w = spdk_jsonrpc_begin_result(request);
959 	spdk_json_write_array_begin(w);
960 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
961 		dump_single_cluster_entry(entry, w);
962 	}
963 	spdk_json_write_array_end(w);
964 	spdk_jsonrpc_end_result(request, w);
965 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
966 
967 	return 0;
968 }
969 
970 static const struct spdk_bdev_fn_table rbd_fn_table = {
971 	.destruct		= bdev_rbd_destruct,
972 	.submit_request		= bdev_rbd_submit_request,
973 	.io_type_supported	= bdev_rbd_io_type_supported,
974 	.get_io_channel		= bdev_rbd_get_io_channel,
975 	.dump_info_json		= bdev_rbd_dump_info_json,
976 	.write_config_json	= bdev_rbd_write_config_json,
977 };
978 
979 static int
980 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
981 		     const char *config_file, const char *key_file)
982 {
983 	struct bdev_rbd_cluster *entry;
984 	int rc;
985 
986 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
987 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
988 		if (strcmp(name, entry->name) == 0) {
989 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
990 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
991 			return -1;
992 		}
993 	}
994 
995 	entry = calloc(1, sizeof(*entry));
996 	if (!entry) {
997 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
998 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
999 		return -1;
1000 	}
1001 
1002 	entry->name = strdup(name);
1003 	if (entry->name == NULL) {
1004 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1005 		goto err_handle;
1006 	}
1007 
1008 	if (user_id) {
1009 		entry->user_id = strdup(user_id);
1010 		if (entry->user_id == NULL) {
1011 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1012 			goto err_handle;
1013 		}
1014 	}
1015 
1016 	/* Support specify config_param or config_file separately, or both of them. */
1017 	if (config_param) {
1018 		entry->config_param = bdev_rbd_dup_config(config_param);
1019 		if (entry->config_param == NULL) {
1020 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1021 			goto err_handle;
1022 		}
1023 	}
1024 
1025 	if (config_file) {
1026 		entry->config_file = strdup(config_file);
1027 		if (entry->config_file == NULL) {
1028 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1029 			goto err_handle;
1030 		}
1031 	}
1032 
1033 	if (key_file) {
1034 		entry->key_file = strdup(key_file);
1035 		if (entry->key_file == NULL) {
1036 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1037 			goto err_handle;
1038 		}
1039 	}
1040 
1041 	rc = rados_create(&entry->cluster, user_id);
1042 	if (rc < 0) {
1043 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1044 		goto err_handle;
1045 	}
1046 
1047 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1048 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1049 	if (entry->config_file && rc < 0) {
1050 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1051 		rados_shutdown(entry->cluster);
1052 		goto err_handle;
1053 	}
1054 
1055 	if (config_param) {
1056 		const char *const *config_entry = config_param;
1057 		while (*config_entry) {
1058 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1059 			if (rc < 0) {
1060 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1061 				rados_shutdown(entry->cluster);
1062 				goto err_handle;
1063 			}
1064 			config_entry += 2;
1065 		}
1066 	}
1067 
1068 	if (key_file) {
1069 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1070 		if (rc < 0) {
1071 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1072 			rados_shutdown(entry->cluster);
1073 			goto err_handle;
1074 		}
1075 	}
1076 
1077 	rc = rados_connect(entry->cluster);
1078 	if (rc < 0) {
1079 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1080 		rados_shutdown(entry->cluster);
1081 		goto err_handle;
1082 	}
1083 
1084 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1085 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1086 
1087 	return 0;
1088 
1089 err_handle:
1090 	bdev_rbd_cluster_free(entry);
1091 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1092 	return -1;
1093 }
1094 
1095 int
1096 bdev_rbd_unregister_cluster(const char *name)
1097 {
1098 	struct bdev_rbd_cluster *entry;
1099 	int rc = 0;
1100 
1101 	if (name == NULL) {
1102 		return -1;
1103 	}
1104 
1105 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1106 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1107 		if (strcmp(name, entry->name) == 0) {
1108 			if (entry->ref == 0) {
1109 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1110 				rados_shutdown(entry->cluster);
1111 				bdev_rbd_cluster_free(entry);
1112 			} else {
1113 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1114 					    entry->name);
1115 				rc = -1;
1116 			}
1117 
1118 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1119 			return rc;
1120 		}
1121 	}
1122 
1123 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1124 
1125 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1126 
1127 	return -1;
1128 }
1129 
1130 static void *
1131 _bdev_rbd_register_cluster(void *arg)
1132 {
1133 	struct cluster_register_info *info = arg;
1134 	void *ret = arg;
1135 	int rc;
1136 
1137 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1138 				  (const char *const *)info->config_param, (const char *)info->config_file,
1139 				  (const char *)info->key_file);
1140 	if (rc) {
1141 		ret = NULL;
1142 	}
1143 
1144 	return ret;
1145 }
1146 
1147 int
1148 bdev_rbd_register_cluster(struct cluster_register_info *info)
1149 {
1150 	assert(info != NULL);
1151 
1152 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1153 	 * resource contention */
1154 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1155 		return -1;
1156 	}
1157 
1158 	return 0;
1159 }
1160 
1161 int
1162 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1163 		const char *pool_name,
1164 		const char *const *config,
1165 		const char *rbd_name,
1166 		uint32_t block_size,
1167 		const char *cluster_name,
1168 		const struct spdk_uuid *uuid)
1169 {
1170 	struct bdev_rbd *rbd;
1171 	int ret;
1172 
1173 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1174 		return -EINVAL;
1175 	}
1176 
1177 	rbd = calloc(1, sizeof(struct bdev_rbd));
1178 	if (rbd == NULL) {
1179 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1180 		return -ENOMEM;
1181 	}
1182 
1183 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1184 	if (ret) {
1185 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1186 		free(rbd);
1187 		return ret;
1188 	}
1189 
1190 	rbd->rbd_name = strdup(rbd_name);
1191 	if (!rbd->rbd_name) {
1192 		bdev_rbd_free(rbd);
1193 		return -ENOMEM;
1194 	}
1195 
1196 	if (user_id) {
1197 		rbd->user_id = strdup(user_id);
1198 		if (!rbd->user_id) {
1199 			bdev_rbd_free(rbd);
1200 			return -ENOMEM;
1201 		}
1202 	}
1203 
1204 	if (cluster_name) {
1205 		rbd->cluster_name = strdup(cluster_name);
1206 		if (!rbd->cluster_name) {
1207 			bdev_rbd_free(rbd);
1208 			return -ENOMEM;
1209 		}
1210 	}
1211 	rbd->pool_name = strdup(pool_name);
1212 	if (!rbd->pool_name) {
1213 		bdev_rbd_free(rbd);
1214 		return -ENOMEM;
1215 	}
1216 
1217 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1218 		bdev_rbd_free(rbd);
1219 		return -ENOMEM;
1220 	}
1221 
1222 	ret = bdev_rbd_init(rbd);
1223 	if (ret < 0) {
1224 		bdev_rbd_free(rbd);
1225 		SPDK_ERRLOG("Failed to init rbd device\n");
1226 		return ret;
1227 	}
1228 
1229 	if (uuid) {
1230 		rbd->disk.uuid = *uuid;
1231 	} else {
1232 		spdk_uuid_generate(&rbd->disk.uuid);
1233 	}
1234 
1235 	if (name) {
1236 		rbd->disk.name = strdup(name);
1237 	} else {
1238 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1239 	}
1240 	if (!rbd->disk.name) {
1241 		bdev_rbd_free(rbd);
1242 		return -ENOMEM;
1243 	}
1244 	rbd->disk.product_name = "Ceph Rbd Disk";
1245 	bdev_rbd_count++;
1246 
1247 	rbd->disk.write_cache = 0;
1248 	rbd->disk.blocklen = block_size;
1249 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1250 	rbd->disk.ctxt = rbd;
1251 	rbd->disk.fn_table = &rbd_fn_table;
1252 	rbd->disk.module = &rbd_if;
1253 
1254 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1255 
1256 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1257 				bdev_rbd_destroy_cb,
1258 				sizeof(struct bdev_rbd_io_channel),
1259 				rbd_name);
1260 	ret = spdk_bdev_register(&rbd->disk);
1261 	if (ret) {
1262 		spdk_io_device_unregister(rbd, NULL);
1263 		bdev_rbd_free(rbd);
1264 		return ret;
1265 	}
1266 
1267 	*bdev = &(rbd->disk);
1268 
1269 	return ret;
1270 }
1271 
1272 void
1273 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1274 {
1275 	int rc;
1276 
1277 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1278 	if (rc != 0) {
1279 		cb_fn(cb_arg, rc);
1280 	}
1281 }
1282 
1283 static void
1284 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1285 {
1286 }
1287 
1288 int
1289 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1290 {
1291 	struct spdk_bdev_desc *desc;
1292 	struct spdk_bdev *bdev;
1293 	struct spdk_io_channel *ch;
1294 	struct bdev_rbd_io_channel *rbd_io_ch;
1295 	int rc = 0;
1296 	uint64_t new_size_in_byte;
1297 	uint64_t current_size_in_mb;
1298 
1299 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1300 	if (rc != 0) {
1301 		return rc;
1302 	}
1303 
1304 	bdev = spdk_bdev_desc_get_bdev(desc);
1305 
1306 	if (bdev->module != &rbd_if) {
1307 		rc = -EINVAL;
1308 		goto exit;
1309 	}
1310 
1311 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1312 	if (current_size_in_mb > new_size_in_mb) {
1313 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1314 		rc = -EINVAL;
1315 		goto exit;
1316 	}
1317 
1318 	ch = bdev_rbd_get_io_channel(bdev);
1319 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1320 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1321 
1322 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1323 	spdk_put_io_channel(ch);
1324 	if (rc != 0) {
1325 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1326 		goto exit;
1327 	}
1328 
1329 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1330 	if (rc != 0) {
1331 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1332 	}
1333 
1334 exit:
1335 	spdk_bdev_close(desc);
1336 	return rc;
1337 }
1338 
1339 static int
1340 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1341 {
1342 	return 0;
1343 }
1344 
1345 static void
1346 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1347 {
1348 }
1349 
1350 static int
1351 bdev_rbd_library_init(void)
1352 {
1353 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1354 				0, "bdev_rbd_poll_groups");
1355 	return 0;
1356 }
1357 
1358 static void
1359 bdev_rbd_library_fini(void)
1360 {
1361 	spdk_io_device_unregister(&rbd_if, NULL);
1362 }
1363 
1364 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1365