xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision e9030f36bc2672216cc333e940696627451aba19)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 
41 #include "spdk/env.h"
42 #include "spdk/bdev.h"
43 #include "spdk/thread.h"
44 #include "spdk/json.h"
45 #include "spdk/string.h"
46 #include "spdk/util.h"
47 #include "spdk/likely.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk/log.h"
51 
52 static int bdev_rbd_count = 0;
53 
54 struct bdev_rbd {
55 	struct spdk_bdev disk;
56 	char *rbd_name;
57 	char *user_id;
58 	char *pool_name;
59 	char **config;
60 
61 	rados_t cluster;
62 	rados_t *cluster_p;
63 	char *cluster_name;
64 
65 	rados_ioctx_t io_ctx;
66 	rbd_image_t image;
67 
68 	rbd_image_info_t info;
69 	pthread_mutex_t mutex;
70 	struct spdk_thread *main_td;
71 	struct spdk_thread *destruct_td;
72 	uint32_t ch_count;
73 	struct spdk_io_channel *group_ch;
74 
75 	TAILQ_ENTRY(bdev_rbd) tailq;
76 	struct spdk_poller *reset_timer;
77 	struct spdk_bdev_io *reset_bdev_io;
78 };
79 
80 struct bdev_rbd_io_channel {
81 	struct bdev_rbd *disk;
82 };
83 
84 struct bdev_rbd_io {
85 	struct			spdk_thread *submit_td;
86 	enum			spdk_bdev_io_status status;
87 	rbd_completion_t	comp;
88 	size_t			total_len;
89 };
90 
91 struct bdev_rbd_cluster {
92 	char *name;
93 	char *user_id;
94 	char **config_param;
95 	char *config_file;
96 	rados_t cluster;
97 	uint32_t ref;
98 	STAILQ_ENTRY(bdev_rbd_cluster) link;
99 };
100 
101 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
102 			g_map_bdev_rbd_cluster);
103 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
104 
105 static void
106 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
107 {
108 	assert(entry != NULL);
109 
110 	bdev_rbd_free_config(entry->config_param);
111 	free(entry->config_file);
112 	free(entry->user_id);
113 	free(entry->name);
114 	free(entry);
115 }
116 
117 static void
118 bdev_rbd_put_cluster(rados_t **cluster)
119 {
120 	struct bdev_rbd_cluster *entry;
121 
122 	assert(cluster != NULL);
123 
124 	/* No need go through the map if *cluster equals to NULL */
125 	if (*cluster == NULL) {
126 		return;
127 	}
128 
129 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
130 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
131 		if (*cluster != &entry->cluster) {
132 			continue;
133 		}
134 
135 		assert(entry->ref > 0);
136 		entry->ref--;
137 		*cluster = NULL;
138 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
139 		return;
140 	}
141 
142 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
143 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
144 }
145 
146 static void
147 bdev_rbd_free(struct bdev_rbd *rbd)
148 {
149 	if (!rbd) {
150 		return;
151 	}
152 
153 	free(rbd->disk.name);
154 	free(rbd->rbd_name);
155 	free(rbd->user_id);
156 	free(rbd->pool_name);
157 	bdev_rbd_free_config(rbd->config);
158 
159 	if (rbd->io_ctx) {
160 		rados_ioctx_destroy(rbd->io_ctx);
161 	}
162 
163 	if (rbd->cluster_name) {
164 		bdev_rbd_put_cluster(&rbd->cluster_p);
165 		free(rbd->cluster_name);
166 	} else if (rbd->cluster) {
167 		rados_shutdown(rbd->cluster);
168 	}
169 
170 	pthread_mutex_destroy(&rbd->mutex);
171 	free(rbd);
172 }
173 
174 void
175 bdev_rbd_free_config(char **config)
176 {
177 	char **entry;
178 
179 	if (config) {
180 		for (entry = config; *entry; entry++) {
181 			free(*entry);
182 		}
183 		free(config);
184 	}
185 }
186 
187 char **
188 bdev_rbd_dup_config(const char *const *config)
189 {
190 	size_t count;
191 	char **copy;
192 
193 	if (!config) {
194 		return NULL;
195 	}
196 	for (count = 0; config[count]; count++) {}
197 	copy = calloc(count + 1, sizeof(*copy));
198 	if (!copy) {
199 		return NULL;
200 	}
201 	for (count = 0; config[count]; count++) {
202 		if (!(copy[count] = strdup(config[count]))) {
203 			bdev_rbd_free_config(copy);
204 			return NULL;
205 		}
206 	}
207 	return copy;
208 }
209 
210 static int
211 bdev_rados_cluster_init(const char *user_id, const char *const *config,
212 			rados_t *cluster)
213 {
214 	int ret;
215 
216 	ret = rados_create(cluster, user_id);
217 	if (ret < 0) {
218 		SPDK_ERRLOG("Failed to create rados_t struct\n");
219 		return -1;
220 	}
221 
222 	if (config) {
223 		const char *const *entry = config;
224 		while (*entry) {
225 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
226 			if (ret < 0) {
227 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
228 				rados_shutdown(*cluster);
229 				return -1;
230 			}
231 			entry += 2;
232 		}
233 	} else {
234 		ret = rados_conf_read_file(*cluster, NULL);
235 		if (ret < 0) {
236 			SPDK_ERRLOG("Failed to read conf file\n");
237 			rados_shutdown(*cluster);
238 			return -1;
239 		}
240 	}
241 
242 	ret = rados_connect(*cluster);
243 	if (ret < 0) {
244 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
245 		rados_shutdown(*cluster);
246 		return -1;
247 	}
248 
249 	return 0;
250 }
251 
252 static int
253 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
254 {
255 	struct bdev_rbd_cluster *entry;
256 
257 	if (cluster == NULL) {
258 		SPDK_ERRLOG("cluster should not be NULL\n");
259 		return -1;
260 	}
261 
262 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
263 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
264 		if (strcmp(cluster_name, entry->name) == 0) {
265 			entry->ref++;
266 			*cluster = &entry->cluster;
267 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
268 			return 0;
269 		}
270 	}
271 
272 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
273 	return -1;
274 }
275 
276 static int
277 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
278 {
279 	int ret;
280 
281 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
282 	if (ret < 0) {
283 		SPDK_ERRLOG("Failed to create rados_t struct\n");
284 		return -1;
285 	}
286 
287 	return ret;
288 }
289 
290 static void *
291 bdev_rbd_cluster_handle(void *arg)
292 {
293 	void *ret = arg;
294 	struct bdev_rbd *rbd = arg;
295 	int rc;
296 
297 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
298 				     &rbd->cluster);
299 	if (rc < 0) {
300 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
301 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
302 		ret = NULL;
303 	}
304 
305 	return ret;
306 }
307 
308 static void *
309 bdev_rbd_init_context(void *arg)
310 {
311 	struct bdev_rbd *rbd = arg;
312 	int rc;
313 
314 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
315 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
316 		return NULL;
317 	}
318 
319 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
320 	if (rc < 0) {
321 		SPDK_ERRLOG("Failed to open specified rbd device\n");
322 		return NULL;
323 	}
324 
325 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
326 	rbd_close(rbd->image);
327 	if (rc < 0) {
328 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
329 		return NULL;
330 	}
331 
332 	return arg;
333 }
334 
335 static int
336 bdev_rbd_init(struct bdev_rbd *rbd)
337 {
338 	int ret = 0;
339 
340 	if (!rbd->cluster_name) {
341 		rbd->cluster_p = &rbd->cluster;
342 		/* Cluster should be created in non-SPDK thread to avoid conflict between
343 		 * Rados and SPDK thread */
344 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
345 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
346 			return -1;
347 		}
348 	} else {
349 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
350 		if (ret < 0) {
351 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
352 				    rbd, rbd->cluster_name);
353 			return -1;
354 		}
355 	}
356 
357 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
358 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
359 	}
360 
361 	return ret;
362 }
363 
364 static void
365 bdev_rbd_exit(rbd_image_t image)
366 {
367 	rbd_flush(image);
368 	rbd_close(image);
369 }
370 
371 static void
372 _bdev_rbd_io_complete(void *_rbd_io)
373 {
374 	struct bdev_rbd_io *rbd_io = _rbd_io;
375 
376 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
377 }
378 
379 static void
380 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
381 {
382 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
383 	struct spdk_thread *current_thread = spdk_get_thread();
384 
385 	rbd_io->status = status;
386 	assert(rbd_io->submit_td != NULL);
387 	if (rbd_io->submit_td != current_thread) {
388 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
389 	} else {
390 		_bdev_rbd_io_complete(rbd_io);
391 	}
392 }
393 
394 static void
395 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
396 {
397 	int io_status;
398 	struct spdk_bdev_io *bdev_io;
399 	struct bdev_rbd_io *rbd_io;
400 	enum spdk_bdev_io_status bio_status;
401 
402 	bdev_io = rbd_aio_get_arg(cb);
403 	rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
404 	io_status = rbd_aio_get_return_value(cb);
405 	bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
406 
407 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
408 		if ((int)rbd_io->total_len != io_status) {
409 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
410 		}
411 	} else {
412 		/* For others, 0 means success */
413 		if (io_status != 0) {
414 			bio_status = SPDK_BDEV_IO_STATUS_FAILED;
415 		}
416 	}
417 
418 	rbd_aio_release(cb);
419 
420 	bdev_rbd_io_complete(bdev_io, bio_status);
421 }
422 
423 static void
424 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
425 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
426 {
427 	int ret;
428 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
429 	rbd_image_t image = disk->image;
430 
431 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
432 					&rbd_io->comp);
433 	if (ret < 0) {
434 		goto err;
435 	}
436 
437 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
438 		rbd_io->total_len = len;
439 		if (spdk_likely(iovcnt == 1)) {
440 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
441 		} else {
442 			ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
443 		}
444 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
445 		if (spdk_likely(iovcnt == 1)) {
446 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp);
447 		} else {
448 			ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
449 		}
450 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
451 		ret = rbd_aio_flush(image, rbd_io->comp);
452 	}
453 
454 	if (ret < 0) {
455 		rbd_aio_release(rbd_io->comp);
456 		goto err;
457 	}
458 
459 	return;
460 
461 err:
462 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
463 }
464 
465 static int bdev_rbd_library_init(void);
466 static void bdev_rbd_library_fini(void);
467 
468 static int
469 bdev_rbd_get_ctx_size(void)
470 {
471 	return sizeof(struct bdev_rbd_io);
472 }
473 
474 static struct spdk_bdev_module rbd_if = {
475 	.name = "rbd",
476 	.module_init = bdev_rbd_library_init,
477 	.module_fini = bdev_rbd_library_fini,
478 	.get_ctx_size = bdev_rbd_get_ctx_size,
479 
480 };
481 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
482 
483 static int
484 bdev_rbd_reset_timer(void *arg)
485 {
486 	struct bdev_rbd *disk = arg;
487 
488 	/*
489 	 * TODO: This should check if any I/O is still in flight before completing the reset.
490 	 * For now, just complete after the timer expires.
491 	 */
492 	bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
493 	spdk_poller_unregister(&disk->reset_timer);
494 	disk->reset_bdev_io = NULL;
495 
496 	return SPDK_POLLER_BUSY;
497 }
498 
499 static void
500 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
501 {
502 	/*
503 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
504 	 * timer to wait for in-flight I/O to complete.
505 	 */
506 	assert(disk->reset_bdev_io == NULL);
507 	disk->reset_bdev_io = bdev_io;
508 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
509 }
510 
511 static void
512 _bdev_rbd_destruct_done(void *io_device)
513 {
514 	struct bdev_rbd *rbd = io_device;
515 
516 	assert(rbd != NULL);
517 	assert(rbd->ch_count == 0);
518 
519 	spdk_bdev_destruct_done(&rbd->disk, 0);
520 	bdev_rbd_free(rbd);
521 }
522 
523 static void
524 bdev_rbd_free_cb(void *io_device)
525 {
526 	struct bdev_rbd *rbd = io_device;
527 
528 	/* The io device has been unregistered.  Send a message back to the
529 	 * original thread that started the destruct operation, so that the
530 	 * bdev unregister callback is invoked on the same thread that started
531 	 * this whole process.
532 	 */
533 	spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
534 }
535 
536 static void
537 _bdev_rbd_destruct(void *ctx)
538 {
539 	struct bdev_rbd *rbd = ctx;
540 
541 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
542 }
543 
544 static int
545 bdev_rbd_destruct(void *ctx)
546 {
547 	struct bdev_rbd *rbd = ctx;
548 	struct spdk_thread *td;
549 
550 	if (rbd->main_td == NULL) {
551 		td = spdk_get_thread();
552 	} else {
553 		td = rbd->main_td;
554 	}
555 
556 	/* Start the destruct operation on the rbd bdev's
557 	 * main thread.  This guarantees it will only start
558 	 * executing after any messages related to channel
559 	 * deletions have finished completing.  *Always*
560 	 * send a message, even if this function gets called
561 	 * from the main thread, in case there are pending
562 	 * channel delete messages in flight to this thread.
563 	 */
564 	assert(rbd->destruct_td == NULL);
565 	rbd->destruct_td = td;
566 	spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd);
567 
568 	/* Return 1 to indicate the destruct path is asynchronous. */
569 	return 1;
570 }
571 
572 static void
573 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
574 		    bool success)
575 {
576 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
577 
578 	if (!success) {
579 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
580 		return;
581 	}
582 
583 	bdev_rbd_start_aio(disk,
584 			   bdev_io,
585 			   bdev_io->u.bdev.iovs,
586 			   bdev_io->u.bdev.iovcnt,
587 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
588 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
589 }
590 
591 static void
592 _bdev_rbd_submit_request(void *ctx)
593 {
594 	struct spdk_bdev_io *bdev_io = ctx;
595 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
596 
597 	switch (bdev_io->type) {
598 	case SPDK_BDEV_IO_TYPE_READ:
599 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
600 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
601 		break;
602 
603 	case SPDK_BDEV_IO_TYPE_WRITE:
604 	case SPDK_BDEV_IO_TYPE_FLUSH:
605 		bdev_rbd_start_aio(disk,
606 				   bdev_io,
607 				   bdev_io->u.bdev.iovs,
608 				   bdev_io->u.bdev.iovcnt,
609 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
610 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
611 		break;
612 
613 	case SPDK_BDEV_IO_TYPE_RESET:
614 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
615 			       bdev_io);
616 		break;
617 
618 	default:
619 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
620 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
621 		break;
622 	}
623 }
624 
625 static void
626 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
627 {
628 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
629 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
630 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
631 
632 	rbd_io->submit_td = submit_td;
633 	if (disk->main_td != submit_td) {
634 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
635 	} else {
636 		_bdev_rbd_submit_request(bdev_io);
637 	}
638 }
639 
640 static bool
641 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
642 {
643 	switch (io_type) {
644 	case SPDK_BDEV_IO_TYPE_READ:
645 	case SPDK_BDEV_IO_TYPE_WRITE:
646 	case SPDK_BDEV_IO_TYPE_FLUSH:
647 	case SPDK_BDEV_IO_TYPE_RESET:
648 		return true;
649 
650 	default:
651 		return false;
652 	}
653 }
654 
655 static void
656 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
657 {
658 	assert(disk != NULL);
659 	assert(disk->main_td == spdk_get_thread());
660 	assert(disk->ch_count == 0);
661 
662 	spdk_put_io_channel(disk->group_ch);
663 	if (disk->image) {
664 		bdev_rbd_exit(disk->image);
665 	}
666 
667 	disk->main_td = NULL;
668 	disk->group_ch = NULL;
669 }
670 
671 static void *
672 bdev_rbd_handle(void *arg)
673 {
674 	struct bdev_rbd *disk = arg;
675 	void *ret = arg;
676 
677 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
678 		SPDK_ERRLOG("Failed to open specified rbd device\n");
679 		ret = NULL;
680 	}
681 
682 	return ret;
683 }
684 
685 static int
686 _bdev_rbd_create_cb(struct bdev_rbd *disk)
687 {
688 	disk->group_ch = spdk_get_io_channel(&rbd_if);
689 	assert(disk->group_ch != NULL);
690 
691 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
692 		bdev_rbd_free_channel_resources(disk);
693 		return -1;
694 	}
695 
696 	return 0;
697 }
698 
699 static int
700 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
701 {
702 	struct bdev_rbd_io_channel *ch = ctx_buf;
703 	struct bdev_rbd *disk = io_device;
704 	int rc;
705 
706 	ch->disk = disk;
707 	pthread_mutex_lock(&disk->mutex);
708 	if (disk->ch_count == 0) {
709 		assert(disk->main_td == NULL);
710 		rc = _bdev_rbd_create_cb(disk);
711 		if (rc) {
712 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
713 			pthread_mutex_unlock(&disk->mutex);
714 			return rc;
715 		}
716 
717 		disk->main_td = spdk_get_thread();
718 	}
719 
720 	disk->ch_count++;
721 	pthread_mutex_unlock(&disk->mutex);
722 
723 	return 0;
724 }
725 
726 static void
727 _bdev_rbd_destroy_cb(void *ctx)
728 {
729 	struct bdev_rbd *disk = ctx;
730 
731 	pthread_mutex_lock(&disk->mutex);
732 	assert(disk->ch_count > 0);
733 	disk->ch_count--;
734 
735 	if (disk->ch_count > 0) {
736 		/* A new channel was created between when message was sent and this function executed */
737 		pthread_mutex_unlock(&disk->mutex);
738 		return;
739 	}
740 
741 	bdev_rbd_free_channel_resources(disk);
742 	pthread_mutex_unlock(&disk->mutex);
743 }
744 
745 static void
746 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
747 {
748 	struct bdev_rbd *disk = io_device;
749 	struct spdk_thread *thread;
750 
751 	pthread_mutex_lock(&disk->mutex);
752 	assert(disk->ch_count > 0);
753 	disk->ch_count--;
754 	if (disk->ch_count == 0) {
755 		assert(disk->main_td != NULL);
756 		if (disk->main_td != spdk_get_thread()) {
757 			/* The final channel was destroyed on a different thread
758 			 * than where the first channel was created. Pass a message
759 			 * to the main thread to unregister the poller. */
760 			disk->ch_count++;
761 			thread = disk->main_td;
762 			pthread_mutex_unlock(&disk->mutex);
763 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
764 			return;
765 		}
766 
767 		bdev_rbd_free_channel_resources(disk);
768 	}
769 	pthread_mutex_unlock(&disk->mutex);
770 }
771 
772 static struct spdk_io_channel *
773 bdev_rbd_get_io_channel(void *ctx)
774 {
775 	struct bdev_rbd *rbd_bdev = ctx;
776 
777 	return spdk_get_io_channel(rbd_bdev);
778 }
779 
780 static void
781 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
782 {
783 	struct bdev_rbd_cluster *entry;
784 
785 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
786 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
787 		if (strcmp(cluster_name, entry->name)) {
788 			continue;
789 		}
790 		if (entry->user_id) {
791 			spdk_json_write_named_string(w, "user_id", entry->user_id);
792 		}
793 
794 		if (entry->config_param) {
795 			char **config_entry = entry->config_param;
796 
797 			spdk_json_write_named_object_begin(w, "config_param");
798 			while (*config_entry) {
799 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
800 				config_entry += 2;
801 			}
802 			spdk_json_write_object_end(w);
803 		} else if (entry->config_file) {
804 			spdk_json_write_named_string(w, "config_file", entry->config_file);
805 		}
806 
807 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
808 		return;
809 	}
810 
811 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
812 }
813 
814 static int
815 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
816 {
817 	struct bdev_rbd *rbd_bdev = ctx;
818 
819 	spdk_json_write_named_object_begin(w, "rbd");
820 
821 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
822 
823 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
824 
825 	if (rbd_bdev->cluster_name) {
826 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
827 		goto end;
828 	}
829 
830 	if (rbd_bdev->user_id) {
831 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
832 	}
833 
834 	if (rbd_bdev->config) {
835 		char **entry = rbd_bdev->config;
836 
837 		spdk_json_write_named_object_begin(w, "config");
838 		while (*entry) {
839 			spdk_json_write_named_string(w, entry[0], entry[1]);
840 			entry += 2;
841 		}
842 		spdk_json_write_object_end(w);
843 	}
844 
845 end:
846 	spdk_json_write_object_end(w);
847 
848 	return 0;
849 }
850 
851 static void
852 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
853 {
854 	struct bdev_rbd *rbd = bdev->ctxt;
855 
856 	spdk_json_write_object_begin(w);
857 
858 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
859 
860 	spdk_json_write_named_object_begin(w, "params");
861 	spdk_json_write_named_string(w, "name", bdev->name);
862 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
863 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
864 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
865 	if (rbd->user_id) {
866 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
867 	}
868 
869 	if (rbd->config) {
870 		char **entry = rbd->config;
871 
872 		spdk_json_write_named_object_begin(w, "config");
873 		while (*entry) {
874 			spdk_json_write_named_string(w, entry[0], entry[1]);
875 			entry += 2;
876 		}
877 		spdk_json_write_object_end(w);
878 	}
879 
880 	spdk_json_write_object_end(w);
881 
882 	spdk_json_write_object_end(w);
883 }
884 
885 static void
886 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
887 {
888 	assert(entry != NULL);
889 
890 	spdk_json_write_object_begin(w);
891 	spdk_json_write_named_string(w, "cluster_name", entry->name);
892 
893 	if (entry->user_id) {
894 		spdk_json_write_named_string(w, "user_id", entry->user_id);
895 	}
896 
897 	if (entry->config_param) {
898 		char **config_entry = entry->config_param;
899 
900 		spdk_json_write_named_object_begin(w, "config_param");
901 		while (*config_entry) {
902 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
903 			config_entry += 2;
904 		}
905 		spdk_json_write_object_end(w);
906 	} else if (entry->config_file) {
907 		spdk_json_write_named_string(w, "config_file", entry->config_file);
908 	}
909 
910 	spdk_json_write_object_end(w);
911 }
912 
913 int
914 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
915 {
916 	struct bdev_rbd_cluster *entry;
917 	struct spdk_json_write_ctx *w;
918 
919 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
920 
921 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
922 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
923 		return -ENOENT;
924 	}
925 
926 	/* If cluster name is provided */
927 	if (name) {
928 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
929 			if (strcmp(name, entry->name) == 0) {
930 				w = spdk_jsonrpc_begin_result(request);
931 				dump_single_cluster_entry(entry, w);
932 				spdk_jsonrpc_end_result(request, w);
933 
934 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
935 				return 0;
936 			}
937 		}
938 
939 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
940 		return -ENOENT;
941 	}
942 
943 	w = spdk_jsonrpc_begin_result(request);
944 	spdk_json_write_array_begin(w);
945 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
946 		dump_single_cluster_entry(entry, w);
947 	}
948 	spdk_json_write_array_end(w);
949 	spdk_jsonrpc_end_result(request, w);
950 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
951 
952 	return 0;
953 }
954 
955 static const struct spdk_bdev_fn_table rbd_fn_table = {
956 	.destruct		= bdev_rbd_destruct,
957 	.submit_request		= bdev_rbd_submit_request,
958 	.io_type_supported	= bdev_rbd_io_type_supported,
959 	.get_io_channel		= bdev_rbd_get_io_channel,
960 	.dump_info_json		= bdev_rbd_dump_info_json,
961 	.write_config_json	= bdev_rbd_write_config_json,
962 };
963 
964 static int
965 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
966 		     const char *config_file)
967 {
968 	struct bdev_rbd_cluster *entry;
969 	int rc;
970 
971 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
972 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
973 		if (strcmp(name, entry->name) == 0) {
974 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
975 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
976 			return -1;
977 		}
978 	}
979 
980 	entry = calloc(1, sizeof(*entry));
981 	if (!entry) {
982 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
983 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
984 		return -1;
985 	}
986 
987 	entry->name = strdup(name);
988 	if (entry->name == NULL) {
989 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
990 		goto err_handle;
991 	}
992 
993 	if (user_id) {
994 		entry->user_id = strdup(user_id);
995 		if (entry->user_id == NULL) {
996 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
997 			goto err_handle;
998 		}
999 	}
1000 
1001 	/* The first priority is the config_param, then we use the config_file */
1002 	if (config_param) {
1003 		entry->config_param = bdev_rbd_dup_config(config_param);
1004 		if (entry->config_param == NULL) {
1005 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1006 			goto err_handle;
1007 		}
1008 	} else if (config_file) {
1009 		entry->config_file = strdup(config_file);
1010 		if (entry->config_file == NULL) {
1011 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1012 			goto err_handle;
1013 		}
1014 	}
1015 
1016 	rc = rados_create(&entry->cluster, user_id);
1017 	if (rc < 0) {
1018 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1019 		goto err_handle;
1020 	}
1021 
1022 	if (config_param) {
1023 		const char *const *config_entry = config_param;
1024 		while (*config_entry) {
1025 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1026 			if (rc < 0) {
1027 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1028 				rados_shutdown(entry->cluster);
1029 				goto err_handle;
1030 			}
1031 			config_entry += 2;
1032 		}
1033 	} else {
1034 		rc = rados_conf_read_file(entry->cluster, entry->config_file);
1035 		if (rc < 0) {
1036 			SPDK_ERRLOG("Failed to read conf file\n");
1037 			rados_shutdown(entry->cluster);
1038 			goto err_handle;
1039 		}
1040 	}
1041 
1042 	rc = rados_connect(entry->cluster);
1043 	if (rc < 0) {
1044 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1045 		rados_shutdown(entry->cluster);
1046 		goto err_handle;
1047 	}
1048 
1049 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1050 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1051 
1052 	return 0;
1053 
1054 err_handle:
1055 	bdev_rbd_cluster_free(entry);
1056 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1057 	return -1;
1058 }
1059 
1060 int
1061 bdev_rbd_unregister_cluster(const char *name)
1062 {
1063 	struct bdev_rbd_cluster *entry;
1064 	int rc = 0;
1065 
1066 	if (name == NULL) {
1067 		return -1;
1068 	}
1069 
1070 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1071 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1072 		if (strcmp(name, entry->name) == 0) {
1073 			if (entry->ref == 0) {
1074 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1075 				rados_shutdown(entry->cluster);
1076 				bdev_rbd_cluster_free(entry);
1077 			} else {
1078 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1079 					    entry->name);
1080 				rc = -1;
1081 			}
1082 
1083 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1084 			return rc;
1085 		}
1086 	}
1087 
1088 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1089 
1090 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1091 
1092 	return -1;
1093 }
1094 
1095 static void *
1096 _bdev_rbd_register_cluster(void *arg)
1097 {
1098 	struct cluster_register_info *info = arg;
1099 	void *ret = arg;
1100 	int rc;
1101 
1102 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1103 				  (const char *const *)info->config_param, (const char *)info->config_file);
1104 	if (rc) {
1105 		ret = NULL;
1106 	}
1107 
1108 	return ret;
1109 }
1110 
1111 int
1112 bdev_rbd_register_cluster(struct cluster_register_info *info)
1113 {
1114 	assert(info != NULL);
1115 
1116 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1117 	 * resource contention */
1118 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1119 		return -1;
1120 	}
1121 
1122 	return 0;
1123 }
1124 
1125 int
1126 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1127 		const char *pool_name,
1128 		const char *const *config,
1129 		const char *rbd_name,
1130 		uint32_t block_size,
1131 		const char *cluster_name)
1132 {
1133 	struct bdev_rbd *rbd;
1134 	int ret;
1135 
1136 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1137 		return -EINVAL;
1138 	}
1139 
1140 	rbd = calloc(1, sizeof(struct bdev_rbd));
1141 	if (rbd == NULL) {
1142 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1143 		return -ENOMEM;
1144 	}
1145 
1146 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1147 	if (ret) {
1148 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1149 		free(rbd);
1150 		return ret;
1151 	}
1152 
1153 	rbd->rbd_name = strdup(rbd_name);
1154 	if (!rbd->rbd_name) {
1155 		bdev_rbd_free(rbd);
1156 		return -ENOMEM;
1157 	}
1158 
1159 	if (user_id) {
1160 		rbd->user_id = strdup(user_id);
1161 		if (!rbd->user_id) {
1162 			bdev_rbd_free(rbd);
1163 			return -ENOMEM;
1164 		}
1165 	}
1166 
1167 	if (cluster_name) {
1168 		rbd->cluster_name = strdup(cluster_name);
1169 		if (!rbd->cluster_name) {
1170 			bdev_rbd_free(rbd);
1171 			return -ENOMEM;
1172 		}
1173 	}
1174 	rbd->pool_name = strdup(pool_name);
1175 	if (!rbd->pool_name) {
1176 		bdev_rbd_free(rbd);
1177 		return -ENOMEM;
1178 	}
1179 
1180 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1181 		bdev_rbd_free(rbd);
1182 		return -ENOMEM;
1183 	}
1184 
1185 	ret = bdev_rbd_init(rbd);
1186 	if (ret < 0) {
1187 		bdev_rbd_free(rbd);
1188 		SPDK_ERRLOG("Failed to init rbd device\n");
1189 		return ret;
1190 	}
1191 
1192 	if (name) {
1193 		rbd->disk.name = strdup(name);
1194 	} else {
1195 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1196 	}
1197 	if (!rbd->disk.name) {
1198 		bdev_rbd_free(rbd);
1199 		return -ENOMEM;
1200 	}
1201 	rbd->disk.product_name = "Ceph Rbd Disk";
1202 	bdev_rbd_count++;
1203 
1204 	rbd->disk.write_cache = 0;
1205 	rbd->disk.blocklen = block_size;
1206 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1207 	rbd->disk.ctxt = rbd;
1208 	rbd->disk.fn_table = &rbd_fn_table;
1209 	rbd->disk.module = &rbd_if;
1210 
1211 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1212 
1213 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1214 				bdev_rbd_destroy_cb,
1215 				sizeof(struct bdev_rbd_io_channel),
1216 				rbd_name);
1217 	ret = spdk_bdev_register(&rbd->disk);
1218 	if (ret) {
1219 		spdk_io_device_unregister(rbd, NULL);
1220 		bdev_rbd_free(rbd);
1221 		return ret;
1222 	}
1223 
1224 	*bdev = &(rbd->disk);
1225 
1226 	return ret;
1227 }
1228 
1229 void
1230 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1231 {
1232 	if (!bdev || bdev->module != &rbd_if) {
1233 		cb_fn(cb_arg, -ENODEV);
1234 		return;
1235 	}
1236 
1237 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
1238 }
1239 
1240 int
1241 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
1242 {
1243 	struct spdk_io_channel *ch;
1244 	struct bdev_rbd_io_channel *rbd_io_ch;
1245 	int rc;
1246 	uint64_t new_size_in_byte;
1247 	uint64_t current_size_in_mb;
1248 
1249 	if (bdev->module != &rbd_if) {
1250 		return -EINVAL;
1251 	}
1252 
1253 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1254 	if (current_size_in_mb > new_size_in_mb) {
1255 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
1256 		return -EINVAL;
1257 	}
1258 
1259 	ch = bdev_rbd_get_io_channel(bdev);
1260 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1261 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1262 
1263 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1264 	spdk_put_io_channel(ch);
1265 	if (rc != 0) {
1266 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1267 		return rc;
1268 	}
1269 
1270 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1271 	if (rc != 0) {
1272 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1273 		return rc;
1274 	}
1275 
1276 	return rc;
1277 }
1278 
1279 static int
1280 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1281 {
1282 	return 0;
1283 }
1284 
1285 static void
1286 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1287 {
1288 }
1289 
1290 static int
1291 bdev_rbd_library_init(void)
1292 {
1293 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1294 				0, "bdev_rbd_poll_groups");
1295 	return 0;
1296 }
1297 
1298 static void
1299 bdev_rbd_library_fini(void)
1300 {
1301 	spdk_io_device_unregister(&rbd_if, NULL);
1302 }
1303 
1304 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1305