xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 7506a7aa53d239f533af3bc768f0d2af55e735fe)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 
41 #include "spdk/env.h"
42 #include "spdk/bdev.h"
43 #include "spdk/thread.h"
44 #include "spdk/json.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 #include "spdk/likely.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk/log.h"
51 
52 static int bdev_rbd_count = 0;
53 
54 struct bdev_rbd {
55 	struct spdk_bdev disk;
56 	char *rbd_name;
57 	char *user_id;
58 	char *pool_name;
59 	char **config;
60 
61 	rados_t cluster;
62 	rados_t *cluster_p;
63 	char *cluster_name;
64 
65 	rados_ioctx_t io_ctx;
66 	rbd_image_t image;
67 
68 	rbd_image_info_t info;
69 	pthread_mutex_t mutex;
70 	struct spdk_thread *main_td;
71 	struct spdk_thread *destruct_td;
72 	uint32_t ch_count;
73 	struct spdk_io_channel *group_ch;
74 
75 	TAILQ_ENTRY(bdev_rbd) tailq;
76 	struct spdk_poller *reset_timer;
77 	struct spdk_bdev_io *reset_bdev_io;
78 };
79 
80 struct bdev_rbd_io_channel {
81 	struct bdev_rbd *disk;
82 };
83 
84 struct bdev_rbd_io {
85 	struct			spdk_thread *submit_td;
86 	enum			spdk_bdev_io_status status;
87 	rbd_completion_t	comp;
88 	size_t			total_len;
89 };
90 
91 struct bdev_rbd_cluster {
92 	char *name;
93 	char *user_id;
94 	char **config_param;
95 	char *config_file;
96 	char *key_file;
97 	rados_t cluster;
98 	uint32_t ref;
99 	STAILQ_ENTRY(bdev_rbd_cluster) link;
100 };
101 
102 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
103 			g_map_bdev_rbd_cluster);
104 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
105 
106 static void
107 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
108 {
109 	assert(entry != NULL);
110 
111 	bdev_rbd_free_config(entry->config_param);
112 	free(entry->config_file);
113 	free(entry->key_file);
114 	free(entry->user_id);
115 	free(entry->name);
116 	free(entry);
117 }
118 
119 static void
120 bdev_rbd_put_cluster(rados_t **cluster)
121 {
122 	struct bdev_rbd_cluster *entry;
123 
124 	assert(cluster != NULL);
125 
126 	/* No need go through the map if *cluster equals to NULL */
127 	if (*cluster == NULL) {
128 		return;
129 	}
130 
131 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
132 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
133 		if (*cluster != &entry->cluster) {
134 			continue;
135 		}
136 
137 		assert(entry->ref > 0);
138 		entry->ref--;
139 		*cluster = NULL;
140 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
141 		return;
142 	}
143 
144 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
145 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
146 }
147 
148 static void
149 bdev_rbd_free(struct bdev_rbd *rbd)
150 {
151 	if (!rbd) {
152 		return;
153 	}
154 
155 	free(rbd->disk.name);
156 	free(rbd->rbd_name);
157 	free(rbd->user_id);
158 	free(rbd->pool_name);
159 	bdev_rbd_free_config(rbd->config);
160 
161 	if (rbd->io_ctx) {
162 		rados_ioctx_destroy(rbd->io_ctx);
163 	}
164 
165 	if (rbd->cluster_name) {
166 		bdev_rbd_put_cluster(&rbd->cluster_p);
167 		free(rbd->cluster_name);
168 	} else if (rbd->cluster) {
169 		rados_shutdown(rbd->cluster);
170 	}
171 
172 	pthread_mutex_destroy(&rbd->mutex);
173 	free(rbd);
174 }
175 
176 void
177 bdev_rbd_free_config(char **config)
178 {
179 	char **entry;
180 
181 	if (config) {
182 		for (entry = config; *entry; entry++) {
183 			free(*entry);
184 		}
185 		free(config);
186 	}
187 }
188 
189 char **
190 bdev_rbd_dup_config(const char *const *config)
191 {
192 	size_t count;
193 	char **copy;
194 
195 	if (!config) {
196 		return NULL;
197 	}
198 	for (count = 0; config[count]; count++) {}
199 	copy = calloc(count + 1, sizeof(*copy));
200 	if (!copy) {
201 		return NULL;
202 	}
203 	for (count = 0; config[count]; count++) {
204 		if (!(copy[count] = strdup(config[count]))) {
205 			bdev_rbd_free_config(copy);
206 			return NULL;
207 		}
208 	}
209 	return copy;
210 }
211 
212 static int
213 bdev_rados_cluster_init(const char *user_id, const char *const *config,
214 			rados_t *cluster)
215 {
216 	int ret;
217 
218 	ret = rados_create(cluster, user_id);
219 	if (ret < 0) {
220 		SPDK_ERRLOG("Failed to create rados_t struct\n");
221 		return -1;
222 	}
223 
224 	if (config) {
225 		const char *const *entry = config;
226 		while (*entry) {
227 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
228 			if (ret < 0) {
229 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
230 				rados_shutdown(*cluster);
231 				return -1;
232 			}
233 			entry += 2;
234 		}
235 	} else {
236 		ret = rados_conf_read_file(*cluster, NULL);
237 		if (ret < 0) {
238 			SPDK_ERRLOG("Failed to read conf file\n");
239 			rados_shutdown(*cluster);
240 			return -1;
241 		}
242 	}
243 
244 	ret = rados_connect(*cluster);
245 	if (ret < 0) {
246 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
247 		rados_shutdown(*cluster);
248 		return -1;
249 	}
250 
251 	return 0;
252 }
253 
254 static int
255 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
256 {
257 	struct bdev_rbd_cluster *entry;
258 
259 	if (cluster == NULL) {
260 		SPDK_ERRLOG("cluster should not be NULL\n");
261 		return -1;
262 	}
263 
264 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
265 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
266 		if (strcmp(cluster_name, entry->name) == 0) {
267 			entry->ref++;
268 			*cluster = &entry->cluster;
269 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
270 			return 0;
271 		}
272 	}
273 
274 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
275 	return -1;
276 }
277 
278 static int
279 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
280 {
281 	int ret;
282 
283 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
284 	if (ret < 0) {
285 		SPDK_ERRLOG("Failed to create rados_t struct\n");
286 		return -1;
287 	}
288 
289 	return ret;
290 }
291 
292 static void *
293 bdev_rbd_cluster_handle(void *arg)
294 {
295 	void *ret = arg;
296 	struct bdev_rbd *rbd = arg;
297 	int rc;
298 
299 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
300 				     &rbd->cluster);
301 	if (rc < 0) {
302 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
303 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
304 		ret = NULL;
305 	}
306 
307 	return ret;
308 }
309 
310 static void *
311 bdev_rbd_init_context(void *arg)
312 {
313 	struct bdev_rbd *rbd = arg;
314 	int rc;
315 
316 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
317 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
318 		return NULL;
319 	}
320 
321 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
322 	if (rc < 0) {
323 		SPDK_ERRLOG("Failed to open specified rbd device\n");
324 		return NULL;
325 	}
326 
327 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
328 	rbd_close(rbd->image);
329 	if (rc < 0) {
330 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
331 		return NULL;
332 	}
333 
334 	return arg;
335 }
336 
337 static int
338 bdev_rbd_init(struct bdev_rbd *rbd)
339 {
340 	int ret = 0;
341 
342 	if (!rbd->cluster_name) {
343 		rbd->cluster_p = &rbd->cluster;
344 		/* Cluster should be created in non-SPDK thread to avoid conflict between
345 		 * Rados and SPDK thread */
346 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
347 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
348 			return -1;
349 		}
350 	} else {
351 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
352 		if (ret < 0) {
353 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
354 				    rbd, rbd->cluster_name);
355 			return -1;
356 		}
357 	}
358 
359 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
360 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
361 	}
362 
363 	return ret;
364 }
365 
366 static void
367 bdev_rbd_exit(rbd_image_t image)
368 {
369 	rbd_flush(image);
370 	rbd_close(image);
371 }
372 
373 static void
374 _bdev_rbd_io_complete(void *_rbd_io)
375 {
376 	struct bdev_rbd_io *rbd_io = _rbd_io;
377 
378 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
379 }
380 
381 static void
382 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
383 {
384 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
385 	struct spdk_thread *current_thread = spdk_get_thread();
386 
387 	rbd_io->status = status;
388 	assert(rbd_io->submit_td != NULL);
389 	if (rbd_io->submit_td != current_thread) {
390 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
391 	} else {
392 		_bdev_rbd_io_complete(rbd_io);
393 	}
394 }
395 
396 static void
397 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
398 {
399 	int io_status;
400 	struct spdk_bdev_io *bdev_io;
401 	struct bdev_rbd_io *rbd_io;
402 	enum spdk_bdev_io_status bio_status;
403 
404 	bdev_io = rbd_aio_get_arg(cb);
405 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
406 	io_status = rbd_aio_get_return_value(cb);
407 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
408 
409 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
410 		if ((int)rbd_io->total_len != io_status) {
411 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
412 		}
413 	} else {
414 		/* For others, 0 means success */
415 		if (io_status != 0) {
416 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
417 		}
418 	}
419 
420 	rbd_aio_release(cb);
421 
422 	bdev_rbd_io_complete(bdev_io, bio_status);
423 }
424 
425 static void
426 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
427 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
428 {
429 	int ret;
430 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
431 	rbd_image_t image = disk->image;
432 
433 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
434 					&rbd_io->comp);
435 	if (ret < 0) {
436 		goto err;
437 	}
438 
439 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
440 		rbd_io->total_len = len;
441 		if (spdk_likely(iovcnt == 1)) {
442 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
443 		} else {
444 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
445 		}
446 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
447 		if (spdk_likely(iovcnt == 1)) {
448 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
449 		} else {
450 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
451 		}
452 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) {
453 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
454 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
455 		ret = rbd_aio_flush(image, rbd_io->comp);
456 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) {
457 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0);
458 	}
459 
460 	if (ret < 0) {
461 		rbd_aio_release(rbd_io->comp);
462 		goto err;
463 	}
464 
465 	return;
466 
467 err:
468 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
469 }
470 
471 static int bdev_rbd_library_init(void);
472 static void bdev_rbd_library_fini(void);
473 
474 static int
475 bdev_rbd_get_ctx_size(void)
476 {
477 	return sizeof(struct bdev_rbd_io);
478 }
479 
480 static struct spdk_bdev_module rbd_if = {
481 	.name = "rbd",
482 	.module_init = bdev_rbd_library_init,
483 	.module_fini = bdev_rbd_library_fini,
484 	.get_ctx_size = bdev_rbd_get_ctx_size,
485 
486 };
487 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
488 
489 static int
490 bdev_rbd_reset_timer(void *arg)
491 {
492 	struct bdev_rbd *disk = arg;
493 
494 	/*
495 	 * TODO: This should check if any I/O is still in flight before completing the reset.
496 	 * For now, just complete after the timer expires.
497 	 */
498 	bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
499 	spdk_poller_unregister(&disk->reset_timer);
500 	disk->reset_bdev_io = NULL;
501 
502 	return SPDK_POLLER_BUSY;
503 }
504 
505 static void
506 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
507 {
508 	/*
509 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
510 	 * timer to wait for in-flight I/O to complete.
511 	 */
512 	assert(disk->reset_bdev_io == NULL);
513 	disk->reset_bdev_io = bdev_io;
514 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
515 }
516 
517 static void
518 _bdev_rbd_destruct_done(void *io_device)
519 {
520 	struct bdev_rbd *rbd = io_device;
521 
522 	assert(rbd != NULL);
523 	assert(rbd->ch_count == 0);
524 
525 	spdk_bdev_destruct_done(&rbd->disk, 0);
526 	bdev_rbd_free(rbd);
527 }
528 
529 static void
530 bdev_rbd_free_cb(void *io_device)
531 {
532 	struct bdev_rbd *rbd = io_device;
533 
534 	/* The io device has been unregistered.  Send a message back to the
535 	 * original thread that started the destruct operation, so that the
536 	 * bdev unregister callback is invoked on the same thread that started
537 	 * this whole process.
538 	 */
539 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
540 }
541 
542 static void
543 _bdev_rbd_destruct(void *ctx)
544 {
545 	struct bdev_rbd *rbd = ctx;
546 
547 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
548 }
549 
550 static int
551 bdev_rbd_destruct(void *ctx)
552 {
553 	struct bdev_rbd *rbd = ctx;
554 	struct spdk_thread *td;
555 
556 	if (rbd->main_td == NULL) {
557 		td = spdk_get_thread();
558 	} else {
559 		td = rbd->main_td;
560 	}
561 
562 	/* Start the destruct operation on the rbd bdev's
563 	 * main thread.  This guarantees it will only start
564 	 * executing after any messages related to channel
565 	 * deletions have finished completing.  *Always*
566 	 * send a message, even if this function gets called
567 	 * from the main thread, in case there are pending
568 	 * channel delete messages in flight to this thread.
569 	 */
570 	assert(rbd->destruct_td == NULL);
571 	rbd->destruct_td = td;
572 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
573 
574 	/* Return 1 to indicate the destruct path is asynchronous. */
575 	return 1;
576 }
577 
578 static void
579 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
580 		    bool success)
581 {
582 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
583 
584 	if (!success) {
585 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
586 		return;
587 	}
588 
589 	bdev_rbd_start_aio(disk,
590 			   bdev_io,
591 			   bdev_io->u.bdev.iovs,
592 			   bdev_io->u.bdev.iovcnt,
593 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
594 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
595 }
596 
597 static void
598 _bdev_rbd_submit_request(void *ctx)
599 {
600 	struct spdk_bdev_io *bdev_io = ctx;
601 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
602 
603 	switch (bdev_io->type) {
604 	case SPDK_BDEV_IO_TYPE_READ:
605 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
606 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
607 		break;
608 
609 	case SPDK_BDEV_IO_TYPE_WRITE:
610 	case SPDK_BDEV_IO_TYPE_UNMAP:
611 	case SPDK_BDEV_IO_TYPE_FLUSH:
612 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
613 		bdev_rbd_start_aio(disk,
614 				   bdev_io,
615 				   bdev_io->u.bdev.iovs,
616 				   bdev_io->u.bdev.iovcnt,
617 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
618 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
619 		break;
620 
621 	case SPDK_BDEV_IO_TYPE_RESET:
622 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
623 			       bdev_io);
624 		break;
625 
626 	default:
627 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
628 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
629 		break;
630 	}
631 }
632 
633 static void
634 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
635 {
636 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
637 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
638 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
639 
640 	rbd_io->submit_td = submit_td;
641 	if (disk->main_td != submit_td) {
642 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
643 	} else {
644 		_bdev_rbd_submit_request(bdev_io);
645 	}
646 }
647 
648 static bool
649 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
650 {
651 	switch (io_type) {
652 	case SPDK_BDEV_IO_TYPE_READ:
653 	case SPDK_BDEV_IO_TYPE_WRITE:
654 	case SPDK_BDEV_IO_TYPE_UNMAP:
655 	case SPDK_BDEV_IO_TYPE_FLUSH:
656 	case SPDK_BDEV_IO_TYPE_RESET:
657 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
658 		return true;
659 
660 	default:
661 		return false;
662 	}
663 }
664 
665 static void
666 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
667 {
668 	assert(disk != NULL);
669 	assert(disk->main_td == spdk_get_thread());
670 	assert(disk->ch_count == 0);
671 
672 	spdk_put_io_channel(disk->group_ch);
673 	if (disk->image) {
674 		bdev_rbd_exit(disk->image);
675 	}
676 
677 	disk->main_td = NULL;
678 	disk->group_ch = NULL;
679 }
680 
681 static void *
682 bdev_rbd_handle(void *arg)
683 {
684 	struct bdev_rbd *disk = arg;
685 	void *ret = arg;
686 
687 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
688 		SPDK_ERRLOG("Failed to open specified rbd device\n");
689 		ret = NULL;
690 	}
691 
692 	return ret;
693 }
694 
695 static int
696 _bdev_rbd_create_cb(struct bdev_rbd *disk)
697 {
698 	disk->group_ch = spdk_get_io_channel(&rbd_if);
699 	assert(disk->group_ch != NULL);
700 
701 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
702 		bdev_rbd_free_channel_resources(disk);
703 		return -1;
704 	}
705 
706 	return 0;
707 }
708 
709 static int
710 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
711 {
712 	struct bdev_rbd_io_channel *ch = ctx_buf;
713 	struct bdev_rbd *disk = io_device;
714 	int rc;
715 
716 	ch->disk = disk;
717 	pthread_mutex_lock(&disk->mutex);
718 	if (disk->ch_count == 0) {
719 		assert(disk->main_td == NULL);
720 		rc = _bdev_rbd_create_cb(disk);
721 		if (rc) {
722 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
723 			pthread_mutex_unlock(&disk->mutex);
724 			return rc;
725 		}
726 
727 		disk->main_td = spdk_get_thread();
728 	}
729 
730 	disk->ch_count++;
731 	pthread_mutex_unlock(&disk->mutex);
732 
733 	return 0;
734 }
735 
736 static void
737 _bdev_rbd_destroy_cb(void *ctx)
738 {
739 	struct bdev_rbd *disk = ctx;
740 
741 	pthread_mutex_lock(&disk->mutex);
742 	assert(disk->ch_count > 0);
743 	disk->ch_count--;
744 
745 	if (disk->ch_count > 0) {
746 		/* A new channel was created between when message was sent and this function executed */
747 		pthread_mutex_unlock(&disk->mutex);
748 		return;
749 	}
750 
751 	bdev_rbd_free_channel_resources(disk);
752 	pthread_mutex_unlock(&disk->mutex);
753 }
754 
755 static void
756 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
757 {
758 	struct bdev_rbd *disk = io_device;
759 	struct spdk_thread *thread;
760 
761 	pthread_mutex_lock(&disk->mutex);
762 	assert(disk->ch_count > 0);
763 	disk->ch_count--;
764 	if (disk->ch_count == 0) {
765 		assert(disk->main_td != NULL);
766 		if (disk->main_td != spdk_get_thread()) {
767 			/* The final channel was destroyed on a different thread
768 			 * than where the first channel was created. Pass a message
769 			 * to the main thread to unregister the poller. */
770 			disk->ch_count++;
771 			thread = disk->main_td;
772 			pthread_mutex_unlock(&disk->mutex);
773 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
774 			return;
775 		}
776 
777 		bdev_rbd_free_channel_resources(disk);
778 	}
779 	pthread_mutex_unlock(&disk->mutex);
780 }
781 
782 static struct spdk_io_channel *
783 bdev_rbd_get_io_channel(void *ctx)
784 {
785 	struct bdev_rbd *rbd_bdev = ctx;
786 
787 	return spdk_get_io_channel(rbd_bdev);
788 }
789 
790 static void
791 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
792 {
793 	struct bdev_rbd_cluster *entry;
794 
795 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
796 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
797 		if (strcmp(cluster_name, entry->name)) {
798 			continue;
799 		}
800 		if (entry->user_id) {
801 			spdk_json_write_named_string(w, "user_id", entry->user_id);
802 		}
803 
804 		if (entry->config_param) {
805 			char **config_entry = entry->config_param;
806 
807 			spdk_json_write_named_object_begin(w, "config_param");
808 			while (*config_entry) {
809 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
810 				config_entry += 2;
811 			}
812 			spdk_json_write_object_end(w);
813 		}
814 		if (entry->config_file) {
815 			spdk_json_write_named_string(w, "config_file", entry->config_file);
816 		}
817 		if (entry->key_file) {
818 			spdk_json_write_named_string(w, "key_file", entry->key_file);
819 		}
820 
821 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
822 		return;
823 	}
824 
825 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
826 }
827 
828 static int
829 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
830 {
831 	struct bdev_rbd *rbd_bdev = ctx;
832 
833 	spdk_json_write_named_object_begin(w, "rbd");
834 
835 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
836 
837 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
838 
839 	if (rbd_bdev->cluster_name) {
840 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
841 		goto end;
842 	}
843 
844 	if (rbd_bdev->user_id) {
845 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
846 	}
847 
848 	if (rbd_bdev->config) {
849 		char **entry = rbd_bdev->config;
850 
851 		spdk_json_write_named_object_begin(w, "config");
852 		while (*entry) {
853 			spdk_json_write_named_string(w, entry[0], entry[1]);
854 			entry += 2;
855 		}
856 		spdk_json_write_object_end(w);
857 	}
858 
859 end:
860 	spdk_json_write_object_end(w);
861 
862 	return 0;
863 }
864 
865 static void
866 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
867 {
868 	struct bdev_rbd *rbd = bdev->ctxt;
869 	char uuid_str[SPDK_UUID_STRING_LEN];
870 
871 	spdk_json_write_object_begin(w);
872 
873 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
874 
875 	spdk_json_write_named_object_begin(w, "params");
876 	spdk_json_write_named_string(w, "name", bdev->name);
877 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
878 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
879 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
880 	if (rbd->user_id) {
881 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
882 	}
883 
884 	if (rbd->config) {
885 		char **entry = rbd->config;
886 
887 		spdk_json_write_named_object_begin(w, "config");
888 		while (*entry) {
889 			spdk_json_write_named_string(w, entry[0], entry[1]);
890 			entry += 2;
891 		}
892 		spdk_json_write_object_end(w);
893 	}
894 
895 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
896 	spdk_json_write_named_string(w, "uuid", uuid_str);
897 
898 	spdk_json_write_object_end(w);
899 
900 	spdk_json_write_object_end(w);
901 }
902 
903 static void
904 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
905 {
906 	assert(entry != NULL);
907 
908 	spdk_json_write_object_begin(w);
909 	spdk_json_write_named_string(w, "cluster_name", entry->name);
910 
911 	if (entry->user_id) {
912 		spdk_json_write_named_string(w, "user_id", entry->user_id);
913 	}
914 
915 	if (entry->config_param) {
916 		char **config_entry = entry->config_param;
917 
918 		spdk_json_write_named_object_begin(w, "config_param");
919 		while (*config_entry) {
920 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
921 			config_entry += 2;
922 		}
923 		spdk_json_write_object_end(w);
924 	}
925 	if (entry->config_file) {
926 		spdk_json_write_named_string(w, "config_file", entry->config_file);
927 	}
928 	if (entry->key_file) {
929 		spdk_json_write_named_string(w, "key_file", entry->key_file);
930 	}
931 
932 	spdk_json_write_object_end(w);
933 }
934 
935 int
936 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
937 {
938 	struct bdev_rbd_cluster *entry;
939 	struct spdk_json_write_ctx *w;
940 
941 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
942 
943 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
944 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
945 		return -ENOENT;
946 	}
947 
948 	/* If cluster name is provided */
949 	if (name) {
950 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
951 			if (strcmp(name, entry->name) == 0) {
952 				w = spdk_jsonrpc_begin_result(request);
953 				dump_single_cluster_entry(entry, w);
954 				spdk_jsonrpc_end_result(request, w);
955 
956 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
957 				return 0;
958 			}
959 		}
960 
961 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
962 		return -ENOENT;
963 	}
964 
965 	w = spdk_jsonrpc_begin_result(request);
966 	spdk_json_write_array_begin(w);
967 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
968 		dump_single_cluster_entry(entry, w);
969 	}
970 	spdk_json_write_array_end(w);
971 	spdk_jsonrpc_end_result(request, w);
972 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
973 
974 	return 0;
975 }
976 
977 static const struct spdk_bdev_fn_table rbd_fn_table = {
978 	.destruct		= bdev_rbd_destruct,
979 	.submit_request		= bdev_rbd_submit_request,
980 	.io_type_supported	= bdev_rbd_io_type_supported,
981 	.get_io_channel		= bdev_rbd_get_io_channel,
982 	.dump_info_json		= bdev_rbd_dump_info_json,
983 	.write_config_json	= bdev_rbd_write_config_json,
984 };
985 
986 static int
987 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
988 		     const char *config_file, const char *key_file)
989 {
990 	struct bdev_rbd_cluster *entry;
991 	int rc;
992 
993 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
994 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
995 		if (strcmp(name, entry->name) == 0) {
996 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
997 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
998 			return -1;
999 		}
1000 	}
1001 
1002 	entry = calloc(1, sizeof(*entry));
1003 	if (!entry) {
1004 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1005 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1006 		return -1;
1007 	}
1008 
1009 	entry->name = strdup(name);
1010 	if (entry->name == NULL) {
1011 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1012 		goto err_handle;
1013 	}
1014 
1015 	if (user_id) {
1016 		entry->user_id = strdup(user_id);
1017 		if (entry->user_id == NULL) {
1018 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1019 			goto err_handle;
1020 		}
1021 	}
1022 
1023 	/* Support specify config_param or config_file separately, or both of them. */
1024 	if (config_param) {
1025 		entry->config_param = bdev_rbd_dup_config(config_param);
1026 		if (entry->config_param == NULL) {
1027 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1028 			goto err_handle;
1029 		}
1030 	}
1031 
1032 	if (config_file) {
1033 		entry->config_file = strdup(config_file);
1034 		if (entry->config_file == NULL) {
1035 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1036 			goto err_handle;
1037 		}
1038 	}
1039 
1040 	if (key_file) {
1041 		entry->key_file = strdup(key_file);
1042 		if (entry->key_file == NULL) {
1043 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1044 			goto err_handle;
1045 		}
1046 	}
1047 
1048 	rc = rados_create(&entry->cluster, user_id);
1049 	if (rc < 0) {
1050 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1051 		goto err_handle;
1052 	}
1053 
1054 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1055 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1056 	if (entry->config_file && rc < 0) {
1057 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1058 		rados_shutdown(entry->cluster);
1059 		goto err_handle;
1060 	}
1061 
1062 	if (config_param) {
1063 		const char *const *config_entry = config_param;
1064 		while (*config_entry) {
1065 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1066 			if (rc < 0) {
1067 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1068 				rados_shutdown(entry->cluster);
1069 				goto err_handle;
1070 			}
1071 			config_entry += 2;
1072 		}
1073 	}
1074 
1075 	if (key_file) {
1076 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1077 		if (rc < 0) {
1078 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1079 			rados_shutdown(entry->cluster);
1080 			goto err_handle;
1081 		}
1082 	}
1083 
1084 	rc = rados_connect(entry->cluster);
1085 	if (rc < 0) {
1086 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1087 		rados_shutdown(entry->cluster);
1088 		goto err_handle;
1089 	}
1090 
1091 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1092 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1093 
1094 	return 0;
1095 
1096 err_handle:
1097 	bdev_rbd_cluster_free(entry);
1098 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1099 	return -1;
1100 }
1101 
1102 int
1103 bdev_rbd_unregister_cluster(const char *name)
1104 {
1105 	struct bdev_rbd_cluster *entry;
1106 	int rc = 0;
1107 
1108 	if (name == NULL) {
1109 		return -1;
1110 	}
1111 
1112 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1113 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1114 		if (strcmp(name, entry->name) == 0) {
1115 			if (entry->ref == 0) {
1116 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1117 				rados_shutdown(entry->cluster);
1118 				bdev_rbd_cluster_free(entry);
1119 			} else {
1120 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1121 					    entry->name);
1122 				rc = -1;
1123 			}
1124 
1125 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1126 			return rc;
1127 		}
1128 	}
1129 
1130 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1131 
1132 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1133 
1134 	return -1;
1135 }
1136 
1137 static void *
1138 _bdev_rbd_register_cluster(void *arg)
1139 {
1140 	struct cluster_register_info *info = arg;
1141 	void *ret = arg;
1142 	int rc;
1143 
1144 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1145 				  (const char *const *)info->config_param, (const char *)info->config_file,
1146 				  (const char *)info->key_file);
1147 	if (rc) {
1148 		ret = NULL;
1149 	}
1150 
1151 	return ret;
1152 }
1153 
1154 int
1155 bdev_rbd_register_cluster(struct cluster_register_info *info)
1156 {
1157 	assert(info != NULL);
1158 
1159 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1160 	 * resource contention */
1161 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1162 		return -1;
1163 	}
1164 
1165 	return 0;
1166 }
1167 
1168 int
1169 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1170 		const char *pool_name,
1171 		const char *const *config,
1172 		const char *rbd_name,
1173 		uint32_t block_size,
1174 		const char *cluster_name,
1175 		const struct spdk_uuid *uuid)
1176 {
1177 	struct bdev_rbd *rbd;
1178 	int ret;
1179 
1180 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1181 		return -EINVAL;
1182 	}
1183 
1184 	rbd = calloc(1, sizeof(struct bdev_rbd));
1185 	if (rbd == NULL) {
1186 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1187 		return -ENOMEM;
1188 	}
1189 
1190 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1191 	if (ret) {
1192 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1193 		free(rbd);
1194 		return ret;
1195 	}
1196 
1197 	rbd->rbd_name = strdup(rbd_name);
1198 	if (!rbd->rbd_name) {
1199 		bdev_rbd_free(rbd);
1200 		return -ENOMEM;
1201 	}
1202 
1203 	if (user_id) {
1204 		rbd->user_id = strdup(user_id);
1205 		if (!rbd->user_id) {
1206 			bdev_rbd_free(rbd);
1207 			return -ENOMEM;
1208 		}
1209 	}
1210 
1211 	if (cluster_name) {
1212 		rbd->cluster_name = strdup(cluster_name);
1213 		if (!rbd->cluster_name) {
1214 			bdev_rbd_free(rbd);
1215 			return -ENOMEM;
1216 		}
1217 	}
1218 	rbd->pool_name = strdup(pool_name);
1219 	if (!rbd->pool_name) {
1220 		bdev_rbd_free(rbd);
1221 		return -ENOMEM;
1222 	}
1223 
1224 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1225 		bdev_rbd_free(rbd);
1226 		return -ENOMEM;
1227 	}
1228 
1229 	ret = bdev_rbd_init(rbd);
1230 	if (ret < 0) {
1231 		bdev_rbd_free(rbd);
1232 		SPDK_ERRLOG("Failed to init rbd device\n");
1233 		return ret;
1234 	}
1235 
1236 	if (uuid) {
1237 		rbd->disk.uuid = *uuid;
1238 	} else {
1239 		spdk_uuid_generate(&rbd->disk.uuid);
1240 	}
1241 
1242 	if (name) {
1243 		rbd->disk.name = strdup(name);
1244 	} else {
1245 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1246 	}
1247 	if (!rbd->disk.name) {
1248 		bdev_rbd_free(rbd);
1249 		return -ENOMEM;
1250 	}
1251 	rbd->disk.product_name = "Ceph Rbd Disk";
1252 	bdev_rbd_count++;
1253 
1254 	rbd->disk.write_cache = 0;
1255 	rbd->disk.blocklen = block_size;
1256 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1257 	rbd->disk.ctxt = rbd;
1258 	rbd->disk.fn_table = &rbd_fn_table;
1259 	rbd->disk.module = &rbd_if;
1260 
1261 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1262 
1263 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1264 				bdev_rbd_destroy_cb,
1265 				sizeof(struct bdev_rbd_io_channel),
1266 				rbd_name);
1267 	ret = spdk_bdev_register(&rbd->disk);
1268 	if (ret) {
1269 		spdk_io_device_unregister(rbd, NULL);
1270 		bdev_rbd_free(rbd);
1271 		return ret;
1272 	}
1273 
1274 	*bdev = &(rbd->disk);
1275 
1276 	return ret;
1277 }
1278 
1279 void
1280 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1281 {
1282 	int rc;
1283 
1284 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1285 	if (rc != 0) {
1286 		cb_fn(cb_arg, rc);
1287 	}
1288 }
1289 
1290 static void
1291 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1292 {
1293 }
1294 
1295 int
1296 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1297 {
1298 	struct spdk_bdev_desc *desc;
1299 	struct spdk_bdev *bdev;
1300 	struct spdk_io_channel *ch;
1301 	struct bdev_rbd_io_channel *rbd_io_ch;
1302 	int rc = 0;
1303 	uint64_t new_size_in_byte;
1304 	uint64_t current_size_in_mb;
1305 
1306 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1307 	if (rc != 0) {
1308 		return rc;
1309 	}
1310 
1311 	bdev = spdk_bdev_desc_get_bdev(desc);
1312 
1313 	if (bdev->module != &rbd_if) {
1314 		rc = -EINVAL;
1315 		goto exit;
1316 	}
1317 
1318 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1319 	if (current_size_in_mb > new_size_in_mb) {
1320 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1321 		rc = -EINVAL;
1322 		goto exit;
1323 	}
1324 
1325 	ch = bdev_rbd_get_io_channel(bdev);
1326 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1327 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1328 
1329 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1330 	spdk_put_io_channel(ch);
1331 	if (rc != 0) {
1332 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1333 		goto exit;
1334 	}
1335 
1336 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1337 	if (rc != 0) {
1338 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1339 	}
1340 
1341 exit:
1342 	spdk_bdev_close(desc);
1343 	return rc;
1344 }
1345 
1346 static int
1347 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1348 {
1349 	return 0;
1350 }
1351 
1352 static void
1353 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1354 {
1355 }
1356 
1357 static int
1358 bdev_rbd_library_init(void)
1359 {
1360 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1361 				0, "bdev_rbd_poll_groups");
1362 	return 0;
1363 }
1364 
1365 static void
1366 bdev_rbd_library_fini(void)
1367 {
1368 	spdk_io_device_unregister(&rbd_if, NULL);
1369 }
1370 
1371 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1372