xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision eb53c23236cccb6b698b7ca70ee783da1c574b5f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd {
27 	struct spdk_bdev disk;
28 	char *rbd_name;
29 	char *user_id;
30 	char *pool_name;
31 	char **config;
32 
33 	rados_t cluster;
34 	rados_t *cluster_p;
35 	char *cluster_name;
36 
37 	rados_ioctx_t io_ctx;
38 	rbd_image_t image;
39 
40 	rbd_image_info_t info;
41 	pthread_mutex_t mutex;
42 	struct spdk_thread *main_td;
43 	struct spdk_thread *destruct_td;
44 	uint32_t ch_count;
45 	struct spdk_io_channel *group_ch;
46 
47 	TAILQ_ENTRY(bdev_rbd) tailq;
48 	struct spdk_poller *reset_timer;
49 	struct spdk_bdev_io *reset_bdev_io;
50 };
51 
52 struct bdev_rbd_io_channel {
53 	struct bdev_rbd *disk;
54 };
55 
56 struct bdev_rbd_io {
57 	struct			spdk_thread *submit_td;
58 	enum			spdk_bdev_io_status status;
59 	rbd_completion_t	comp;
60 	size_t			total_len;
61 };
62 
63 struct bdev_rbd_cluster {
64 	char *name;
65 	char *user_id;
66 	char **config_param;
67 	char *config_file;
68 	char *key_file;
69 	rados_t cluster;
70 	uint32_t ref;
71 	STAILQ_ENTRY(bdev_rbd_cluster) link;
72 };
73 
74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
75 			g_map_bdev_rbd_cluster);
76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
77 
78 static void
79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
80 {
81 	assert(entry != NULL);
82 
83 	bdev_rbd_free_config(entry->config_param);
84 	free(entry->config_file);
85 	free(entry->key_file);
86 	free(entry->user_id);
87 	free(entry->name);
88 	free(entry);
89 }
90 
91 static void
92 bdev_rbd_put_cluster(rados_t **cluster)
93 {
94 	struct bdev_rbd_cluster *entry;
95 
96 	assert(cluster != NULL);
97 
98 	/* No need go through the map if *cluster equals to NULL */
99 	if (*cluster == NULL) {
100 		return;
101 	}
102 
103 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
104 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
105 		if (*cluster != &entry->cluster) {
106 			continue;
107 		}
108 
109 		assert(entry->ref > 0);
110 		entry->ref--;
111 		*cluster = NULL;
112 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
113 		return;
114 	}
115 
116 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
117 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
118 }
119 
120 static void
121 bdev_rbd_free(struct bdev_rbd *rbd)
122 {
123 	if (!rbd) {
124 		return;
125 	}
126 
127 	free(rbd->disk.name);
128 	free(rbd->rbd_name);
129 	free(rbd->user_id);
130 	free(rbd->pool_name);
131 	bdev_rbd_free_config(rbd->config);
132 
133 	if (rbd->io_ctx) {
134 		rados_ioctx_destroy(rbd->io_ctx);
135 	}
136 
137 	if (rbd->cluster_name) {
138 		bdev_rbd_put_cluster(&rbd->cluster_p);
139 		free(rbd->cluster_name);
140 	} else if (rbd->cluster) {
141 		rados_shutdown(rbd->cluster);
142 	}
143 
144 	pthread_mutex_destroy(&rbd->mutex);
145 	free(rbd);
146 }
147 
148 void
149 bdev_rbd_free_config(char **config)
150 {
151 	char **entry;
152 
153 	if (config) {
154 		for (entry = config; *entry; entry++) {
155 			free(*entry);
156 		}
157 		free(config);
158 	}
159 }
160 
161 char **
162 bdev_rbd_dup_config(const char *const *config)
163 {
164 	size_t count;
165 	char **copy;
166 
167 	if (!config) {
168 		return NULL;
169 	}
170 	for (count = 0; config[count]; count++) {}
171 	copy = calloc(count + 1, sizeof(*copy));
172 	if (!copy) {
173 		return NULL;
174 	}
175 	for (count = 0; config[count]; count++) {
176 		if (!(copy[count] = strdup(config[count]))) {
177 			bdev_rbd_free_config(copy);
178 			return NULL;
179 		}
180 	}
181 	return copy;
182 }
183 
184 static int
185 bdev_rados_cluster_init(const char *user_id, const char *const *config,
186 			rados_t *cluster)
187 {
188 	int ret;
189 
190 	ret = rados_create(cluster, user_id);
191 	if (ret < 0) {
192 		SPDK_ERRLOG("Failed to create rados_t struct\n");
193 		return -1;
194 	}
195 
196 	if (config) {
197 		const char *const *entry = config;
198 		while (*entry) {
199 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
200 			if (ret < 0) {
201 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
202 				rados_shutdown(*cluster);
203 				return -1;
204 			}
205 			entry += 2;
206 		}
207 	} else {
208 		ret = rados_conf_read_file(*cluster, NULL);
209 		if (ret < 0) {
210 			SPDK_ERRLOG("Failed to read conf file\n");
211 			rados_shutdown(*cluster);
212 			return -1;
213 		}
214 	}
215 
216 	ret = rados_connect(*cluster);
217 	if (ret < 0) {
218 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
219 		rados_shutdown(*cluster);
220 		return -1;
221 	}
222 
223 	return 0;
224 }
225 
226 static int
227 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
228 {
229 	struct bdev_rbd_cluster *entry;
230 
231 	if (cluster == NULL) {
232 		SPDK_ERRLOG("cluster should not be NULL\n");
233 		return -1;
234 	}
235 
236 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
237 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
238 		if (strcmp(cluster_name, entry->name) == 0) {
239 			entry->ref++;
240 			*cluster = &entry->cluster;
241 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
242 			return 0;
243 		}
244 	}
245 
246 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
247 	return -1;
248 }
249 
250 static int
251 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
252 {
253 	int ret;
254 
255 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
256 	if (ret < 0) {
257 		SPDK_ERRLOG("Failed to create rados_t struct\n");
258 		return -1;
259 	}
260 
261 	return ret;
262 }
263 
264 static void *
265 bdev_rbd_cluster_handle(void *arg)
266 {
267 	void *ret = arg;
268 	struct bdev_rbd *rbd = arg;
269 	int rc;
270 
271 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
272 				     &rbd->cluster);
273 	if (rc < 0) {
274 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
275 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
276 		ret = NULL;
277 	}
278 
279 	return ret;
280 }
281 
282 static void *
283 bdev_rbd_init_context(void *arg)
284 {
285 	struct bdev_rbd *rbd = arg;
286 	int rc;
287 
288 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
289 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
290 		return NULL;
291 	}
292 
293 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
294 	if (rc < 0) {
295 		SPDK_ERRLOG("Failed to open specified rbd device\n");
296 		return NULL;
297 	}
298 
299 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
300 	rbd_close(rbd->image);
301 	if (rc < 0) {
302 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
303 		return NULL;
304 	}
305 
306 	return arg;
307 }
308 
309 static int
310 bdev_rbd_init(struct bdev_rbd *rbd)
311 {
312 	int ret = 0;
313 
314 	if (!rbd->cluster_name) {
315 		rbd->cluster_p = &rbd->cluster;
316 		/* Cluster should be created in non-SPDK thread to avoid conflict between
317 		 * Rados and SPDK thread */
318 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
319 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
320 			return -1;
321 		}
322 	} else {
323 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
324 		if (ret < 0) {
325 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
326 				    rbd, rbd->cluster_name);
327 			return -1;
328 		}
329 	}
330 
331 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
332 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
333 		return -1;
334 	}
335 
336 	return ret;
337 }
338 
339 static void
340 bdev_rbd_exit(rbd_image_t image)
341 {
342 	rbd_flush(image);
343 	rbd_close(image);
344 }
345 
346 static void
347 _bdev_rbd_io_complete(void *_rbd_io)
348 {
349 	struct bdev_rbd_io *rbd_io = _rbd_io;
350 
351 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
352 }
353 
354 static void
355 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
356 {
357 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
358 	struct spdk_thread *current_thread = spdk_get_thread();
359 
360 	rbd_io->status = status;
361 	assert(rbd_io->submit_td != NULL);
362 	if (rbd_io->submit_td != current_thread) {
363 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
364 	} else {
365 		_bdev_rbd_io_complete(rbd_io);
366 	}
367 }
368 
369 static void
370 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
371 {
372 	int io_status;
373 	struct spdk_bdev_io *bdev_io;
374 	struct bdev_rbd_io *rbd_io;
375 	enum spdk_bdev_io_status bio_status;
376 
377 	bdev_io = rbd_aio_get_arg(cb);
378 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
379 	io_status = rbd_aio_get_return_value(cb);
380 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
381 
382 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
383 		if ((int)rbd_io->total_len != io_status) {
384 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
385 		}
386 	} else {
387 		/* For others, 0 means success */
388 		if (io_status != 0) {
389 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
390 		}
391 	}
392 
393 	rbd_aio_release(cb);
394 
395 	bdev_rbd_io_complete(bdev_io, bio_status);
396 }
397 
398 static void
399 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
400 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
401 {
402 	int ret;
403 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
404 	rbd_image_t image = disk->image;
405 
406 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
407 					&rbd_io->comp);
408 	if (ret < 0) {
409 		goto err;
410 	}
411 
412 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
413 		rbd_io->total_len = len;
414 		if (spdk_likely(iovcnt == 1)) {
415 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
416 		} else {
417 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
418 		}
419 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
420 		if (spdk_likely(iovcnt == 1)) {
421 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
422 		} else {
423 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
424 		}
425 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) {
426 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
427 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
428 		ret = rbd_aio_flush(image, rbd_io->comp);
429 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) {
430 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0);
431 	}
432 
433 	if (ret < 0) {
434 		rbd_aio_release(rbd_io->comp);
435 		goto err;
436 	}
437 
438 	return;
439 
440 err:
441 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
442 }
443 
444 static int bdev_rbd_library_init(void);
445 static void bdev_rbd_library_fini(void);
446 
447 static int
448 bdev_rbd_get_ctx_size(void)
449 {
450 	return sizeof(struct bdev_rbd_io);
451 }
452 
453 static struct spdk_bdev_module rbd_if = {
454 	.name = "rbd",
455 	.module_init = bdev_rbd_library_init,
456 	.module_fini = bdev_rbd_library_fini,
457 	.get_ctx_size = bdev_rbd_get_ctx_size,
458 
459 };
460 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
461 
462 static int bdev_rbd_reset_timer(void *arg);
463 
464 static void
465 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
466 			       void *cb_arg, int rc)
467 {
468 	struct bdev_rbd *disk = cb_arg;
469 	enum spdk_bdev_io_status bio_status;
470 
471 	if (rc == 0 && current_qd > 0) {
472 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
473 		return;
474 	}
475 
476 	if (rc != 0) {
477 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
478 	} else {
479 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
480 	}
481 
482 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
483 	disk->reset_bdev_io = NULL;
484 }
485 
486 static int
487 bdev_rbd_reset_timer(void *arg)
488 {
489 	struct bdev_rbd *disk = arg;
490 
491 	spdk_poller_unregister(&disk->reset_timer);
492 
493 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
494 
495 	return SPDK_POLLER_BUSY;
496 }
497 
498 static void
499 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
500 {
501 	/*
502 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
503 	 * poller to wait for in-flight I/O to complete.
504 	 */
505 	assert(disk->reset_bdev_io == NULL);
506 	disk->reset_bdev_io = bdev_io;
507 
508 	bdev_rbd_reset_timer(disk);
509 }
510 
511 static void
512 _bdev_rbd_destruct_done(void *io_device)
513 {
514 	struct bdev_rbd *rbd = io_device;
515 
516 	assert(rbd != NULL);
517 	assert(rbd->ch_count == 0);
518 
519 	spdk_bdev_destruct_done(&rbd->disk, 0);
520 	bdev_rbd_free(rbd);
521 }
522 
523 static void
524 bdev_rbd_free_cb(void *io_device)
525 {
526 	struct bdev_rbd *rbd = io_device;
527 
528 	/* The io device has been unregistered.  Send a message back to the
529 	 * original thread that started the destruct operation, so that the
530 	 * bdev unregister callback is invoked on the same thread that started
531 	 * this whole process.
532 	 */
533 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
534 }
535 
536 static void
537 _bdev_rbd_destruct(void *ctx)
538 {
539 	struct bdev_rbd *rbd = ctx;
540 
541 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
542 }
543 
544 static int
545 bdev_rbd_destruct(void *ctx)
546 {
547 	struct bdev_rbd *rbd = ctx;
548 	struct spdk_thread *td;
549 
550 	if (rbd->main_td == NULL) {
551 		td = spdk_get_thread();
552 	} else {
553 		td = rbd->main_td;
554 	}
555 
556 	/* Start the destruct operation on the rbd bdev's
557 	 * main thread.  This guarantees it will only start
558 	 * executing after any messages related to channel
559 	 * deletions have finished completing.  *Always*
560 	 * send a message, even if this function gets called
561 	 * from the main thread, in case there are pending
562 	 * channel delete messages in flight to this thread.
563 	 */
564 	assert(rbd->destruct_td == NULL);
565 	rbd->destruct_td = td;
566 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
567 
568 	/* Return 1 to indicate the destruct path is asynchronous. */
569 	return 1;
570 }
571 
572 static void
573 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
574 		    bool success)
575 {
576 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
577 
578 	if (!success) {
579 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
580 		return;
581 	}
582 
583 	bdev_rbd_start_aio(disk,
584 			   bdev_io,
585 			   bdev_io->u.bdev.iovs,
586 			   bdev_io->u.bdev.iovcnt,
587 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
588 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
589 }
590 
591 static void
592 _bdev_rbd_submit_request(void *ctx)
593 {
594 	struct spdk_bdev_io *bdev_io = ctx;
595 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
596 
597 	switch (bdev_io->type) {
598 	case SPDK_BDEV_IO_TYPE_READ:
599 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
600 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
601 		break;
602 
603 	case SPDK_BDEV_IO_TYPE_WRITE:
604 	case SPDK_BDEV_IO_TYPE_UNMAP:
605 	case SPDK_BDEV_IO_TYPE_FLUSH:
606 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
607 		bdev_rbd_start_aio(disk,
608 				   bdev_io,
609 				   bdev_io->u.bdev.iovs,
610 				   bdev_io->u.bdev.iovcnt,
611 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
612 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
613 		break;
614 
615 	case SPDK_BDEV_IO_TYPE_RESET:
616 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
617 			       bdev_io);
618 		break;
619 
620 	default:
621 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
622 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
623 		break;
624 	}
625 }
626 
627 static void
628 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
629 {
630 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
631 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
632 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
633 
634 	rbd_io->submit_td = submit_td;
635 	if (disk->main_td != submit_td) {
636 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
637 	} else {
638 		_bdev_rbd_submit_request(bdev_io);
639 	}
640 }
641 
642 static bool
643 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
644 {
645 	switch (io_type) {
646 	case SPDK_BDEV_IO_TYPE_READ:
647 	case SPDK_BDEV_IO_TYPE_WRITE:
648 	case SPDK_BDEV_IO_TYPE_UNMAP:
649 	case SPDK_BDEV_IO_TYPE_FLUSH:
650 	case SPDK_BDEV_IO_TYPE_RESET:
651 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
652 		return true;
653 
654 	default:
655 		return false;
656 	}
657 }
658 
659 static void
660 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
661 {
662 	assert(disk != NULL);
663 	assert(disk->main_td == spdk_get_thread());
664 	assert(disk->ch_count == 0);
665 
666 	spdk_put_io_channel(disk->group_ch);
667 	if (disk->image) {
668 		bdev_rbd_exit(disk->image);
669 	}
670 
671 	disk->main_td = NULL;
672 	disk->group_ch = NULL;
673 }
674 
675 static void *
676 bdev_rbd_handle(void *arg)
677 {
678 	struct bdev_rbd *disk = arg;
679 	void *ret = arg;
680 
681 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
682 		SPDK_ERRLOG("Failed to open specified rbd device\n");
683 		ret = NULL;
684 	}
685 
686 	return ret;
687 }
688 
689 static int
690 _bdev_rbd_create_cb(struct bdev_rbd *disk)
691 {
692 	disk->group_ch = spdk_get_io_channel(&rbd_if);
693 	assert(disk->group_ch != NULL);
694 
695 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
696 		bdev_rbd_free_channel_resources(disk);
697 		return -1;
698 	}
699 
700 	return 0;
701 }
702 
703 static int
704 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
705 {
706 	struct bdev_rbd_io_channel *ch = ctx_buf;
707 	struct bdev_rbd *disk = io_device;
708 	int rc;
709 
710 	ch->disk = disk;
711 	pthread_mutex_lock(&disk->mutex);
712 	if (disk->ch_count == 0) {
713 		assert(disk->main_td == NULL);
714 		rc = _bdev_rbd_create_cb(disk);
715 		if (rc) {
716 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
717 			pthread_mutex_unlock(&disk->mutex);
718 			return rc;
719 		}
720 
721 		disk->main_td = spdk_get_thread();
722 	}
723 
724 	disk->ch_count++;
725 	pthread_mutex_unlock(&disk->mutex);
726 
727 	return 0;
728 }
729 
730 static void
731 _bdev_rbd_destroy_cb(void *ctx)
732 {
733 	struct bdev_rbd *disk = ctx;
734 
735 	pthread_mutex_lock(&disk->mutex);
736 	assert(disk->ch_count > 0);
737 	disk->ch_count--;
738 
739 	if (disk->ch_count > 0) {
740 		/* A new channel was created between when message was sent and this function executed */
741 		pthread_mutex_unlock(&disk->mutex);
742 		return;
743 	}
744 
745 	bdev_rbd_free_channel_resources(disk);
746 	pthread_mutex_unlock(&disk->mutex);
747 }
748 
749 static void
750 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
751 {
752 	struct bdev_rbd *disk = io_device;
753 	struct spdk_thread *thread;
754 
755 	pthread_mutex_lock(&disk->mutex);
756 	assert(disk->ch_count > 0);
757 	disk->ch_count--;
758 	if (disk->ch_count == 0) {
759 		assert(disk->main_td != NULL);
760 		if (disk->main_td != spdk_get_thread()) {
761 			/* The final channel was destroyed on a different thread
762 			 * than where the first channel was created. Pass a message
763 			 * to the main thread to unregister the poller. */
764 			disk->ch_count++;
765 			thread = disk->main_td;
766 			pthread_mutex_unlock(&disk->mutex);
767 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
768 			return;
769 		}
770 
771 		bdev_rbd_free_channel_resources(disk);
772 	}
773 	pthread_mutex_unlock(&disk->mutex);
774 }
775 
776 static struct spdk_io_channel *
777 bdev_rbd_get_io_channel(void *ctx)
778 {
779 	struct bdev_rbd *rbd_bdev = ctx;
780 
781 	return spdk_get_io_channel(rbd_bdev);
782 }
783 
784 static void
785 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
786 {
787 	struct bdev_rbd_cluster *entry;
788 
789 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
790 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
791 		if (strcmp(cluster_name, entry->name)) {
792 			continue;
793 		}
794 		if (entry->user_id) {
795 			spdk_json_write_named_string(w, "user_id", entry->user_id);
796 		}
797 
798 		if (entry->config_param) {
799 			char **config_entry = entry->config_param;
800 
801 			spdk_json_write_named_object_begin(w, "config_param");
802 			while (*config_entry) {
803 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
804 				config_entry += 2;
805 			}
806 			spdk_json_write_object_end(w);
807 		}
808 		if (entry->config_file) {
809 			spdk_json_write_named_string(w, "config_file", entry->config_file);
810 		}
811 		if (entry->key_file) {
812 			spdk_json_write_named_string(w, "key_file", entry->key_file);
813 		}
814 
815 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
816 		return;
817 	}
818 
819 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
820 }
821 
822 static int
823 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
824 {
825 	struct bdev_rbd *rbd_bdev = ctx;
826 
827 	spdk_json_write_named_object_begin(w, "rbd");
828 
829 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
830 
831 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
832 
833 	if (rbd_bdev->cluster_name) {
834 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
835 		goto end;
836 	}
837 
838 	if (rbd_bdev->user_id) {
839 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
840 	}
841 
842 	if (rbd_bdev->config) {
843 		char **entry = rbd_bdev->config;
844 
845 		spdk_json_write_named_object_begin(w, "config");
846 		while (*entry) {
847 			spdk_json_write_named_string(w, entry[0], entry[1]);
848 			entry += 2;
849 		}
850 		spdk_json_write_object_end(w);
851 	}
852 
853 end:
854 	spdk_json_write_object_end(w);
855 
856 	return 0;
857 }
858 
859 static void
860 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
861 {
862 	struct bdev_rbd *rbd = bdev->ctxt;
863 	char uuid_str[SPDK_UUID_STRING_LEN];
864 
865 	spdk_json_write_object_begin(w);
866 
867 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
868 
869 	spdk_json_write_named_object_begin(w, "params");
870 	spdk_json_write_named_string(w, "name", bdev->name);
871 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
872 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
873 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
874 	if (rbd->user_id) {
875 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
876 	}
877 
878 	if (rbd->config) {
879 		char **entry = rbd->config;
880 
881 		spdk_json_write_named_object_begin(w, "config");
882 		while (*entry) {
883 			spdk_json_write_named_string(w, entry[0], entry[1]);
884 			entry += 2;
885 		}
886 		spdk_json_write_object_end(w);
887 	}
888 
889 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
890 	spdk_json_write_named_string(w, "uuid", uuid_str);
891 
892 	spdk_json_write_object_end(w);
893 
894 	spdk_json_write_object_end(w);
895 }
896 
897 static void
898 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
899 {
900 	assert(entry != NULL);
901 
902 	spdk_json_write_object_begin(w);
903 	spdk_json_write_named_string(w, "cluster_name", entry->name);
904 
905 	if (entry->user_id) {
906 		spdk_json_write_named_string(w, "user_id", entry->user_id);
907 	}
908 
909 	if (entry->config_param) {
910 		char **config_entry = entry->config_param;
911 
912 		spdk_json_write_named_object_begin(w, "config_param");
913 		while (*config_entry) {
914 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
915 			config_entry += 2;
916 		}
917 		spdk_json_write_object_end(w);
918 	}
919 	if (entry->config_file) {
920 		spdk_json_write_named_string(w, "config_file", entry->config_file);
921 	}
922 	if (entry->key_file) {
923 		spdk_json_write_named_string(w, "key_file", entry->key_file);
924 	}
925 
926 	spdk_json_write_object_end(w);
927 }
928 
929 int
930 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
931 {
932 	struct bdev_rbd_cluster *entry;
933 	struct spdk_json_write_ctx *w;
934 
935 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
936 
937 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
938 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
939 		return -ENOENT;
940 	}
941 
942 	/* If cluster name is provided */
943 	if (name) {
944 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
945 			if (strcmp(name, entry->name) == 0) {
946 				w = spdk_jsonrpc_begin_result(request);
947 				dump_single_cluster_entry(entry, w);
948 				spdk_jsonrpc_end_result(request, w);
949 
950 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
951 				return 0;
952 			}
953 		}
954 
955 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
956 		return -ENOENT;
957 	}
958 
959 	w = spdk_jsonrpc_begin_result(request);
960 	spdk_json_write_array_begin(w);
961 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
962 		dump_single_cluster_entry(entry, w);
963 	}
964 	spdk_json_write_array_end(w);
965 	spdk_jsonrpc_end_result(request, w);
966 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
967 
968 	return 0;
969 }
970 
971 static const struct spdk_bdev_fn_table rbd_fn_table = {
972 	.destruct		= bdev_rbd_destruct,
973 	.submit_request		= bdev_rbd_submit_request,
974 	.io_type_supported	= bdev_rbd_io_type_supported,
975 	.get_io_channel		= bdev_rbd_get_io_channel,
976 	.dump_info_json		= bdev_rbd_dump_info_json,
977 	.write_config_json	= bdev_rbd_write_config_json,
978 };
979 
980 static int
981 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
982 		     const char *config_file, const char *key_file)
983 {
984 	struct bdev_rbd_cluster *entry;
985 	int rc;
986 
987 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
988 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
989 		if (strcmp(name, entry->name) == 0) {
990 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
991 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
992 			return -1;
993 		}
994 	}
995 
996 	entry = calloc(1, sizeof(*entry));
997 	if (!entry) {
998 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
999 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1000 		return -1;
1001 	}
1002 
1003 	entry->name = strdup(name);
1004 	if (entry->name == NULL) {
1005 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1006 		goto err_handle;
1007 	}
1008 
1009 	if (user_id) {
1010 		entry->user_id = strdup(user_id);
1011 		if (entry->user_id == NULL) {
1012 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1013 			goto err_handle;
1014 		}
1015 	}
1016 
1017 	/* Support specify config_param or config_file separately, or both of them. */
1018 	if (config_param) {
1019 		entry->config_param = bdev_rbd_dup_config(config_param);
1020 		if (entry->config_param == NULL) {
1021 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1022 			goto err_handle;
1023 		}
1024 	}
1025 
1026 	if (config_file) {
1027 		entry->config_file = strdup(config_file);
1028 		if (entry->config_file == NULL) {
1029 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1030 			goto err_handle;
1031 		}
1032 	}
1033 
1034 	if (key_file) {
1035 		entry->key_file = strdup(key_file);
1036 		if (entry->key_file == NULL) {
1037 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1038 			goto err_handle;
1039 		}
1040 	}
1041 
1042 	rc = rados_create(&entry->cluster, user_id);
1043 	if (rc < 0) {
1044 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1045 		goto err_handle;
1046 	}
1047 
1048 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1049 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1050 	if (entry->config_file && rc < 0) {
1051 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1052 		rados_shutdown(entry->cluster);
1053 		goto err_handle;
1054 	}
1055 
1056 	if (config_param) {
1057 		const char *const *config_entry = config_param;
1058 		while (*config_entry) {
1059 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1060 			if (rc < 0) {
1061 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1062 				rados_shutdown(entry->cluster);
1063 				goto err_handle;
1064 			}
1065 			config_entry += 2;
1066 		}
1067 	}
1068 
1069 	if (key_file) {
1070 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1071 		if (rc < 0) {
1072 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1073 			rados_shutdown(entry->cluster);
1074 			goto err_handle;
1075 		}
1076 	}
1077 
1078 	rc = rados_connect(entry->cluster);
1079 	if (rc < 0) {
1080 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1081 		rados_shutdown(entry->cluster);
1082 		goto err_handle;
1083 	}
1084 
1085 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1086 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1087 
1088 	return 0;
1089 
1090 err_handle:
1091 	bdev_rbd_cluster_free(entry);
1092 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1093 	return -1;
1094 }
1095 
1096 int
1097 bdev_rbd_unregister_cluster(const char *name)
1098 {
1099 	struct bdev_rbd_cluster *entry;
1100 	int rc = 0;
1101 
1102 	if (name == NULL) {
1103 		return -1;
1104 	}
1105 
1106 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1107 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1108 		if (strcmp(name, entry->name) == 0) {
1109 			if (entry->ref == 0) {
1110 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1111 				rados_shutdown(entry->cluster);
1112 				bdev_rbd_cluster_free(entry);
1113 			} else {
1114 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1115 					    entry->name);
1116 				rc = -1;
1117 			}
1118 
1119 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1120 			return rc;
1121 		}
1122 	}
1123 
1124 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1125 
1126 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1127 
1128 	return -1;
1129 }
1130 
1131 static void *
1132 _bdev_rbd_register_cluster(void *arg)
1133 {
1134 	struct cluster_register_info *info = arg;
1135 	void *ret = arg;
1136 	int rc;
1137 
1138 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1139 				  (const char *const *)info->config_param, (const char *)info->config_file,
1140 				  (const char *)info->key_file);
1141 	if (rc) {
1142 		ret = NULL;
1143 	}
1144 
1145 	return ret;
1146 }
1147 
1148 int
1149 bdev_rbd_register_cluster(struct cluster_register_info *info)
1150 {
1151 	assert(info != NULL);
1152 
1153 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1154 	 * resource contention */
1155 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1156 		return -1;
1157 	}
1158 
1159 	return 0;
1160 }
1161 
1162 int
1163 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1164 		const char *pool_name,
1165 		const char *const *config,
1166 		const char *rbd_name,
1167 		uint32_t block_size,
1168 		const char *cluster_name,
1169 		const struct spdk_uuid *uuid)
1170 {
1171 	struct bdev_rbd *rbd;
1172 	int ret;
1173 
1174 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1175 		return -EINVAL;
1176 	}
1177 
1178 	rbd = calloc(1, sizeof(struct bdev_rbd));
1179 	if (rbd == NULL) {
1180 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1181 		return -ENOMEM;
1182 	}
1183 
1184 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1185 	if (ret) {
1186 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1187 		free(rbd);
1188 		return ret;
1189 	}
1190 
1191 	rbd->rbd_name = strdup(rbd_name);
1192 	if (!rbd->rbd_name) {
1193 		bdev_rbd_free(rbd);
1194 		return -ENOMEM;
1195 	}
1196 
1197 	if (user_id) {
1198 		rbd->user_id = strdup(user_id);
1199 		if (!rbd->user_id) {
1200 			bdev_rbd_free(rbd);
1201 			return -ENOMEM;
1202 		}
1203 	}
1204 
1205 	if (cluster_name) {
1206 		rbd->cluster_name = strdup(cluster_name);
1207 		if (!rbd->cluster_name) {
1208 			bdev_rbd_free(rbd);
1209 			return -ENOMEM;
1210 		}
1211 	}
1212 	rbd->pool_name = strdup(pool_name);
1213 	if (!rbd->pool_name) {
1214 		bdev_rbd_free(rbd);
1215 		return -ENOMEM;
1216 	}
1217 
1218 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1219 		bdev_rbd_free(rbd);
1220 		return -ENOMEM;
1221 	}
1222 
1223 	ret = bdev_rbd_init(rbd);
1224 	if (ret < 0) {
1225 		bdev_rbd_free(rbd);
1226 		SPDK_ERRLOG("Failed to init rbd device\n");
1227 		return ret;
1228 	}
1229 
1230 	if (uuid) {
1231 		rbd->disk.uuid = *uuid;
1232 	} else {
1233 		spdk_uuid_generate(&rbd->disk.uuid);
1234 	}
1235 
1236 	if (name) {
1237 		rbd->disk.name = strdup(name);
1238 	} else {
1239 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1240 	}
1241 	if (!rbd->disk.name) {
1242 		bdev_rbd_free(rbd);
1243 		return -ENOMEM;
1244 	}
1245 	rbd->disk.product_name = "Ceph Rbd Disk";
1246 	bdev_rbd_count++;
1247 
1248 	rbd->disk.write_cache = 0;
1249 	rbd->disk.blocklen = block_size;
1250 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1251 	rbd->disk.ctxt = rbd;
1252 	rbd->disk.fn_table = &rbd_fn_table;
1253 	rbd->disk.module = &rbd_if;
1254 
1255 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1256 
1257 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1258 				bdev_rbd_destroy_cb,
1259 				sizeof(struct bdev_rbd_io_channel),
1260 				rbd_name);
1261 	ret = spdk_bdev_register(&rbd->disk);
1262 	if (ret) {
1263 		spdk_io_device_unregister(rbd, NULL);
1264 		bdev_rbd_free(rbd);
1265 		return ret;
1266 	}
1267 
1268 	*bdev = &(rbd->disk);
1269 
1270 	return ret;
1271 }
1272 
1273 void
1274 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1275 {
1276 	int rc;
1277 
1278 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1279 	if (rc != 0) {
1280 		cb_fn(cb_arg, rc);
1281 	}
1282 }
1283 
1284 static void
1285 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1286 {
1287 }
1288 
1289 int
1290 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1291 {
1292 	struct spdk_bdev_desc *desc;
1293 	struct spdk_bdev *bdev;
1294 	struct spdk_io_channel *ch;
1295 	struct bdev_rbd_io_channel *rbd_io_ch;
1296 	int rc = 0;
1297 	uint64_t new_size_in_byte;
1298 	uint64_t current_size_in_mb;
1299 
1300 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1301 	if (rc != 0) {
1302 		return rc;
1303 	}
1304 
1305 	bdev = spdk_bdev_desc_get_bdev(desc);
1306 
1307 	if (bdev->module != &rbd_if) {
1308 		rc = -EINVAL;
1309 		goto exit;
1310 	}
1311 
1312 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1313 	if (current_size_in_mb > new_size_in_mb) {
1314 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1315 		rc = -EINVAL;
1316 		goto exit;
1317 	}
1318 
1319 	ch = bdev_rbd_get_io_channel(bdev);
1320 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1321 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1322 
1323 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1324 	spdk_put_io_channel(ch);
1325 	if (rc != 0) {
1326 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1327 		goto exit;
1328 	}
1329 
1330 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1331 	if (rc != 0) {
1332 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1333 	}
1334 
1335 exit:
1336 	spdk_bdev_close(desc);
1337 	return rc;
1338 }
1339 
1340 static int
1341 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1342 {
1343 	return 0;
1344 }
1345 
1346 static void
1347 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1348 {
1349 }
1350 
1351 static int
1352 bdev_rbd_library_init(void)
1353 {
1354 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1355 				0, "bdev_rbd_poll_groups");
1356 	return 0;
1357 }
1358 
1359 static void
1360 bdev_rbd_library_fini(void)
1361 {
1362 	spdk_io_device_unregister(&rbd_if, NULL);
1363 }
1364 
1365 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1366