xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision b02581a89058ebaebe03bd0e16e3b58adfe406c1)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd_pool_ctx {
27 	rados_t *cluster_p;
28 	char *name;
29 	rados_ioctx_t io_ctx;
30 	uint32_t ref;
31 	STAILQ_ENTRY(bdev_rbd_pool_ctx) link;
32 };
33 
34 static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER(
35 			g_map_bdev_rbd_pool_ctx);
36 
37 struct bdev_rbd {
38 	struct spdk_bdev disk;
39 	char *rbd_name;
40 	char *user_id;
41 	char *pool_name;
42 	char **config;
43 
44 	rados_t cluster;
45 	rados_t *cluster_p;
46 	char *cluster_name;
47 
48 	union rbd_ctx {
49 		rados_ioctx_t io_ctx;
50 		struct bdev_rbd_pool_ctx *ctx;
51 	} rados_ctx;
52 
53 	rbd_image_t image;
54 
55 	rbd_image_info_t info;
56 	struct spdk_thread *destruct_td;
57 
58 	TAILQ_ENTRY(bdev_rbd) tailq;
59 	struct spdk_poller *reset_timer;
60 	struct spdk_bdev_io *reset_bdev_io;
61 
62 	uint64_t rbd_watch_handle;
63 };
64 
65 struct bdev_rbd_io_channel {
66 	struct bdev_rbd *disk;
67 	struct spdk_io_channel *group_ch;
68 };
69 
70 struct bdev_rbd_io {
71 	struct			spdk_thread *submit_td;
72 	enum			spdk_bdev_io_status status;
73 	rbd_completion_t	comp;
74 	size_t			total_len;
75 };
76 
77 struct bdev_rbd_cluster {
78 	char *name;
79 	char *user_id;
80 	char **config_param;
81 	char *config_file;
82 	char *key_file;
83 	char *core_mask;
84 	rados_t cluster;
85 	uint32_t ref;
86 	STAILQ_ENTRY(bdev_rbd_cluster) link;
87 };
88 
89 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
90 			g_map_bdev_rbd_cluster);
91 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
92 
93 static struct spdk_io_channel *bdev_rbd_get_io_channel(void *ctx);
94 
95 static void
96 _rbd_update_callback(void *arg)
97 {
98 	struct bdev_rbd *rbd = arg;
99 	uint64_t current_size_in_bytes = 0;
100 	int rc;
101 
102 	rc = rbd_get_size(rbd->image, &current_size_in_bytes);
103 	if (rc < 0) {
104 		SPDK_ERRLOG("Failed getting size %d\n", rc);
105 		return;
106 	}
107 
108 	rc = spdk_bdev_notify_blockcnt_change(&rbd->disk, current_size_in_bytes / rbd->disk.blocklen);
109 	if (rc != 0) {
110 		SPDK_ERRLOG("failed to notify block cnt change.\n");
111 	}
112 }
113 
114 static void
115 rbd_update_callback(void *arg)
116 {
117 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _rbd_update_callback, arg);
118 }
119 
120 static void
121 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
122 {
123 	assert(entry != NULL);
124 
125 	bdev_rbd_free_config(entry->config_param);
126 	free(entry->config_file);
127 	free(entry->key_file);
128 	free(entry->user_id);
129 	free(entry->name);
130 	free(entry->core_mask);
131 	free(entry);
132 }
133 
134 static void
135 bdev_rbd_put_cluster(rados_t **cluster)
136 {
137 	struct bdev_rbd_cluster *entry;
138 
139 	assert(cluster != NULL);
140 
141 	/* No need go through the map if *cluster equals to NULL */
142 	if (*cluster == NULL) {
143 		return;
144 	}
145 
146 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
147 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
148 		if (*cluster != &entry->cluster) {
149 			continue;
150 		}
151 
152 		assert(entry->ref > 0);
153 		entry->ref--;
154 		*cluster = NULL;
155 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
156 		return;
157 	}
158 
159 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
160 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
161 }
162 
163 static void
164 bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry)
165 {
166 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
167 
168 	assert(entry != NULL);
169 	assert(entry->ref > 0);
170 	entry->ref--;
171 	if (entry->ref == 0) {
172 		STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link);
173 		rados_ioctx_destroy(entry->io_ctx);
174 		free(entry->name);
175 		free(entry);
176 	}
177 }
178 
179 static void
180 bdev_rbd_free(struct bdev_rbd *rbd)
181 {
182 	if (!rbd) {
183 		return;
184 	}
185 
186 	if (rbd->image) {
187 		rbd_update_unwatch(rbd->image, rbd->rbd_watch_handle);
188 		rbd_flush(rbd->image);
189 		rbd_close(rbd->image);
190 	}
191 
192 	free(rbd->disk.name);
193 	free(rbd->rbd_name);
194 	free(rbd->user_id);
195 	free(rbd->pool_name);
196 	bdev_rbd_free_config(rbd->config);
197 
198 	if (rbd->cluster_name) {
199 		/* When rbd is destructed by bdev_rbd_destruct, it will not enter here
200 		 * because the ctx will already freed by bdev_rbd_free_cb in async manner.
201 		 * This path only happens during the rbd initialization procedure of rbd */
202 		if (rbd->rados_ctx.ctx) {
203 			bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx);
204 			rbd->rados_ctx.ctx = NULL;
205 		}
206 
207 		bdev_rbd_put_cluster(&rbd->cluster_p);
208 		free(rbd->cluster_name);
209 	} else if (rbd->cluster) {
210 		if (rbd->rados_ctx.io_ctx) {
211 			rados_ioctx_destroy(rbd->rados_ctx.io_ctx);
212 		}
213 		rados_shutdown(rbd->cluster);
214 	}
215 
216 	free(rbd);
217 }
218 
219 void
220 bdev_rbd_free_config(char **config)
221 {
222 	char **entry;
223 
224 	if (config) {
225 		for (entry = config; *entry; entry++) {
226 			free(*entry);
227 		}
228 		free(config);
229 	}
230 }
231 
232 char **
233 bdev_rbd_dup_config(const char *const *config)
234 {
235 	size_t count;
236 	char **copy;
237 
238 	if (!config) {
239 		return NULL;
240 	}
241 	for (count = 0; config[count]; count++) {}
242 	copy = calloc(count + 1, sizeof(*copy));
243 	if (!copy) {
244 		return NULL;
245 	}
246 	for (count = 0; config[count]; count++) {
247 		if (!(copy[count] = strdup(config[count]))) {
248 			bdev_rbd_free_config(copy);
249 			return NULL;
250 		}
251 	}
252 	return copy;
253 }
254 
255 static int
256 bdev_rados_cluster_init(const char *user_id, const char *const *config,
257 			rados_t *cluster)
258 {
259 	int ret;
260 
261 	ret = rados_create(cluster, user_id);
262 	if (ret < 0) {
263 		SPDK_ERRLOG("Failed to create rados_t struct\n");
264 		return -1;
265 	}
266 
267 	if (config) {
268 		const char *const *entry = config;
269 		while (*entry) {
270 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
271 			if (ret < 0) {
272 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
273 				rados_shutdown(*cluster);
274 				*cluster = NULL;
275 				return -1;
276 			}
277 			entry += 2;
278 		}
279 	} else {
280 		ret = rados_conf_read_file(*cluster, NULL);
281 		if (ret < 0) {
282 			SPDK_ERRLOG("Failed to read conf file\n");
283 			rados_shutdown(*cluster);
284 			*cluster = NULL;
285 			return -1;
286 		}
287 	}
288 
289 	ret = rados_connect(*cluster);
290 	if (ret < 0) {
291 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
292 		rados_shutdown(*cluster);
293 		*cluster = NULL;
294 		return -1;
295 	}
296 
297 	return 0;
298 }
299 
300 static int
301 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
302 {
303 	struct bdev_rbd_cluster *entry;
304 
305 	if (cluster == NULL) {
306 		SPDK_ERRLOG("cluster should not be NULL\n");
307 		return -1;
308 	}
309 
310 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
311 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
312 		if (strcmp(cluster_name, entry->name) == 0) {
313 			entry->ref++;
314 			*cluster = &entry->cluster;
315 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
316 			return 0;
317 		}
318 	}
319 
320 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
321 	return -1;
322 }
323 
324 static int
325 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
326 {
327 	int ret;
328 
329 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
330 	if (ret < 0) {
331 		SPDK_ERRLOG("Failed to create rados_t struct\n");
332 		return -1;
333 	}
334 
335 	return ret;
336 }
337 
338 static void *
339 bdev_rbd_cluster_handle(void *arg)
340 {
341 	void *ret = arg;
342 	struct bdev_rbd *rbd = arg;
343 	int rc;
344 
345 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
346 				     &rbd->cluster);
347 	if (rc < 0) {
348 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
349 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
350 		ret = NULL;
351 	}
352 
353 	return ret;
354 }
355 
356 static int
357 bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name,  struct bdev_rbd_pool_ctx **ctx)
358 {
359 	struct bdev_rbd_pool_ctx *entry;
360 
361 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
362 
363 	if (name == NULL || ctx == NULL) {
364 		return -1;
365 	}
366 
367 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) {
368 		if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) {
369 			entry->ref++;
370 			*ctx = entry;
371 			return 0;
372 		}
373 	}
374 
375 	entry = calloc(1, sizeof(*entry));
376 	if (!entry) {
377 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
378 		return -1;
379 	}
380 
381 	entry->name = strdup(name);
382 	if (entry->name == NULL) {
383 		SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry);
384 		goto err_handle;
385 	}
386 
387 	if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) {
388 		goto err_handle1;
389 	}
390 
391 	entry->cluster_p = cluster_p;
392 	entry->ref = 1;
393 	*ctx = entry;
394 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link);
395 
396 	return 0;
397 
398 err_handle1:
399 	free(entry->name);
400 err_handle:
401 	free(entry);
402 
403 	return -1;
404 }
405 
406 static void *
407 bdev_rbd_init_context(void *arg)
408 {
409 	struct bdev_rbd *rbd = arg;
410 	int rc;
411 	rados_ioctx_t *io_ctx = NULL;
412 
413 	if (rbd->cluster_name) {
414 		if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->rados_ctx.ctx) < 0) {
415 			SPDK_ERRLOG("Failed to create ioctx on rbd=%p with cluster_name=%s\n",
416 				    rbd, rbd->cluster_name);
417 			return NULL;
418 		}
419 		io_ctx = &rbd->rados_ctx.ctx->io_ctx;
420 	} else {
421 		if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->rados_ctx.io_ctx) < 0) {
422 			SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
423 			return NULL;
424 		}
425 		io_ctx = &rbd->rados_ctx.io_ctx;
426 	}
427 
428 	assert(io_ctx != NULL);
429 	rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
430 	if (rc < 0) {
431 		SPDK_ERRLOG("Failed to open specified rbd device\n");
432 		return NULL;
433 	}
434 
435 	rc = rbd_update_watch(rbd->image, &rbd->rbd_watch_handle, rbd_update_callback, (void *)rbd);
436 	if (rc < 0) {
437 		SPDK_ERRLOG("Failed to set up watch %d\n", rc);
438 	}
439 
440 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
441 	if (rc < 0) {
442 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
443 		return NULL;
444 	}
445 
446 	return arg;
447 }
448 
449 static int
450 bdev_rbd_init(struct bdev_rbd *rbd)
451 {
452 	int ret = 0;
453 
454 	if (!rbd->cluster_name) {
455 		rbd->cluster_p = &rbd->cluster;
456 		/* Cluster should be created in non-SPDK thread to avoid conflict between
457 		 * Rados and SPDK thread */
458 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
459 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
460 			return -1;
461 		}
462 	} else {
463 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
464 		if (ret < 0) {
465 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
466 				    rbd, rbd->cluster_name);
467 			return -1;
468 		}
469 	}
470 
471 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
472 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
473 		return -1;
474 	}
475 
476 	return ret;
477 }
478 
479 static void
480 _bdev_rbd_io_complete(void *_rbd_io)
481 {
482 	struct bdev_rbd_io *rbd_io = _rbd_io;
483 
484 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
485 }
486 
487 static void
488 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
489 {
490 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
491 	struct spdk_thread *current_thread = spdk_get_thread();
492 
493 	rbd_io->status = status;
494 	assert(rbd_io->submit_td != NULL);
495 	if (rbd_io->submit_td != current_thread) {
496 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
497 	} else {
498 		_bdev_rbd_io_complete(rbd_io);
499 	}
500 }
501 
502 static void
503 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
504 {
505 	int io_status;
506 	struct spdk_bdev_io *bdev_io;
507 	struct bdev_rbd_io *rbd_io;
508 	enum spdk_bdev_io_status bio_status;
509 
510 	bdev_io = rbd_aio_get_arg(cb);
511 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
512 	io_status = rbd_aio_get_return_value(cb);
513 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
514 
515 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
516 		if ((int)rbd_io->total_len != io_status) {
517 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
518 		}
519 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
520 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE && io_status == -EILSEQ) {
521 		bio_status = SPDK_BDEV_IO_STATUS_MISCOMPARE;
522 #endif
523 	} else if (io_status != 0) { /* For others, 0 means success */
524 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
525 	}
526 
527 	rbd_aio_release(cb);
528 
529 	bdev_rbd_io_complete(bdev_io, bio_status);
530 }
531 
532 static void
533 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
534 		    struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
535 {
536 	int ret;
537 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
538 	rbd_image_t image = disk->image;
539 
540 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
541 					&rbd_io->comp);
542 	if (ret < 0) {
543 		goto err;
544 	}
545 
546 	switch (bdev_io->type) {
547 	case SPDK_BDEV_IO_TYPE_READ:
548 		rbd_io->total_len = len;
549 		if (spdk_likely(iovcnt == 1)) {
550 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base,
551 					   rbd_io->comp);
552 		} else {
553 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
554 		}
555 		break;
556 	case SPDK_BDEV_IO_TYPE_WRITE:
557 		if (spdk_likely(iovcnt == 1)) {
558 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base,
559 					    rbd_io->comp);
560 		} else {
561 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
562 		}
563 		break;
564 	case SPDK_BDEV_IO_TYPE_UNMAP:
565 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
566 		break;
567 	case SPDK_BDEV_IO_TYPE_FLUSH:
568 		ret = rbd_aio_flush(image, rbd_io->comp);
569 		break;
570 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
571 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0,
572 					   /* op_flags */ 0);
573 		break;
574 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
575 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
576 		ret = rbd_aio_compare_and_writev(image, offset, iov /* cmp */, iovcnt,
577 						 bdev_io->u.bdev.fused_iovs /* write */,
578 						 bdev_io->u.bdev.fused_iovcnt,
579 						 rbd_io->comp, NULL,
580 						 /* op_flags */ 0);
581 		break;
582 #endif
583 	default:
584 		/* This should not happen.
585 		 * Function should only be called with supported io types in bdev_rbd_submit_request
586 		 */
587 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
588 		ret = -ENOTSUP;
589 		break;
590 	}
591 
592 	if (ret < 0) {
593 		rbd_aio_release(rbd_io->comp);
594 		goto err;
595 	}
596 
597 	return;
598 
599 err:
600 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
601 }
602 
603 static void
604 bdev_rbd_start_aio(void *ctx)
605 {
606 	struct spdk_bdev_io *bdev_io = ctx;
607 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
608 
609 	_bdev_rbd_start_aio(disk,
610 			    bdev_io,
611 			    bdev_io->u.bdev.iovs,
612 			    bdev_io->u.bdev.iovcnt,
613 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
614 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
615 }
616 
617 static int bdev_rbd_library_init(void);
618 static void bdev_rbd_library_fini(void);
619 
620 static int
621 bdev_rbd_get_ctx_size(void)
622 {
623 	return sizeof(struct bdev_rbd_io);
624 }
625 
626 static struct spdk_bdev_module rbd_if = {
627 	.name = "rbd",
628 	.module_init = bdev_rbd_library_init,
629 	.module_fini = bdev_rbd_library_fini,
630 	.get_ctx_size = bdev_rbd_get_ctx_size,
631 
632 };
633 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
634 
635 static int bdev_rbd_reset_timer(void *arg);
636 
637 static void
638 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
639 			       void *cb_arg, int rc)
640 {
641 	struct bdev_rbd *disk = cb_arg;
642 	enum spdk_bdev_io_status bio_status;
643 
644 	if (rc == 0 && current_qd > 0) {
645 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
646 		return;
647 	}
648 
649 	if (rc != 0) {
650 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
651 	} else {
652 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
653 	}
654 
655 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
656 	disk->reset_bdev_io = NULL;
657 }
658 
659 static int
660 bdev_rbd_reset_timer(void *arg)
661 {
662 	struct bdev_rbd *disk = arg;
663 
664 	spdk_poller_unregister(&disk->reset_timer);
665 
666 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
667 
668 	return SPDK_POLLER_BUSY;
669 }
670 
671 static void
672 bdev_rbd_reset(void *ctx)
673 {
674 	struct spdk_bdev_io *bdev_io = ctx;
675 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
676 
677 	/*
678 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
679 	 * poller to wait for in-flight I/O to complete.
680 	 */
681 	assert(disk->reset_bdev_io == NULL);
682 	disk->reset_bdev_io = bdev_io;
683 
684 	bdev_rbd_reset_timer(disk);
685 }
686 
687 static void
688 _bdev_rbd_destruct_done(void *io_device)
689 {
690 	struct bdev_rbd *rbd = io_device;
691 
692 	assert(rbd != NULL);
693 
694 	spdk_bdev_destruct_done(&rbd->disk, 0);
695 	bdev_rbd_free(rbd);
696 }
697 
698 static void
699 bdev_rbd_free_cb(void *io_device)
700 {
701 	struct bdev_rbd *rbd = io_device;
702 
703 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
704 
705 	/* free the ctx */
706 	if (rbd->cluster_name && rbd->rados_ctx.ctx) {
707 		bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx);
708 		rbd->rados_ctx.ctx = NULL;
709 	}
710 
711 	/* The io device has been unregistered.  Send a message back to the
712 	 * original thread that started the destruct operation, so that the
713 	 * bdev unregister callback is invoked on the same thread that started
714 	 * this whole process.
715 	 */
716 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
717 }
718 
719 static void
720 _bdev_rbd_destruct(void *ctx)
721 {
722 	struct bdev_rbd *rbd = ctx;
723 
724 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
725 }
726 
727 static int
728 bdev_rbd_destruct(void *ctx)
729 {
730 	struct bdev_rbd *rbd = ctx;
731 
732 	/* Start the destruct operation on the rbd bdev's
733 	 * main thread.  This guarantees it will only start
734 	 * executing after any messages related to channel
735 	 * deletions have finished completing.  *Always*
736 	 * send a message, even if this function gets called
737 	 * from the main thread, in case there are pending
738 	 * channel delete messages in flight to this thread.
739 	 */
740 	assert(rbd->destruct_td == NULL);
741 	rbd->destruct_td = spdk_get_thread();
742 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _bdev_rbd_destruct, rbd);
743 
744 	/* Return 1 to indicate the destruct path is asynchronous. */
745 	return 1;
746 }
747 
748 static void
749 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
750 		    bool success)
751 {
752 	if (!success) {
753 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
754 		return;
755 	}
756 
757 	bdev_rbd_start_aio(bdev_io);
758 }
759 
760 static void
761 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
762 {
763 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
764 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
765 
766 	rbd_io->submit_td = submit_td;
767 	switch (bdev_io->type) {
768 	case SPDK_BDEV_IO_TYPE_READ:
769 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
770 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
771 		break;
772 
773 	case SPDK_BDEV_IO_TYPE_WRITE:
774 	case SPDK_BDEV_IO_TYPE_UNMAP:
775 	case SPDK_BDEV_IO_TYPE_FLUSH:
776 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
777 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
778 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
779 #endif
780 		bdev_rbd_start_aio(bdev_io);
781 		break;
782 
783 	case SPDK_BDEV_IO_TYPE_RESET:
784 		spdk_thread_exec_msg(spdk_thread_get_app_thread(), bdev_rbd_reset, bdev_io);
785 		break;
786 
787 	default:
788 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
789 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
790 		break;
791 	}
792 }
793 
794 static bool
795 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
796 {
797 	switch (io_type) {
798 	case SPDK_BDEV_IO_TYPE_READ:
799 	case SPDK_BDEV_IO_TYPE_WRITE:
800 	case SPDK_BDEV_IO_TYPE_UNMAP:
801 	case SPDK_BDEV_IO_TYPE_FLUSH:
802 	case SPDK_BDEV_IO_TYPE_RESET:
803 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
804 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
805 	case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
806 #endif
807 		return true;
808 
809 	default:
810 		return false;
811 	}
812 }
813 
814 static int
815 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
816 {
817 	struct bdev_rbd_io_channel *ch = ctx_buf;
818 	struct bdev_rbd *disk = io_device;
819 
820 	ch->disk = disk;
821 	ch->group_ch = spdk_get_io_channel(&rbd_if);
822 	assert(ch->group_ch != NULL);
823 
824 	return 0;
825 }
826 
827 static void
828 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
829 {
830 	struct bdev_rbd_io_channel *ch = ctx_buf;
831 
832 	spdk_put_io_channel(ch->group_ch);
833 }
834 
835 static struct spdk_io_channel *
836 bdev_rbd_get_io_channel(void *ctx)
837 {
838 	struct bdev_rbd *rbd_bdev = ctx;
839 
840 	return spdk_get_io_channel(rbd_bdev);
841 }
842 
843 static void
844 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
845 {
846 	struct bdev_rbd_cluster *entry;
847 
848 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
849 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
850 		if (strcmp(cluster_name, entry->name)) {
851 			continue;
852 		}
853 		if (entry->user_id) {
854 			spdk_json_write_named_string(w, "user_id", entry->user_id);
855 		}
856 
857 		if (entry->config_param) {
858 			char **config_entry = entry->config_param;
859 
860 			spdk_json_write_named_object_begin(w, "config_param");
861 			while (*config_entry) {
862 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
863 				config_entry += 2;
864 			}
865 			spdk_json_write_object_end(w);
866 		}
867 		if (entry->config_file) {
868 			spdk_json_write_named_string(w, "config_file", entry->config_file);
869 		}
870 		if (entry->key_file) {
871 			spdk_json_write_named_string(w, "key_file", entry->key_file);
872 		}
873 
874 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
875 		return;
876 	}
877 
878 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
879 }
880 
881 static int
882 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
883 {
884 	struct bdev_rbd *rbd_bdev = ctx;
885 
886 	spdk_json_write_named_object_begin(w, "rbd");
887 
888 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
889 
890 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
891 
892 	if (rbd_bdev->cluster_name) {
893 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
894 		goto end;
895 	}
896 
897 	if (rbd_bdev->user_id) {
898 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
899 	}
900 
901 	if (rbd_bdev->config) {
902 		char **entry = rbd_bdev->config;
903 
904 		spdk_json_write_named_object_begin(w, "config");
905 		while (*entry) {
906 			spdk_json_write_named_string(w, entry[0], entry[1]);
907 			entry += 2;
908 		}
909 		spdk_json_write_object_end(w);
910 	}
911 
912 end:
913 	spdk_json_write_object_end(w);
914 
915 	return 0;
916 }
917 
918 static void
919 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
920 {
921 	struct bdev_rbd *rbd = bdev->ctxt;
922 
923 	spdk_json_write_object_begin(w);
924 
925 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
926 
927 	spdk_json_write_named_object_begin(w, "params");
928 	spdk_json_write_named_string(w, "name", bdev->name);
929 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
930 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
931 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
932 	if (rbd->user_id) {
933 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
934 	}
935 
936 	if (rbd->config) {
937 		char **entry = rbd->config;
938 
939 		spdk_json_write_named_object_begin(w, "config");
940 		while (*entry) {
941 			spdk_json_write_named_string(w, entry[0], entry[1]);
942 			entry += 2;
943 		}
944 		spdk_json_write_object_end(w);
945 	}
946 
947 	spdk_json_write_named_uuid(w, "uuid", &bdev->uuid);
948 
949 	spdk_json_write_object_end(w);
950 
951 	spdk_json_write_object_end(w);
952 }
953 
954 static void
955 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
956 {
957 	assert(entry != NULL);
958 
959 	spdk_json_write_object_begin(w);
960 	spdk_json_write_named_string(w, "cluster_name", entry->name);
961 
962 	if (entry->user_id) {
963 		spdk_json_write_named_string(w, "user_id", entry->user_id);
964 	}
965 
966 	if (entry->config_param) {
967 		char **config_entry = entry->config_param;
968 
969 		spdk_json_write_named_object_begin(w, "config_param");
970 		while (*config_entry) {
971 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
972 			config_entry += 2;
973 		}
974 		spdk_json_write_object_end(w);
975 	}
976 	if (entry->config_file) {
977 		spdk_json_write_named_string(w, "config_file", entry->config_file);
978 	}
979 	if (entry->key_file) {
980 		spdk_json_write_named_string(w, "key_file", entry->key_file);
981 	}
982 
983 	if (entry->core_mask) {
984 		spdk_json_write_named_string(w, "core_mask", entry->core_mask);
985 	}
986 
987 	spdk_json_write_object_end(w);
988 }
989 
990 int
991 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
992 {
993 	struct bdev_rbd_cluster *entry;
994 	struct spdk_json_write_ctx *w;
995 
996 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
997 
998 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
999 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1000 		return -ENOENT;
1001 	}
1002 
1003 	/* If cluster name is provided */
1004 	if (name) {
1005 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1006 			if (strcmp(name, entry->name) == 0) {
1007 				w = spdk_jsonrpc_begin_result(request);
1008 				dump_single_cluster_entry(entry, w);
1009 				spdk_jsonrpc_end_result(request, w);
1010 
1011 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1012 				return 0;
1013 			}
1014 		}
1015 
1016 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1017 		return -ENOENT;
1018 	}
1019 
1020 	w = spdk_jsonrpc_begin_result(request);
1021 	spdk_json_write_array_begin(w);
1022 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1023 		dump_single_cluster_entry(entry, w);
1024 	}
1025 	spdk_json_write_array_end(w);
1026 	spdk_jsonrpc_end_result(request, w);
1027 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1028 
1029 	return 0;
1030 }
1031 
1032 static const struct spdk_bdev_fn_table rbd_fn_table = {
1033 	.destruct		= bdev_rbd_destruct,
1034 	.submit_request		= bdev_rbd_submit_request,
1035 	.io_type_supported	= bdev_rbd_io_type_supported,
1036 	.get_io_channel		= bdev_rbd_get_io_channel,
1037 	.dump_info_json		= bdev_rbd_dump_info_json,
1038 	.write_config_json	= bdev_rbd_write_config_json,
1039 };
1040 
1041 static int
1042 rbd_thread_set_cpumask(struct spdk_cpuset *set)
1043 {
1044 #ifdef __linux__
1045 	uint32_t lcore;
1046 	cpu_set_t mask;
1047 
1048 	assert(set != NULL);
1049 	CPU_ZERO(&mask);
1050 
1051 	/* get the core id on current spdk_cpuset and set to cpu_set_t */
1052 	for (lcore = 0; lcore < SPDK_CPUSET_SIZE; lcore++) {
1053 		if (spdk_cpuset_get_cpu(set, lcore)) {
1054 			CPU_SET(lcore, &mask);
1055 		}
1056 	}
1057 
1058 	/* change current thread core mask */
1059 	if (sched_setaffinity(0, sizeof(mask), &mask) < 0) {
1060 		SPDK_ERRLOG("Set non SPDK thread cpu mask error (errno=%d)\n", errno);
1061 		return -1;
1062 	}
1063 
1064 	return 0;
1065 #else
1066 	SPDK_ERRLOG("SPDK non spdk thread cpumask setup supports only Linux platform now.\n");
1067 	return -ENOTSUP;
1068 #endif
1069 }
1070 
1071 
1072 static int
1073 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
1074 		     const char *config_file, const char *key_file, const char *core_mask)
1075 {
1076 	struct bdev_rbd_cluster *entry;
1077 	struct spdk_cpuset rbd_core_mask = {};
1078 	int rc;
1079 
1080 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1081 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1082 		if (strcmp(name, entry->name) == 0) {
1083 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
1084 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1085 			return -1;
1086 		}
1087 	}
1088 
1089 	entry = calloc(1, sizeof(*entry));
1090 	if (!entry) {
1091 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1092 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1093 		return -1;
1094 	}
1095 
1096 	entry->name = strdup(name);
1097 	if (entry->name == NULL) {
1098 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1099 		goto err_handle;
1100 	}
1101 
1102 	if (user_id) {
1103 		entry->user_id = strdup(user_id);
1104 		if (entry->user_id == NULL) {
1105 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1106 			goto err_handle;
1107 		}
1108 	}
1109 
1110 	/* Support specify config_param or config_file separately, or both of them. */
1111 	if (config_param) {
1112 		entry->config_param = bdev_rbd_dup_config(config_param);
1113 		if (entry->config_param == NULL) {
1114 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1115 			goto err_handle;
1116 		}
1117 	}
1118 
1119 	if (config_file) {
1120 		entry->config_file = strdup(config_file);
1121 		if (entry->config_file == NULL) {
1122 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1123 			goto err_handle;
1124 		}
1125 	}
1126 
1127 	if (key_file) {
1128 		entry->key_file = strdup(key_file);
1129 		if (entry->key_file == NULL) {
1130 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1131 			goto err_handle;
1132 		}
1133 	}
1134 
1135 	if (core_mask) {
1136 		entry->core_mask = strdup(core_mask);
1137 		if (entry->core_mask == NULL) {
1138 			SPDK_ERRLOG("Core_mask=%s allocation failed on entry = %p\n", core_mask, entry);
1139 			goto err_handle;
1140 		}
1141 
1142 		if (spdk_cpuset_parse(&rbd_core_mask, entry->core_mask) < 0) {
1143 			SPDK_ERRLOG("Invalid cpumask=%s on entry = %p\n", entry->core_mask, entry);
1144 			goto err_handle;
1145 		}
1146 
1147 		if (rbd_thread_set_cpumask(&rbd_core_mask) < 0) {
1148 			SPDK_ERRLOG("Failed to change rbd threads to core_mask %s on entry = %p\n", core_mask, entry);
1149 			goto err_handle;
1150 		}
1151 	}
1152 
1153 
1154 	/* If rbd thread core mask is given, rados_create() must execute with
1155 	 * the affinity set by rbd_thread_set_cpumask(). The affinity set
1156 	 * by rbd_thread_set_cpumask() will be reverted once rbd_register_cluster() returns
1157 	 * and when we leave the spdk_call_unaffinitized context. */
1158 	rc = rados_create(&entry->cluster, user_id);
1159 	if (rc < 0) {
1160 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1161 		goto err_handle;
1162 	}
1163 
1164 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1165 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
1166 	if (entry->config_file && rc < 0) {
1167 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1168 		rados_shutdown(entry->cluster);
1169 		goto err_handle;
1170 	}
1171 
1172 	if (config_param) {
1173 		const char *const *config_entry = config_param;
1174 		while (*config_entry) {
1175 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1176 			if (rc < 0) {
1177 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1178 				rados_shutdown(entry->cluster);
1179 				goto err_handle;
1180 			}
1181 			config_entry += 2;
1182 		}
1183 	}
1184 
1185 	if (key_file) {
1186 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
1187 		if (rc < 0) {
1188 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1189 			rados_shutdown(entry->cluster);
1190 			goto err_handle;
1191 		}
1192 	}
1193 
1194 	rc = rados_connect(entry->cluster);
1195 	if (rc < 0) {
1196 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1197 		rados_shutdown(entry->cluster);
1198 		goto err_handle;
1199 	}
1200 
1201 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1202 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1203 
1204 	return 0;
1205 
1206 err_handle:
1207 	bdev_rbd_cluster_free(entry);
1208 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1209 	return -1;
1210 }
1211 
1212 int
1213 bdev_rbd_unregister_cluster(const char *name)
1214 {
1215 	struct bdev_rbd_cluster *entry;
1216 	int rc = 0;
1217 
1218 	if (name == NULL) {
1219 		return -1;
1220 	}
1221 
1222 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1223 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1224 		if (strcmp(name, entry->name) == 0) {
1225 			if (entry->ref == 0) {
1226 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1227 				rados_shutdown(entry->cluster);
1228 				bdev_rbd_cluster_free(entry);
1229 			} else {
1230 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1231 					    entry->name);
1232 				rc = -1;
1233 			}
1234 
1235 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1236 			return rc;
1237 		}
1238 	}
1239 
1240 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1241 
1242 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1243 
1244 	return -1;
1245 }
1246 
1247 static void *
1248 _bdev_rbd_register_cluster(void *arg)
1249 {
1250 	struct cluster_register_info *info = arg;
1251 	void *ret = arg;
1252 	int rc;
1253 
1254 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1255 				  (const char *const *)info->config_param, (const char *)info->config_file,
1256 				  (const char *)info->key_file, info->core_mask);
1257 	if (rc) {
1258 		ret = NULL;
1259 	}
1260 
1261 	return ret;
1262 }
1263 
1264 int
1265 bdev_rbd_register_cluster(struct cluster_register_info *info)
1266 {
1267 	assert(info != NULL);
1268 
1269 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1270 	 * resource contention */
1271 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1272 		return -1;
1273 	}
1274 
1275 	return 0;
1276 }
1277 
1278 int
1279 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1280 		const char *pool_name,
1281 		const char *const *config,
1282 		const char *rbd_name,
1283 		uint32_t block_size,
1284 		const char *cluster_name,
1285 		const struct spdk_uuid *uuid)
1286 {
1287 	struct bdev_rbd *rbd;
1288 	int ret;
1289 
1290 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1291 		return -EINVAL;
1292 	}
1293 
1294 	rbd = calloc(1, sizeof(struct bdev_rbd));
1295 	if (rbd == NULL) {
1296 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1297 		return -ENOMEM;
1298 	}
1299 
1300 	rbd->rbd_name = strdup(rbd_name);
1301 	if (!rbd->rbd_name) {
1302 		bdev_rbd_free(rbd);
1303 		return -ENOMEM;
1304 	}
1305 
1306 	if (user_id) {
1307 		rbd->user_id = strdup(user_id);
1308 		if (!rbd->user_id) {
1309 			bdev_rbd_free(rbd);
1310 			return -ENOMEM;
1311 		}
1312 	}
1313 
1314 	if (cluster_name) {
1315 		rbd->cluster_name = strdup(cluster_name);
1316 		if (!rbd->cluster_name) {
1317 			bdev_rbd_free(rbd);
1318 			return -ENOMEM;
1319 		}
1320 	}
1321 	rbd->pool_name = strdup(pool_name);
1322 	if (!rbd->pool_name) {
1323 		bdev_rbd_free(rbd);
1324 		return -ENOMEM;
1325 	}
1326 
1327 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1328 		bdev_rbd_free(rbd);
1329 		return -ENOMEM;
1330 	}
1331 
1332 	ret = bdev_rbd_init(rbd);
1333 	if (ret < 0) {
1334 		bdev_rbd_free(rbd);
1335 		SPDK_ERRLOG("Failed to init rbd device\n");
1336 		return ret;
1337 	}
1338 
1339 	rbd->disk.uuid = *uuid;
1340 	if (name) {
1341 		rbd->disk.name = strdup(name);
1342 	} else {
1343 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1344 	}
1345 	if (!rbd->disk.name) {
1346 		bdev_rbd_free(rbd);
1347 		return -ENOMEM;
1348 	}
1349 	rbd->disk.product_name = "Ceph Rbd Disk";
1350 	bdev_rbd_count++;
1351 
1352 	rbd->disk.write_cache = 0;
1353 	rbd->disk.blocklen = block_size;
1354 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1355 	rbd->disk.ctxt = rbd;
1356 	rbd->disk.fn_table = &rbd_fn_table;
1357 	rbd->disk.module = &rbd_if;
1358 
1359 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1360 
1361 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1362 				bdev_rbd_destroy_cb,
1363 				sizeof(struct bdev_rbd_io_channel),
1364 				rbd_name);
1365 	ret = spdk_bdev_register(&rbd->disk);
1366 	if (ret) {
1367 		spdk_io_device_unregister(rbd, NULL);
1368 		bdev_rbd_free(rbd);
1369 		return ret;
1370 	}
1371 
1372 	*bdev = &(rbd->disk);
1373 
1374 	return ret;
1375 }
1376 
1377 void
1378 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1379 {
1380 	int rc;
1381 
1382 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1383 	if (rc != 0) {
1384 		cb_fn(cb_arg, rc);
1385 	}
1386 }
1387 
1388 static void
1389 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1390 {
1391 }
1392 
1393 int
1394 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1395 {
1396 	struct spdk_bdev_desc *desc;
1397 	struct spdk_bdev *bdev;
1398 	struct bdev_rbd *rbd;
1399 	int rc = 0;
1400 	uint64_t new_size_in_byte;
1401 	uint64_t current_size_in_mb;
1402 
1403 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1404 	if (rc != 0) {
1405 		return rc;
1406 	}
1407 
1408 	bdev = spdk_bdev_desc_get_bdev(desc);
1409 
1410 	if (bdev->module != &rbd_if) {
1411 		rc = -EINVAL;
1412 		goto exit;
1413 	}
1414 
1415 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1416 	if (current_size_in_mb > new_size_in_mb) {
1417 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1418 		rc = -EINVAL;
1419 		goto exit;
1420 	}
1421 
1422 	rbd = SPDK_CONTAINEROF(bdev, struct bdev_rbd, disk);
1423 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1424 	rc = rbd_resize(rbd->image, new_size_in_byte);
1425 	if (rc != 0) {
1426 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1427 		goto exit;
1428 	}
1429 
1430 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1431 	if (rc != 0) {
1432 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1433 	}
1434 
1435 exit:
1436 	spdk_bdev_close(desc);
1437 	return rc;
1438 }
1439 
1440 static int
1441 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1442 {
1443 	return 0;
1444 }
1445 
1446 static void
1447 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1448 {
1449 }
1450 
1451 static int
1452 bdev_rbd_library_init(void)
1453 {
1454 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1455 				0, "bdev_rbd_poll_groups");
1456 	return 0;
1457 }
1458 
1459 static void
1460 bdev_rbd_library_fini(void)
1461 {
1462 	spdk_io_device_unregister(&rbd_if, NULL);
1463 }
1464 
1465 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1466