xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd_pool_ctx {
27 	rados_t *cluster_p;
28 	char *name;
29 	rados_ioctx_t io_ctx;
30 	uint32_t ref;
31 	STAILQ_ENTRY(bdev_rbd_pool_ctx) link;
32 };
33 
34 static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER(
35 			g_map_bdev_rbd_pool_ctx);
36 
37 struct bdev_rbd {
38 	struct spdk_bdev disk;
39 	char *rbd_name;
40 	char *user_id;
41 	char *pool_name;
42 	char **config;
43 
44 	rados_t cluster;
45 	rados_t *cluster_p;
46 	char *cluster_name;
47 
48 	union rbd_ctx {
49 		rados_ioctx_t io_ctx;
50 		struct bdev_rbd_pool_ctx *ctx;
51 	} rados_ctx;
52 
53 	rbd_image_t image;
54 
55 	rbd_image_info_t info;
56 	struct spdk_thread *destruct_td;
57 
58 	TAILQ_ENTRY(bdev_rbd) tailq;
59 	struct spdk_poller *reset_timer;
60 	struct spdk_bdev_io *reset_bdev_io;
61 };
62 
63 struct bdev_rbd_io_channel {
64 	struct bdev_rbd *disk;
65 	struct spdk_io_channel *group_ch;
66 };
67 
68 struct bdev_rbd_io {
69 	struct			spdk_thread *submit_td;
70 	enum			spdk_bdev_io_status status;
71 	rbd_completion_t	comp;
72 	size_t			total_len;
73 };
74 
75 struct bdev_rbd_cluster {
76 	char *name;
77 	char *user_id;
78 	char **config_param;
79 	char *config_file;
80 	char *key_file;
81 	char *core_mask;
82 	rados_t cluster;
83 	uint32_t ref;
84 	STAILQ_ENTRY(bdev_rbd_cluster) link;
85 };
86 
87 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
88 			g_map_bdev_rbd_cluster);
89 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
90 
91 static void
92 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
93 {
94 	assert(entry != NULL);
95 
96 	bdev_rbd_free_config(entry->config_param);
97 	free(entry->config_file);
98 	free(entry->key_file);
99 	free(entry->user_id);
100 	free(entry->name);
101 	free(entry->core_mask);
102 	free(entry);
103 }
104 
105 static void
106 bdev_rbd_put_cluster(rados_t **cluster)
107 {
108 	struct bdev_rbd_cluster *entry;
109 
110 	assert(cluster != NULL);
111 
112 	/* No need go through the map if *cluster equals to NULL */
113 	if (*cluster == NULL) {
114 		return;
115 	}
116 
117 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
118 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
119 		if (*cluster != &entry->cluster) {
120 			continue;
121 		}
122 
123 		assert(entry->ref > 0);
124 		entry->ref--;
125 		*cluster = NULL;
126 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
127 		return;
128 	}
129 
130 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
131 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
132 }
133 
134 static void
135 bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry)
136 {
137 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
138 
139 	assert(entry != NULL);
140 	assert(entry->ref > 0);
141 	entry->ref--;
142 	if (entry->ref == 0) {
143 		STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link);
144 		rados_ioctx_destroy(entry->io_ctx);
145 		free(entry->name);
146 		free(entry);
147 	}
148 }
149 
150 static void
151 bdev_rbd_free(struct bdev_rbd *rbd)
152 {
153 	if (!rbd) {
154 		return;
155 	}
156 
157 	if (rbd->image) {
158 		rbd_flush(rbd->image);
159 		rbd_close(rbd->image);
160 	}
161 
162 	free(rbd->disk.name);
163 	free(rbd->rbd_name);
164 	free(rbd->user_id);
165 	free(rbd->pool_name);
166 	bdev_rbd_free_config(rbd->config);
167 
168 	if (rbd->cluster_name) {
169 		/* When rbd is destructed by bdev_rbd_destruct, it will not enter here
170 		 * because the ctx will already freed by bdev_rbd_free_cb in async manner.
171 		 * This path only happens during the rbd initialization procedure of rbd */
172 		if (rbd->rados_ctx.ctx) {
173 			bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx);
174 			rbd->rados_ctx.ctx = NULL;
175 		}
176 
177 		bdev_rbd_put_cluster(&rbd->cluster_p);
178 		free(rbd->cluster_name);
179 	} else if (rbd->cluster) {
180 		if (rbd->rados_ctx.io_ctx) {
181 			rados_ioctx_destroy(rbd->rados_ctx.io_ctx);
182 		}
183 		rados_shutdown(rbd->cluster);
184 	}
185 
186 	free(rbd);
187 }
188 
189 void
190 bdev_rbd_free_config(char **config)
191 {
192 	char **entry;
193 
194 	if (config) {
195 		for (entry = config; *entry; entry++) {
196 			free(*entry);
197 		}
198 		free(config);
199 	}
200 }
201 
202 char **
203 bdev_rbd_dup_config(const char *const *config)
204 {
205 	size_t count;
206 	char **copy;
207 
208 	if (!config) {
209 		return NULL;
210 	}
211 	for (count = 0; config[count]; count++) {}
212 	copy = calloc(count + 1, sizeof(*copy));
213 	if (!copy) {
214 		return NULL;
215 	}
216 	for (count = 0; config[count]; count++) {
217 		if (!(copy[count] = strdup(config[count]))) {
218 			bdev_rbd_free_config(copy);
219 			return NULL;
220 		}
221 	}
222 	return copy;
223 }
224 
225 static int
226 bdev_rados_cluster_init(const char *user_id, const char *const *config,
227 			rados_t *cluster)
228 {
229 	int ret;
230 
231 	ret = rados_create(cluster, user_id);
232 	if (ret < 0) {
233 		SPDK_ERRLOG("Failed to create rados_t struct\n");
234 		return -1;
235 	}
236 
237 	if (config) {
238 		const char *const *entry = config;
239 		while (*entry) {
240 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
241 			if (ret < 0) {
242 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
243 				rados_shutdown(*cluster);
244 				*cluster = NULL;
245 				return -1;
246 			}
247 			entry += 2;
248 		}
249 	} else {
250 		ret = rados_conf_read_file(*cluster, NULL);
251 		if (ret < 0) {
252 			SPDK_ERRLOG("Failed to read conf file\n");
253 			rados_shutdown(*cluster);
254 			*cluster = NULL;
255 			return -1;
256 		}
257 	}
258 
259 	ret = rados_connect(*cluster);
260 	if (ret < 0) {
261 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
262 		rados_shutdown(*cluster);
263 		*cluster = NULL;
264 		return -1;
265 	}
266 
267 	return 0;
268 }
269 
270 static int
271 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
272 {
273 	struct bdev_rbd_cluster *entry;
274 
275 	if (cluster == NULL) {
276 		SPDK_ERRLOG("cluster should not be NULL\n");
277 		return -1;
278 	}
279 
280 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
281 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
282 		if (strcmp(cluster_name, entry->name) == 0) {
283 			entry->ref++;
284 			*cluster = &entry->cluster;
285 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
286 			return 0;
287 		}
288 	}
289 
290 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
291 	return -1;
292 }
293 
294 static int
295 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
296 {
297 	int ret;
298 
299 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
300 	if (ret < 0) {
301 		SPDK_ERRLOG("Failed to create rados_t struct\n");
302 		return -1;
303 	}
304 
305 	return ret;
306 }
307 
308 static void *
309 bdev_rbd_cluster_handle(void *arg)
310 {
311 	void *ret = arg;
312 	struct bdev_rbd *rbd = arg;
313 	int rc;
314 
315 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
316 				     &rbd->cluster);
317 	if (rc < 0) {
318 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
319 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
320 		ret = NULL;
321 	}
322 
323 	return ret;
324 }
325 
326 static int
327 bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name,  struct bdev_rbd_pool_ctx **ctx)
328 {
329 	struct bdev_rbd_pool_ctx *entry;
330 
331 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
332 
333 	if (name == NULL || ctx == NULL) {
334 		return -1;
335 	}
336 
337 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) {
338 		if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) {
339 			entry->ref++;
340 			*ctx = entry;
341 			return 0;
342 		}
343 	}
344 
345 	entry = calloc(1, sizeof(*entry));
346 	if (!entry) {
347 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
348 		return -1;
349 	}
350 
351 	entry->name = strdup(name);
352 	if (entry->name == NULL) {
353 		SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry);
354 		goto err_handle;
355 	}
356 
357 	if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) {
358 		goto err_handle1;
359 	}
360 
361 	entry->cluster_p = cluster_p;
362 	entry->ref = 1;
363 	*ctx = entry;
364 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link);
365 
366 	return 0;
367 
368 err_handle1:
369 	free(entry->name);
370 err_handle:
371 	free(entry);
372 
373 	return -1;
374 }
375 
376 static void *
377 bdev_rbd_init_context(void *arg)
378 {
379 	struct bdev_rbd *rbd = arg;
380 	int rc;
381 	rados_ioctx_t *io_ctx = NULL;
382 
383 	if (rbd->cluster_name) {
384 		if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->rados_ctx.ctx) < 0) {
385 			SPDK_ERRLOG("Failed to create ioctx on rbd=%p with cluster_name=%s\n",
386 				    rbd, rbd->cluster_name);
387 			return NULL;
388 		}
389 		io_ctx = &rbd->rados_ctx.ctx->io_ctx;
390 	} else {
391 		if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->rados_ctx.io_ctx) < 0) {
392 			SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
393 			return NULL;
394 		}
395 		io_ctx = &rbd->rados_ctx.io_ctx;
396 	}
397 
398 	assert(io_ctx != NULL);
399 	rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
400 	if (rc < 0) {
401 		SPDK_ERRLOG("Failed to open specified rbd device\n");
402 		return NULL;
403 	}
404 
405 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
406 	if (rc < 0) {
407 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
408 		return NULL;
409 	}
410 
411 	return arg;
412 }
413 
414 static int
415 bdev_rbd_init(struct bdev_rbd *rbd)
416 {
417 	int ret = 0;
418 
419 	if (!rbd->cluster_name) {
420 		rbd->cluster_p = &rbd->cluster;
421 		/* Cluster should be created in non-SPDK thread to avoid conflict between
422 		 * Rados and SPDK thread */
423 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
424 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
425 			return -1;
426 		}
427 	} else {
428 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
429 		if (ret < 0) {
430 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
431 				    rbd, rbd->cluster_name);
432 			return -1;
433 		}
434 	}
435 
436 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
437 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
438 		return -1;
439 	}
440 
441 	return ret;
442 }
443 
444 static void
445 _bdev_rbd_io_complete(void *_rbd_io)
446 {
447 	struct bdev_rbd_io *rbd_io = _rbd_io;
448 
449 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
450 }
451 
452 static void
453 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
454 {
455 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
456 	struct spdk_thread *current_thread = spdk_get_thread();
457 
458 	rbd_io->status = status;
459 	assert(rbd_io->submit_td != NULL);
460 	if (rbd_io->submit_td != current_thread) {
461 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
462 	} else {
463 		_bdev_rbd_io_complete(rbd_io);
464 	}
465 }
466 
467 static void
468 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
469 {
470 	int io_status;
471 	struct spdk_bdev_io *bdev_io;
472 	struct bdev_rbd_io *rbd_io;
473 	enum spdk_bdev_io_status bio_status;
474 
475 	bdev_io = rbd_aio_get_arg(cb);
476 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
477 	io_status = rbd_aio_get_return_value(cb);
478 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
479 
480 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
481 		if ((int)rbd_io->total_len != io_status) {
482 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
483 		}
484 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
485 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE && io_status == -EILSEQ) {
486 		bio_status = SPDK_BDEV_IO_STATUS_MISCOMPARE;
487 #endif
488 	} else if (io_status != 0) { /* For others, 0 means success */
489 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
490 	}
491 
492 	rbd_aio_release(cb);
493 
494 	bdev_rbd_io_complete(bdev_io, bio_status);
495 }
496 
497 static void
498 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
499 		    struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
500 {
501 	int ret;
502 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
503 	rbd_image_t image = disk->image;
504 
505 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
506 					&rbd_io->comp);
507 	if (ret < 0) {
508 		goto err;
509 	}
510 
511 	switch (bdev_io->type) {
512 	case SPDK_BDEV_IO_TYPE_READ:
513 		rbd_io->total_len = len;
514 		if (spdk_likely(iovcnt == 1)) {
515 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base,
516 					   rbd_io->comp);
517 		} else {
518 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
519 		}
520 		break;
521 	case SPDK_BDEV_IO_TYPE_WRITE:
522 		if (spdk_likely(iovcnt == 1)) {
523 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base,
524 					    rbd_io->comp);
525 		} else {
526 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
527 		}
528 		break;
529 	case SPDK_BDEV_IO_TYPE_UNMAP:
530 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
531 		break;
532 	case SPDK_BDEV_IO_TYPE_FLUSH:
533 		ret = rbd_aio_flush(image, rbd_io->comp);
534 		break;
535 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
536 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0,
537 					   /* op_flags */ 0);
538 		break;
539 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
540 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
541 		ret = rbd_aio_compare_and_writev(image, offset, iov /* cmp */, iovcnt,
542 						 bdev_io->u.bdev.fused_iovs /* write */,
543 						 bdev_io->u.bdev.fused_iovcnt,
544 						 rbd_io->comp, NULL,
545 						 /* op_flags */ 0);
546 		break;
547 #endif
548 	default:
549 		/* This should not happen.
550 		 * Function should only be called with supported io types in bdev_rbd_submit_request
551 		 */
552 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
553 		ret = -ENOTSUP;
554 		break;
555 	}
556 
557 	if (ret < 0) {
558 		rbd_aio_release(rbd_io->comp);
559 		goto err;
560 	}
561 
562 	return;
563 
564 err:
565 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
566 }
567 
568 static void
569 bdev_rbd_start_aio(void *ctx)
570 {
571 	struct spdk_bdev_io *bdev_io = ctx;
572 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
573 
574 	_bdev_rbd_start_aio(disk,
575 			    bdev_io,
576 			    bdev_io->u.bdev.iovs,
577 			    bdev_io->u.bdev.iovcnt,
578 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
579 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
580 }
581 
582 static int bdev_rbd_library_init(void);
583 static void bdev_rbd_library_fini(void);
584 
585 static int
586 bdev_rbd_get_ctx_size(void)
587 {
588 	return sizeof(struct bdev_rbd_io);
589 }
590 
591 static struct spdk_bdev_module rbd_if = {
592 	.name = "rbd",
593 	.module_init = bdev_rbd_library_init,
594 	.module_fini = bdev_rbd_library_fini,
595 	.get_ctx_size = bdev_rbd_get_ctx_size,
596 
597 };
598 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
599 
600 static int bdev_rbd_reset_timer(void *arg);
601 
602 static void
603 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
604 			       void *cb_arg, int rc)
605 {
606 	struct bdev_rbd *disk = cb_arg;
607 	enum spdk_bdev_io_status bio_status;
608 
609 	if (rc == 0 && current_qd > 0) {
610 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
611 		return;
612 	}
613 
614 	if (rc != 0) {
615 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
616 	} else {
617 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
618 	}
619 
620 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
621 	disk->reset_bdev_io = NULL;
622 }
623 
624 static int
625 bdev_rbd_reset_timer(void *arg)
626 {
627 	struct bdev_rbd *disk = arg;
628 
629 	spdk_poller_unregister(&disk->reset_timer);
630 
631 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
632 
633 	return SPDK_POLLER_BUSY;
634 }
635 
636 static void
637 bdev_rbd_reset(void *ctx)
638 {
639 	struct spdk_bdev_io *bdev_io = ctx;
640 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
641 
642 	/*
643 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
644 	 * poller to wait for in-flight I/O to complete.
645 	 */
646 	assert(disk->reset_bdev_io == NULL);
647 	disk->reset_bdev_io = bdev_io;
648 
649 	bdev_rbd_reset_timer(disk);
650 }
651 
652 static void
653 _bdev_rbd_destruct_done(void *io_device)
654 {
655 	struct bdev_rbd *rbd = io_device;
656 
657 	assert(rbd != NULL);
658 
659 	spdk_bdev_destruct_done(&rbd->disk, 0);
660 	bdev_rbd_free(rbd);
661 }
662 
663 static void
664 bdev_rbd_free_cb(void *io_device)
665 {
666 	struct bdev_rbd *rbd = io_device;
667 
668 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
669 
670 	/* free the ctx */
671 	if (rbd->cluster_name && rbd->rados_ctx.ctx) {
672 		bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx);
673 		rbd->rados_ctx.ctx = NULL;
674 	}
675 
676 	/* The io device has been unregistered.  Send a message back to the
677 	 * original thread that started the destruct operation, so that the
678 	 * bdev unregister callback is invoked on the same thread that started
679 	 * this whole process.
680 	 */
681 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
682 }
683 
684 static void
685 _bdev_rbd_destruct(void *ctx)
686 {
687 	struct bdev_rbd *rbd = ctx;
688 
689 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
690 }
691 
692 static int
693 bdev_rbd_destruct(void *ctx)
694 {
695 	struct bdev_rbd *rbd = ctx;
696 
697 	/* Start the destruct operation on the rbd bdev's
698 	 * main thread.  This guarantees it will only start
699 	 * executing after any messages related to channel
700 	 * deletions have finished completing.  *Always*
701 	 * send a message, even if this function gets called
702 	 * from the main thread, in case there are pending
703 	 * channel delete messages in flight to this thread.
704 	 */
705 	assert(rbd->destruct_td == NULL);
706 	rbd->destruct_td = spdk_get_thread();
707 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _bdev_rbd_destruct, rbd);
708 
709 	/* Return 1 to indicate the destruct path is asynchronous. */
710 	return 1;
711 }
712 
713 static void
714 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
715 		    bool success)
716 {
717 	if (!success) {
718 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
719 		return;
720 	}
721 
722 	bdev_rbd_start_aio(bdev_io);
723 }
724 
725 static void
726 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
727 {
728 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
729 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
730 
731 	rbd_io->submit_td = submit_td;
732 	switch (bdev_io->type) {
733 	case SPDK_BDEV_IO_TYPE_READ:
734 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
735 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
736 		break;
737 
738 	case SPDK_BDEV_IO_TYPE_WRITE:
739 	case SPDK_BDEV_IO_TYPE_UNMAP:
740 	case SPDK_BDEV_IO_TYPE_FLUSH:
741 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
742 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
743 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
744 #endif
745 		bdev_rbd_start_aio(bdev_io);
746 		break;
747 
748 	case SPDK_BDEV_IO_TYPE_RESET:
749 		spdk_thread_exec_msg(spdk_thread_get_app_thread(), bdev_rbd_reset, bdev_io);
750 		break;
751 
752 	default:
753 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
754 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
755 		break;
756 	}
757 }
758 
759 static bool
760 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
761 {
762 	switch (io_type) {
763 	case SPDK_BDEV_IO_TYPE_READ:
764 	case SPDK_BDEV_IO_TYPE_WRITE:
765 	case SPDK_BDEV_IO_TYPE_UNMAP:
766 	case SPDK_BDEV_IO_TYPE_FLUSH:
767 	case SPDK_BDEV_IO_TYPE_RESET:
768 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
769 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
770 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
771 #endif
772 		return true;
773 
774 	default:
775 		return false;
776 	}
777 }
778 
779 static int
780 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
781 {
782 	struct bdev_rbd_io_channel *ch = ctx_buf;
783 	struct bdev_rbd *disk = io_device;
784 
785 	ch->disk = disk;
786 	ch->group_ch = spdk_get_io_channel(&rbd_if);
787 	assert(ch->group_ch != NULL);
788 
789 	return 0;
790 }
791 
792 static void
793 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
794 {
795 	struct bdev_rbd_io_channel *ch = ctx_buf;
796 
797 	spdk_put_io_channel(ch->group_ch);
798 }
799 
800 static struct spdk_io_channel *
801 bdev_rbd_get_io_channel(void *ctx)
802 {
803 	struct bdev_rbd *rbd_bdev = ctx;
804 
805 	return spdk_get_io_channel(rbd_bdev);
806 }
807 
808 static void
809 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
810 {
811 	struct bdev_rbd_cluster *entry;
812 
813 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
814 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
815 		if (strcmp(cluster_name, entry->name)) {
816 			continue;
817 		}
818 		if (entry->user_id) {
819 			spdk_json_write_named_string(w, "user_id", entry->user_id);
820 		}
821 
822 		if (entry->config_param) {
823 			char **config_entry = entry->config_param;
824 
825 			spdk_json_write_named_object_begin(w, "config_param");
826 			while (*config_entry) {
827 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
828 				config_entry += 2;
829 			}
830 			spdk_json_write_object_end(w);
831 		}
832 		if (entry->config_file) {
833 			spdk_json_write_named_string(w, "config_file", entry->config_file);
834 		}
835 		if (entry->key_file) {
836 			spdk_json_write_named_string(w, "key_file", entry->key_file);
837 		}
838 
839 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
840 		return;
841 	}
842 
843 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
844 }
845 
846 static int
847 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
848 {
849 	struct bdev_rbd *rbd_bdev = ctx;
850 
851 	spdk_json_write_named_object_begin(w, "rbd");
852 
853 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
854 
855 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
856 
857 	if (rbd_bdev->cluster_name) {
858 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
859 		goto end;
860 	}
861 
862 	if (rbd_bdev->user_id) {
863 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
864 	}
865 
866 	if (rbd_bdev->config) {
867 		char **entry = rbd_bdev->config;
868 
869 		spdk_json_write_named_object_begin(w, "config");
870 		while (*entry) {
871 			spdk_json_write_named_string(w, entry[0], entry[1]);
872 			entry += 2;
873 		}
874 		spdk_json_write_object_end(w);
875 	}
876 
877 end:
878 	spdk_json_write_object_end(w);
879 
880 	return 0;
881 }
882 
883 static void
884 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
885 {
886 	struct bdev_rbd *rbd = bdev->ctxt;
887 	char uuid_str[SPDK_UUID_STRING_LEN];
888 
889 	spdk_json_write_object_begin(w);
890 
891 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
892 
893 	spdk_json_write_named_object_begin(w, "params");
894 	spdk_json_write_named_string(w, "name", bdev->name);
895 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
896 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
897 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
898 	if (rbd->user_id) {
899 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
900 	}
901 
902 	if (rbd->config) {
903 		char **entry = rbd->config;
904 
905 		spdk_json_write_named_object_begin(w, "config");
906 		while (*entry) {
907 			spdk_json_write_named_string(w, entry[0], entry[1]);
908 			entry += 2;
909 		}
910 		spdk_json_write_object_end(w);
911 	}
912 
913 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
914 	spdk_json_write_named_string(w, "uuid", uuid_str);
915 
916 	spdk_json_write_object_end(w);
917 
918 	spdk_json_write_object_end(w);
919 }
920 
921 static void
922 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
923 {
924 	assert(entry != NULL);
925 
926 	spdk_json_write_object_begin(w);
927 	spdk_json_write_named_string(w, "cluster_name", entry->name);
928 
929 	if (entry->user_id) {
930 		spdk_json_write_named_string(w, "user_id", entry->user_id);
931 	}
932 
933 	if (entry->config_param) {
934 		char **config_entry = entry->config_param;
935 
936 		spdk_json_write_named_object_begin(w, "config_param");
937 		while (*config_entry) {
938 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
939 			config_entry += 2;
940 		}
941 		spdk_json_write_object_end(w);
942 	}
943 	if (entry->config_file) {
944 		spdk_json_write_named_string(w, "config_file", entry->config_file);
945 	}
946 	if (entry->key_file) {
947 		spdk_json_write_named_string(w, "key_file", entry->key_file);
948 	}
949 
950 	if (entry->core_mask) {
951 		spdk_json_write_named_string(w, "core_mask", entry->core_mask);
952 	}
953 
954 	spdk_json_write_object_end(w);
955 }
956 
957 int
958 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
959 {
960 	struct bdev_rbd_cluster *entry;
961 	struct spdk_json_write_ctx *w;
962 
963 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
964 
965 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
966 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
967 		return -ENOENT;
968 	}
969 
970 	/* If cluster name is provided */
971 	if (name) {
972 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
973 			if (strcmp(name, entry->name) == 0) {
974 				w = spdk_jsonrpc_begin_result(request);
975 				dump_single_cluster_entry(entry, w);
976 				spdk_jsonrpc_end_result(request, w);
977 
978 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
979 				return 0;
980 			}
981 		}
982 
983 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
984 		return -ENOENT;
985 	}
986 
987 	w = spdk_jsonrpc_begin_result(request);
988 	spdk_json_write_array_begin(w);
989 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
990 		dump_single_cluster_entry(entry, w);
991 	}
992 	spdk_json_write_array_end(w);
993 	spdk_jsonrpc_end_result(request, w);
994 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
995 
996 	return 0;
997 }
998 
999 static const struct spdk_bdev_fn_table rbd_fn_table = {
1000 	.destruct		= bdev_rbd_destruct,
1001 	.submit_request		= bdev_rbd_submit_request,
1002 	.io_type_supported	= bdev_rbd_io_type_supported,
1003 	.get_io_channel		= bdev_rbd_get_io_channel,
1004 	.dump_info_json		= bdev_rbd_dump_info_json,
1005 	.write_config_json	= bdev_rbd_write_config_json,
1006 };
1007 
1008 static int
1009 rbd_thread_set_cpumask(struct spdk_cpuset *set)
1010 {
1011 #ifdef __linux__
1012 	uint32_t lcore;
1013 	cpu_set_t mask;
1014 
1015 	assert(set != NULL);
1016 	CPU_ZERO(&mask);
1017 
1018 	/* get the core id on current spdk_cpuset and set to cpu_set_t */
1019 	for (lcore = 0; lcore < SPDK_CPUSET_SIZE; lcore++) {
1020 		if (spdk_cpuset_get_cpu(set, lcore)) {
1021 			CPU_SET(lcore, &mask);
1022 		}
1023 	}
1024 
1025 	/* change current thread core mask */
1026 	if (sched_setaffinity(0, sizeof(mask), &mask) < 0) {
1027 		SPDK_ERRLOG("Set non SPDK thread cpu mask error (errno=%d)\n", errno);
1028 		return -1;
1029 	}
1030 
1031 	return 0;
1032 #else
1033 	SPDK_ERRLOG("SPDK non spdk thread cpumask setup supports only Linux platform now.\n");
1034 	return -ENOTSUP;
1035 #endif
1036 }
1037 
1038 
1039 static int
1040 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
1041 		     const char *config_file, const char *key_file, const char *core_mask)
1042 {
1043 	struct bdev_rbd_cluster *entry;
1044 	struct spdk_cpuset rbd_core_mask = {};
1045 	int rc;
1046 
1047 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1048 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1049 		if (strcmp(name, entry->name) == 0) {
1050 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
1051 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1052 			return -1;
1053 		}
1054 	}
1055 
1056 	entry = calloc(1, sizeof(*entry));
1057 	if (!entry) {
1058 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1059 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1060 		return -1;
1061 	}
1062 
1063 	entry->name = strdup(name);
1064 	if (entry->name == NULL) {
1065 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1066 		goto err_handle;
1067 	}
1068 
1069 	if (user_id) {
1070 		entry->user_id = strdup(user_id);
1071 		if (entry->user_id == NULL) {
1072 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1073 			goto err_handle;
1074 		}
1075 	}
1076 
1077 	/* Support specify config_param or config_file separately, or both of them. */
1078 	if (config_param) {
1079 		entry->config_param = bdev_rbd_dup_config(config_param);
1080 		if (entry->config_param == NULL) {
1081 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1082 			goto err_handle;
1083 		}
1084 	}
1085 
1086 	if (config_file) {
1087 		entry->config_file = strdup(config_file);
1088 		if (entry->config_file == NULL) {
1089 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1090 			goto err_handle;
1091 		}
1092 	}
1093 
1094 	if (key_file) {
1095 		entry->key_file = strdup(key_file);
1096 		if (entry->key_file == NULL) {
1097 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1098 			goto err_handle;
1099 		}
1100 	}
1101 
1102 	if (core_mask) {
1103 		entry->core_mask = strdup(core_mask);
1104 		if (entry->core_mask == NULL) {
1105 			SPDK_ERRLOG("Core_mask=%s allocation failed on entry = %p\n", core_mask, entry);
1106 			goto err_handle;
1107 		}
1108 
1109 		if (spdk_cpuset_parse(&rbd_core_mask, entry->core_mask) < 0) {
1110 			SPDK_ERRLOG("Invalid cpumask=%s on entry = %p\n", entry->core_mask, entry);
1111 			goto err_handle;
1112 		}
1113 
1114 		if (rbd_thread_set_cpumask(&rbd_core_mask) < 0) {
1115 			SPDK_ERRLOG("Failed to change rbd threads to core_mask %s on entry = %p\n", core_mask, entry);
1116 			goto err_handle;
1117 		}
1118 	}
1119 
1120 
1121 	/* If rbd thread core mask is given, rados_create() must execute with
1122 	 * the affinity set by rbd_thread_set_cpumask(). The affinity set
1123 	 * by rbd_thread_set_cpumask() will be reverted once rbd_register_cluster() returns
1124 	 * and when we leave the spdk_call_unaffinitized context. */
1125 	rc = rados_create(&entry->cluster, user_id);
1126 	if (rc < 0) {
1127 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1128 		goto err_handle;
1129 	}
1130 
1131 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1132 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1133 	if (entry->config_file && rc < 0) {
1134 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1135 		rados_shutdown(entry->cluster);
1136 		goto err_handle;
1137 	}
1138 
1139 	if (config_param) {
1140 		const char *const *config_entry = config_param;
1141 		while (*config_entry) {
1142 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1143 			if (rc < 0) {
1144 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1145 				rados_shutdown(entry->cluster);
1146 				goto err_handle;
1147 			}
1148 			config_entry += 2;
1149 		}
1150 	}
1151 
1152 	if (key_file) {
1153 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1154 		if (rc < 0) {
1155 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1156 			rados_shutdown(entry->cluster);
1157 			goto err_handle;
1158 		}
1159 	}
1160 
1161 	rc = rados_connect(entry->cluster);
1162 	if (rc < 0) {
1163 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1164 		rados_shutdown(entry->cluster);
1165 		goto err_handle;
1166 	}
1167 
1168 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1169 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1170 
1171 	return 0;
1172 
1173 err_handle:
1174 	bdev_rbd_cluster_free(entry);
1175 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1176 	return -1;
1177 }
1178 
1179 int
1180 bdev_rbd_unregister_cluster(const char *name)
1181 {
1182 	struct bdev_rbd_cluster *entry;
1183 	int rc = 0;
1184 
1185 	if (name == NULL) {
1186 		return -1;
1187 	}
1188 
1189 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1190 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1191 		if (strcmp(name, entry->name) == 0) {
1192 			if (entry->ref == 0) {
1193 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1194 				rados_shutdown(entry->cluster);
1195 				bdev_rbd_cluster_free(entry);
1196 			} else {
1197 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1198 					    entry->name);
1199 				rc = -1;
1200 			}
1201 
1202 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1203 			return rc;
1204 		}
1205 	}
1206 
1207 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1208 
1209 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1210 
1211 	return -1;
1212 }
1213 
1214 static void *
1215 _bdev_rbd_register_cluster(void *arg)
1216 {
1217 	struct cluster_register_info *info = arg;
1218 	void *ret = arg;
1219 	int rc;
1220 
1221 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1222 				  (const char *const *)info->config_param, (const char *)info->config_file,
1223 				  (const char *)info->key_file, info->core_mask);
1224 	if (rc) {
1225 		ret = NULL;
1226 	}
1227 
1228 	return ret;
1229 }
1230 
1231 int
1232 bdev_rbd_register_cluster(struct cluster_register_info *info)
1233 {
1234 	assert(info != NULL);
1235 
1236 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1237 	 * resource contention */
1238 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1239 		return -1;
1240 	}
1241 
1242 	return 0;
1243 }
1244 
1245 int
1246 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1247 		const char *pool_name,
1248 		const char *const *config,
1249 		const char *rbd_name,
1250 		uint32_t block_size,
1251 		const char *cluster_name,
1252 		const struct spdk_uuid *uuid)
1253 {
1254 	struct bdev_rbd *rbd;
1255 	int ret;
1256 
1257 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1258 		return -EINVAL;
1259 	}
1260 
1261 	rbd = calloc(1, sizeof(struct bdev_rbd));
1262 	if (rbd == NULL) {
1263 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1264 		return -ENOMEM;
1265 	}
1266 
1267 	rbd->rbd_name = strdup(rbd_name);
1268 	if (!rbd->rbd_name) {
1269 		bdev_rbd_free(rbd);
1270 		return -ENOMEM;
1271 	}
1272 
1273 	if (user_id) {
1274 		rbd->user_id = strdup(user_id);
1275 		if (!rbd->user_id) {
1276 			bdev_rbd_free(rbd);
1277 			return -ENOMEM;
1278 		}
1279 	}
1280 
1281 	if (cluster_name) {
1282 		rbd->cluster_name = strdup(cluster_name);
1283 		if (!rbd->cluster_name) {
1284 			bdev_rbd_free(rbd);
1285 			return -ENOMEM;
1286 		}
1287 	}
1288 	rbd->pool_name = strdup(pool_name);
1289 	if (!rbd->pool_name) {
1290 		bdev_rbd_free(rbd);
1291 		return -ENOMEM;
1292 	}
1293 
1294 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1295 		bdev_rbd_free(rbd);
1296 		return -ENOMEM;
1297 	}
1298 
1299 	ret = bdev_rbd_init(rbd);
1300 	if (ret < 0) {
1301 		bdev_rbd_free(rbd);
1302 		SPDK_ERRLOG("Failed to init rbd device\n");
1303 		return ret;
1304 	}
1305 
1306 	if (uuid) {
1307 		rbd->disk.uuid = *uuid;
1308 	}
1309 
1310 	if (name) {
1311 		rbd->disk.name = strdup(name);
1312 	} else {
1313 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1314 	}
1315 	if (!rbd->disk.name) {
1316 		bdev_rbd_free(rbd);
1317 		return -ENOMEM;
1318 	}
1319 	rbd->disk.product_name = "Ceph Rbd Disk";
1320 	bdev_rbd_count++;
1321 
1322 	rbd->disk.write_cache = 0;
1323 	rbd->disk.blocklen = block_size;
1324 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1325 	rbd->disk.ctxt = rbd;
1326 	rbd->disk.fn_table = &rbd_fn_table;
1327 	rbd->disk.module = &rbd_if;
1328 
1329 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1330 
1331 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1332 				bdev_rbd_destroy_cb,
1333 				sizeof(struct bdev_rbd_io_channel),
1334 				rbd_name);
1335 	ret = spdk_bdev_register(&rbd->disk);
1336 	if (ret) {
1337 		spdk_io_device_unregister(rbd, NULL);
1338 		bdev_rbd_free(rbd);
1339 		return ret;
1340 	}
1341 
1342 	*bdev = &(rbd->disk);
1343 
1344 	return ret;
1345 }
1346 
1347 void
1348 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1349 {
1350 	int rc;
1351 
1352 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1353 	if (rc != 0) {
1354 		cb_fn(cb_arg, rc);
1355 	}
1356 }
1357 
1358 static void
1359 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1360 {
1361 }
1362 
1363 int
1364 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1365 {
1366 	struct spdk_bdev_desc *desc;
1367 	struct spdk_bdev *bdev;
1368 	struct spdk_io_channel *ch;
1369 	struct bdev_rbd_io_channel *rbd_io_ch;
1370 	int rc = 0;
1371 	uint64_t new_size_in_byte;
1372 	uint64_t current_size_in_mb;
1373 
1374 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1375 	if (rc != 0) {
1376 		return rc;
1377 	}
1378 
1379 	bdev = spdk_bdev_desc_get_bdev(desc);
1380 
1381 	if (bdev->module != &rbd_if) {
1382 		rc = -EINVAL;
1383 		goto exit;
1384 	}
1385 
1386 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1387 	if (current_size_in_mb > new_size_in_mb) {
1388 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1389 		rc = -EINVAL;
1390 		goto exit;
1391 	}
1392 
1393 	ch = bdev_rbd_get_io_channel(bdev);
1394 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1395 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1396 
1397 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1398 	spdk_put_io_channel(ch);
1399 	if (rc != 0) {
1400 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1401 		goto exit;
1402 	}
1403 
1404 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1405 	if (rc != 0) {
1406 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1407 	}
1408 
1409 exit:
1410 	spdk_bdev_close(desc);
1411 	return rc;
1412 }
1413 
1414 static int
1415 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1416 {
1417 	return 0;
1418 }
1419 
1420 static void
1421 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1422 {
1423 }
1424 
1425 static int
1426 bdev_rbd_library_init(void)
1427 {
1428 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1429 				0, "bdev_rbd_poll_groups");
1430 	return 0;
1431 }
1432 
1433 static void
1434 bdev_rbd_library_fini(void)
1435 {
1436 	spdk_io_device_unregister(&rbd_if, NULL);
1437 }
1438 
1439 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1440