xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 6b79f76769c83dacf162ff2ca2cf1cf133896835)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "bdev_rbd.h"
9 
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12 
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20 
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23 
24 static int bdev_rbd_count = 0;
25 
26 struct bdev_rbd {
27 	struct spdk_bdev disk;
28 	char *rbd_name;
29 	char *user_id;
30 	char *pool_name;
31 	char **config;
32 
33 	rados_t cluster;
34 	rados_t *cluster_p;
35 	char *cluster_name;
36 
37 	rados_ioctx_t io_ctx;
38 	rbd_image_t image;
39 
40 	rbd_image_info_t info;
41 	struct spdk_thread *main_td;
42 	struct spdk_thread *destruct_td;
43 
44 	TAILQ_ENTRY(bdev_rbd) tailq;
45 	struct spdk_poller *reset_timer;
46 	struct spdk_bdev_io *reset_bdev_io;
47 };
48 
49 struct bdev_rbd_io_channel {
50 	struct bdev_rbd *disk;
51 	struct spdk_io_channel *group_ch;
52 };
53 
54 struct bdev_rbd_io {
55 	struct			spdk_thread *submit_td;
56 	enum			spdk_bdev_io_status status;
57 	rbd_completion_t	comp;
58 	size_t			total_len;
59 };
60 
61 struct bdev_rbd_cluster {
62 	char *name;
63 	char *user_id;
64 	char **config_param;
65 	char *config_file;
66 	char *key_file;
67 	rados_t cluster;
68 	uint32_t ref;
69 	STAILQ_ENTRY(bdev_rbd_cluster) link;
70 };
71 
72 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
73 			g_map_bdev_rbd_cluster);
74 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
75 
76 static void
77 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
78 {
79 	assert(entry != NULL);
80 
81 	bdev_rbd_free_config(entry->config_param);
82 	free(entry->config_file);
83 	free(entry->key_file);
84 	free(entry->user_id);
85 	free(entry->name);
86 	free(entry);
87 }
88 
89 static void
90 bdev_rbd_put_cluster(rados_t **cluster)
91 {
92 	struct bdev_rbd_cluster *entry;
93 
94 	assert(cluster != NULL);
95 
96 	/* No need go through the map if *cluster equals to NULL */
97 	if (*cluster == NULL) {
98 		return;
99 	}
100 
101 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
102 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
103 		if (*cluster != &entry->cluster) {
104 			continue;
105 		}
106 
107 		assert(entry->ref > 0);
108 		entry->ref--;
109 		*cluster = NULL;
110 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
111 		return;
112 	}
113 
114 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
115 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
116 }
117 
118 static void
119 bdev_rbd_free(struct bdev_rbd *rbd)
120 {
121 	if (!rbd) {
122 		return;
123 	}
124 
125 	if (rbd->image) {
126 		rbd_flush(rbd->image);
127 		rbd_close(rbd->image);
128 	}
129 
130 	free(rbd->disk.name);
131 	free(rbd->rbd_name);
132 	free(rbd->user_id);
133 	free(rbd->pool_name);
134 	bdev_rbd_free_config(rbd->config);
135 
136 	if (rbd->io_ctx) {
137 		rados_ioctx_destroy(rbd->io_ctx);
138 	}
139 
140 	if (rbd->cluster_name) {
141 		bdev_rbd_put_cluster(&rbd->cluster_p);
142 		free(rbd->cluster_name);
143 	} else if (rbd->cluster) {
144 		rados_shutdown(rbd->cluster);
145 	}
146 
147 	free(rbd);
148 }
149 
150 void
151 bdev_rbd_free_config(char **config)
152 {
153 	char **entry;
154 
155 	if (config) {
156 		for (entry = config; *entry; entry++) {
157 			free(*entry);
158 		}
159 		free(config);
160 	}
161 }
162 
163 char **
164 bdev_rbd_dup_config(const char *const *config)
165 {
166 	size_t count;
167 	char **copy;
168 
169 	if (!config) {
170 		return NULL;
171 	}
172 	for (count = 0; config[count]; count++) {}
173 	copy = calloc(count + 1, sizeof(*copy));
174 	if (!copy) {
175 		return NULL;
176 	}
177 	for (count = 0; config[count]; count++) {
178 		if (!(copy[count] = strdup(config[count]))) {
179 			bdev_rbd_free_config(copy);
180 			return NULL;
181 		}
182 	}
183 	return copy;
184 }
185 
186 static int
187 bdev_rados_cluster_init(const char *user_id, const char *const *config,
188 			rados_t *cluster)
189 {
190 	int ret;
191 
192 	ret = rados_create(cluster, user_id);
193 	if (ret < 0) {
194 		SPDK_ERRLOG("Failed to create rados_t struct\n");
195 		return -1;
196 	}
197 
198 	if (config) {
199 		const char *const *entry = config;
200 		while (*entry) {
201 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
202 			if (ret < 0) {
203 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
204 				rados_shutdown(*cluster);
205 				*cluster = NULL;
206 				return -1;
207 			}
208 			entry += 2;
209 		}
210 	} else {
211 		ret = rados_conf_read_file(*cluster, NULL);
212 		if (ret < 0) {
213 			SPDK_ERRLOG("Failed to read conf file\n");
214 			rados_shutdown(*cluster);
215 			*cluster = NULL;
216 			return -1;
217 		}
218 	}
219 
220 	ret = rados_connect(*cluster);
221 	if (ret < 0) {
222 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
223 		rados_shutdown(*cluster);
224 		*cluster = NULL;
225 		return -1;
226 	}
227 
228 	return 0;
229 }
230 
231 static int
232 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
233 {
234 	struct bdev_rbd_cluster *entry;
235 
236 	if (cluster == NULL) {
237 		SPDK_ERRLOG("cluster should not be NULL\n");
238 		return -1;
239 	}
240 
241 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
242 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
243 		if (strcmp(cluster_name, entry->name) == 0) {
244 			entry->ref++;
245 			*cluster = &entry->cluster;
246 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
247 			return 0;
248 		}
249 	}
250 
251 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
252 	return -1;
253 }
254 
255 static int
256 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
257 {
258 	int ret;
259 
260 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
261 	if (ret < 0) {
262 		SPDK_ERRLOG("Failed to create rados_t struct\n");
263 		return -1;
264 	}
265 
266 	return ret;
267 }
268 
269 static void *
270 bdev_rbd_cluster_handle(void *arg)
271 {
272 	void *ret = arg;
273 	struct bdev_rbd *rbd = arg;
274 	int rc;
275 
276 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
277 				     &rbd->cluster);
278 	if (rc < 0) {
279 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
280 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
281 		ret = NULL;
282 	}
283 
284 	return ret;
285 }
286 
287 static void *
288 bdev_rbd_init_context(void *arg)
289 {
290 	struct bdev_rbd *rbd = arg;
291 	int rc;
292 
293 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
294 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
295 		return NULL;
296 	}
297 
298 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
299 	if (rc < 0) {
300 		SPDK_ERRLOG("Failed to open specified rbd device\n");
301 		return NULL;
302 	}
303 
304 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
305 	if (rc < 0) {
306 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
307 		return NULL;
308 	}
309 
310 	return arg;
311 }
312 
313 static int
314 bdev_rbd_init(struct bdev_rbd *rbd)
315 {
316 	int ret = 0;
317 
318 	if (!rbd->cluster_name) {
319 		rbd->cluster_p = &rbd->cluster;
320 		/* Cluster should be created in non-SPDK thread to avoid conflict between
321 		 * Rados and SPDK thread */
322 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
323 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
324 			return -1;
325 		}
326 	} else {
327 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
328 		if (ret < 0) {
329 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
330 				    rbd, rbd->cluster_name);
331 			return -1;
332 		}
333 	}
334 
335 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
336 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
337 		return -1;
338 	}
339 
340 	rbd->main_td = spdk_get_thread();
341 
342 	return ret;
343 }
344 
345 static void
346 _bdev_rbd_io_complete(void *_rbd_io)
347 {
348 	struct bdev_rbd_io *rbd_io = _rbd_io;
349 
350 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
351 }
352 
353 static void
354 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
355 {
356 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
357 	struct spdk_thread *current_thread = spdk_get_thread();
358 
359 	rbd_io->status = status;
360 	assert(rbd_io->submit_td != NULL);
361 	if (rbd_io->submit_td != current_thread) {
362 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
363 	} else {
364 		_bdev_rbd_io_complete(rbd_io);
365 	}
366 }
367 
368 static void
369 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
370 {
371 	int io_status;
372 	struct spdk_bdev_io *bdev_io;
373 	struct bdev_rbd_io *rbd_io;
374 	enum spdk_bdev_io_status bio_status;
375 
376 	bdev_io = rbd_aio_get_arg(cb);
377 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
378 	io_status = rbd_aio_get_return_value(cb);
379 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
380 
381 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
382 		if ((int)rbd_io->total_len != io_status) {
383 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
384 		}
385 	} else {
386 		/* For others, 0 means success */
387 		if (io_status != 0) {
388 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
389 		}
390 	}
391 
392 	rbd_aio_release(cb);
393 
394 	bdev_rbd_io_complete(bdev_io, bio_status);
395 }
396 
397 static void
398 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
399 		    struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
400 {
401 	int ret;
402 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
403 	rbd_image_t image = disk->image;
404 
405 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
406 					&rbd_io->comp);
407 	if (ret < 0) {
408 		goto err;
409 	}
410 
411 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
412 		rbd_io->total_len = len;
413 		if (spdk_likely(iovcnt == 1)) {
414 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
415 		} else {
416 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
417 		}
418 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
419 		if (spdk_likely(iovcnt == 1)) {
420 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
421 		} else {
422 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
423 		}
424 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) {
425 		ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
426 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
427 		ret = rbd_aio_flush(image, rbd_io->comp);
428 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) {
429 		ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0);
430 	}
431 
432 	if (ret < 0) {
433 		rbd_aio_release(rbd_io->comp);
434 		goto err;
435 	}
436 
437 	return;
438 
439 err:
440 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
441 }
442 
443 static void
444 bdev_rbd_start_aio(void *ctx)
445 {
446 	struct spdk_bdev_io *bdev_io = ctx;
447 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
448 
449 	_bdev_rbd_start_aio(disk,
450 			    bdev_io,
451 			    bdev_io->u.bdev.iovs,
452 			    bdev_io->u.bdev.iovcnt,
453 			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
454 			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
455 }
456 
457 static int bdev_rbd_library_init(void);
458 static void bdev_rbd_library_fini(void);
459 
460 static int
461 bdev_rbd_get_ctx_size(void)
462 {
463 	return sizeof(struct bdev_rbd_io);
464 }
465 
466 static struct spdk_bdev_module rbd_if = {
467 	.name = "rbd",
468 	.module_init = bdev_rbd_library_init,
469 	.module_fini = bdev_rbd_library_fini,
470 	.get_ctx_size = bdev_rbd_get_ctx_size,
471 
472 };
473 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
474 
475 static int bdev_rbd_reset_timer(void *arg);
476 
477 static void
478 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
479 			       void *cb_arg, int rc)
480 {
481 	struct bdev_rbd *disk = cb_arg;
482 	enum spdk_bdev_io_status bio_status;
483 
484 	if (rc == 0 && current_qd > 0) {
485 		disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
486 		return;
487 	}
488 
489 	if (rc != 0) {
490 		bio_status = SPDK_BDEV_IO_STATUS_FAILED;
491 	} else {
492 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
493 	}
494 
495 	bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
496 	disk->reset_bdev_io = NULL;
497 }
498 
499 static int
500 bdev_rbd_reset_timer(void *arg)
501 {
502 	struct bdev_rbd *disk = arg;
503 
504 	spdk_poller_unregister(&disk->reset_timer);
505 
506 	spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
507 
508 	return SPDK_POLLER_BUSY;
509 }
510 
511 static void
512 bdev_rbd_reset(void *ctx)
513 {
514 	struct spdk_bdev_io *bdev_io = ctx;
515 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
516 
517 	/*
518 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
519 	 * poller to wait for in-flight I/O to complete.
520 	 */
521 	assert(disk->reset_bdev_io == NULL);
522 	disk->reset_bdev_io = bdev_io;
523 
524 	bdev_rbd_reset_timer(disk);
525 }
526 
527 static void
528 _bdev_rbd_destruct_done(void *io_device)
529 {
530 	struct bdev_rbd *rbd = io_device;
531 
532 	assert(rbd != NULL);
533 
534 	spdk_bdev_destruct_done(&rbd->disk, 0);
535 	bdev_rbd_free(rbd);
536 }
537 
538 static void
539 bdev_rbd_free_cb(void *io_device)
540 {
541 	struct bdev_rbd *rbd = io_device;
542 
543 	/* The io device has been unregistered.  Send a message back to the
544 	 * original thread that started the destruct operation, so that the
545 	 * bdev unregister callback is invoked on the same thread that started
546 	 * this whole process.
547 	 */
548 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
549 }
550 
551 static void
552 _bdev_rbd_destruct(void *ctx)
553 {
554 	struct bdev_rbd *rbd = ctx;
555 
556 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
557 }
558 
559 static int
560 bdev_rbd_destruct(void *ctx)
561 {
562 	struct bdev_rbd *rbd = ctx;
563 	struct spdk_thread *td;
564 
565 	if (rbd->main_td == NULL) {
566 		td = spdk_get_thread();
567 	} else {
568 		td = rbd->main_td;
569 	}
570 
571 	/* Start the destruct operation on the rbd bdev's
572 	 * main thread.  This guarantees it will only start
573 	 * executing after any messages related to channel
574 	 * deletions have finished completing.  *Always*
575 	 * send a message, even if this function gets called
576 	 * from the main thread, in case there are pending
577 	 * channel delete messages in flight to this thread.
578 	 */
579 	assert(rbd->destruct_td == NULL);
580 	rbd->destruct_td = td;
581 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
582 
583 	/* Return 1 to indicate the destruct path is asynchronous. */
584 	return 1;
585 }
586 
587 static void
588 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
589 		    bool success)
590 {
591 	if (!success) {
592 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
593 		return;
594 	}
595 
596 	bdev_rbd_start_aio(bdev_io);
597 }
598 
599 static void
600 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
601 {
602 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
603 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
604 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
605 
606 	rbd_io->submit_td = submit_td;
607 	switch (bdev_io->type) {
608 	case SPDK_BDEV_IO_TYPE_READ:
609 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
610 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
611 		break;
612 
613 	case SPDK_BDEV_IO_TYPE_WRITE:
614 	case SPDK_BDEV_IO_TYPE_UNMAP:
615 	case SPDK_BDEV_IO_TYPE_FLUSH:
616 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
617 		bdev_rbd_start_aio(bdev_io);
618 		break;
619 
620 	case SPDK_BDEV_IO_TYPE_RESET:
621 		spdk_thread_exec_msg(disk->main_td, bdev_rbd_reset, bdev_io);
622 		break;
623 
624 	default:
625 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
626 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
627 		break;
628 	}
629 }
630 
631 static bool
632 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
633 {
634 	switch (io_type) {
635 	case SPDK_BDEV_IO_TYPE_READ:
636 	case SPDK_BDEV_IO_TYPE_WRITE:
637 	case SPDK_BDEV_IO_TYPE_UNMAP:
638 	case SPDK_BDEV_IO_TYPE_FLUSH:
639 	case SPDK_BDEV_IO_TYPE_RESET:
640 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
641 		return true;
642 
643 	default:
644 		return false;
645 	}
646 }
647 
648 static int
649 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
650 {
651 	struct bdev_rbd_io_channel *ch = ctx_buf;
652 	struct bdev_rbd *disk = io_device;
653 
654 	ch->disk = disk;
655 	ch->group_ch = spdk_get_io_channel(&rbd_if);
656 	assert(ch->group_ch != NULL);
657 
658 	return 0;
659 }
660 
661 static void
662 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
663 {
664 	struct bdev_rbd_io_channel *ch = ctx_buf;
665 
666 	spdk_put_io_channel(ch->group_ch);
667 }
668 
669 static struct spdk_io_channel *
670 bdev_rbd_get_io_channel(void *ctx)
671 {
672 	struct bdev_rbd *rbd_bdev = ctx;
673 
674 	return spdk_get_io_channel(rbd_bdev);
675 }
676 
677 static void
678 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
679 {
680 	struct bdev_rbd_cluster *entry;
681 
682 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
683 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
684 		if (strcmp(cluster_name, entry->name)) {
685 			continue;
686 		}
687 		if (entry->user_id) {
688 			spdk_json_write_named_string(w, "user_id", entry->user_id);
689 		}
690 
691 		if (entry->config_param) {
692 			char **config_entry = entry->config_param;
693 
694 			spdk_json_write_named_object_begin(w, "config_param");
695 			while (*config_entry) {
696 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
697 				config_entry += 2;
698 			}
699 			spdk_json_write_object_end(w);
700 		}
701 		if (entry->config_file) {
702 			spdk_json_write_named_string(w, "config_file", entry->config_file);
703 		}
704 		if (entry->key_file) {
705 			spdk_json_write_named_string(w, "key_file", entry->key_file);
706 		}
707 
708 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
709 		return;
710 	}
711 
712 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
713 }
714 
715 static int
716 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
717 {
718 	struct bdev_rbd *rbd_bdev = ctx;
719 
720 	spdk_json_write_named_object_begin(w, "rbd");
721 
722 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
723 
724 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
725 
726 	if (rbd_bdev->cluster_name) {
727 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
728 		goto end;
729 	}
730 
731 	if (rbd_bdev->user_id) {
732 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
733 	}
734 
735 	if (rbd_bdev->config) {
736 		char **entry = rbd_bdev->config;
737 
738 		spdk_json_write_named_object_begin(w, "config");
739 		while (*entry) {
740 			spdk_json_write_named_string(w, entry[0], entry[1]);
741 			entry += 2;
742 		}
743 		spdk_json_write_object_end(w);
744 	}
745 
746 end:
747 	spdk_json_write_object_end(w);
748 
749 	return 0;
750 }
751 
752 static void
753 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
754 {
755 	struct bdev_rbd *rbd = bdev->ctxt;
756 	char uuid_str[SPDK_UUID_STRING_LEN];
757 
758 	spdk_json_write_object_begin(w);
759 
760 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
761 
762 	spdk_json_write_named_object_begin(w, "params");
763 	spdk_json_write_named_string(w, "name", bdev->name);
764 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
765 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
766 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
767 	if (rbd->user_id) {
768 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
769 	}
770 
771 	if (rbd->config) {
772 		char **entry = rbd->config;
773 
774 		spdk_json_write_named_object_begin(w, "config");
775 		while (*entry) {
776 			spdk_json_write_named_string(w, entry[0], entry[1]);
777 			entry += 2;
778 		}
779 		spdk_json_write_object_end(w);
780 	}
781 
782 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
783 	spdk_json_write_named_string(w, "uuid", uuid_str);
784 
785 	spdk_json_write_object_end(w);
786 
787 	spdk_json_write_object_end(w);
788 }
789 
790 static void
791 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
792 {
793 	assert(entry != NULL);
794 
795 	spdk_json_write_object_begin(w);
796 	spdk_json_write_named_string(w, "cluster_name", entry->name);
797 
798 	if (entry->user_id) {
799 		spdk_json_write_named_string(w, "user_id", entry->user_id);
800 	}
801 
802 	if (entry->config_param) {
803 		char **config_entry = entry->config_param;
804 
805 		spdk_json_write_named_object_begin(w, "config_param");
806 		while (*config_entry) {
807 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
808 			config_entry += 2;
809 		}
810 		spdk_json_write_object_end(w);
811 	}
812 	if (entry->config_file) {
813 		spdk_json_write_named_string(w, "config_file", entry->config_file);
814 	}
815 	if (entry->key_file) {
816 		spdk_json_write_named_string(w, "key_file", entry->key_file);
817 	}
818 
819 	spdk_json_write_object_end(w);
820 }
821 
822 int
823 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
824 {
825 	struct bdev_rbd_cluster *entry;
826 	struct spdk_json_write_ctx *w;
827 
828 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
829 
830 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
831 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
832 		return -ENOENT;
833 	}
834 
835 	/* If cluster name is provided */
836 	if (name) {
837 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
838 			if (strcmp(name, entry->name) == 0) {
839 				w = spdk_jsonrpc_begin_result(request);
840 				dump_single_cluster_entry(entry, w);
841 				spdk_jsonrpc_end_result(request, w);
842 
843 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
844 				return 0;
845 			}
846 		}
847 
848 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
849 		return -ENOENT;
850 	}
851 
852 	w = spdk_jsonrpc_begin_result(request);
853 	spdk_json_write_array_begin(w);
854 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
855 		dump_single_cluster_entry(entry, w);
856 	}
857 	spdk_json_write_array_end(w);
858 	spdk_jsonrpc_end_result(request, w);
859 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
860 
861 	return 0;
862 }
863 
864 static const struct spdk_bdev_fn_table rbd_fn_table = {
865 	.destruct		= bdev_rbd_destruct,
866 	.submit_request		= bdev_rbd_submit_request,
867 	.io_type_supported	= bdev_rbd_io_type_supported,
868 	.get_io_channel		= bdev_rbd_get_io_channel,
869 	.dump_info_json		= bdev_rbd_dump_info_json,
870 	.write_config_json	= bdev_rbd_write_config_json,
871 };
872 
873 static int
874 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
875 		     const char *config_file, const char *key_file)
876 {
877 	struct bdev_rbd_cluster *entry;
878 	int rc;
879 
880 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
881 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
882 		if (strcmp(name, entry->name) == 0) {
883 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
884 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
885 			return -1;
886 		}
887 	}
888 
889 	entry = calloc(1, sizeof(*entry));
890 	if (!entry) {
891 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
892 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
893 		return -1;
894 	}
895 
896 	entry->name = strdup(name);
897 	if (entry->name == NULL) {
898 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
899 		goto err_handle;
900 	}
901 
902 	if (user_id) {
903 		entry->user_id = strdup(user_id);
904 		if (entry->user_id == NULL) {
905 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
906 			goto err_handle;
907 		}
908 	}
909 
910 	/* Support specify config_param or config_file separately, or both of them. */
911 	if (config_param) {
912 		entry->config_param = bdev_rbd_dup_config(config_param);
913 		if (entry->config_param == NULL) {
914 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
915 			goto err_handle;
916 		}
917 	}
918 
919 	if (config_file) {
920 		entry->config_file = strdup(config_file);
921 		if (entry->config_file == NULL) {
922 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
923 			goto err_handle;
924 		}
925 	}
926 
927 	if (key_file) {
928 		entry->key_file = strdup(key_file);
929 		if (entry->key_file == NULL) {
930 			SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
931 			goto err_handle;
932 		}
933 	}
934 
935 	rc = rados_create(&entry->cluster, user_id);
936 	if (rc < 0) {
937 		SPDK_ERRLOG("Failed to create rados_t struct\n");
938 		goto err_handle;
939 	}
940 
941 	/* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
942 	rc = rados_conf_read_file(entry->cluster, entry->config_file);
943 	if (entry->config_file && rc < 0) {
944 		SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
945 		rados_shutdown(entry->cluster);
946 		goto err_handle;
947 	}
948 
949 	if (config_param) {
950 		const char *const *config_entry = config_param;
951 		while (*config_entry) {
952 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
953 			if (rc < 0) {
954 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
955 				rados_shutdown(entry->cluster);
956 				goto err_handle;
957 			}
958 			config_entry += 2;
959 		}
960 	}
961 
962 	if (key_file) {
963 		rc = rados_conf_set(entry->cluster, "keyring", key_file);
964 		if (rc < 0) {
965 			SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
966 			rados_shutdown(entry->cluster);
967 			goto err_handle;
968 		}
969 	}
970 
971 	rc = rados_connect(entry->cluster);
972 	if (rc < 0) {
973 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
974 		rados_shutdown(entry->cluster);
975 		goto err_handle;
976 	}
977 
978 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
979 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
980 
981 	return 0;
982 
983 err_handle:
984 	bdev_rbd_cluster_free(entry);
985 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
986 	return -1;
987 }
988 
989 int
990 bdev_rbd_unregister_cluster(const char *name)
991 {
992 	struct bdev_rbd_cluster *entry;
993 	int rc = 0;
994 
995 	if (name == NULL) {
996 		return -1;
997 	}
998 
999 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1000 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1001 		if (strcmp(name, entry->name) == 0) {
1002 			if (entry->ref == 0) {
1003 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1004 				rados_shutdown(entry->cluster);
1005 				bdev_rbd_cluster_free(entry);
1006 			} else {
1007 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1008 					    entry->name);
1009 				rc = -1;
1010 			}
1011 
1012 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1013 			return rc;
1014 		}
1015 	}
1016 
1017 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1018 
1019 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1020 
1021 	return -1;
1022 }
1023 
1024 static void *
1025 _bdev_rbd_register_cluster(void *arg)
1026 {
1027 	struct cluster_register_info *info = arg;
1028 	void *ret = arg;
1029 	int rc;
1030 
1031 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1032 				  (const char *const *)info->config_param, (const char *)info->config_file,
1033 				  (const char *)info->key_file);
1034 	if (rc) {
1035 		ret = NULL;
1036 	}
1037 
1038 	return ret;
1039 }
1040 
1041 int
1042 bdev_rbd_register_cluster(struct cluster_register_info *info)
1043 {
1044 	assert(info != NULL);
1045 
1046 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1047 	 * resource contention */
1048 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1049 		return -1;
1050 	}
1051 
1052 	return 0;
1053 }
1054 
1055 int
1056 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1057 		const char *pool_name,
1058 		const char *const *config,
1059 		const char *rbd_name,
1060 		uint32_t block_size,
1061 		const char *cluster_name,
1062 		const struct spdk_uuid *uuid)
1063 {
1064 	struct bdev_rbd *rbd;
1065 	int ret;
1066 
1067 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1068 		return -EINVAL;
1069 	}
1070 
1071 	rbd = calloc(1, sizeof(struct bdev_rbd));
1072 	if (rbd == NULL) {
1073 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1074 		return -ENOMEM;
1075 	}
1076 
1077 	rbd->rbd_name = strdup(rbd_name);
1078 	if (!rbd->rbd_name) {
1079 		bdev_rbd_free(rbd);
1080 		return -ENOMEM;
1081 	}
1082 
1083 	if (user_id) {
1084 		rbd->user_id = strdup(user_id);
1085 		if (!rbd->user_id) {
1086 			bdev_rbd_free(rbd);
1087 			return -ENOMEM;
1088 		}
1089 	}
1090 
1091 	if (cluster_name) {
1092 		rbd->cluster_name = strdup(cluster_name);
1093 		if (!rbd->cluster_name) {
1094 			bdev_rbd_free(rbd);
1095 			return -ENOMEM;
1096 		}
1097 	}
1098 	rbd->pool_name = strdup(pool_name);
1099 	if (!rbd->pool_name) {
1100 		bdev_rbd_free(rbd);
1101 		return -ENOMEM;
1102 	}
1103 
1104 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1105 		bdev_rbd_free(rbd);
1106 		return -ENOMEM;
1107 	}
1108 
1109 	ret = bdev_rbd_init(rbd);
1110 	if (ret < 0) {
1111 		bdev_rbd_free(rbd);
1112 		SPDK_ERRLOG("Failed to init rbd device\n");
1113 		return ret;
1114 	}
1115 
1116 	if (uuid) {
1117 		rbd->disk.uuid = *uuid;
1118 	}
1119 
1120 	if (name) {
1121 		rbd->disk.name = strdup(name);
1122 	} else {
1123 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1124 	}
1125 	if (!rbd->disk.name) {
1126 		bdev_rbd_free(rbd);
1127 		return -ENOMEM;
1128 	}
1129 	rbd->disk.product_name = "Ceph Rbd Disk";
1130 	bdev_rbd_count++;
1131 
1132 	rbd->disk.write_cache = 0;
1133 	rbd->disk.blocklen = block_size;
1134 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1135 	rbd->disk.ctxt = rbd;
1136 	rbd->disk.fn_table = &rbd_fn_table;
1137 	rbd->disk.module = &rbd_if;
1138 
1139 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1140 
1141 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1142 				bdev_rbd_destroy_cb,
1143 				sizeof(struct bdev_rbd_io_channel),
1144 				rbd_name);
1145 	ret = spdk_bdev_register(&rbd->disk);
1146 	if (ret) {
1147 		spdk_io_device_unregister(rbd, NULL);
1148 		bdev_rbd_free(rbd);
1149 		return ret;
1150 	}
1151 
1152 	*bdev = &(rbd->disk);
1153 
1154 	return ret;
1155 }
1156 
1157 void
1158 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1159 {
1160 	int rc;
1161 
1162 	rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1163 	if (rc != 0) {
1164 		cb_fn(cb_arg, rc);
1165 	}
1166 }
1167 
1168 static void
1169 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1170 {
1171 }
1172 
1173 int
1174 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1175 {
1176 	struct spdk_bdev_desc *desc;
1177 	struct spdk_bdev *bdev;
1178 	struct spdk_io_channel *ch;
1179 	struct bdev_rbd_io_channel *rbd_io_ch;
1180 	int rc = 0;
1181 	uint64_t new_size_in_byte;
1182 	uint64_t current_size_in_mb;
1183 
1184 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1185 	if (rc != 0) {
1186 		return rc;
1187 	}
1188 
1189 	bdev = spdk_bdev_desc_get_bdev(desc);
1190 
1191 	if (bdev->module != &rbd_if) {
1192 		rc = -EINVAL;
1193 		goto exit;
1194 	}
1195 
1196 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1197 	if (current_size_in_mb > new_size_in_mb) {
1198 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1199 		rc = -EINVAL;
1200 		goto exit;
1201 	}
1202 
1203 	ch = bdev_rbd_get_io_channel(bdev);
1204 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1205 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1206 
1207 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1208 	spdk_put_io_channel(ch);
1209 	if (rc != 0) {
1210 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1211 		goto exit;
1212 	}
1213 
1214 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1215 	if (rc != 0) {
1216 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1217 	}
1218 
1219 exit:
1220 	spdk_bdev_close(desc);
1221 	return rc;
1222 }
1223 
1224 static int
1225 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1226 {
1227 	return 0;
1228 }
1229 
1230 static void
1231 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1232 {
1233 }
1234 
1235 static int
1236 bdev_rbd_library_init(void)
1237 {
1238 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1239 				0, "bdev_rbd_poll_groups");
1240 	return 0;
1241 }
1242 
1243 static void
1244 bdev_rbd_library_fini(void)
1245 {
1246 	spdk_io_device_unregister(&rbd_if, NULL);
1247 }
1248 
1249 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1250