xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 441431d22872ae4e05a1bf8b78e9aeff1eba1eb3)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41 #include <sys/epoll.h>
42 
43 #include "spdk/env.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
49 #include "spdk/likely.h"
50 
51 #include "spdk/bdev_module.h"
52 #include "spdk/log.h"
53 
54 #define SPDK_RBD_QUEUE_DEPTH 128
55 #define MAX_EVENTS_PER_POLL 128
56 
57 static int bdev_rbd_count = 0;
58 
59 struct bdev_rbd {
60 	struct spdk_bdev disk;
61 	char *rbd_name;
62 	char *user_id;
63 	char *pool_name;
64 	char **config;
65 	rados_t cluster;
66 	rados_t *cluster_p;
67 	char *cluster_name;
68 	rbd_image_info_t info;
69 	TAILQ_ENTRY(bdev_rbd) tailq;
70 	struct spdk_poller *reset_timer;
71 	struct spdk_bdev_io *reset_bdev_io;
72 };
73 
74 struct bdev_rbd_group_channel {
75 	struct spdk_poller *poller;
76 	int epoll_fd;
77 };
78 
79 struct bdev_rbd_io_channel {
80 	rados_ioctx_t io_ctx;
81 	int pfd;
82 	rbd_image_t image;
83 	struct bdev_rbd *disk;
84 	struct bdev_rbd_group_channel *group_ch;
85 };
86 
87 struct bdev_rbd_io {
88 	size_t	total_len;
89 };
90 
91 struct bdev_rbd_cluster {
92 	char *name;
93 	char *user_id;
94 	char **config_param;
95 	char *config_file;
96 	rados_t cluster;
97 	uint32_t ref;
98 	STAILQ_ENTRY(bdev_rbd_cluster) link;
99 };
100 
101 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
102 			g_map_bdev_rbd_cluster);
103 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
104 
105 static void
106 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
107 {
108 	assert(entry != NULL);
109 
110 	bdev_rbd_free_config(entry->config_param);
111 	free(entry->config_file);
112 	free(entry->user_id);
113 	free(entry->name);
114 	free(entry);
115 }
116 
117 static void
118 bdev_rbd_put_cluster(rados_t **cluster)
119 {
120 	struct bdev_rbd_cluster *entry;
121 
122 	assert(cluster != NULL);
123 
124 	/* No need go through the map if *cluster equals to NULL */
125 	if (*cluster == NULL) {
126 		return;
127 	}
128 
129 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
130 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
131 		if (*cluster != &entry->cluster) {
132 			continue;
133 		}
134 
135 		assert(entry->ref > 0);
136 		entry->ref--;
137 		*cluster = NULL;
138 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
139 		return;
140 	}
141 
142 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
143 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
144 }
145 
146 static void
147 bdev_rbd_free(struct bdev_rbd *rbd)
148 {
149 	if (!rbd) {
150 		return;
151 	}
152 
153 	free(rbd->disk.name);
154 	free(rbd->rbd_name);
155 	free(rbd->user_id);
156 	free(rbd->pool_name);
157 	bdev_rbd_free_config(rbd->config);
158 
159 	if (rbd->cluster_name) {
160 		bdev_rbd_put_cluster(&rbd->cluster_p);
161 		free(rbd->cluster_name);
162 	} else if (rbd->cluster) {
163 		rados_shutdown(rbd->cluster);
164 	}
165 
166 	free(rbd);
167 }
168 
169 void
170 bdev_rbd_free_config(char **config)
171 {
172 	char **entry;
173 
174 	if (config) {
175 		for (entry = config; *entry; entry++) {
176 			free(*entry);
177 		}
178 		free(config);
179 	}
180 }
181 
182 char **
183 bdev_rbd_dup_config(const char *const *config)
184 {
185 	size_t count;
186 	char **copy;
187 
188 	if (!config) {
189 		return NULL;
190 	}
191 	for (count = 0; config[count]; count++) {}
192 	copy = calloc(count + 1, sizeof(*copy));
193 	if (!copy) {
194 		return NULL;
195 	}
196 	for (count = 0; config[count]; count++) {
197 		if (!(copy[count] = strdup(config[count]))) {
198 			bdev_rbd_free_config(copy);
199 			return NULL;
200 		}
201 	}
202 	return copy;
203 }
204 
205 static int
206 bdev_rados_cluster_init(const char *user_id, const char *const *config,
207 			rados_t *cluster)
208 {
209 	int ret;
210 
211 	ret = rados_create(cluster, user_id);
212 	if (ret < 0) {
213 		SPDK_ERRLOG("Failed to create rados_t struct\n");
214 		return -1;
215 	}
216 
217 	if (config) {
218 		const char *const *entry = config;
219 		while (*entry) {
220 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
221 			if (ret < 0) {
222 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
223 				rados_shutdown(*cluster);
224 				return -1;
225 			}
226 			entry += 2;
227 		}
228 	} else {
229 		ret = rados_conf_read_file(*cluster, NULL);
230 		if (ret < 0) {
231 			SPDK_ERRLOG("Failed to read conf file\n");
232 			rados_shutdown(*cluster);
233 			return -1;
234 		}
235 	}
236 
237 	ret = rados_connect(*cluster);
238 	if (ret < 0) {
239 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
240 		rados_shutdown(*cluster);
241 		return -1;
242 	}
243 
244 	return 0;
245 }
246 
247 static int
248 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
249 {
250 	struct bdev_rbd_cluster *entry;
251 
252 	if (cluster == NULL) {
253 		SPDK_ERRLOG("cluster should not be NULL\n");
254 		return -1;
255 	}
256 
257 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
258 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
259 		if (strcmp(cluster_name, entry->name) == 0) {
260 			entry->ref++;
261 			*cluster = &entry->cluster;
262 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
263 			return 0;
264 		}
265 	}
266 
267 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
268 	return -1;
269 }
270 
271 static int
272 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
273 {
274 	int ret;
275 
276 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
277 	if (ret < 0) {
278 		SPDK_ERRLOG("Failed to create rados_t struct\n");
279 		return -1;
280 	}
281 
282 	return ret;
283 }
284 
285 static void *
286 bdev_rbd_cluster_handle(void *arg)
287 {
288 	void *ret = arg;
289 	struct bdev_rbd *rbd = arg;
290 	int rc;
291 
292 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
293 				     &rbd->cluster);
294 	if (rc < 0) {
295 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
296 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
297 		ret = NULL;
298 	}
299 
300 	return ret;
301 }
302 
303 static int
304 bdev_rbd_init(struct bdev_rbd *rbd)
305 {
306 	int ret = 0;
307 	rados_ioctx_t io_ctx = NULL;
308 	rbd_image_t image = NULL;
309 
310 	if (!rbd->cluster_name) {
311 		rbd->cluster_p = &rbd->cluster;
312 		/* Cluster should be created in non-SPDK thread to avoid conflict between
313 		 * Rados and SPDK thread */
314 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
315 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
316 			return -1;
317 		}
318 	} else {
319 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
320 		if (ret < 0) {
321 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
322 				    rbd, rbd->cluster_name);
323 			return -1;
324 		}
325 	}
326 
327 	ret = rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &io_ctx);
328 	if (ret < 0) {
329 		SPDK_ERRLOG("Failed to create ioctx\n");
330 		return -1;
331 	}
332 
333 	ret = rbd_open(io_ctx, rbd->rbd_name, &image, NULL);
334 	if (ret < 0) {
335 		SPDK_ERRLOG("Failed to open specified rbd device\n");
336 		goto end;
337 	}
338 	ret = rbd_stat(image, &rbd->info, sizeof(rbd->info));
339 	rbd_close(image);
340 	if (ret < 0) {
341 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
342 	}
343 
344 end:
345 	rados_ioctx_destroy(io_ctx);
346 	return ret;
347 }
348 
349 static void
350 bdev_rbd_exit(rbd_image_t image)
351 {
352 	rbd_flush(image);
353 	rbd_close(image);
354 }
355 
356 static void
357 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
358 {
359 	/* Doing nothing here */
360 }
361 
362 static void
363 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
364 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
365 {
366 	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
367 	int ret;
368 	rbd_completion_t comp;
369 	struct bdev_rbd_io *rbd_io;
370 	rbd_image_t image = rbdio_ch->image;
371 
372 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
373 					&comp);
374 	if (ret < 0) {
375 		goto err;
376 	}
377 
378 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
379 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
380 		rbd_io->total_len = len;
381 		if (spdk_likely(iovcnt == 1)) {
382 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
383 		} else {
384 			ret = rbd_aio_readv(image, iov, iovcnt, offset, comp);
385 		}
386 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
387 		if (spdk_likely(iovcnt == 1)) {
388 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
389 		} else {
390 			ret = rbd_aio_writev(image, iov, iovcnt, offset, comp);
391 		}
392 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
393 		ret = rbd_aio_flush(image, comp);
394 	}
395 
396 	if (ret < 0) {
397 		rbd_aio_release(comp);
398 		goto err;
399 	}
400 
401 	return;
402 
403 err:
404 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
405 }
406 
407 static int bdev_rbd_library_init(void);
408 
409 static void bdev_rbd_library_fini(void);
410 
411 static int
412 bdev_rbd_get_ctx_size(void)
413 {
414 	return sizeof(struct bdev_rbd_io);
415 }
416 
417 static struct spdk_bdev_module rbd_if = {
418 	.name = "rbd",
419 	.module_init = bdev_rbd_library_init,
420 	.module_fini = bdev_rbd_library_fini,
421 	.get_ctx_size = bdev_rbd_get_ctx_size,
422 
423 };
424 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
425 
426 static int
427 bdev_rbd_reset_timer(void *arg)
428 {
429 	struct bdev_rbd *disk = arg;
430 
431 	/*
432 	 * TODO: This should check if any I/O is still in flight before completing the reset.
433 	 * For now, just complete after the timer expires.
434 	 */
435 	spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
436 	spdk_poller_unregister(&disk->reset_timer);
437 	disk->reset_bdev_io = NULL;
438 
439 	return SPDK_POLLER_BUSY;
440 }
441 
442 static void
443 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
444 {
445 	/*
446 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
447 	 * timer to wait for in-flight I/O to complete.
448 	 */
449 	assert(disk->reset_bdev_io == NULL);
450 	disk->reset_bdev_io = bdev_io;
451 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
452 }
453 
454 static void
455 bdev_rbd_free_cb(void *io_device)
456 {
457 	struct bdev_rbd *rbd = io_device;
458 
459 	assert(rbd != NULL);
460 
461 	bdev_rbd_free((struct bdev_rbd *)rbd);
462 }
463 
464 static int
465 bdev_rbd_destruct(void *ctx)
466 {
467 	struct bdev_rbd *rbd = ctx;
468 
469 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
470 
471 	return 0;
472 }
473 
474 static void
475 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
476 		    bool success)
477 {
478 	if (!success) {
479 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
480 		return;
481 	}
482 
483 	bdev_rbd_start_aio(ch,
484 			   bdev_io,
485 			   bdev_io->u.bdev.iovs,
486 			   bdev_io->u.bdev.iovcnt,
487 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
488 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
489 }
490 
491 static void
492 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
493 {
494 	switch (bdev_io->type) {
495 	case SPDK_BDEV_IO_TYPE_READ:
496 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
497 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
498 		break;
499 
500 	case SPDK_BDEV_IO_TYPE_WRITE:
501 	case SPDK_BDEV_IO_TYPE_FLUSH:
502 		bdev_rbd_start_aio(ch,
503 				   bdev_io,
504 				   bdev_io->u.bdev.iovs,
505 				   bdev_io->u.bdev.iovcnt,
506 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
507 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
508 		break;
509 
510 	case SPDK_BDEV_IO_TYPE_RESET:
511 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
512 			       bdev_io);
513 		break;
514 
515 	default:
516 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
517 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
518 		break;
519 	}
520 }
521 
522 static bool
523 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
524 {
525 	switch (io_type) {
526 	case SPDK_BDEV_IO_TYPE_READ:
527 	case SPDK_BDEV_IO_TYPE_WRITE:
528 	case SPDK_BDEV_IO_TYPE_FLUSH:
529 	case SPDK_BDEV_IO_TYPE_RESET:
530 		return true;
531 
532 	default:
533 		return false;
534 	}
535 }
536 
537 static void
538 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
539 {
540 	int i, io_status, rc;
541 	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
542 	struct spdk_bdev_io *bdev_io;
543 	struct bdev_rbd_io *rbd_io;
544 	enum spdk_bdev_io_status bio_status;
545 
546 	rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
547 	for (i = 0; i < rc; i++) {
548 		bdev_io = rbd_aio_get_arg(comps[i]);
549 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
550 		io_status = rbd_aio_get_return_value(comps[i]);
551 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
552 
553 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
554 			if ((int)rbd_io->total_len != io_status) {
555 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
556 			}
557 		} else {
558 			/* For others, 0 means success */
559 			if (io_status != 0) {
560 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
561 			}
562 		}
563 
564 		rbd_aio_release(comps[i]);
565 
566 		spdk_bdev_io_complete(bdev_io, bio_status);
567 	}
568 }
569 
570 static void
571 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
572 {
573 	if (!ch) {
574 		return;
575 	}
576 
577 	if (ch->image) {
578 		bdev_rbd_exit(ch->image);
579 	}
580 
581 	if (ch->io_ctx) {
582 		rados_ioctx_destroy(ch->io_ctx);
583 	}
584 
585 	if (ch->pfd >= 0) {
586 		close(ch->pfd);
587 	}
588 
589 	if (ch->group_ch) {
590 		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
591 	}
592 }
593 
594 static void *
595 bdev_rbd_handle(void *arg)
596 {
597 	struct bdev_rbd_io_channel *ch = arg;
598 	void *ret = arg;
599 
600 	assert(ch->disk->cluster_p != NULL);
601 
602 	if (rados_ioctx_create(*(ch->disk->cluster_p), ch->disk->pool_name, &ch->io_ctx) < 0) {
603 		SPDK_ERRLOG("Failed to create ioctx\n");
604 		ret = NULL;
605 		return ret;
606 	}
607 
608 	if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
609 		SPDK_ERRLOG("Failed to open specified rbd device\n");
610 		ret = NULL;
611 	}
612 
613 	return ret;
614 }
615 
616 static int
617 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
618 {
619 	struct bdev_rbd_io_channel *ch = ctx_buf;
620 	int ret;
621 	struct epoll_event event;
622 
623 	ch->disk = io_device;
624 	ch->image = NULL;
625 	ch->io_ctx = NULL;
626 	ch->pfd = -1;
627 
628 	if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
629 		goto err;
630 	}
631 
632 	ch->pfd = eventfd(0, EFD_NONBLOCK);
633 	if (ch->pfd < 0) {
634 		SPDK_ERRLOG("Failed to get eventfd\n");
635 		goto err;
636 	}
637 
638 	ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD);
639 	if (ret < 0) {
640 		SPDK_ERRLOG("Failed to set rbd image notification\n");
641 		goto err;
642 	}
643 
644 	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
645 	assert(ch->group_ch != NULL);
646 	memset(&event, 0, sizeof(event));
647 	event.events = EPOLLIN;
648 	event.data.ptr = ch;
649 
650 	ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event);
651 	if (ret < 0) {
652 		SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch,
653 			    ch->group_ch);
654 		goto err;
655 	}
656 
657 	return 0;
658 
659 err:
660 	bdev_rbd_free_channel(ch);
661 	return -1;
662 }
663 
664 static void
665 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
666 {
667 	struct bdev_rbd_io_channel *io_channel = ctx_buf;
668 	int rc;
669 
670 	rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL,
671 		       io_channel->pfd, NULL);
672 	if (rc < 0) {
673 		SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n",
674 			    io_channel, io_channel->group_ch);
675 	}
676 
677 	bdev_rbd_free_channel(io_channel);
678 }
679 
680 static struct spdk_io_channel *
681 bdev_rbd_get_io_channel(void *ctx)
682 {
683 	struct bdev_rbd *rbd_bdev = ctx;
684 
685 	return spdk_get_io_channel(rbd_bdev);
686 }
687 
688 static void
689 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
690 {
691 	struct bdev_rbd_cluster *entry;
692 
693 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
694 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
695 		if (strcmp(cluster_name, entry->name)) {
696 			continue;
697 		}
698 		if (entry->user_id) {
699 			spdk_json_write_named_string(w, "user_id", entry->user_id);
700 		}
701 
702 		if (entry->config_param) {
703 			char **config_entry = entry->config_param;
704 
705 			spdk_json_write_named_object_begin(w, "config_param");
706 			while (*config_entry) {
707 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
708 				config_entry += 2;
709 			}
710 			spdk_json_write_object_end(w);
711 		} else if (entry->config_file) {
712 			spdk_json_write_named_string(w, "config_file", entry->config_file);
713 		}
714 
715 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
716 		return;
717 	}
718 
719 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
720 }
721 
722 static int
723 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
724 {
725 	struct bdev_rbd *rbd_bdev = ctx;
726 
727 	spdk_json_write_named_object_begin(w, "rbd");
728 
729 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
730 
731 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
732 
733 	if (rbd_bdev->cluster_name) {
734 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
735 		goto end;
736 	}
737 
738 	if (rbd_bdev->user_id) {
739 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
740 	}
741 
742 	if (rbd_bdev->config) {
743 		char **entry = rbd_bdev->config;
744 
745 		spdk_json_write_named_object_begin(w, "config");
746 		while (*entry) {
747 			spdk_json_write_named_string(w, entry[0], entry[1]);
748 			entry += 2;
749 		}
750 		spdk_json_write_object_end(w);
751 	}
752 
753 end:
754 	spdk_json_write_object_end(w);
755 
756 	return 0;
757 }
758 
759 static void
760 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
761 {
762 	struct bdev_rbd *rbd = bdev->ctxt;
763 
764 	spdk_json_write_object_begin(w);
765 
766 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
767 
768 	spdk_json_write_named_object_begin(w, "params");
769 	spdk_json_write_named_string(w, "name", bdev->name);
770 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
771 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
772 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
773 	if (rbd->user_id) {
774 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
775 	}
776 
777 	if (rbd->config) {
778 		char **entry = rbd->config;
779 
780 		spdk_json_write_named_object_begin(w, "config");
781 		while (*entry) {
782 			spdk_json_write_named_string(w, entry[0], entry[1]);
783 			entry += 2;
784 		}
785 		spdk_json_write_object_end(w);
786 	}
787 
788 	spdk_json_write_object_end(w);
789 
790 	spdk_json_write_object_end(w);
791 }
792 
793 static void
794 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
795 {
796 	assert(entry != NULL);
797 
798 	spdk_json_write_object_begin(w);
799 	spdk_json_write_named_string(w, "cluster_name", entry->name);
800 
801 	if (entry->user_id) {
802 		spdk_json_write_named_string(w, "user_id", entry->user_id);
803 	}
804 
805 	if (entry->config_param) {
806 		char **config_entry = entry->config_param;
807 
808 		spdk_json_write_named_object_begin(w, "config_param");
809 		while (*config_entry) {
810 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
811 			config_entry += 2;
812 		}
813 		spdk_json_write_object_end(w);
814 	} else if (entry->config_file) {
815 		spdk_json_write_named_string(w, "config_file", entry->config_file);
816 	}
817 
818 	spdk_json_write_object_end(w);
819 }
820 
821 int
822 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
823 {
824 	struct bdev_rbd_cluster *entry;
825 	struct spdk_json_write_ctx *w;
826 
827 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
828 
829 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
830 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
831 		return -ENOENT;
832 	}
833 
834 	/* If cluster name is provided */
835 	if (name) {
836 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
837 			if (strcmp(name, entry->name) == 0) {
838 				w = spdk_jsonrpc_begin_result(request);
839 				dump_single_cluster_entry(entry, w);
840 				spdk_jsonrpc_end_result(request, w);
841 
842 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
843 				return 0;
844 			}
845 		}
846 
847 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
848 		return -ENOENT;
849 	}
850 
851 	w = spdk_jsonrpc_begin_result(request);
852 	spdk_json_write_array_begin(w);
853 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
854 		dump_single_cluster_entry(entry, w);
855 	}
856 	spdk_json_write_array_end(w);
857 	spdk_jsonrpc_end_result(request, w);
858 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
859 
860 	return 0;
861 }
862 
863 static const struct spdk_bdev_fn_table rbd_fn_table = {
864 	.destruct		= bdev_rbd_destruct,
865 	.submit_request		= bdev_rbd_submit_request,
866 	.io_type_supported	= bdev_rbd_io_type_supported,
867 	.get_io_channel		= bdev_rbd_get_io_channel,
868 	.dump_info_json		= bdev_rbd_dump_info_json,
869 	.write_config_json	= bdev_rbd_write_config_json,
870 };
871 
872 static int
873 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
874 		     const char *config_file)
875 {
876 	struct bdev_rbd_cluster *entry;
877 	int rc;
878 
879 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
880 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
881 		if (strcmp(name, entry->name) == 0) {
882 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
883 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
884 			return -1;
885 		}
886 	}
887 
888 	entry = calloc(1, sizeof(*entry));
889 	if (!entry) {
890 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
891 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
892 		return -1;
893 	}
894 
895 	entry->name = strdup(name);
896 	if (entry->name == NULL) {
897 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
898 		goto err_handle;
899 	}
900 
901 	if (user_id) {
902 		entry->user_id = strdup(user_id);
903 		if (entry->user_id == NULL) {
904 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
905 			goto err_handle;
906 		}
907 	}
908 
909 	/* The first priority is the config_param, then we use the config_file */
910 	if (config_param) {
911 		entry->config_param = bdev_rbd_dup_config(config_param);
912 		if (entry->config_param == NULL) {
913 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
914 			goto err_handle;
915 		}
916 	} else if (config_file) {
917 		entry->config_file = strdup(config_file);
918 		if (entry->config_file == NULL) {
919 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
920 			goto err_handle;
921 		}
922 	}
923 
924 	rc = rados_create(&entry->cluster, user_id);
925 	if (rc < 0) {
926 		SPDK_ERRLOG("Failed to create rados_t struct\n");
927 		goto err_handle;
928 	}
929 
930 	if (config_param) {
931 		const char *const *config_entry = config_param;
932 		while (*config_entry) {
933 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
934 			if (rc < 0) {
935 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
936 				rados_shutdown(entry->cluster);
937 				goto err_handle;
938 			}
939 			config_entry += 2;
940 		}
941 	} else {
942 		rc = rados_conf_read_file(entry->cluster, entry->config_file);
943 		if (rc < 0) {
944 			SPDK_ERRLOG("Failed to read conf file\n");
945 			rados_shutdown(entry->cluster);
946 			goto err_handle;
947 		}
948 	}
949 
950 	rc = rados_connect(entry->cluster);
951 	if (rc < 0) {
952 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
953 		rados_shutdown(entry->cluster);
954 		goto err_handle;
955 	}
956 
957 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
958 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
959 
960 	return 0;
961 
962 err_handle:
963 	bdev_rbd_cluster_free(entry);
964 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
965 	return -1;
966 }
967 
968 int
969 bdev_rbd_unregister_cluster(const char *name)
970 {
971 	struct bdev_rbd_cluster *entry;
972 	int rc = 0;
973 
974 	if (name == NULL) {
975 		return -1;
976 	}
977 
978 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
979 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
980 		if (strcmp(name, entry->name) == 0) {
981 			if (entry->ref == 0) {
982 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
983 				rados_shutdown(entry->cluster);
984 				bdev_rbd_cluster_free(entry);
985 			} else {
986 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
987 					    entry->name);
988 				rc = -1;
989 			}
990 
991 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
992 			return rc;
993 		}
994 	}
995 
996 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
997 
998 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
999 
1000 	return -1;
1001 }
1002 
1003 static void *
1004 _bdev_rbd_register_cluster(void *arg)
1005 {
1006 	struct cluster_register_info *info = arg;
1007 	void *ret = arg;
1008 	int rc;
1009 
1010 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1011 				  (const char *const *)info->config_param, (const char *)info->config_file);
1012 	if (rc) {
1013 		ret = NULL;
1014 	}
1015 
1016 	return ret;
1017 }
1018 
1019 int
1020 bdev_rbd_register_cluster(struct cluster_register_info *info)
1021 {
1022 	assert(info != NULL);
1023 
1024 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1025 	 * resource contention */
1026 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1027 		return -1;
1028 	}
1029 
1030 	return 0;
1031 }
1032 
1033 int
1034 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1035 		const char *pool_name,
1036 		const char *const *config,
1037 		const char *rbd_name,
1038 		uint32_t block_size,
1039 		const char *cluster_name)
1040 {
1041 	struct bdev_rbd *rbd;
1042 	int ret;
1043 
1044 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1045 		return -EINVAL;
1046 	}
1047 
1048 	rbd = calloc(1, sizeof(struct bdev_rbd));
1049 	if (rbd == NULL) {
1050 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1051 		return -ENOMEM;
1052 	}
1053 
1054 	rbd->rbd_name = strdup(rbd_name);
1055 	if (!rbd->rbd_name) {
1056 		bdev_rbd_free(rbd);
1057 		return -ENOMEM;
1058 	}
1059 
1060 	if (user_id) {
1061 		rbd->user_id = strdup(user_id);
1062 		if (!rbd->user_id) {
1063 			bdev_rbd_free(rbd);
1064 			return -ENOMEM;
1065 		}
1066 	}
1067 
1068 	if (cluster_name) {
1069 		rbd->cluster_name = strdup(cluster_name);
1070 		if (!rbd->cluster_name) {
1071 			bdev_rbd_free(rbd);
1072 			return -ENOMEM;
1073 		}
1074 	}
1075 	rbd->pool_name = strdup(pool_name);
1076 	if (!rbd->pool_name) {
1077 		bdev_rbd_free(rbd);
1078 		return -ENOMEM;
1079 	}
1080 
1081 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1082 		bdev_rbd_free(rbd);
1083 		return -ENOMEM;
1084 	}
1085 
1086 	ret = bdev_rbd_init(rbd);
1087 	if (ret < 0) {
1088 		bdev_rbd_free(rbd);
1089 		SPDK_ERRLOG("Failed to init rbd device\n");
1090 		return ret;
1091 	}
1092 
1093 	if (name) {
1094 		rbd->disk.name = strdup(name);
1095 	} else {
1096 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1097 	}
1098 	if (!rbd->disk.name) {
1099 		bdev_rbd_free(rbd);
1100 		return -ENOMEM;
1101 	}
1102 	rbd->disk.product_name = "Ceph Rbd Disk";
1103 	bdev_rbd_count++;
1104 
1105 	rbd->disk.write_cache = 0;
1106 	rbd->disk.blocklen = block_size;
1107 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1108 	rbd->disk.ctxt = rbd;
1109 	rbd->disk.fn_table = &rbd_fn_table;
1110 	rbd->disk.module = &rbd_if;
1111 
1112 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1113 
1114 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1115 				bdev_rbd_destroy_cb,
1116 				sizeof(struct bdev_rbd_io_channel),
1117 				rbd_name);
1118 	ret = spdk_bdev_register(&rbd->disk);
1119 	if (ret) {
1120 		spdk_io_device_unregister(rbd, NULL);
1121 		bdev_rbd_free(rbd);
1122 		return ret;
1123 	}
1124 
1125 	*bdev = &(rbd->disk);
1126 
1127 	return ret;
1128 }
1129 
1130 void
1131 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1132 {
1133 	if (!bdev || bdev->module != &rbd_if) {
1134 		cb_fn(cb_arg, -ENODEV);
1135 		return;
1136 	}
1137 
1138 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
1139 }
1140 
1141 int
1142 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
1143 {
1144 	struct spdk_io_channel *ch;
1145 	struct bdev_rbd_io_channel *rbd_io_ch;
1146 	int rc;
1147 	uint64_t new_size_in_byte;
1148 	uint64_t current_size_in_mb;
1149 
1150 	if (bdev->module != &rbd_if) {
1151 		return -EINVAL;
1152 	}
1153 
1154 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1155 	if (current_size_in_mb > new_size_in_mb) {
1156 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
1157 		return -EINVAL;
1158 	}
1159 
1160 	ch = bdev_rbd_get_io_channel(bdev);
1161 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1162 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1163 
1164 	rc = rbd_resize(rbd_io_ch->image, new_size_in_byte);
1165 	spdk_put_io_channel(ch);
1166 	if (rc != 0) {
1167 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1168 		return rc;
1169 	}
1170 
1171 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1172 	if (rc != 0) {
1173 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1174 		return rc;
1175 	}
1176 
1177 	return rc;
1178 }
1179 
1180 static int
1181 bdev_rbd_group_poll(void *arg)
1182 {
1183 	struct bdev_rbd_group_channel *group_ch = arg;
1184 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1185 	int num_events, i;
1186 
1187 	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
1188 
1189 	if (num_events <= 0) {
1190 		return SPDK_POLLER_IDLE;
1191 	}
1192 
1193 	for (i = 0; i < num_events; i++) {
1194 		bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr);
1195 	}
1196 
1197 	return SPDK_POLLER_BUSY;
1198 }
1199 
1200 static int
1201 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1202 {
1203 	struct bdev_rbd_group_channel *ch = ctx_buf;
1204 
1205 	ch->epoll_fd = epoll_create1(0);
1206 	if (ch->epoll_fd < 0) {
1207 		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
1208 		return -1;
1209 	}
1210 
1211 	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0);
1212 
1213 	return 0;
1214 }
1215 
1216 static void
1217 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1218 {
1219 	struct bdev_rbd_group_channel *ch = ctx_buf;
1220 
1221 	if (ch->epoll_fd >= 0) {
1222 		close(ch->epoll_fd);
1223 	}
1224 
1225 	spdk_poller_unregister(&ch->poller);
1226 }
1227 
1228 static int
1229 bdev_rbd_library_init(void)
1230 {
1231 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1232 				sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups");
1233 
1234 	return 0;
1235 }
1236 
1237 static void
1238 bdev_rbd_library_fini(void)
1239 {
1240 	spdk_io_device_unregister(&rbd_if, NULL);
1241 }
1242 
1243 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1244