xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 075d422f3480d3db11013734f833304606867da4)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd_pool_ctx {
27 	rados_t *cluster_p;
28 	char *name;
29 	rados_ioctx_t io_ctx;
30 	uint32_t ref;
31 	STAILQ_ENTRY(bdev_rbd_pool_ctx) link;
32 };
33 
34 static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER(
35 			g_map_bdev_rbd_pool_ctx);
36 
37 struct bdev_rbd {
38 	struct spdk_bdev disk;
39 	char *rbd_name;
40 	char *user_id;
41 	char *pool_name;
42 	char **config;
43 
44 	rados_t cluster;
45 	rados_t *cluster_p;
46 	char *cluster_name;
47 
48 	struct bdev_rbd_pool_ctx *ctx;
49 	rbd_image_t image;
50 
51 	rbd_image_info_t info;
52 	struct spdk_thread *destruct_td;
53 
54 	TAILQ_ENTRY(bdev_rbd) tailq;
55 	struct spdk_poller *reset_timer;
56 	struct spdk_bdev_io *reset_bdev_io;
57 };
58 
59 struct bdev_rbd_io_channel {
60 	struct bdev_rbd *disk;
61 	struct spdk_io_channel *group_ch;
62 };
63 
64 struct bdev_rbd_io {
65 	struct			spdk_thread *submit_td;
66 	enum			spdk_bdev_io_status status;
67 	rbd_completion_t	comp;
68 	size_t			total_len;
69 };
70 
71 struct bdev_rbd_cluster {
72 	char *name;
73 	char *user_id;
74 	char **config_param;
75 	char *config_file;
76 	char *key_file;
77 	char *core_mask;
78 	rados_t cluster;
79 	uint32_t ref;
80 	STAILQ_ENTRY(bdev_rbd_cluster) link;
81 };
82 
83 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
84 			g_map_bdev_rbd_cluster);
85 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
86 
87 static void
88 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
89 {
90 	assert(entry != NULL);
91 
92 	bdev_rbd_free_config(entry->config_param);
93 	free(entry->config_file);
94 	free(entry->key_file);
95 	free(entry->user_id);
96 	free(entry->name);
97 	free(entry->core_mask);
98 	free(entry);
99 }
100 
101 static void
102 bdev_rbd_put_cluster(rados_t **cluster)
103 {
104 	struct bdev_rbd_cluster *entry;
105 
106 	assert(cluster != NULL);
107 
108 	/* No need go through the map if *cluster equals to NULL */
109 	if (*cluster == NULL) {
110 		return;
111 	}
112 
113 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
114 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
115 		if (*cluster != &entry->cluster) {
116 			continue;
117 		}
118 
119 		assert(entry->ref > 0);
120 		entry->ref--;
121 		*cluster = NULL;
122 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
123 		return;
124 	}
125 
126 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
127 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
128 }
129 
130 static void
131 bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry)
132 {
133 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
134 
135 	assert(entry != NULL);
136 	assert(entry->ref > 0);
137 	entry->ref--;
138 	if (entry->ref == 0) {
139 		STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link);
140 		rados_ioctx_destroy(entry->io_ctx);
141 		free(entry->name);
142 		free(entry);
143 	}
144 }
145 
146 static void
147 bdev_rbd_free(struct bdev_rbd *rbd)
148 {
149 	if (!rbd) {
150 		return;
151 	}
152 
153 	if (rbd->image) {
154 		rbd_flush(rbd->image);
155 		rbd_close(rbd->image);
156 	}
157 
158 	free(rbd->disk.name);
159 	free(rbd->rbd_name);
160 	free(rbd->user_id);
161 	free(rbd->pool_name);
162 	bdev_rbd_free_config(rbd->config);
163 
164 	/* When rbd is destructed by bdev_rbd_destruct, it will not enter here
165 	 * because the ctx will already freed by bdev_rbd_free_cb in async manner.
166 	 * This path only happens during the rbd initialization procedure of rbd */
167 	if (rbd->ctx) {
168 		bdev_rbd_put_pool_ctx(rbd->ctx);
169 		rbd->ctx = NULL;
170 	}
171 
172 	if (rbd->cluster_name) {
173 		bdev_rbd_put_cluster(&rbd->cluster_p);
174 		free(rbd->cluster_name);
175 	} else if (rbd->cluster) {
176 		rados_shutdown(rbd->cluster);
177 	}
178 
179 	free(rbd);
180 }
181 
182 void
183 bdev_rbd_free_config(char **config)
184 {
185 	char **entry;
186 
187 	if (config) {
188 		for (entry = config; *entry; entry++) {
189 			free(*entry);
190 		}
191 		free(config);
192 	}
193 }
194 
195 char **
196 bdev_rbd_dup_config(const char *const *config)
197 {
198 	size_t count;
199 	char **copy;
200 
201 	if (!config) {
202 		return NULL;
203 	}
204 	for (count = 0; config[count]; count++) {}
205 	copy = calloc(count + 1, sizeof(*copy));
206 	if (!copy) {
207 		return NULL;
208 	}
209 	for (count = 0; config[count]; count++) {
210 		if (!(copy[count] = strdup(config[count]))) {
211 			bdev_rbd_free_config(copy);
212 			return NULL;
213 		}
214 	}
215 	return copy;
216 }
217 
218 static int
219 bdev_rados_cluster_init(const char *user_id, const char *const *config,
220 			rados_t *cluster)
221 {
222 	int ret;
223 
224 	ret = rados_create(cluster, user_id);
225 	if (ret < 0) {
226 		SPDK_ERRLOG("Failed to create rados_t struct\n");
227 		return -1;
228 	}
229 
230 	if (config) {
231 		const char *const *entry = config;
232 		while (*entry) {
233 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
234 			if (ret < 0) {
235 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
236 				rados_shutdown(*cluster);
237 				*cluster = NULL;
238 				return -1;
239 			}
240 			entry += 2;
241 		}
242 	} else {
243 		ret = rados_conf_read_file(*cluster, NULL);
244 		if (ret < 0) {
245 			SPDK_ERRLOG("Failed to read conf file\n");
246 			rados_shutdown(*cluster);
247 			*cluster = NULL;
248 			return -1;
249 		}
250 	}
251 
252 	ret = rados_connect(*cluster);
253 	if (ret < 0) {
254 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
255 		rados_shutdown(*cluster);
256 		*cluster = NULL;
257 		return -1;
258 	}
259 
260 	return 0;
261 }
262 
263 static int
264 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
265 {
266 	struct bdev_rbd_cluster *entry;
267 
268 	if (cluster == NULL) {
269 		SPDK_ERRLOG("cluster should not be NULL\n");
270 		return -1;
271 	}
272 
273 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
274 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
275 		if (strcmp(cluster_name, entry->name) == 0) {
276 			entry->ref++;
277 			*cluster = &entry->cluster;
278 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
279 			return 0;
280 		}
281 	}
282 
283 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
284 	return -1;
285 }
286 
287 static int
288 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
289 {
290 	int ret;
291 
292 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
293 	if (ret < 0) {
294 		SPDK_ERRLOG("Failed to create rados_t struct\n");
295 		return -1;
296 	}
297 
298 	return ret;
299 }
300 
301 static void *
302 bdev_rbd_cluster_handle(void *arg)
303 {
304 	void *ret = arg;
305 	struct bdev_rbd *rbd = arg;
306 	int rc;
307 
308 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
309 				     &rbd->cluster);
310 	if (rc < 0) {
311 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
312 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
313 		ret = NULL;
314 	}
315 
316 	return ret;
317 }
318 
319 static int
320 bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name,  struct bdev_rbd_pool_ctx **ctx)
321 {
322 	struct bdev_rbd_pool_ctx *entry;
323 
324 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
325 
326 	if (name == NULL || ctx == NULL) {
327 		return -1;
328 	}
329 
330 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) {
331 		if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) {
332 			entry->ref++;
333 			*ctx = entry;
334 			return 0;
335 		}
336 	}
337 
338 	entry = calloc(1, sizeof(*entry));
339 	if (!entry) {
340 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
341 		return -1;
342 	}
343 
344 	entry->name = strdup(name);
345 	if (entry->name == NULL) {
346 		SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry);
347 		goto err_handle;
348 	}
349 
350 	if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) {
351 		goto err_handle1;
352 	}
353 
354 	entry->cluster_p = cluster_p;
355 	entry->ref = 1;
356 	*ctx = entry;
357 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link);
358 
359 	return 0;
360 
361 err_handle1:
362 	free(entry->name);
363 err_handle:
364 	free(entry);
365 
366 	return -1;
367 }
368 
369 static void *
370 bdev_rbd_init_context(void *arg)
371 {
372 	struct bdev_rbd *rbd = arg;
373 	int rc;
374 
375 	if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->ctx) < 0) {
376 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
377 		return NULL;
378 	}
379 
380 	assert(rbd->ctx != NULL);
381 	rc = rbd_open(rbd->ctx->io_ctx, rbd->rbd_name, &rbd->image, NULL);
382 	if (rc < 0) {
383 		SPDK_ERRLOG("Failed to open specified rbd device\n");
384 		return NULL;
385 	}
386 
387 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
388 	if (rc < 0) {
389 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
390 		return NULL;
391 	}
392 
393 	return arg;
394 }
395 
396 static int
397 bdev_rbd_init(struct bdev_rbd *rbd)
398 {
399 	int ret = 0;
400 
401 	if (!rbd->cluster_name) {
402 		rbd->cluster_p = &rbd->cluster;
403 		/* Cluster should be created in non-SPDK thread to avoid conflict between
404 		 * Rados and SPDK thread */
405 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
406 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
407 			return -1;
408 		}
409 	} else {
410 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
411 		if (ret < 0) {
412 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
413 				    rbd, rbd->cluster_name);
414 			return -1;
415 		}
416 	}
417 
418 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
419 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
420 		return -1;
421 	}
422 
423 	return ret;
424 }
425 
426 static void
427 _bdev_rbd_io_complete(void *_rbd_io)
428 {
429 	struct bdev_rbd_io *rbd_io = _rbd_io;
430 
431 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
432 }
433 
434 static void
435 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
436 {
437 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
438 	struct spdk_thread *current_thread = spdk_get_thread();
439 
440 	rbd_io->status = status;
441 	assert(rbd_io->submit_td != NULL);
442 	if (rbd_io->submit_td != current_thread) {
443 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
444 	} else {
445 		_bdev_rbd_io_complete(rbd_io);
446 	}
447 }
448 
449 static void
450 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
451 {
452 	int io_status;
453 	struct spdk_bdev_io *bdev_io;
454 	struct bdev_rbd_io *rbd_io;
455 	enum spdk_bdev_io_status bio_status;
456 
457 	bdev_io = rbd_aio_get_arg(cb);
458 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
459 	io_status = rbd_aio_get_return_value(cb);
460 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
461 
462 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
463 		if ((int)rbd_io->total_len != io_status) {
464 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
465 		}
466 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
467 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE && io_status == -EILSEQ) {
468 		bio_status = SPDK_BDEV_IO_STATUS_MISCOMPARE;
469 #endif
470 	} else if (io_status != 0) { /* For others, 0 means success */
471 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
472 	}
473 
474 	rbd_aio_release(cb);
475 
476 	bdev_rbd_io_complete(bdev_io, bio_status);
477 }
478 
479 static void
480 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
481 		    struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
482 {
483 	int ret;
484 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
485 	rbd_image_t image = disk->image;
486 
487 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
488 					&rbd_io->comp);
489 	if (ret < 0) {
490 		goto err;
491 	}
492 
493 	switch (bdev_io->type) {
494 	case SPDK_BDEV_IO_TYPE_READ:
495 		rbd_io->total_len = len;
496 		if (spdk_likely(iovcnt == 1)) {
497 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base,
498 					   rbd_io->comp);
499 		} else {
500 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
501 		}
502 		break;
503 	case SPDK_BDEV_IO_TYPE_WRITE:
504 		if (spdk_likely(iovcnt == 1)) {
505 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base,
506 					    rbd_io->comp);
507 		} else {
508 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
509 		}
510 		break;
511 	case SPDK_BDEV_IO_TYPE_UNMAP:
512 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
513 		break;
514 	case SPDK_BDEV_IO_TYPE_FLUSH:
515 		ret = rbd_aio_flush(image, rbd_io->comp);
516 		break;
517 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
518 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0,
519 					   /* op_flags */ 0);
520 		break;
521 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
522 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
523 		ret = rbd_aio_compare_and_writev(image, offset, iov /* cmp */, iovcnt,
524 						 bdev_io->u.bdev.fused_iovs /* write */,
525 						 bdev_io->u.bdev.fused_iovcnt,
526 						 rbd_io->comp, NULL,
527 						 /* op_flags */ 0);
528 		break;
529 #endif
530 	default:
531 		/* This should not happen.
532 		 * Function should only be called with supported io types in bdev_rbd_submit_request
533 		 */
534 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
535 		ret = -ENOTSUP;
536 		break;
537 	}
538 
539 	if (ret < 0) {
540 		rbd_aio_release(rbd_io->comp);
541 		goto err;
542 	}
543 
544 	return;
545 
546 err:
547 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
548 }
549 
550 static void
551 bdev_rbd_start_aio(void *ctx)
552 {
553 	struct spdk_bdev_io *bdev_io = ctx;
554 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
555 
556 	_bdev_rbd_start_aio(disk,
557 			    bdev_io,
558 			    bdev_io->u.bdev.iovs,
559 			    bdev_io->u.bdev.iovcnt,
560 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
561 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
562 }
563 
564 static int bdev_rbd_library_init(void);
565 static void bdev_rbd_library_fini(void);
566 
567 static int
568 bdev_rbd_get_ctx_size(void)
569 {
570 	return sizeof(struct bdev_rbd_io);
571 }
572 
573 static struct spdk_bdev_module rbd_if = {
574 	.name = "rbd",
575 	.module_init = bdev_rbd_library_init,
576 	.module_fini = bdev_rbd_library_fini,
577 	.get_ctx_size = bdev_rbd_get_ctx_size,
578 
579 };
580 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
581 
582 static int bdev_rbd_reset_timer(void *arg);
583 
584 static void
585 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
586 			       void *cb_arg, int rc)
587 {
588 	struct bdev_rbd *disk = cb_arg;
589 	enum spdk_bdev_io_status bio_status;
590 
591 	if (rc == 0 && current_qd > 0) {
592 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
593 		return;
594 	}
595 
596 	if (rc != 0) {
597 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
598 	} else {
599 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
600 	}
601 
602 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
603 	disk->reset_bdev_io = NULL;
604 }
605 
606 static int
607 bdev_rbd_reset_timer(void *arg)
608 {
609 	struct bdev_rbd *disk = arg;
610 
611 	spdk_poller_unregister(&disk->reset_timer);
612 
613 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
614 
615 	return SPDK_POLLER_BUSY;
616 }
617 
618 static void
619 bdev_rbd_reset(void *ctx)
620 {
621 	struct spdk_bdev_io *bdev_io = ctx;
622 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
623 
624 	/*
625 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
626 	 * poller to wait for in-flight I/O to complete.
627 	 */
628 	assert(disk->reset_bdev_io == NULL);
629 	disk->reset_bdev_io = bdev_io;
630 
631 	bdev_rbd_reset_timer(disk);
632 }
633 
634 static void
635 _bdev_rbd_destruct_done(void *io_device)
636 {
637 	struct bdev_rbd *rbd = io_device;
638 
639 	assert(rbd != NULL);
640 
641 	spdk_bdev_destruct_done(&rbd->disk, 0);
642 	bdev_rbd_free(rbd);
643 }
644 
645 static void
646 bdev_rbd_free_cb(void *io_device)
647 {
648 	struct bdev_rbd *rbd = io_device;
649 
650 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
651 
652 	/* free the ctx */
653 	if (rbd->ctx) {
654 		bdev_rbd_put_pool_ctx(rbd->ctx);
655 		rbd->ctx = NULL;
656 	}
657 
658 	/* The io device has been unregistered.  Send a message back to the
659 	 * original thread that started the destruct operation, so that the
660 	 * bdev unregister callback is invoked on the same thread that started
661 	 * this whole process.
662 	 */
663 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
664 }
665 
666 static void
667 _bdev_rbd_destruct(void *ctx)
668 {
669 	struct bdev_rbd *rbd = ctx;
670 
671 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
672 }
673 
674 static int
675 bdev_rbd_destruct(void *ctx)
676 {
677 	struct bdev_rbd *rbd = ctx;
678 
679 	/* Start the destruct operation on the rbd bdev's
680 	 * main thread.  This guarantees it will only start
681 	 * executing after any messages related to channel
682 	 * deletions have finished completing.  *Always*
683 	 * send a message, even if this function gets called
684 	 * from the main thread, in case there are pending
685 	 * channel delete messages in flight to this thread.
686 	 */
687 	assert(rbd->destruct_td == NULL);
688 	rbd->destruct_td = spdk_get_thread();
689 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _bdev_rbd_destruct, rbd);
690 
691 	/* Return 1 to indicate the destruct path is asynchronous. */
692 	return 1;
693 }
694 
695 static void
696 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
697 		    bool success)
698 {
699 	if (!success) {
700 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
701 		return;
702 	}
703 
704 	bdev_rbd_start_aio(bdev_io);
705 }
706 
707 static void
708 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
709 {
710 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
711 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
712 
713 	rbd_io->submit_td = submit_td;
714 	switch (bdev_io->type) {
715 	case SPDK_BDEV_IO_TYPE_READ:
716 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
717 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
718 		break;
719 
720 	case SPDK_BDEV_IO_TYPE_WRITE:
721 	case SPDK_BDEV_IO_TYPE_UNMAP:
722 	case SPDK_BDEV_IO_TYPE_FLUSH:
723 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
724 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
725 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
726 #endif
727 		bdev_rbd_start_aio(bdev_io);
728 		break;
729 
730 	case SPDK_BDEV_IO_TYPE_RESET:
731 		spdk_thread_exec_msg(spdk_thread_get_app_thread(), bdev_rbd_reset, bdev_io);
732 		break;
733 
734 	default:
735 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
736 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
737 		break;
738 	}
739 }
740 
741 static bool
742 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
743 {
744 	switch (io_type) {
745 	case SPDK_BDEV_IO_TYPE_READ:
746 	case SPDK_BDEV_IO_TYPE_WRITE:
747 	case SPDK_BDEV_IO_TYPE_UNMAP:
748 	case SPDK_BDEV_IO_TYPE_FLUSH:
749 	case SPDK_BDEV_IO_TYPE_RESET:
750 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
751 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
752 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
753 #endif
754 		return true;
755 
756 	default:
757 		return false;
758 	}
759 }
760 
761 static int
762 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
763 {
764 	struct bdev_rbd_io_channel *ch = ctx_buf;
765 	struct bdev_rbd *disk = io_device;
766 
767 	ch->disk = disk;
768 	ch->group_ch = spdk_get_io_channel(&rbd_if);
769 	assert(ch->group_ch != NULL);
770 
771 	return 0;
772 }
773 
774 static void
775 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
776 {
777 	struct bdev_rbd_io_channel *ch = ctx_buf;
778 
779 	spdk_put_io_channel(ch->group_ch);
780 }
781 
782 static struct spdk_io_channel *
783 bdev_rbd_get_io_channel(void *ctx)
784 {
785 	struct bdev_rbd *rbd_bdev = ctx;
786 
787 	return spdk_get_io_channel(rbd_bdev);
788 }
789 
790 static void
791 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
792 {
793 	struct bdev_rbd_cluster *entry;
794 
795 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
796 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
797 		if (strcmp(cluster_name, entry->name)) {
798 			continue;
799 		}
800 		if (entry->user_id) {
801 			spdk_json_write_named_string(w, "user_id", entry->user_id);
802 		}
803 
804 		if (entry->config_param) {
805 			char **config_entry = entry->config_param;
806 
807 			spdk_json_write_named_object_begin(w, "config_param");
808 			while (*config_entry) {
809 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
810 				config_entry += 2;
811 			}
812 			spdk_json_write_object_end(w);
813 		}
814 		if (entry->config_file) {
815 			spdk_json_write_named_string(w, "config_file", entry->config_file);
816 		}
817 		if (entry->key_file) {
818 			spdk_json_write_named_string(w, "key_file", entry->key_file);
819 		}
820 
821 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
822 		return;
823 	}
824 
825 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
826 }
827 
828 static int
829 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
830 {
831 	struct bdev_rbd *rbd_bdev = ctx;
832 
833 	spdk_json_write_named_object_begin(w, "rbd");
834 
835 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
836 
837 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
838 
839 	if (rbd_bdev->cluster_name) {
840 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
841 		goto end;
842 	}
843 
844 	if (rbd_bdev->user_id) {
845 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
846 	}
847 
848 	if (rbd_bdev->config) {
849 		char **entry = rbd_bdev->config;
850 
851 		spdk_json_write_named_object_begin(w, "config");
852 		while (*entry) {
853 			spdk_json_write_named_string(w, entry[0], entry[1]);
854 			entry += 2;
855 		}
856 		spdk_json_write_object_end(w);
857 	}
858 
859 end:
860 	spdk_json_write_object_end(w);
861 
862 	return 0;
863 }
864 
865 static void
866 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
867 {
868 	struct bdev_rbd *rbd = bdev->ctxt;
869 	char uuid_str[SPDK_UUID_STRING_LEN];
870 
871 	spdk_json_write_object_begin(w);
872 
873 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
874 
875 	spdk_json_write_named_object_begin(w, "params");
876 	spdk_json_write_named_string(w, "name", bdev->name);
877 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
878 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
879 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
880 	if (rbd->user_id) {
881 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
882 	}
883 
884 	if (rbd->config) {
885 		char **entry = rbd->config;
886 
887 		spdk_json_write_named_object_begin(w, "config");
888 		while (*entry) {
889 			spdk_json_write_named_string(w, entry[0], entry[1]);
890 			entry += 2;
891 		}
892 		spdk_json_write_object_end(w);
893 	}
894 
895 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
896 	spdk_json_write_named_string(w, "uuid", uuid_str);
897 
898 	spdk_json_write_object_end(w);
899 
900 	spdk_json_write_object_end(w);
901 }
902 
903 static void
904 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
905 {
906 	assert(entry != NULL);
907 
908 	spdk_json_write_object_begin(w);
909 	spdk_json_write_named_string(w, "cluster_name", entry->name);
910 
911 	if (entry->user_id) {
912 		spdk_json_write_named_string(w, "user_id", entry->user_id);
913 	}
914 
915 	if (entry->config_param) {
916 		char **config_entry = entry->config_param;
917 
918 		spdk_json_write_named_object_begin(w, "config_param");
919 		while (*config_entry) {
920 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
921 			config_entry += 2;
922 		}
923 		spdk_json_write_object_end(w);
924 	}
925 	if (entry->config_file) {
926 		spdk_json_write_named_string(w, "config_file", entry->config_file);
927 	}
928 	if (entry->key_file) {
929 		spdk_json_write_named_string(w, "key_file", entry->key_file);
930 	}
931 
932 	if (entry->core_mask) {
933 		spdk_json_write_named_string(w, "core_mask", entry->core_mask);
934 	}
935 
936 	spdk_json_write_object_end(w);
937 }
938 
939 int
940 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
941 {
942 	struct bdev_rbd_cluster *entry;
943 	struct spdk_json_write_ctx *w;
944 
945 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
946 
947 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
948 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
949 		return -ENOENT;
950 	}
951 
952 	/* If cluster name is provided */
953 	if (name) {
954 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
955 			if (strcmp(name, entry->name) == 0) {
956 				w = spdk_jsonrpc_begin_result(request);
957 				dump_single_cluster_entry(entry, w);
958 				spdk_jsonrpc_end_result(request, w);
959 
960 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
961 				return 0;
962 			}
963 		}
964 
965 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
966 		return -ENOENT;
967 	}
968 
969 	w = spdk_jsonrpc_begin_result(request);
970 	spdk_json_write_array_begin(w);
971 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
972 		dump_single_cluster_entry(entry, w);
973 	}
974 	spdk_json_write_array_end(w);
975 	spdk_jsonrpc_end_result(request, w);
976 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
977 
978 	return 0;
979 }
980 
981 static const struct spdk_bdev_fn_table rbd_fn_table = {
982 	.destruct		= bdev_rbd_destruct,
983 	.submit_request		= bdev_rbd_submit_request,
984 	.io_type_supported	= bdev_rbd_io_type_supported,
985 	.get_io_channel		= bdev_rbd_get_io_channel,
986 	.dump_info_json		= bdev_rbd_dump_info_json,
987 	.write_config_json	= bdev_rbd_write_config_json,
988 };
989 
990 static int
991 rbd_thread_set_cpumask(struct spdk_cpuset *set)
992 {
993 #ifdef __linux__
994 	uint32_t lcore;
995 	cpu_set_t mask;
996 
997 	assert(set != NULL);
998 	CPU_ZERO(&mask);
999 
1000 	/* get the core id on current spdk_cpuset and set to cpu_set_t */
1001 	for (lcore = 0; lcore < SPDK_CPUSET_SIZE; lcore++) {
1002 		if (spdk_cpuset_get_cpu(set, lcore)) {
1003 			CPU_SET(lcore, &mask);
1004 		}
1005 	}
1006 
1007 	/* change current thread core mask */
1008 	if (sched_setaffinity(0, sizeof(mask), &mask) < 0) {
1009 		SPDK_ERRLOG("Set non SPDK thread cpu mask error (errno=%d)\n", errno);
1010 		return -1;
1011 	}
1012 
1013 	return 0;
1014 #else
1015 	SPDK_ERRLOG("SPDK non spdk thread cpumask setup supports only Linux platform now.\n");
1016 	return -ENOTSUP;
1017 #endif
1018 }
1019 
1020 
1021 static int
1022 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
1023 		     const char *config_file, const char *key_file, const char *core_mask)
1024 {
1025 	struct bdev_rbd_cluster *entry;
1026 	struct spdk_cpuset rbd_core_mask = {};
1027 	int rc;
1028 
1029 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1030 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1031 		if (strcmp(name, entry->name) == 0) {
1032 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
1033 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1034 			return -1;
1035 		}
1036 	}
1037 
1038 	entry = calloc(1, sizeof(*entry));
1039 	if (!entry) {
1040 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1041 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1042 		return -1;
1043 	}
1044 
1045 	entry->name = strdup(name);
1046 	if (entry->name == NULL) {
1047 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1048 		goto err_handle;
1049 	}
1050 
1051 	if (user_id) {
1052 		entry->user_id = strdup(user_id);
1053 		if (entry->user_id == NULL) {
1054 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1055 			goto err_handle;
1056 		}
1057 	}
1058 
1059 	/* Support specify config_param or config_file separately, or both of them. */
1060 	if (config_param) {
1061 		entry->config_param = bdev_rbd_dup_config(config_param);
1062 		if (entry->config_param == NULL) {
1063 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1064 			goto err_handle;
1065 		}
1066 	}
1067 
1068 	if (config_file) {
1069 		entry->config_file = strdup(config_file);
1070 		if (entry->config_file == NULL) {
1071 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1072 			goto err_handle;
1073 		}
1074 	}
1075 
1076 	if (key_file) {
1077 		entry->key_file = strdup(key_file);
1078 		if (entry->key_file == NULL) {
1079 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1080 			goto err_handle;
1081 		}
1082 	}
1083 
1084 	if (core_mask) {
1085 		entry->core_mask = strdup(core_mask);
1086 		if (entry->core_mask == NULL) {
1087 			SPDK_ERRLOG("Core_mask=%s allocation failed on entry = %p\n", core_mask, entry);
1088 			goto err_handle;
1089 		}
1090 
1091 		if (spdk_cpuset_parse(&rbd_core_mask, entry->core_mask) < 0) {
1092 			SPDK_ERRLOG("Invalid cpumask=%s on entry = %p\n", entry->core_mask, entry);
1093 			goto err_handle;
1094 		}
1095 
1096 		if (rbd_thread_set_cpumask(&rbd_core_mask) < 0) {
1097 			SPDK_ERRLOG("Failed to change rbd threads to core_mask %s on entry = %p\n", core_mask, entry);
1098 			goto err_handle;
1099 		}
1100 	}
1101 
1102 
1103 	/* If rbd thread core mask is given, rados_create() must execute with
1104 	 * the affinity set by rbd_thread_set_cpumask(). The affinity set
1105 	 * by rbd_thread_set_cpumask() will be reverted once rbd_register_cluster() returns
1106 	 * and when we leave the spdk_call_unaffinitized context. */
1107 	rc = rados_create(&entry->cluster, user_id);
1108 	if (rc < 0) {
1109 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1110 		goto err_handle;
1111 	}
1112 
1113 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1114 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1115 	if (entry->config_file && rc < 0) {
1116 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1117 		rados_shutdown(entry->cluster);
1118 		goto err_handle;
1119 	}
1120 
1121 	if (config_param) {
1122 		const char *const *config_entry = config_param;
1123 		while (*config_entry) {
1124 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1125 			if (rc < 0) {
1126 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1127 				rados_shutdown(entry->cluster);
1128 				goto err_handle;
1129 			}
1130 			config_entry += 2;
1131 		}
1132 	}
1133 
1134 	if (key_file) {
1135 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1136 		if (rc < 0) {
1137 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1138 			rados_shutdown(entry->cluster);
1139 			goto err_handle;
1140 		}
1141 	}
1142 
1143 	rc = rados_connect(entry->cluster);
1144 	if (rc < 0) {
1145 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1146 		rados_shutdown(entry->cluster);
1147 		goto err_handle;
1148 	}
1149 
1150 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1151 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1152 
1153 	return 0;
1154 
1155 err_handle:
1156 	bdev_rbd_cluster_free(entry);
1157 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1158 	return -1;
1159 }
1160 
1161 int
1162 bdev_rbd_unregister_cluster(const char *name)
1163 {
1164 	struct bdev_rbd_cluster *entry;
1165 	int rc = 0;
1166 
1167 	if (name == NULL) {
1168 		return -1;
1169 	}
1170 
1171 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1172 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1173 		if (strcmp(name, entry->name) == 0) {
1174 			if (entry->ref == 0) {
1175 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1176 				rados_shutdown(entry->cluster);
1177 				bdev_rbd_cluster_free(entry);
1178 			} else {
1179 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1180 					    entry->name);
1181 				rc = -1;
1182 			}
1183 
1184 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1185 			return rc;
1186 		}
1187 	}
1188 
1189 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1190 
1191 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1192 
1193 	return -1;
1194 }
1195 
1196 static void *
1197 _bdev_rbd_register_cluster(void *arg)
1198 {
1199 	struct cluster_register_info *info = arg;
1200 	void *ret = arg;
1201 	int rc;
1202 
1203 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1204 				  (const char *const *)info->config_param, (const char *)info->config_file,
1205 				  (const char *)info->key_file, info->core_mask);
1206 	if (rc) {
1207 		ret = NULL;
1208 	}
1209 
1210 	return ret;
1211 }
1212 
1213 int
1214 bdev_rbd_register_cluster(struct cluster_register_info *info)
1215 {
1216 	assert(info != NULL);
1217 
1218 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1219 	 * resource contention */
1220 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1221 		return -1;
1222 	}
1223 
1224 	return 0;
1225 }
1226 
1227 int
1228 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1229 		const char *pool_name,
1230 		const char *const *config,
1231 		const char *rbd_name,
1232 		uint32_t block_size,
1233 		const char *cluster_name,
1234 		const struct spdk_uuid *uuid)
1235 {
1236 	struct bdev_rbd *rbd;
1237 	int ret;
1238 
1239 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1240 		return -EINVAL;
1241 	}
1242 
1243 	rbd = calloc(1, sizeof(struct bdev_rbd));
1244 	if (rbd == NULL) {
1245 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1246 		return -ENOMEM;
1247 	}
1248 
1249 	rbd->rbd_name = strdup(rbd_name);
1250 	if (!rbd->rbd_name) {
1251 		bdev_rbd_free(rbd);
1252 		return -ENOMEM;
1253 	}
1254 
1255 	if (user_id) {
1256 		rbd->user_id = strdup(user_id);
1257 		if (!rbd->user_id) {
1258 			bdev_rbd_free(rbd);
1259 			return -ENOMEM;
1260 		}
1261 	}
1262 
1263 	if (cluster_name) {
1264 		rbd->cluster_name = strdup(cluster_name);
1265 		if (!rbd->cluster_name) {
1266 			bdev_rbd_free(rbd);
1267 			return -ENOMEM;
1268 		}
1269 	}
1270 	rbd->pool_name = strdup(pool_name);
1271 	if (!rbd->pool_name) {
1272 		bdev_rbd_free(rbd);
1273 		return -ENOMEM;
1274 	}
1275 
1276 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1277 		bdev_rbd_free(rbd);
1278 		return -ENOMEM;
1279 	}
1280 
1281 	ret = bdev_rbd_init(rbd);
1282 	if (ret < 0) {
1283 		bdev_rbd_free(rbd);
1284 		SPDK_ERRLOG("Failed to init rbd device\n");
1285 		return ret;
1286 	}
1287 
1288 	if (uuid) {
1289 		rbd->disk.uuid = *uuid;
1290 	}
1291 
1292 	if (name) {
1293 		rbd->disk.name = strdup(name);
1294 	} else {
1295 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1296 	}
1297 	if (!rbd->disk.name) {
1298 		bdev_rbd_free(rbd);
1299 		return -ENOMEM;
1300 	}
1301 	rbd->disk.product_name = "Ceph Rbd Disk";
1302 	bdev_rbd_count++;
1303 
1304 	rbd->disk.write_cache = 0;
1305 	rbd->disk.blocklen = block_size;
1306 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1307 	rbd->disk.ctxt = rbd;
1308 	rbd->disk.fn_table = &rbd_fn_table;
1309 	rbd->disk.module = &rbd_if;
1310 
1311 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1312 
1313 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1314 				bdev_rbd_destroy_cb,
1315 				sizeof(struct bdev_rbd_io_channel),
1316 				rbd_name);
1317 	ret = spdk_bdev_register(&rbd->disk);
1318 	if (ret) {
1319 		spdk_io_device_unregister(rbd, NULL);
1320 		bdev_rbd_free(rbd);
1321 		return ret;
1322 	}
1323 
1324 	*bdev = &(rbd->disk);
1325 
1326 	return ret;
1327 }
1328 
1329 void
1330 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1331 {
1332 	int rc;
1333 
1334 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1335 	if (rc != 0) {
1336 		cb_fn(cb_arg, rc);
1337 	}
1338 }
1339 
1340 static void
1341 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1342 {
1343 }
1344 
1345 int
1346 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1347 {
1348 	struct spdk_bdev_desc *desc;
1349 	struct spdk_bdev *bdev;
1350 	struct spdk_io_channel *ch;
1351 	struct bdev_rbd_io_channel *rbd_io_ch;
1352 	int rc = 0;
1353 	uint64_t new_size_in_byte;
1354 	uint64_t current_size_in_mb;
1355 
1356 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1357 	if (rc != 0) {
1358 		return rc;
1359 	}
1360 
1361 	bdev = spdk_bdev_desc_get_bdev(desc);
1362 
1363 	if (bdev->module != &rbd_if) {
1364 		rc = -EINVAL;
1365 		goto exit;
1366 	}
1367 
1368 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1369 	if (current_size_in_mb > new_size_in_mb) {
1370 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1371 		rc = -EINVAL;
1372 		goto exit;
1373 	}
1374 
1375 	ch = bdev_rbd_get_io_channel(bdev);
1376 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1377 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1378 
1379 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1380 	spdk_put_io_channel(ch);
1381 	if (rc != 0) {
1382 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1383 		goto exit;
1384 	}
1385 
1386 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1387 	if (rc != 0) {
1388 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1389 	}
1390 
1391 exit:
1392 	spdk_bdev_close(desc);
1393 	return rc;
1394 }
1395 
1396 static int
1397 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1398 {
1399 	return 0;
1400 }
1401 
1402 static void
1403 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1404 {
1405 }
1406 
1407 static int
1408 bdev_rbd_library_init(void)
1409 {
1410 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1411 				0, "bdev_rbd_poll_groups");
1412 	return 0;
1413 }
1414 
1415 static void
1416 bdev_rbd_library_fini(void)
1417 {
1418 	spdk_io_device_unregister(&rbd_if, NULL);
1419 }
1420 
1421 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1422