xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision a9bcb7f2613c28948b141fdc3fd9cc7b1d3f30a9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd {
27 	struct spdk_bdev disk;
28 	char *rbd_name;
29 	char *user_id;
30 	char *pool_name;
31 	char **config;
32 
33 	rados_t cluster;
34 	rados_t *cluster_p;
35 	char *cluster_name;
36 
37 	rados_ioctx_t io_ctx;
38 	rbd_image_t image;
39 
40 	rbd_image_info_t info;
41 	pthread_mutex_t mutex;
42 	struct spdk_thread *main_td;
43 	struct spdk_thread *destruct_td;
44 	uint32_t ch_count;
45 	struct spdk_io_channel *group_ch;
46 
47 	TAILQ_ENTRY(bdev_rbd) tailq;
48 	struct spdk_poller *reset_timer;
49 	struct spdk_bdev_io *reset_bdev_io;
50 };
51 
52 struct bdev_rbd_io_channel {
53 	struct bdev_rbd *disk;
54 };
55 
56 struct bdev_rbd_io {
57 	struct			spdk_thread *submit_td;
58 	enum			spdk_bdev_io_status status;
59 	rbd_completion_t	comp;
60 	size_t			total_len;
61 };
62 
63 struct bdev_rbd_cluster {
64 	char *name;
65 	char *user_id;
66 	char **config_param;
67 	char *config_file;
68 	char *key_file;
69 	rados_t cluster;
70 	uint32_t ref;
71 	STAILQ_ENTRY(bdev_rbd_cluster) link;
72 };
73 
74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
75 			g_map_bdev_rbd_cluster);
76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
77 
78 static void
79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
80 {
81 	assert(entry != NULL);
82 
83 	bdev_rbd_free_config(entry->config_param);
84 	free(entry->config_file);
85 	free(entry->key_file);
86 	free(entry->user_id);
87 	free(entry->name);
88 	free(entry);
89 }
90 
91 static void
92 bdev_rbd_put_cluster(rados_t **cluster)
93 {
94 	struct bdev_rbd_cluster *entry;
95 
96 	assert(cluster != NULL);
97 
98 	/* No need go through the map if *cluster equals to NULL */
99 	if (*cluster == NULL) {
100 		return;
101 	}
102 
103 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
104 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
105 		if (*cluster != &entry->cluster) {
106 			continue;
107 		}
108 
109 		assert(entry->ref > 0);
110 		entry->ref--;
111 		*cluster = NULL;
112 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
113 		return;
114 	}
115 
116 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
117 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
118 }
119 
120 static void
121 bdev_rbd_free(struct bdev_rbd *rbd)
122 {
123 	if (!rbd) {
124 		return;
125 	}
126 
127 	free(rbd->disk.name);
128 	free(rbd->rbd_name);
129 	free(rbd->user_id);
130 	free(rbd->pool_name);
131 	bdev_rbd_free_config(rbd->config);
132 
133 	if (rbd->io_ctx) {
134 		rados_ioctx_destroy(rbd->io_ctx);
135 	}
136 
137 	if (rbd->cluster_name) {
138 		bdev_rbd_put_cluster(&rbd->cluster_p);
139 		free(rbd->cluster_name);
140 	} else if (rbd->cluster) {
141 		rados_shutdown(rbd->cluster);
142 	}
143 
144 	pthread_mutex_destroy(&rbd->mutex);
145 	free(rbd);
146 }
147 
148 void
149 bdev_rbd_free_config(char **config)
150 {
151 	char **entry;
152 
153 	if (config) {
154 		for (entry = config; *entry; entry++) {
155 			free(*entry);
156 		}
157 		free(config);
158 	}
159 }
160 
161 char **
162 bdev_rbd_dup_config(const char *const *config)
163 {
164 	size_t count;
165 	char **copy;
166 
167 	if (!config) {
168 		return NULL;
169 	}
170 	for (count = 0; config[count]; count++) {}
171 	copy = calloc(count + 1, sizeof(*copy));
172 	if (!copy) {
173 		return NULL;
174 	}
175 	for (count = 0; config[count]; count++) {
176 		if (!(copy[count] = strdup(config[count]))) {
177 			bdev_rbd_free_config(copy);
178 			return NULL;
179 		}
180 	}
181 	return copy;
182 }
183 
184 static int
185 bdev_rados_cluster_init(const char *user_id, const char *const *config,
186 			rados_t *cluster)
187 {
188 	int ret;
189 
190 	ret = rados_create(cluster, user_id);
191 	if (ret < 0) {
192 		SPDK_ERRLOG("Failed to create rados_t struct\n");
193 		return -1;
194 	}
195 
196 	if (config) {
197 		const char *const *entry = config;
198 		while (*entry) {
199 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
200 			if (ret < 0) {
201 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
202 				rados_shutdown(*cluster);
203 				*cluster = NULL;
204 				return -1;
205 			}
206 			entry += 2;
207 		}
208 	} else {
209 		ret = rados_conf_read_file(*cluster, NULL);
210 		if (ret < 0) {
211 			SPDK_ERRLOG("Failed to read conf file\n");
212 			rados_shutdown(*cluster);
213 			*cluster = NULL;
214 			return -1;
215 		}
216 	}
217 
218 	ret = rados_connect(*cluster);
219 	if (ret < 0) {
220 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
221 		rados_shutdown(*cluster);
222 		*cluster = NULL;
223 		return -1;
224 	}
225 
226 	return 0;
227 }
228 
229 static int
230 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
231 {
232 	struct bdev_rbd_cluster *entry;
233 
234 	if (cluster == NULL) {
235 		SPDK_ERRLOG("cluster should not be NULL\n");
236 		return -1;
237 	}
238 
239 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
240 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
241 		if (strcmp(cluster_name, entry->name) == 0) {
242 			entry->ref++;
243 			*cluster = &entry->cluster;
244 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
245 			return 0;
246 		}
247 	}
248 
249 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
250 	return -1;
251 }
252 
253 static int
254 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
255 {
256 	int ret;
257 
258 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
259 	if (ret < 0) {
260 		SPDK_ERRLOG("Failed to create rados_t struct\n");
261 		return -1;
262 	}
263 
264 	return ret;
265 }
266 
267 static void *
268 bdev_rbd_cluster_handle(void *arg)
269 {
270 	void *ret = arg;
271 	struct bdev_rbd *rbd = arg;
272 	int rc;
273 
274 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
275 				     &rbd->cluster);
276 	if (rc < 0) {
277 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
278 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
279 		ret = NULL;
280 	}
281 
282 	return ret;
283 }
284 
285 static void *
286 bdev_rbd_init_context(void *arg)
287 {
288 	struct bdev_rbd *rbd = arg;
289 	int rc;
290 
291 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
292 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
293 		return NULL;
294 	}
295 
296 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
297 	if (rc < 0) {
298 		SPDK_ERRLOG("Failed to open specified rbd device\n");
299 		return NULL;
300 	}
301 
302 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
303 	rbd_close(rbd->image);
304 	if (rc < 0) {
305 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
306 		return NULL;
307 	}
308 
309 	return arg;
310 }
311 
312 static int
313 bdev_rbd_init(struct bdev_rbd *rbd)
314 {
315 	int ret = 0;
316 
317 	if (!rbd->cluster_name) {
318 		rbd->cluster_p = &rbd->cluster;
319 		/* Cluster should be created in non-SPDK thread to avoid conflict between
320 		 * Rados and SPDK thread */
321 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
322 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
323 			return -1;
324 		}
325 	} else {
326 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
327 		if (ret < 0) {
328 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
329 				    rbd, rbd->cluster_name);
330 			return -1;
331 		}
332 	}
333 
334 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
335 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
336 		return -1;
337 	}
338 
339 	return ret;
340 }
341 
342 static void
343 bdev_rbd_exit(rbd_image_t image)
344 {
345 	rbd_flush(image);
346 	rbd_close(image);
347 }
348 
349 static void
350 _bdev_rbd_io_complete(void *_rbd_io)
351 {
352 	struct bdev_rbd_io *rbd_io = _rbd_io;
353 
354 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
355 }
356 
357 static void
358 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
359 {
360 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
361 	struct spdk_thread *current_thread = spdk_get_thread();
362 
363 	rbd_io->status = status;
364 	assert(rbd_io->submit_td != NULL);
365 	if (rbd_io->submit_td != current_thread) {
366 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
367 	} else {
368 		_bdev_rbd_io_complete(rbd_io);
369 	}
370 }
371 
372 static void
373 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
374 {
375 	int io_status;
376 	struct spdk_bdev_io *bdev_io;
377 	struct bdev_rbd_io *rbd_io;
378 	enum spdk_bdev_io_status bio_status;
379 
380 	bdev_io = rbd_aio_get_arg(cb);
381 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
382 	io_status = rbd_aio_get_return_value(cb);
383 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
384 
385 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
386 		if ((int)rbd_io->total_len != io_status) {
387 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
388 		}
389 	} else {
390 		/* For others, 0 means success */
391 		if (io_status != 0) {
392 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
393 		}
394 	}
395 
396 	rbd_aio_release(cb);
397 
398 	bdev_rbd_io_complete(bdev_io, bio_status);
399 }
400 
401 static void
402 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
403 		    struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
404 {
405 	int ret;
406 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
407 	rbd_image_t image = disk->image;
408 
409 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
410 					&rbd_io->comp);
411 	if (ret < 0) {
412 		goto err;
413 	}
414 
415 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
416 		rbd_io->total_len = len;
417 		if (spdk_likely(iovcnt == 1)) {
418 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
419 		} else {
420 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
421 		}
422 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
423 		if (spdk_likely(iovcnt == 1)) {
424 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
425 		} else {
426 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
427 		}
428 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) {
429 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
430 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
431 		ret = rbd_aio_flush(image, rbd_io->comp);
432 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) {
433 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0);
434 	}
435 
436 	if (ret < 0) {
437 		rbd_aio_release(rbd_io->comp);
438 		goto err;
439 	}
440 
441 	return;
442 
443 err:
444 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
445 }
446 
447 static void
448 bdev_rbd_start_aio(void *ctx)
449 {
450 	struct spdk_bdev_io *bdev_io = ctx;
451 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
452 
453 	_bdev_rbd_start_aio(disk,
454 			    bdev_io,
455 			    bdev_io->u.bdev.iovs,
456 			    bdev_io->u.bdev.iovcnt,
457 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
458 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
459 }
460 
461 static int bdev_rbd_library_init(void);
462 static void bdev_rbd_library_fini(void);
463 
464 static int
465 bdev_rbd_get_ctx_size(void)
466 {
467 	return sizeof(struct bdev_rbd_io);
468 }
469 
470 static struct spdk_bdev_module rbd_if = {
471 	.name = "rbd",
472 	.module_init = bdev_rbd_library_init,
473 	.module_fini = bdev_rbd_library_fini,
474 	.get_ctx_size = bdev_rbd_get_ctx_size,
475 
476 };
477 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
478 
479 static int bdev_rbd_reset_timer(void *arg);
480 
481 static void
482 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
483 			       void *cb_arg, int rc)
484 {
485 	struct bdev_rbd *disk = cb_arg;
486 	enum spdk_bdev_io_status bio_status;
487 
488 	if (rc == 0 && current_qd > 0) {
489 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
490 		return;
491 	}
492 
493 	if (rc != 0) {
494 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
495 	} else {
496 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
497 	}
498 
499 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
500 	disk->reset_bdev_io = NULL;
501 }
502 
503 static int
504 bdev_rbd_reset_timer(void *arg)
505 {
506 	struct bdev_rbd *disk = arg;
507 
508 	spdk_poller_unregister(&disk->reset_timer);
509 
510 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
511 
512 	return SPDK_POLLER_BUSY;
513 }
514 
515 static void
516 bdev_rbd_reset(void *ctx)
517 {
518 	struct spdk_bdev_io *bdev_io = ctx;
519 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
520 
521 	/*
522 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
523 	 * poller to wait for in-flight I/O to complete.
524 	 */
525 	assert(disk->reset_bdev_io == NULL);
526 	disk->reset_bdev_io = bdev_io;
527 
528 	bdev_rbd_reset_timer(disk);
529 }
530 
531 static void
532 _bdev_rbd_destruct_done(void *io_device)
533 {
534 	struct bdev_rbd *rbd = io_device;
535 
536 	assert(rbd != NULL);
537 	assert(rbd->ch_count == 0);
538 
539 	spdk_bdev_destruct_done(&rbd->disk, 0);
540 	bdev_rbd_free(rbd);
541 }
542 
543 static void
544 bdev_rbd_free_cb(void *io_device)
545 {
546 	struct bdev_rbd *rbd = io_device;
547 
548 	/* The io device has been unregistered.  Send a message back to the
549 	 * original thread that started the destruct operation, so that the
550 	 * bdev unregister callback is invoked on the same thread that started
551 	 * this whole process.
552 	 */
553 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
554 }
555 
556 static void
557 _bdev_rbd_destruct(void *ctx)
558 {
559 	struct bdev_rbd *rbd = ctx;
560 
561 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
562 }
563 
564 static int
565 bdev_rbd_destruct(void *ctx)
566 {
567 	struct bdev_rbd *rbd = ctx;
568 	struct spdk_thread *td;
569 
570 	if (rbd->main_td == NULL) {
571 		td = spdk_get_thread();
572 	} else {
573 		td = rbd->main_td;
574 	}
575 
576 	/* Start the destruct operation on the rbd bdev's
577 	 * main thread.  This guarantees it will only start
578 	 * executing after any messages related to channel
579 	 * deletions have finished completing.  *Always*
580 	 * send a message, even if this function gets called
581 	 * from the main thread, in case there are pending
582 	 * channel delete messages in flight to this thread.
583 	 */
584 	assert(rbd->destruct_td == NULL);
585 	rbd->destruct_td = td;
586 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
587 
588 	/* Return 1 to indicate the destruct path is asynchronous. */
589 	return 1;
590 }
591 
592 static void
593 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
594 		    bool success)
595 {
596 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
597 
598 	if (!success) {
599 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
600 		return;
601 	}
602 
603 	spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io);
604 }
605 
606 static void
607 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
608 {
609 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
610 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
611 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
612 
613 	rbd_io->submit_td = submit_td;
614 	switch (bdev_io->type) {
615 	case SPDK_BDEV_IO_TYPE_READ:
616 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
617 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
618 		break;
619 
620 	case SPDK_BDEV_IO_TYPE_WRITE:
621 	case SPDK_BDEV_IO_TYPE_UNMAP:
622 	case SPDK_BDEV_IO_TYPE_FLUSH:
623 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
624 		spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io);
625 		break;
626 
627 	case SPDK_BDEV_IO_TYPE_RESET:
628 		spdk_thread_exec_msg(disk->main_td, bdev_rbd_reset, bdev_io);
629 		break;
630 
631 	default:
632 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
633 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
634 		break;
635 	}
636 }
637 
638 static bool
639 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
640 {
641 	switch (io_type) {
642 	case SPDK_BDEV_IO_TYPE_READ:
643 	case SPDK_BDEV_IO_TYPE_WRITE:
644 	case SPDK_BDEV_IO_TYPE_UNMAP:
645 	case SPDK_BDEV_IO_TYPE_FLUSH:
646 	case SPDK_BDEV_IO_TYPE_RESET:
647 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
648 		return true;
649 
650 	default:
651 		return false;
652 	}
653 }
654 
655 static void
656 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
657 {
658 	assert(disk != NULL);
659 	assert(disk->main_td == spdk_get_thread());
660 	assert(disk->ch_count == 0);
661 
662 	spdk_put_io_channel(disk->group_ch);
663 	if (disk->image) {
664 		bdev_rbd_exit(disk->image);
665 	}
666 
667 	disk->main_td = NULL;
668 	disk->group_ch = NULL;
669 }
670 
671 static void *
672 bdev_rbd_handle(void *arg)
673 {
674 	struct bdev_rbd *disk = arg;
675 	void *ret = arg;
676 
677 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
678 		SPDK_ERRLOG("Failed to open specified rbd device\n");
679 		ret = NULL;
680 	}
681 
682 	return ret;
683 }
684 
685 static int
686 _bdev_rbd_create_cb(struct bdev_rbd *disk)
687 {
688 	disk->group_ch = spdk_get_io_channel(&rbd_if);
689 	assert(disk->group_ch != NULL);
690 
691 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
692 		bdev_rbd_free_channel_resources(disk);
693 		return -1;
694 	}
695 
696 	return 0;
697 }
698 
699 static int
700 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
701 {
702 	struct bdev_rbd_io_channel *ch = ctx_buf;
703 	struct bdev_rbd *disk = io_device;
704 	int rc;
705 
706 	ch->disk = disk;
707 	pthread_mutex_lock(&disk->mutex);
708 	if (disk->ch_count == 0) {
709 		assert(disk->main_td == NULL);
710 		rc = _bdev_rbd_create_cb(disk);
711 		if (rc) {
712 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
713 			pthread_mutex_unlock(&disk->mutex);
714 			return rc;
715 		}
716 
717 		disk->main_td = spdk_get_thread();
718 	}
719 
720 	disk->ch_count++;
721 	pthread_mutex_unlock(&disk->mutex);
722 
723 	return 0;
724 }
725 
726 static void
727 _bdev_rbd_destroy_cb(void *ctx)
728 {
729 	struct bdev_rbd *disk = ctx;
730 
731 	pthread_mutex_lock(&disk->mutex);
732 	assert(disk->ch_count > 0);
733 	disk->ch_count--;
734 
735 	if (disk->ch_count > 0) {
736 		/* A new channel was created between when message was sent and this function executed */
737 		pthread_mutex_unlock(&disk->mutex);
738 		return;
739 	}
740 
741 	bdev_rbd_free_channel_resources(disk);
742 	pthread_mutex_unlock(&disk->mutex);
743 }
744 
745 static void
746 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
747 {
748 	struct bdev_rbd *disk = io_device;
749 	struct spdk_thread *thread;
750 
751 	pthread_mutex_lock(&disk->mutex);
752 	assert(disk->ch_count > 0);
753 	disk->ch_count--;
754 	if (disk->ch_count == 0) {
755 		assert(disk->main_td != NULL);
756 		if (disk->main_td != spdk_get_thread()) {
757 			/* The final channel was destroyed on a different thread
758 			 * than where the first channel was created. Pass a message
759 			 * to the main thread to unregister the poller. */
760 			disk->ch_count++;
761 			thread = disk->main_td;
762 			pthread_mutex_unlock(&disk->mutex);
763 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
764 			return;
765 		}
766 
767 		bdev_rbd_free_channel_resources(disk);
768 	}
769 	pthread_mutex_unlock(&disk->mutex);
770 }
771 
772 static struct spdk_io_channel *
773 bdev_rbd_get_io_channel(void *ctx)
774 {
775 	struct bdev_rbd *rbd_bdev = ctx;
776 
777 	return spdk_get_io_channel(rbd_bdev);
778 }
779 
780 static void
781 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
782 {
783 	struct bdev_rbd_cluster *entry;
784 
785 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
786 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
787 		if (strcmp(cluster_name, entry->name)) {
788 			continue;
789 		}
790 		if (entry->user_id) {
791 			spdk_json_write_named_string(w, "user_id", entry->user_id);
792 		}
793 
794 		if (entry->config_param) {
795 			char **config_entry = entry->config_param;
796 
797 			spdk_json_write_named_object_begin(w, "config_param");
798 			while (*config_entry) {
799 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
800 				config_entry += 2;
801 			}
802 			spdk_json_write_object_end(w);
803 		}
804 		if (entry->config_file) {
805 			spdk_json_write_named_string(w, "config_file", entry->config_file);
806 		}
807 		if (entry->key_file) {
808 			spdk_json_write_named_string(w, "key_file", entry->key_file);
809 		}
810 
811 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
812 		return;
813 	}
814 
815 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
816 }
817 
818 static int
819 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
820 {
821 	struct bdev_rbd *rbd_bdev = ctx;
822 
823 	spdk_json_write_named_object_begin(w, "rbd");
824 
825 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
826 
827 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
828 
829 	if (rbd_bdev->cluster_name) {
830 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
831 		goto end;
832 	}
833 
834 	if (rbd_bdev->user_id) {
835 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
836 	}
837 
838 	if (rbd_bdev->config) {
839 		char **entry = rbd_bdev->config;
840 
841 		spdk_json_write_named_object_begin(w, "config");
842 		while (*entry) {
843 			spdk_json_write_named_string(w, entry[0], entry[1]);
844 			entry += 2;
845 		}
846 		spdk_json_write_object_end(w);
847 	}
848 
849 end:
850 	spdk_json_write_object_end(w);
851 
852 	return 0;
853 }
854 
855 static void
856 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
857 {
858 	struct bdev_rbd *rbd = bdev->ctxt;
859 	char uuid_str[SPDK_UUID_STRING_LEN];
860 
861 	spdk_json_write_object_begin(w);
862 
863 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
864 
865 	spdk_json_write_named_object_begin(w, "params");
866 	spdk_json_write_named_string(w, "name", bdev->name);
867 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
868 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
869 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
870 	if (rbd->user_id) {
871 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
872 	}
873 
874 	if (rbd->config) {
875 		char **entry = rbd->config;
876 
877 		spdk_json_write_named_object_begin(w, "config");
878 		while (*entry) {
879 			spdk_json_write_named_string(w, entry[0], entry[1]);
880 			entry += 2;
881 		}
882 		spdk_json_write_object_end(w);
883 	}
884 
885 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
886 	spdk_json_write_named_string(w, "uuid", uuid_str);
887 
888 	spdk_json_write_object_end(w);
889 
890 	spdk_json_write_object_end(w);
891 }
892 
893 static void
894 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
895 {
896 	assert(entry != NULL);
897 
898 	spdk_json_write_object_begin(w);
899 	spdk_json_write_named_string(w, "cluster_name", entry->name);
900 
901 	if (entry->user_id) {
902 		spdk_json_write_named_string(w, "user_id", entry->user_id);
903 	}
904 
905 	if (entry->config_param) {
906 		char **config_entry = entry->config_param;
907 
908 		spdk_json_write_named_object_begin(w, "config_param");
909 		while (*config_entry) {
910 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
911 			config_entry += 2;
912 		}
913 		spdk_json_write_object_end(w);
914 	}
915 	if (entry->config_file) {
916 		spdk_json_write_named_string(w, "config_file", entry->config_file);
917 	}
918 	if (entry->key_file) {
919 		spdk_json_write_named_string(w, "key_file", entry->key_file);
920 	}
921 
922 	spdk_json_write_object_end(w);
923 }
924 
925 int
926 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
927 {
928 	struct bdev_rbd_cluster *entry;
929 	struct spdk_json_write_ctx *w;
930 
931 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
932 
933 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
934 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
935 		return -ENOENT;
936 	}
937 
938 	/* If cluster name is provided */
939 	if (name) {
940 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
941 			if (strcmp(name, entry->name) == 0) {
942 				w = spdk_jsonrpc_begin_result(request);
943 				dump_single_cluster_entry(entry, w);
944 				spdk_jsonrpc_end_result(request, w);
945 
946 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
947 				return 0;
948 			}
949 		}
950 
951 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
952 		return -ENOENT;
953 	}
954 
955 	w = spdk_jsonrpc_begin_result(request);
956 	spdk_json_write_array_begin(w);
957 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
958 		dump_single_cluster_entry(entry, w);
959 	}
960 	spdk_json_write_array_end(w);
961 	spdk_jsonrpc_end_result(request, w);
962 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
963 
964 	return 0;
965 }
966 
967 static const struct spdk_bdev_fn_table rbd_fn_table = {
968 	.destruct		= bdev_rbd_destruct,
969 	.submit_request		= bdev_rbd_submit_request,
970 	.io_type_supported	= bdev_rbd_io_type_supported,
971 	.get_io_channel		= bdev_rbd_get_io_channel,
972 	.dump_info_json		= bdev_rbd_dump_info_json,
973 	.write_config_json	= bdev_rbd_write_config_json,
974 };
975 
976 static int
977 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
978 		     const char *config_file, const char *key_file)
979 {
980 	struct bdev_rbd_cluster *entry;
981 	int rc;
982 
983 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
984 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
985 		if (strcmp(name, entry->name) == 0) {
986 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
987 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
988 			return -1;
989 		}
990 	}
991 
992 	entry = calloc(1, sizeof(*entry));
993 	if (!entry) {
994 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
995 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
996 		return -1;
997 	}
998 
999 	entry->name = strdup(name);
1000 	if (entry->name == NULL) {
1001 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1002 		goto err_handle;
1003 	}
1004 
1005 	if (user_id) {
1006 		entry->user_id = strdup(user_id);
1007 		if (entry->user_id == NULL) {
1008 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1009 			goto err_handle;
1010 		}
1011 	}
1012 
1013 	/* Support specify config_param or config_file separately, or both of them. */
1014 	if (config_param) {
1015 		entry->config_param = bdev_rbd_dup_config(config_param);
1016 		if (entry->config_param == NULL) {
1017 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1018 			goto err_handle;
1019 		}
1020 	}
1021 
1022 	if (config_file) {
1023 		entry->config_file = strdup(config_file);
1024 		if (entry->config_file == NULL) {
1025 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1026 			goto err_handle;
1027 		}
1028 	}
1029 
1030 	if (key_file) {
1031 		entry->key_file = strdup(key_file);
1032 		if (entry->key_file == NULL) {
1033 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1034 			goto err_handle;
1035 		}
1036 	}
1037 
1038 	rc = rados_create(&entry->cluster, user_id);
1039 	if (rc < 0) {
1040 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1041 		goto err_handle;
1042 	}
1043 
1044 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1045 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1046 	if (entry->config_file && rc < 0) {
1047 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1048 		rados_shutdown(entry->cluster);
1049 		goto err_handle;
1050 	}
1051 
1052 	if (config_param) {
1053 		const char *const *config_entry = config_param;
1054 		while (*config_entry) {
1055 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1056 			if (rc < 0) {
1057 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1058 				rados_shutdown(entry->cluster);
1059 				goto err_handle;
1060 			}
1061 			config_entry += 2;
1062 		}
1063 	}
1064 
1065 	if (key_file) {
1066 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1067 		if (rc < 0) {
1068 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1069 			rados_shutdown(entry->cluster);
1070 			goto err_handle;
1071 		}
1072 	}
1073 
1074 	rc = rados_connect(entry->cluster);
1075 	if (rc < 0) {
1076 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1077 		rados_shutdown(entry->cluster);
1078 		goto err_handle;
1079 	}
1080 
1081 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1082 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1083 
1084 	return 0;
1085 
1086 err_handle:
1087 	bdev_rbd_cluster_free(entry);
1088 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1089 	return -1;
1090 }
1091 
1092 int
1093 bdev_rbd_unregister_cluster(const char *name)
1094 {
1095 	struct bdev_rbd_cluster *entry;
1096 	int rc = 0;
1097 
1098 	if (name == NULL) {
1099 		return -1;
1100 	}
1101 
1102 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1103 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1104 		if (strcmp(name, entry->name) == 0) {
1105 			if (entry->ref == 0) {
1106 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1107 				rados_shutdown(entry->cluster);
1108 				bdev_rbd_cluster_free(entry);
1109 			} else {
1110 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1111 					    entry->name);
1112 				rc = -1;
1113 			}
1114 
1115 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1116 			return rc;
1117 		}
1118 	}
1119 
1120 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1121 
1122 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1123 
1124 	return -1;
1125 }
1126 
1127 static void *
1128 _bdev_rbd_register_cluster(void *arg)
1129 {
1130 	struct cluster_register_info *info = arg;
1131 	void *ret = arg;
1132 	int rc;
1133 
1134 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1135 				  (const char *const *)info->config_param, (const char *)info->config_file,
1136 				  (const char *)info->key_file);
1137 	if (rc) {
1138 		ret = NULL;
1139 	}
1140 
1141 	return ret;
1142 }
1143 
1144 int
1145 bdev_rbd_register_cluster(struct cluster_register_info *info)
1146 {
1147 	assert(info != NULL);
1148 
1149 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1150 	 * resource contention */
1151 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1152 		return -1;
1153 	}
1154 
1155 	return 0;
1156 }
1157 
1158 int
1159 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1160 		const char *pool_name,
1161 		const char *const *config,
1162 		const char *rbd_name,
1163 		uint32_t block_size,
1164 		const char *cluster_name,
1165 		const struct spdk_uuid *uuid)
1166 {
1167 	struct bdev_rbd *rbd;
1168 	int ret;
1169 
1170 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1171 		return -EINVAL;
1172 	}
1173 
1174 	rbd = calloc(1, sizeof(struct bdev_rbd));
1175 	if (rbd == NULL) {
1176 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1177 		return -ENOMEM;
1178 	}
1179 
1180 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1181 	if (ret) {
1182 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1183 		free(rbd);
1184 		return ret;
1185 	}
1186 
1187 	rbd->rbd_name = strdup(rbd_name);
1188 	if (!rbd->rbd_name) {
1189 		bdev_rbd_free(rbd);
1190 		return -ENOMEM;
1191 	}
1192 
1193 	if (user_id) {
1194 		rbd->user_id = strdup(user_id);
1195 		if (!rbd->user_id) {
1196 			bdev_rbd_free(rbd);
1197 			return -ENOMEM;
1198 		}
1199 	}
1200 
1201 	if (cluster_name) {
1202 		rbd->cluster_name = strdup(cluster_name);
1203 		if (!rbd->cluster_name) {
1204 			bdev_rbd_free(rbd);
1205 			return -ENOMEM;
1206 		}
1207 	}
1208 	rbd->pool_name = strdup(pool_name);
1209 	if (!rbd->pool_name) {
1210 		bdev_rbd_free(rbd);
1211 		return -ENOMEM;
1212 	}
1213 
1214 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1215 		bdev_rbd_free(rbd);
1216 		return -ENOMEM;
1217 	}
1218 
1219 	ret = bdev_rbd_init(rbd);
1220 	if (ret < 0) {
1221 		bdev_rbd_free(rbd);
1222 		SPDK_ERRLOG("Failed to init rbd device\n");
1223 		return ret;
1224 	}
1225 
1226 	if (uuid) {
1227 		rbd->disk.uuid = *uuid;
1228 	} else {
1229 		spdk_uuid_generate(&rbd->disk.uuid);
1230 	}
1231 
1232 	if (name) {
1233 		rbd->disk.name = strdup(name);
1234 	} else {
1235 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1236 	}
1237 	if (!rbd->disk.name) {
1238 		bdev_rbd_free(rbd);
1239 		return -ENOMEM;
1240 	}
1241 	rbd->disk.product_name = "Ceph Rbd Disk";
1242 	bdev_rbd_count++;
1243 
1244 	rbd->disk.write_cache = 0;
1245 	rbd->disk.blocklen = block_size;
1246 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1247 	rbd->disk.ctxt = rbd;
1248 	rbd->disk.fn_table = &rbd_fn_table;
1249 	rbd->disk.module = &rbd_if;
1250 
1251 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1252 
1253 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1254 				bdev_rbd_destroy_cb,
1255 				sizeof(struct bdev_rbd_io_channel),
1256 				rbd_name);
1257 	ret = spdk_bdev_register(&rbd->disk);
1258 	if (ret) {
1259 		spdk_io_device_unregister(rbd, NULL);
1260 		bdev_rbd_free(rbd);
1261 		return ret;
1262 	}
1263 
1264 	*bdev = &(rbd->disk);
1265 
1266 	return ret;
1267 }
1268 
1269 void
1270 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1271 {
1272 	int rc;
1273 
1274 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1275 	if (rc != 0) {
1276 		cb_fn(cb_arg, rc);
1277 	}
1278 }
1279 
1280 static void
1281 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1282 {
1283 }
1284 
1285 int
1286 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1287 {
1288 	struct spdk_bdev_desc *desc;
1289 	struct spdk_bdev *bdev;
1290 	struct spdk_io_channel *ch;
1291 	struct bdev_rbd_io_channel *rbd_io_ch;
1292 	int rc = 0;
1293 	uint64_t new_size_in_byte;
1294 	uint64_t current_size_in_mb;
1295 
1296 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1297 	if (rc != 0) {
1298 		return rc;
1299 	}
1300 
1301 	bdev = spdk_bdev_desc_get_bdev(desc);
1302 
1303 	if (bdev->module != &rbd_if) {
1304 		rc = -EINVAL;
1305 		goto exit;
1306 	}
1307 
1308 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1309 	if (current_size_in_mb > new_size_in_mb) {
1310 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1311 		rc = -EINVAL;
1312 		goto exit;
1313 	}
1314 
1315 	ch = bdev_rbd_get_io_channel(bdev);
1316 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1317 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1318 
1319 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1320 	spdk_put_io_channel(ch);
1321 	if (rc != 0) {
1322 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1323 		goto exit;
1324 	}
1325 
1326 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1327 	if (rc != 0) {
1328 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1329 	}
1330 
1331 exit:
1332 	spdk_bdev_close(desc);
1333 	return rc;
1334 }
1335 
1336 static int
1337 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1338 {
1339 	return 0;
1340 }
1341 
1342 static void
1343 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1344 {
1345 }
1346 
1347 static int
1348 bdev_rbd_library_init(void)
1349 {
1350 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1351 				0, "bdev_rbd_poll_groups");
1352 	return 0;
1353 }
1354 
1355 static void
1356 bdev_rbd_library_fini(void)
1357 {
1358 	spdk_io_device_unregister(&rbd_if, NULL);
1359 }
1360 
1361 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1362