xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 877573897ad52be4fa8989f7617bd655b87e05c4)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd {
27 	struct spdk_bdev disk;
28 	char *rbd_name;
29 	char *user_id;
30 	char *pool_name;
31 	char **config;
32 
33 	rados_t cluster;
34 	rados_t *cluster_p;
35 	char *cluster_name;
36 
37 	rados_ioctx_t io_ctx;
38 	rbd_image_t image;
39 
40 	rbd_image_info_t info;
41 	pthread_mutex_t mutex;
42 	struct spdk_thread *main_td;
43 	struct spdk_thread *destruct_td;
44 	uint32_t ch_count;
45 	struct spdk_io_channel *group_ch;
46 
47 	TAILQ_ENTRY(bdev_rbd) tailq;
48 	struct spdk_poller *reset_timer;
49 	struct spdk_bdev_io *reset_bdev_io;
50 };
51 
52 struct bdev_rbd_io_channel {
53 	struct bdev_rbd *disk;
54 };
55 
56 struct bdev_rbd_io {
57 	struct			spdk_thread *submit_td;
58 	enum			spdk_bdev_io_status status;
59 	rbd_completion_t	comp;
60 	size_t			total_len;
61 };
62 
63 struct bdev_rbd_cluster {
64 	char *name;
65 	char *user_id;
66 	char **config_param;
67 	char *config_file;
68 	char *key_file;
69 	rados_t cluster;
70 	uint32_t ref;
71 	STAILQ_ENTRY(bdev_rbd_cluster) link;
72 };
73 
74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
75 			g_map_bdev_rbd_cluster);
76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
77 
78 static void
79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
80 {
81 	assert(entry != NULL);
82 
83 	bdev_rbd_free_config(entry->config_param);
84 	free(entry->config_file);
85 	free(entry->key_file);
86 	free(entry->user_id);
87 	free(entry->name);
88 	free(entry);
89 }
90 
91 static void
92 bdev_rbd_put_cluster(rados_t **cluster)
93 {
94 	struct bdev_rbd_cluster *entry;
95 
96 	assert(cluster != NULL);
97 
98 	/* No need go through the map if *cluster equals to NULL */
99 	if (*cluster == NULL) {
100 		return;
101 	}
102 
103 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
104 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
105 		if (*cluster != &entry->cluster) {
106 			continue;
107 		}
108 
109 		assert(entry->ref > 0);
110 		entry->ref--;
111 		*cluster = NULL;
112 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
113 		return;
114 	}
115 
116 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
117 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
118 }
119 
120 static void
121 bdev_rbd_free(struct bdev_rbd *rbd)
122 {
123 	if (!rbd) {
124 		return;
125 	}
126 
127 	free(rbd->disk.name);
128 	free(rbd->rbd_name);
129 	free(rbd->user_id);
130 	free(rbd->pool_name);
131 	bdev_rbd_free_config(rbd->config);
132 
133 	if (rbd->io_ctx) {
134 		rados_ioctx_destroy(rbd->io_ctx);
135 	}
136 
137 	if (rbd->cluster_name) {
138 		bdev_rbd_put_cluster(&rbd->cluster_p);
139 		free(rbd->cluster_name);
140 	} else if (rbd->cluster) {
141 		rados_shutdown(rbd->cluster);
142 	}
143 
144 	pthread_mutex_destroy(&rbd->mutex);
145 	free(rbd);
146 }
147 
148 void
149 bdev_rbd_free_config(char **config)
150 {
151 	char **entry;
152 
153 	if (config) {
154 		for (entry = config; *entry; entry++) {
155 			free(*entry);
156 		}
157 		free(config);
158 	}
159 }
160 
161 char **
162 bdev_rbd_dup_config(const char *const *config)
163 {
164 	size_t count;
165 	char **copy;
166 
167 	if (!config) {
168 		return NULL;
169 	}
170 	for (count = 0; config[count]; count++) {}
171 	copy = calloc(count + 1, sizeof(*copy));
172 	if (!copy) {
173 		return NULL;
174 	}
175 	for (count = 0; config[count]; count++) {
176 		if (!(copy[count] = strdup(config[count]))) {
177 			bdev_rbd_free_config(copy);
178 			return NULL;
179 		}
180 	}
181 	return copy;
182 }
183 
184 static int
185 bdev_rados_cluster_init(const char *user_id, const char *const *config,
186 			rados_t *cluster)
187 {
188 	int ret;
189 
190 	ret = rados_create(cluster, user_id);
191 	if (ret < 0) {
192 		SPDK_ERRLOG("Failed to create rados_t struct\n");
193 		return -1;
194 	}
195 
196 	if (config) {
197 		const char *const *entry = config;
198 		while (*entry) {
199 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
200 			if (ret < 0) {
201 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
202 				rados_shutdown(*cluster);
203 				return -1;
204 			}
205 			entry += 2;
206 		}
207 	} else {
208 		ret = rados_conf_read_file(*cluster, NULL);
209 		if (ret < 0) {
210 			SPDK_ERRLOG("Failed to read conf file\n");
211 			rados_shutdown(*cluster);
212 			return -1;
213 		}
214 	}
215 
216 	ret = rados_connect(*cluster);
217 	if (ret < 0) {
218 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
219 		rados_shutdown(*cluster);
220 		return -1;
221 	}
222 
223 	return 0;
224 }
225 
226 static int
227 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
228 {
229 	struct bdev_rbd_cluster *entry;
230 
231 	if (cluster == NULL) {
232 		SPDK_ERRLOG("cluster should not be NULL\n");
233 		return -1;
234 	}
235 
236 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
237 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
238 		if (strcmp(cluster_name, entry->name) == 0) {
239 			entry->ref++;
240 			*cluster = &entry->cluster;
241 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
242 			return 0;
243 		}
244 	}
245 
246 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
247 	return -1;
248 }
249 
250 static int
251 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
252 {
253 	int ret;
254 
255 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
256 	if (ret < 0) {
257 		SPDK_ERRLOG("Failed to create rados_t struct\n");
258 		return -1;
259 	}
260 
261 	return ret;
262 }
263 
264 static void *
265 bdev_rbd_cluster_handle(void *arg)
266 {
267 	void *ret = arg;
268 	struct bdev_rbd *rbd = arg;
269 	int rc;
270 
271 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
272 				     &rbd->cluster);
273 	if (rc < 0) {
274 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
275 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
276 		ret = NULL;
277 	}
278 
279 	return ret;
280 }
281 
282 static void *
283 bdev_rbd_init_context(void *arg)
284 {
285 	struct bdev_rbd *rbd = arg;
286 	int rc;
287 
288 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
289 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
290 		return NULL;
291 	}
292 
293 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
294 	if (rc < 0) {
295 		SPDK_ERRLOG("Failed to open specified rbd device\n");
296 		return NULL;
297 	}
298 
299 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
300 	rbd_close(rbd->image);
301 	if (rc < 0) {
302 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
303 		return NULL;
304 	}
305 
306 	return arg;
307 }
308 
309 static int
310 bdev_rbd_init(struct bdev_rbd *rbd)
311 {
312 	int ret = 0;
313 
314 	if (!rbd->cluster_name) {
315 		rbd->cluster_p = &rbd->cluster;
316 		/* Cluster should be created in non-SPDK thread to avoid conflict between
317 		 * Rados and SPDK thread */
318 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
319 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
320 			return -1;
321 		}
322 	} else {
323 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
324 		if (ret < 0) {
325 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
326 				    rbd, rbd->cluster_name);
327 			return -1;
328 		}
329 	}
330 
331 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
332 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
333 		return -1;
334 	}
335 
336 	return ret;
337 }
338 
339 static void
340 bdev_rbd_exit(rbd_image_t image)
341 {
342 	rbd_flush(image);
343 	rbd_close(image);
344 }
345 
346 static void
347 _bdev_rbd_io_complete(void *_rbd_io)
348 {
349 	struct bdev_rbd_io *rbd_io = _rbd_io;
350 
351 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
352 }
353 
354 static void
355 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
356 {
357 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
358 	struct spdk_thread *current_thread = spdk_get_thread();
359 
360 	rbd_io->status = status;
361 	assert(rbd_io->submit_td != NULL);
362 	if (rbd_io->submit_td != current_thread) {
363 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
364 	} else {
365 		_bdev_rbd_io_complete(rbd_io);
366 	}
367 }
368 
369 static void
370 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
371 {
372 	int io_status;
373 	struct spdk_bdev_io *bdev_io;
374 	struct bdev_rbd_io *rbd_io;
375 	enum spdk_bdev_io_status bio_status;
376 
377 	bdev_io = rbd_aio_get_arg(cb);
378 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
379 	io_status = rbd_aio_get_return_value(cb);
380 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
381 
382 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
383 		if ((int)rbd_io->total_len != io_status) {
384 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
385 		}
386 	} else {
387 		/* For others, 0 means success */
388 		if (io_status != 0) {
389 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
390 		}
391 	}
392 
393 	rbd_aio_release(cb);
394 
395 	bdev_rbd_io_complete(bdev_io, bio_status);
396 }
397 
398 static void
399 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
400 		    struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
401 {
402 	int ret;
403 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
404 	rbd_image_t image = disk->image;
405 
406 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
407 					&rbd_io->comp);
408 	if (ret < 0) {
409 		goto err;
410 	}
411 
412 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
413 		rbd_io->total_len = len;
414 		if (spdk_likely(iovcnt == 1)) {
415 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
416 		} else {
417 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
418 		}
419 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
420 		if (spdk_likely(iovcnt == 1)) {
421 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
422 		} else {
423 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
424 		}
425 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) {
426 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
427 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
428 		ret = rbd_aio_flush(image, rbd_io->comp);
429 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) {
430 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0);
431 	}
432 
433 	if (ret < 0) {
434 		rbd_aio_release(rbd_io->comp);
435 		goto err;
436 	}
437 
438 	return;
439 
440 err:
441 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
442 }
443 
444 static void
445 bdev_rbd_start_aio(void *ctx)
446 {
447 	struct spdk_bdev_io *bdev_io = ctx;
448 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
449 
450 	_bdev_rbd_start_aio(disk,
451 			    bdev_io,
452 			    bdev_io->u.bdev.iovs,
453 			    bdev_io->u.bdev.iovcnt,
454 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
455 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
456 }
457 
458 static int bdev_rbd_library_init(void);
459 static void bdev_rbd_library_fini(void);
460 
461 static int
462 bdev_rbd_get_ctx_size(void)
463 {
464 	return sizeof(struct bdev_rbd_io);
465 }
466 
467 static struct spdk_bdev_module rbd_if = {
468 	.name = "rbd",
469 	.module_init = bdev_rbd_library_init,
470 	.module_fini = bdev_rbd_library_fini,
471 	.get_ctx_size = bdev_rbd_get_ctx_size,
472 
473 };
474 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
475 
476 static int bdev_rbd_reset_timer(void *arg);
477 
478 static void
479 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
480 			       void *cb_arg, int rc)
481 {
482 	struct bdev_rbd *disk = cb_arg;
483 	enum spdk_bdev_io_status bio_status;
484 
485 	if (rc == 0 && current_qd > 0) {
486 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
487 		return;
488 	}
489 
490 	if (rc != 0) {
491 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
492 	} else {
493 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
494 	}
495 
496 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
497 	disk->reset_bdev_io = NULL;
498 }
499 
500 static int
501 bdev_rbd_reset_timer(void *arg)
502 {
503 	struct bdev_rbd *disk = arg;
504 
505 	spdk_poller_unregister(&disk->reset_timer);
506 
507 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
508 
509 	return SPDK_POLLER_BUSY;
510 }
511 
512 static void
513 bdev_rbd_reset(void *ctx)
514 {
515 	struct spdk_bdev_io *bdev_io = ctx;
516 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
517 
518 	/*
519 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
520 	 * poller to wait for in-flight I/O to complete.
521 	 */
522 	assert(disk->reset_bdev_io == NULL);
523 	disk->reset_bdev_io = bdev_io;
524 
525 	bdev_rbd_reset_timer(disk);
526 }
527 
528 static void
529 _bdev_rbd_destruct_done(void *io_device)
530 {
531 	struct bdev_rbd *rbd = io_device;
532 
533 	assert(rbd != NULL);
534 	assert(rbd->ch_count == 0);
535 
536 	spdk_bdev_destruct_done(&rbd->disk, 0);
537 	bdev_rbd_free(rbd);
538 }
539 
540 static void
541 bdev_rbd_free_cb(void *io_device)
542 {
543 	struct bdev_rbd *rbd = io_device;
544 
545 	/* The io device has been unregistered.  Send a message back to the
546 	 * original thread that started the destruct operation, so that the
547 	 * bdev unregister callback is invoked on the same thread that started
548 	 * this whole process.
549 	 */
550 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
551 }
552 
553 static void
554 _bdev_rbd_destruct(void *ctx)
555 {
556 	struct bdev_rbd *rbd = ctx;
557 
558 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
559 }
560 
561 static int
562 bdev_rbd_destruct(void *ctx)
563 {
564 	struct bdev_rbd *rbd = ctx;
565 	struct spdk_thread *td;
566 
567 	if (rbd->main_td == NULL) {
568 		td = spdk_get_thread();
569 	} else {
570 		td = rbd->main_td;
571 	}
572 
573 	/* Start the destruct operation on the rbd bdev's
574 	 * main thread.  This guarantees it will only start
575 	 * executing after any messages related to channel
576 	 * deletions have finished completing.  *Always*
577 	 * send a message, even if this function gets called
578 	 * from the main thread, in case there are pending
579 	 * channel delete messages in flight to this thread.
580 	 */
581 	assert(rbd->destruct_td == NULL);
582 	rbd->destruct_td = td;
583 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
584 
585 	/* Return 1 to indicate the destruct path is asynchronous. */
586 	return 1;
587 }
588 
589 static void
590 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
591 		    bool success)
592 {
593 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
594 
595 	if (!success) {
596 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
597 		return;
598 	}
599 
600 	spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io);
601 }
602 
603 static void
604 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
605 {
606 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
607 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
608 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
609 
610 	rbd_io->submit_td = submit_td;
611 	switch (bdev_io->type) {
612 	case SPDK_BDEV_IO_TYPE_READ:
613 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
614 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
615 		break;
616 
617 	case SPDK_BDEV_IO_TYPE_WRITE:
618 	case SPDK_BDEV_IO_TYPE_UNMAP:
619 	case SPDK_BDEV_IO_TYPE_FLUSH:
620 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
621 		spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io);
622 		break;
623 
624 	case SPDK_BDEV_IO_TYPE_RESET:
625 		spdk_thread_exec_msg(disk->main_td, bdev_rbd_reset, bdev_io);
626 		break;
627 
628 	default:
629 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
630 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
631 		break;
632 	}
633 }
634 
635 static bool
636 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
637 {
638 	switch (io_type) {
639 	case SPDK_BDEV_IO_TYPE_READ:
640 	case SPDK_BDEV_IO_TYPE_WRITE:
641 	case SPDK_BDEV_IO_TYPE_UNMAP:
642 	case SPDK_BDEV_IO_TYPE_FLUSH:
643 	case SPDK_BDEV_IO_TYPE_RESET:
644 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
645 		return true;
646 
647 	default:
648 		return false;
649 	}
650 }
651 
652 static void
653 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
654 {
655 	assert(disk != NULL);
656 	assert(disk->main_td == spdk_get_thread());
657 	assert(disk->ch_count == 0);
658 
659 	spdk_put_io_channel(disk->group_ch);
660 	if (disk->image) {
661 		bdev_rbd_exit(disk->image);
662 	}
663 
664 	disk->main_td = NULL;
665 	disk->group_ch = NULL;
666 }
667 
668 static void *
669 bdev_rbd_handle(void *arg)
670 {
671 	struct bdev_rbd *disk = arg;
672 	void *ret = arg;
673 
674 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
675 		SPDK_ERRLOG("Failed to open specified rbd device\n");
676 		ret = NULL;
677 	}
678 
679 	return ret;
680 }
681 
682 static int
683 _bdev_rbd_create_cb(struct bdev_rbd *disk)
684 {
685 	disk->group_ch = spdk_get_io_channel(&rbd_if);
686 	assert(disk->group_ch != NULL);
687 
688 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
689 		bdev_rbd_free_channel_resources(disk);
690 		return -1;
691 	}
692 
693 	return 0;
694 }
695 
696 static int
697 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
698 {
699 	struct bdev_rbd_io_channel *ch = ctx_buf;
700 	struct bdev_rbd *disk = io_device;
701 	int rc;
702 
703 	ch->disk = disk;
704 	pthread_mutex_lock(&disk->mutex);
705 	if (disk->ch_count == 0) {
706 		assert(disk->main_td == NULL);
707 		rc = _bdev_rbd_create_cb(disk);
708 		if (rc) {
709 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
710 			pthread_mutex_unlock(&disk->mutex);
711 			return rc;
712 		}
713 
714 		disk->main_td = spdk_get_thread();
715 	}
716 
717 	disk->ch_count++;
718 	pthread_mutex_unlock(&disk->mutex);
719 
720 	return 0;
721 }
722 
723 static void
724 _bdev_rbd_destroy_cb(void *ctx)
725 {
726 	struct bdev_rbd *disk = ctx;
727 
728 	pthread_mutex_lock(&disk->mutex);
729 	assert(disk->ch_count > 0);
730 	disk->ch_count--;
731 
732 	if (disk->ch_count > 0) {
733 		/* A new channel was created between when message was sent and this function executed */
734 		pthread_mutex_unlock(&disk->mutex);
735 		return;
736 	}
737 
738 	bdev_rbd_free_channel_resources(disk);
739 	pthread_mutex_unlock(&disk->mutex);
740 }
741 
742 static void
743 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
744 {
745 	struct bdev_rbd *disk = io_device;
746 	struct spdk_thread *thread;
747 
748 	pthread_mutex_lock(&disk->mutex);
749 	assert(disk->ch_count > 0);
750 	disk->ch_count--;
751 	if (disk->ch_count == 0) {
752 		assert(disk->main_td != NULL);
753 		if (disk->main_td != spdk_get_thread()) {
754 			/* The final channel was destroyed on a different thread
755 			 * than where the first channel was created. Pass a message
756 			 * to the main thread to unregister the poller. */
757 			disk->ch_count++;
758 			thread = disk->main_td;
759 			pthread_mutex_unlock(&disk->mutex);
760 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
761 			return;
762 		}
763 
764 		bdev_rbd_free_channel_resources(disk);
765 	}
766 	pthread_mutex_unlock(&disk->mutex);
767 }
768 
769 static struct spdk_io_channel *
770 bdev_rbd_get_io_channel(void *ctx)
771 {
772 	struct bdev_rbd *rbd_bdev = ctx;
773 
774 	return spdk_get_io_channel(rbd_bdev);
775 }
776 
777 static void
778 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
779 {
780 	struct bdev_rbd_cluster *entry;
781 
782 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
783 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
784 		if (strcmp(cluster_name, entry->name)) {
785 			continue;
786 		}
787 		if (entry->user_id) {
788 			spdk_json_write_named_string(w, "user_id", entry->user_id);
789 		}
790 
791 		if (entry->config_param) {
792 			char **config_entry = entry->config_param;
793 
794 			spdk_json_write_named_object_begin(w, "config_param");
795 			while (*config_entry) {
796 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
797 				config_entry += 2;
798 			}
799 			spdk_json_write_object_end(w);
800 		}
801 		if (entry->config_file) {
802 			spdk_json_write_named_string(w, "config_file", entry->config_file);
803 		}
804 		if (entry->key_file) {
805 			spdk_json_write_named_string(w, "key_file", entry->key_file);
806 		}
807 
808 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
809 		return;
810 	}
811 
812 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
813 }
814 
815 static int
816 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
817 {
818 	struct bdev_rbd *rbd_bdev = ctx;
819 
820 	spdk_json_write_named_object_begin(w, "rbd");
821 
822 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
823 
824 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
825 
826 	if (rbd_bdev->cluster_name) {
827 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
828 		goto end;
829 	}
830 
831 	if (rbd_bdev->user_id) {
832 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
833 	}
834 
835 	if (rbd_bdev->config) {
836 		char **entry = rbd_bdev->config;
837 
838 		spdk_json_write_named_object_begin(w, "config");
839 		while (*entry) {
840 			spdk_json_write_named_string(w, entry[0], entry[1]);
841 			entry += 2;
842 		}
843 		spdk_json_write_object_end(w);
844 	}
845 
846 end:
847 	spdk_json_write_object_end(w);
848 
849 	return 0;
850 }
851 
852 static void
853 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
854 {
855 	struct bdev_rbd *rbd = bdev->ctxt;
856 	char uuid_str[SPDK_UUID_STRING_LEN];
857 
858 	spdk_json_write_object_begin(w);
859 
860 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
861 
862 	spdk_json_write_named_object_begin(w, "params");
863 	spdk_json_write_named_string(w, "name", bdev->name);
864 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
865 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
866 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
867 	if (rbd->user_id) {
868 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
869 	}
870 
871 	if (rbd->config) {
872 		char **entry = rbd->config;
873 
874 		spdk_json_write_named_object_begin(w, "config");
875 		while (*entry) {
876 			spdk_json_write_named_string(w, entry[0], entry[1]);
877 			entry += 2;
878 		}
879 		spdk_json_write_object_end(w);
880 	}
881 
882 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
883 	spdk_json_write_named_string(w, "uuid", uuid_str);
884 
885 	spdk_json_write_object_end(w);
886 
887 	spdk_json_write_object_end(w);
888 }
889 
890 static void
891 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
892 {
893 	assert(entry != NULL);
894 
895 	spdk_json_write_object_begin(w);
896 	spdk_json_write_named_string(w, "cluster_name", entry->name);
897 
898 	if (entry->user_id) {
899 		spdk_json_write_named_string(w, "user_id", entry->user_id);
900 	}
901 
902 	if (entry->config_param) {
903 		char **config_entry = entry->config_param;
904 
905 		spdk_json_write_named_object_begin(w, "config_param");
906 		while (*config_entry) {
907 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
908 			config_entry += 2;
909 		}
910 		spdk_json_write_object_end(w);
911 	}
912 	if (entry->config_file) {
913 		spdk_json_write_named_string(w, "config_file", entry->config_file);
914 	}
915 	if (entry->key_file) {
916 		spdk_json_write_named_string(w, "key_file", entry->key_file);
917 	}
918 
919 	spdk_json_write_object_end(w);
920 }
921 
922 int
923 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
924 {
925 	struct bdev_rbd_cluster *entry;
926 	struct spdk_json_write_ctx *w;
927 
928 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
929 
930 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
931 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
932 		return -ENOENT;
933 	}
934 
935 	/* If cluster name is provided */
936 	if (name) {
937 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
938 			if (strcmp(name, entry->name) == 0) {
939 				w = spdk_jsonrpc_begin_result(request);
940 				dump_single_cluster_entry(entry, w);
941 				spdk_jsonrpc_end_result(request, w);
942 
943 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
944 				return 0;
945 			}
946 		}
947 
948 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
949 		return -ENOENT;
950 	}
951 
952 	w = spdk_jsonrpc_begin_result(request);
953 	spdk_json_write_array_begin(w);
954 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
955 		dump_single_cluster_entry(entry, w);
956 	}
957 	spdk_json_write_array_end(w);
958 	spdk_jsonrpc_end_result(request, w);
959 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
960 
961 	return 0;
962 }
963 
964 static const struct spdk_bdev_fn_table rbd_fn_table = {
965 	.destruct		= bdev_rbd_destruct,
966 	.submit_request		= bdev_rbd_submit_request,
967 	.io_type_supported	= bdev_rbd_io_type_supported,
968 	.get_io_channel		= bdev_rbd_get_io_channel,
969 	.dump_info_json		= bdev_rbd_dump_info_json,
970 	.write_config_json	= bdev_rbd_write_config_json,
971 };
972 
973 static int
974 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
975 		     const char *config_file, const char *key_file)
976 {
977 	struct bdev_rbd_cluster *entry;
978 	int rc;
979 
980 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
981 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
982 		if (strcmp(name, entry->name) == 0) {
983 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
984 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
985 			return -1;
986 		}
987 	}
988 
989 	entry = calloc(1, sizeof(*entry));
990 	if (!entry) {
991 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
992 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
993 		return -1;
994 	}
995 
996 	entry->name = strdup(name);
997 	if (entry->name == NULL) {
998 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
999 		goto err_handle;
1000 	}
1001 
1002 	if (user_id) {
1003 		entry->user_id = strdup(user_id);
1004 		if (entry->user_id == NULL) {
1005 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1006 			goto err_handle;
1007 		}
1008 	}
1009 
1010 	/* Support specify config_param or config_file separately, or both of them. */
1011 	if (config_param) {
1012 		entry->config_param = bdev_rbd_dup_config(config_param);
1013 		if (entry->config_param == NULL) {
1014 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1015 			goto err_handle;
1016 		}
1017 	}
1018 
1019 	if (config_file) {
1020 		entry->config_file = strdup(config_file);
1021 		if (entry->config_file == NULL) {
1022 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1023 			goto err_handle;
1024 		}
1025 	}
1026 
1027 	if (key_file) {
1028 		entry->key_file = strdup(key_file);
1029 		if (entry->key_file == NULL) {
1030 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1031 			goto err_handle;
1032 		}
1033 	}
1034 
1035 	rc = rados_create(&entry->cluster, user_id);
1036 	if (rc < 0) {
1037 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1038 		goto err_handle;
1039 	}
1040 
1041 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1042 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1043 	if (entry->config_file && rc < 0) {
1044 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1045 		rados_shutdown(entry->cluster);
1046 		goto err_handle;
1047 	}
1048 
1049 	if (config_param) {
1050 		const char *const *config_entry = config_param;
1051 		while (*config_entry) {
1052 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1053 			if (rc < 0) {
1054 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1055 				rados_shutdown(entry->cluster);
1056 				goto err_handle;
1057 			}
1058 			config_entry += 2;
1059 		}
1060 	}
1061 
1062 	if (key_file) {
1063 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1064 		if (rc < 0) {
1065 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1066 			rados_shutdown(entry->cluster);
1067 			goto err_handle;
1068 		}
1069 	}
1070 
1071 	rc = rados_connect(entry->cluster);
1072 	if (rc < 0) {
1073 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1074 		rados_shutdown(entry->cluster);
1075 		goto err_handle;
1076 	}
1077 
1078 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1079 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1080 
1081 	return 0;
1082 
1083 err_handle:
1084 	bdev_rbd_cluster_free(entry);
1085 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1086 	return -1;
1087 }
1088 
1089 int
1090 bdev_rbd_unregister_cluster(const char *name)
1091 {
1092 	struct bdev_rbd_cluster *entry;
1093 	int rc = 0;
1094 
1095 	if (name == NULL) {
1096 		return -1;
1097 	}
1098 
1099 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1100 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1101 		if (strcmp(name, entry->name) == 0) {
1102 			if (entry->ref == 0) {
1103 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1104 				rados_shutdown(entry->cluster);
1105 				bdev_rbd_cluster_free(entry);
1106 			} else {
1107 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1108 					    entry->name);
1109 				rc = -1;
1110 			}
1111 
1112 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1113 			return rc;
1114 		}
1115 	}
1116 
1117 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1118 
1119 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1120 
1121 	return -1;
1122 }
1123 
1124 static void *
1125 _bdev_rbd_register_cluster(void *arg)
1126 {
1127 	struct cluster_register_info *info = arg;
1128 	void *ret = arg;
1129 	int rc;
1130 
1131 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1132 				  (const char *const *)info->config_param, (const char *)info->config_file,
1133 				  (const char *)info->key_file);
1134 	if (rc) {
1135 		ret = NULL;
1136 	}
1137 
1138 	return ret;
1139 }
1140 
1141 int
1142 bdev_rbd_register_cluster(struct cluster_register_info *info)
1143 {
1144 	assert(info != NULL);
1145 
1146 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1147 	 * resource contention */
1148 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1149 		return -1;
1150 	}
1151 
1152 	return 0;
1153 }
1154 
1155 int
1156 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1157 		const char *pool_name,
1158 		const char *const *config,
1159 		const char *rbd_name,
1160 		uint32_t block_size,
1161 		const char *cluster_name,
1162 		const struct spdk_uuid *uuid)
1163 {
1164 	struct bdev_rbd *rbd;
1165 	int ret;
1166 
1167 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1168 		return -EINVAL;
1169 	}
1170 
1171 	rbd = calloc(1, sizeof(struct bdev_rbd));
1172 	if (rbd == NULL) {
1173 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1174 		return -ENOMEM;
1175 	}
1176 
1177 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1178 	if (ret) {
1179 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1180 		free(rbd);
1181 		return ret;
1182 	}
1183 
1184 	rbd->rbd_name = strdup(rbd_name);
1185 	if (!rbd->rbd_name) {
1186 		bdev_rbd_free(rbd);
1187 		return -ENOMEM;
1188 	}
1189 
1190 	if (user_id) {
1191 		rbd->user_id = strdup(user_id);
1192 		if (!rbd->user_id) {
1193 			bdev_rbd_free(rbd);
1194 			return -ENOMEM;
1195 		}
1196 	}
1197 
1198 	if (cluster_name) {
1199 		rbd->cluster_name = strdup(cluster_name);
1200 		if (!rbd->cluster_name) {
1201 			bdev_rbd_free(rbd);
1202 			return -ENOMEM;
1203 		}
1204 	}
1205 	rbd->pool_name = strdup(pool_name);
1206 	if (!rbd->pool_name) {
1207 		bdev_rbd_free(rbd);
1208 		return -ENOMEM;
1209 	}
1210 
1211 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1212 		bdev_rbd_free(rbd);
1213 		return -ENOMEM;
1214 	}
1215 
1216 	ret = bdev_rbd_init(rbd);
1217 	if (ret < 0) {
1218 		bdev_rbd_free(rbd);
1219 		SPDK_ERRLOG("Failed to init rbd device\n");
1220 		return ret;
1221 	}
1222 
1223 	if (uuid) {
1224 		rbd->disk.uuid = *uuid;
1225 	} else {
1226 		spdk_uuid_generate(&rbd->disk.uuid);
1227 	}
1228 
1229 	if (name) {
1230 		rbd->disk.name = strdup(name);
1231 	} else {
1232 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1233 	}
1234 	if (!rbd->disk.name) {
1235 		bdev_rbd_free(rbd);
1236 		return -ENOMEM;
1237 	}
1238 	rbd->disk.product_name = "Ceph Rbd Disk";
1239 	bdev_rbd_count++;
1240 
1241 	rbd->disk.write_cache = 0;
1242 	rbd->disk.blocklen = block_size;
1243 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1244 	rbd->disk.ctxt = rbd;
1245 	rbd->disk.fn_table = &rbd_fn_table;
1246 	rbd->disk.module = &rbd_if;
1247 
1248 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1249 
1250 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1251 				bdev_rbd_destroy_cb,
1252 				sizeof(struct bdev_rbd_io_channel),
1253 				rbd_name);
1254 	ret = spdk_bdev_register(&rbd->disk);
1255 	if (ret) {
1256 		spdk_io_device_unregister(rbd, NULL);
1257 		bdev_rbd_free(rbd);
1258 		return ret;
1259 	}
1260 
1261 	*bdev = &(rbd->disk);
1262 
1263 	return ret;
1264 }
1265 
1266 void
1267 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1268 {
1269 	int rc;
1270 
1271 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1272 	if (rc != 0) {
1273 		cb_fn(cb_arg, rc);
1274 	}
1275 }
1276 
1277 static void
1278 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1279 {
1280 }
1281 
1282 int
1283 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1284 {
1285 	struct spdk_bdev_desc *desc;
1286 	struct spdk_bdev *bdev;
1287 	struct spdk_io_channel *ch;
1288 	struct bdev_rbd_io_channel *rbd_io_ch;
1289 	int rc = 0;
1290 	uint64_t new_size_in_byte;
1291 	uint64_t current_size_in_mb;
1292 
1293 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1294 	if (rc != 0) {
1295 		return rc;
1296 	}
1297 
1298 	bdev = spdk_bdev_desc_get_bdev(desc);
1299 
1300 	if (bdev->module != &rbd_if) {
1301 		rc = -EINVAL;
1302 		goto exit;
1303 	}
1304 
1305 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1306 	if (current_size_in_mb > new_size_in_mb) {
1307 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1308 		rc = -EINVAL;
1309 		goto exit;
1310 	}
1311 
1312 	ch = bdev_rbd_get_io_channel(bdev);
1313 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1314 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1315 
1316 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1317 	spdk_put_io_channel(ch);
1318 	if (rc != 0) {
1319 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1320 		goto exit;
1321 	}
1322 
1323 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1324 	if (rc != 0) {
1325 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1326 	}
1327 
1328 exit:
1329 	spdk_bdev_close(desc);
1330 	return rc;
1331 }
1332 
1333 static int
1334 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1335 {
1336 	return 0;
1337 }
1338 
1339 static void
1340 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1341 {
1342 }
1343 
1344 static int
1345 bdev_rbd_library_init(void)
1346 {
1347 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1348 				0, "bdev_rbd_poll_groups");
1349 	return 0;
1350 }
1351 
1352 static void
1353 bdev_rbd_library_fini(void)
1354 {
1355 	spdk_io_device_unregister(&rbd_if, NULL);
1356 }
1357 
1358 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1359