xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision 2f5c602574a98ede645991abe279a96e19c50196)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "bdev_rbd.h"
37 
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41 #include <sys/epoll.h>
42 
43 #include "spdk/env.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
49 #include "spdk/likely.h"
50 
51 #include "spdk/bdev_module.h"
52 #include "spdk/log.h"
53 
54 #define SPDK_RBD_QUEUE_DEPTH 128
55 #define MAX_EVENTS_PER_POLL 128
56 
57 static int bdev_rbd_count = 0;
58 
59 struct bdev_rbd {
60 	struct spdk_bdev disk;
61 	char *rbd_name;
62 	char *user_id;
63 	char *pool_name;
64 	char **config;
65 
66 	rados_t cluster;
67 	rados_t *cluster_p;
68 	char *cluster_name;
69 
70 	rados_ioctx_t io_ctx;
71 	rbd_image_t image;
72 	int pfd;
73 
74 	rbd_image_info_t info;
75 	pthread_mutex_t mutex;
76 	struct spdk_thread *main_td;
77 	uint32_t ch_count;
78 	struct bdev_rbd_group_channel *group_ch;
79 
80 	bool deferred_free;
81 	TAILQ_ENTRY(bdev_rbd) tailq;
82 	struct spdk_poller *reset_timer;
83 	struct spdk_bdev_io *reset_bdev_io;
84 };
85 
86 struct bdev_rbd_group_channel {
87 	struct spdk_poller *poller;
88 	int epoll_fd;
89 };
90 
91 struct bdev_rbd_io_channel {
92 	struct bdev_rbd *disk;
93 };
94 
95 struct bdev_rbd_io {
96 	struct	spdk_thread *submit_td;
97 	enum	spdk_bdev_io_status status;
98 	size_t	total_len;
99 };
100 
101 struct bdev_rbd_cluster {
102 	char *name;
103 	char *user_id;
104 	char **config_param;
105 	char *config_file;
106 	rados_t cluster;
107 	uint32_t ref;
108 	STAILQ_ENTRY(bdev_rbd_cluster) link;
109 };
110 
111 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
112 			g_map_bdev_rbd_cluster);
113 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
114 
115 static void
116 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
117 {
118 	assert(entry != NULL);
119 
120 	bdev_rbd_free_config(entry->config_param);
121 	free(entry->config_file);
122 	free(entry->user_id);
123 	free(entry->name);
124 	free(entry);
125 }
126 
127 static void
128 bdev_rbd_put_cluster(rados_t **cluster)
129 {
130 	struct bdev_rbd_cluster *entry;
131 
132 	assert(cluster != NULL);
133 
134 	/* No need go through the map if *cluster equals to NULL */
135 	if (*cluster == NULL) {
136 		return;
137 	}
138 
139 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
140 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
141 		if (*cluster != &entry->cluster) {
142 			continue;
143 		}
144 
145 		assert(entry->ref > 0);
146 		entry->ref--;
147 		*cluster = NULL;
148 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
149 		return;
150 	}
151 
152 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
153 	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
154 }
155 
156 static void
157 bdev_rbd_free(struct bdev_rbd *rbd)
158 {
159 	if (!rbd) {
160 		return;
161 	}
162 
163 	free(rbd->disk.name);
164 	free(rbd->rbd_name);
165 	free(rbd->user_id);
166 	free(rbd->pool_name);
167 	bdev_rbd_free_config(rbd->config);
168 
169 	if (rbd->io_ctx) {
170 		rados_ioctx_destroy(rbd->io_ctx);
171 	}
172 
173 	if (rbd->cluster_name) {
174 		bdev_rbd_put_cluster(&rbd->cluster_p);
175 		free(rbd->cluster_name);
176 	} else if (rbd->cluster) {
177 		rados_shutdown(rbd->cluster);
178 	}
179 
180 	pthread_mutex_destroy(&rbd->mutex);
181 	free(rbd);
182 }
183 
184 void
185 bdev_rbd_free_config(char **config)
186 {
187 	char **entry;
188 
189 	if (config) {
190 		for (entry = config; *entry; entry++) {
191 			free(*entry);
192 		}
193 		free(config);
194 	}
195 }
196 
197 char **
198 bdev_rbd_dup_config(const char *const *config)
199 {
200 	size_t count;
201 	char **copy;
202 
203 	if (!config) {
204 		return NULL;
205 	}
206 	for (count = 0; config[count]; count++) {}
207 	copy = calloc(count + 1, sizeof(*copy));
208 	if (!copy) {
209 		return NULL;
210 	}
211 	for (count = 0; config[count]; count++) {
212 		if (!(copy[count] = strdup(config[count]))) {
213 			bdev_rbd_free_config(copy);
214 			return NULL;
215 		}
216 	}
217 	return copy;
218 }
219 
220 static int
221 bdev_rados_cluster_init(const char *user_id, const char *const *config,
222 			rados_t *cluster)
223 {
224 	int ret;
225 
226 	ret = rados_create(cluster, user_id);
227 	if (ret < 0) {
228 		SPDK_ERRLOG("Failed to create rados_t struct\n");
229 		return -1;
230 	}
231 
232 	if (config) {
233 		const char *const *entry = config;
234 		while (*entry) {
235 			ret = rados_conf_set(*cluster, entry[0], entry[1]);
236 			if (ret < 0) {
237 				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
238 				rados_shutdown(*cluster);
239 				return -1;
240 			}
241 			entry += 2;
242 		}
243 	} else {
244 		ret = rados_conf_read_file(*cluster, NULL);
245 		if (ret < 0) {
246 			SPDK_ERRLOG("Failed to read conf file\n");
247 			rados_shutdown(*cluster);
248 			return -1;
249 		}
250 	}
251 
252 	ret = rados_connect(*cluster);
253 	if (ret < 0) {
254 		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
255 		rados_shutdown(*cluster);
256 		return -1;
257 	}
258 
259 	return 0;
260 }
261 
262 static int
263 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
264 {
265 	struct bdev_rbd_cluster *entry;
266 
267 	if (cluster == NULL) {
268 		SPDK_ERRLOG("cluster should not be NULL\n");
269 		return -1;
270 	}
271 
272 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
273 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
274 		if (strcmp(cluster_name, entry->name) == 0) {
275 			entry->ref++;
276 			*cluster = &entry->cluster;
277 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
278 			return 0;
279 		}
280 	}
281 
282 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
283 	return -1;
284 }
285 
286 static int
287 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
288 {
289 	int ret;
290 
291 	ret = bdev_rbd_get_cluster(cluster_name, cluster);
292 	if (ret < 0) {
293 		SPDK_ERRLOG("Failed to create rados_t struct\n");
294 		return -1;
295 	}
296 
297 	return ret;
298 }
299 
300 static void *
301 bdev_rbd_cluster_handle(void *arg)
302 {
303 	void *ret = arg;
304 	struct bdev_rbd *rbd = arg;
305 	int rc;
306 
307 	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
308 				     &rbd->cluster);
309 	if (rc < 0) {
310 		SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
311 			    rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
312 		ret = NULL;
313 	}
314 
315 	return ret;
316 }
317 
318 static void *
319 bdev_rbd_init_context(void *arg)
320 {
321 	struct bdev_rbd *rbd = arg;
322 	int rc;
323 
324 	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
325 		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
326 		return NULL;
327 	}
328 
329 	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
330 	if (rc < 0) {
331 		SPDK_ERRLOG("Failed to open specified rbd device\n");
332 		return NULL;
333 	}
334 
335 	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
336 	rbd_close(rbd->image);
337 	if (rc < 0) {
338 		SPDK_ERRLOG("Failed to stat specified rbd device\n");
339 		return NULL;
340 	}
341 
342 	return arg;
343 }
344 
345 static int
346 bdev_rbd_init(struct bdev_rbd *rbd)
347 {
348 	int ret = 0;
349 
350 	if (!rbd->cluster_name) {
351 		rbd->cluster_p = &rbd->cluster;
352 		/* Cluster should be created in non-SPDK thread to avoid conflict between
353 		 * Rados and SPDK thread */
354 		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
355 			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
356 			return -1;
357 		}
358 	} else {
359 		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
360 		if (ret < 0) {
361 			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
362 				    rbd, rbd->cluster_name);
363 			return -1;
364 		}
365 	}
366 
367 	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
368 		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
369 	}
370 
371 	return ret;
372 }
373 
374 static void
375 bdev_rbd_exit(rbd_image_t image)
376 {
377 	rbd_flush(image);
378 	rbd_close(image);
379 }
380 
381 static void
382 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
383 {
384 	/* Doing nothing here */
385 }
386 
387 static void
388 _bdev_rbd_io_complete(void *_rbd_io)
389 {
390 	struct bdev_rbd_io *rbd_io = _rbd_io;
391 
392 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
393 }
394 
395 static void
396 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
397 {
398 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
399 
400 	rbd_io->status = status;
401 	if (rbd_io->submit_td != NULL) {
402 		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
403 	} else {
404 		_bdev_rbd_io_complete(rbd_io);
405 	}
406 }
407 
408 static void
409 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
410 		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
411 {
412 	int ret;
413 	rbd_completion_t comp;
414 	struct bdev_rbd_io *rbd_io;
415 	rbd_image_t image = disk->image;
416 
417 	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
418 					&comp);
419 	if (ret < 0) {
420 		goto err;
421 	}
422 
423 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
424 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
425 		rbd_io->total_len = len;
426 		if (spdk_likely(iovcnt == 1)) {
427 			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
428 		} else {
429 			ret = rbd_aio_readv(image, iov, iovcnt, offset, comp);
430 		}
431 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
432 		if (spdk_likely(iovcnt == 1)) {
433 			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
434 		} else {
435 			ret = rbd_aio_writev(image, iov, iovcnt, offset, comp);
436 		}
437 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
438 		ret = rbd_aio_flush(image, comp);
439 	}
440 
441 	if (ret < 0) {
442 		rbd_aio_release(comp);
443 		goto err;
444 	}
445 
446 	return;
447 
448 err:
449 	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
450 }
451 
452 static int bdev_rbd_library_init(void);
453 
454 static void bdev_rbd_library_fini(void);
455 
456 static int
457 bdev_rbd_get_ctx_size(void)
458 {
459 	return sizeof(struct bdev_rbd_io);
460 }
461 
462 static struct spdk_bdev_module rbd_if = {
463 	.name = "rbd",
464 	.module_init = bdev_rbd_library_init,
465 	.module_fini = bdev_rbd_library_fini,
466 	.get_ctx_size = bdev_rbd_get_ctx_size,
467 
468 };
469 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
470 
471 static int
472 bdev_rbd_reset_timer(void *arg)
473 {
474 	struct bdev_rbd *disk = arg;
475 
476 	/*
477 	 * TODO: This should check if any I/O is still in flight before completing the reset.
478 	 * For now, just complete after the timer expires.
479 	 */
480 	bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
481 	spdk_poller_unregister(&disk->reset_timer);
482 	disk->reset_bdev_io = NULL;
483 
484 	return SPDK_POLLER_BUSY;
485 }
486 
487 static void
488 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
489 {
490 	/*
491 	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
492 	 * timer to wait for in-flight I/O to complete.
493 	 */
494 	assert(disk->reset_bdev_io == NULL);
495 	disk->reset_bdev_io = bdev_io;
496 	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
497 }
498 
499 static void
500 bdev_rbd_free_cb(void *io_device)
501 {
502 	struct bdev_rbd *rbd = io_device;
503 
504 	assert(rbd != NULL);
505 
506 	pthread_mutex_lock(&rbd->mutex);
507 
508 	if (rbd->ch_count != 0) {
509 		rbd->deferred_free = true;
510 		pthread_mutex_unlock(&rbd->mutex);
511 	} else {
512 		pthread_mutex_unlock(&rbd->mutex);
513 		bdev_rbd_free((struct bdev_rbd *)rbd);
514 	}
515 }
516 
517 static int
518 bdev_rbd_destruct(void *ctx)
519 {
520 	struct bdev_rbd *rbd = ctx;
521 
522 	spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
523 
524 	return 0;
525 }
526 
527 static void
528 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
529 		    bool success)
530 {
531 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
532 
533 	if (!success) {
534 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
535 		return;
536 	}
537 
538 	bdev_rbd_start_aio(disk,
539 			   bdev_io,
540 			   bdev_io->u.bdev.iovs,
541 			   bdev_io->u.bdev.iovcnt,
542 			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
543 			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
544 }
545 
546 static void
547 _bdev_rbd_submit_request(void *ctx)
548 {
549 	struct spdk_bdev_io *bdev_io = ctx;
550 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
551 
552 	switch (bdev_io->type) {
553 	case SPDK_BDEV_IO_TYPE_READ:
554 		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
555 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
556 		break;
557 
558 	case SPDK_BDEV_IO_TYPE_WRITE:
559 	case SPDK_BDEV_IO_TYPE_FLUSH:
560 		bdev_rbd_start_aio(disk,
561 				   bdev_io,
562 				   bdev_io->u.bdev.iovs,
563 				   bdev_io->u.bdev.iovcnt,
564 				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
565 				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
566 		break;
567 
568 	case SPDK_BDEV_IO_TYPE_RESET:
569 		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
570 			       bdev_io);
571 		break;
572 
573 	default:
574 		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
575 		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
576 		break;
577 	}
578 }
579 
580 static void
581 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
582 {
583 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
584 	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
585 	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
586 
587 	if (disk->main_td != submit_td) {
588 		rbd_io->submit_td = submit_td;
589 		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
590 	} else {
591 		rbd_io->submit_td = NULL;
592 		_bdev_rbd_submit_request(bdev_io);
593 	}
594 }
595 
596 static bool
597 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
598 {
599 	switch (io_type) {
600 	case SPDK_BDEV_IO_TYPE_READ:
601 	case SPDK_BDEV_IO_TYPE_WRITE:
602 	case SPDK_BDEV_IO_TYPE_FLUSH:
603 	case SPDK_BDEV_IO_TYPE_RESET:
604 		return true;
605 
606 	default:
607 		return false;
608 	}
609 }
610 
611 static void
612 bdev_rbd_io_poll(struct bdev_rbd *disk)
613 {
614 	int i, io_status, rc;
615 	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
616 	struct spdk_bdev_io *bdev_io;
617 	struct bdev_rbd_io *rbd_io;
618 	enum spdk_bdev_io_status bio_status;
619 
620 	rc = rbd_poll_io_events(disk->image, comps, SPDK_RBD_QUEUE_DEPTH);
621 	for (i = 0; i < rc; i++) {
622 		bdev_io = rbd_aio_get_arg(comps[i]);
623 		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
624 		io_status = rbd_aio_get_return_value(comps[i]);
625 		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
626 
627 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
628 			if ((int)rbd_io->total_len != io_status) {
629 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
630 			}
631 		} else {
632 			/* For others, 0 means success */
633 			if (io_status != 0) {
634 				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
635 			}
636 		}
637 
638 		rbd_aio_release(comps[i]);
639 
640 		bdev_rbd_io_complete(bdev_io, bio_status);
641 	}
642 }
643 
644 static void
645 bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
646 {
647 	int rc;
648 
649 	assert(disk != NULL);
650 	assert(disk->main_td == spdk_get_thread());
651 	assert(disk->ch_count == 0);
652 	assert(disk->group_ch != NULL);
653 	rc = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_DEL,
654 		       disk->pfd, NULL);
655 	if (rc < 0) {
656 		SPDK_ERRLOG("Failed to remove fd on disk=%p from the polling group=%p\n",
657 			    disk, disk->group_ch);
658 	}
659 	spdk_put_io_channel(spdk_io_channel_from_ctx(disk->group_ch));
660 
661 	if (disk->image) {
662 		bdev_rbd_exit(disk->image);
663 	}
664 
665 	if (disk->pfd >= 0) {
666 		close(disk->pfd);
667 	}
668 
669 	disk->main_td = NULL;
670 	disk->group_ch = NULL;
671 }
672 
673 static void *
674 bdev_rbd_handle(void *arg)
675 {
676 	struct bdev_rbd *disk = arg;
677 	void *ret = arg;
678 
679 	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
680 		SPDK_ERRLOG("Failed to open specified rbd device\n");
681 		ret = NULL;
682 	}
683 
684 	return ret;
685 }
686 
687 static int
688 _bdev_rbd_create_cb(struct bdev_rbd *disk)
689 {
690 	int ret;
691 	struct epoll_event event = {};
692 
693 	disk->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
694 	assert(disk->group_ch != NULL);
695 	event.events = EPOLLIN;
696 	event.data.ptr = disk;
697 
698 	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
699 		goto err;
700 	}
701 
702 	disk->pfd = eventfd(0, EFD_NONBLOCK);
703 	if (disk->pfd < 0) {
704 		SPDK_ERRLOG("Failed to get eventfd\n");
705 		goto err;
706 	}
707 
708 	ret = rbd_set_image_notification(disk->image, disk->pfd, EVENT_TYPE_EVENTFD);
709 	if (ret < 0) {
710 		SPDK_ERRLOG("Failed to set rbd image notification\n");
711 		goto err;
712 	}
713 
714 	ret = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_ADD, disk->pfd, &event);
715 	if (ret < 0) {
716 		SPDK_ERRLOG("Failed to add the fd of disk=%p to the epoll group from group_ch=%p\n", disk,
717 			    disk->group_ch);
718 		goto err;
719 	}
720 
721 	return 0;
722 
723 err:
724 	bdev_rbd_free_channel_resources(disk);
725 	return -1;
726 }
727 
728 static int
729 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
730 {
731 	struct bdev_rbd_io_channel *ch = ctx_buf;
732 	struct bdev_rbd *disk = io_device;
733 	int rc;
734 
735 	ch->disk = disk;
736 	pthread_mutex_lock(&disk->mutex);
737 	if (disk->ch_count == 0) {
738 		assert(disk->main_td == NULL);
739 		rc = _bdev_rbd_create_cb(disk);
740 		if (rc) {
741 			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
742 			pthread_mutex_unlock(&disk->mutex);
743 			return rc;
744 		}
745 
746 		disk->main_td = spdk_get_thread();
747 	}
748 
749 	disk->ch_count++;
750 	pthread_mutex_unlock(&disk->mutex);
751 
752 	return 0;
753 }
754 
755 static void
756 _bdev_rbd_destroy_cb(void *ctx)
757 {
758 	struct bdev_rbd *disk = ctx;
759 	bool deferred_free;
760 
761 	pthread_mutex_lock(&disk->mutex);
762 	assert(disk->ch_count > 0);
763 	disk->ch_count--;
764 
765 	if (disk->ch_count > 0) {
766 		/* A new channel was created between when message was sent and this function executed */
767 		pthread_mutex_unlock(&disk->mutex);
768 		return;
769 	}
770 
771 	bdev_rbd_free_channel_resources(disk);
772 
773 	deferred_free = disk->deferred_free;
774 	pthread_mutex_unlock(&disk->mutex);
775 
776 	/* Need to free rbd structure if there is deferred_free case
777 	 * by the bdev_rbd_destruct function */
778 	if (deferred_free) {
779 		bdev_rbd_free(disk);
780 	}
781 }
782 
783 static void
784 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
785 {
786 	struct bdev_rbd *disk = io_device;
787 	struct spdk_thread *thread;
788 
789 	pthread_mutex_lock(&disk->mutex);
790 	assert(disk->ch_count > 0);
791 	disk->ch_count--;
792 	if (disk->ch_count == 0) {
793 		assert(disk->main_td != NULL);
794 		if (disk->main_td != spdk_get_thread()) {
795 			/* The final channel was destroyed on a different thread
796 			 * than where the first channel was created. Pass a message
797 			 * to the main thread to unregister the poller. */
798 			disk->ch_count++;
799 			thread = disk->main_td;
800 			pthread_mutex_unlock(&disk->mutex);
801 			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
802 			return;
803 		}
804 
805 		bdev_rbd_free_channel_resources(disk);
806 	}
807 	pthread_mutex_unlock(&disk->mutex);
808 }
809 
810 static struct spdk_io_channel *
811 bdev_rbd_get_io_channel(void *ctx)
812 {
813 	struct bdev_rbd *rbd_bdev = ctx;
814 
815 	return spdk_get_io_channel(rbd_bdev);
816 }
817 
818 static void
819 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
820 {
821 	struct bdev_rbd_cluster *entry;
822 
823 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
824 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
825 		if (strcmp(cluster_name, entry->name)) {
826 			continue;
827 		}
828 		if (entry->user_id) {
829 			spdk_json_write_named_string(w, "user_id", entry->user_id);
830 		}
831 
832 		if (entry->config_param) {
833 			char **config_entry = entry->config_param;
834 
835 			spdk_json_write_named_object_begin(w, "config_param");
836 			while (*config_entry) {
837 				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
838 				config_entry += 2;
839 			}
840 			spdk_json_write_object_end(w);
841 		} else if (entry->config_file) {
842 			spdk_json_write_named_string(w, "config_file", entry->config_file);
843 		}
844 
845 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
846 		return;
847 	}
848 
849 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
850 }
851 
852 static int
853 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
854 {
855 	struct bdev_rbd *rbd_bdev = ctx;
856 
857 	spdk_json_write_named_object_begin(w, "rbd");
858 
859 	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
860 
861 	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
862 
863 	if (rbd_bdev->cluster_name) {
864 		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
865 		goto end;
866 	}
867 
868 	if (rbd_bdev->user_id) {
869 		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
870 	}
871 
872 	if (rbd_bdev->config) {
873 		char **entry = rbd_bdev->config;
874 
875 		spdk_json_write_named_object_begin(w, "config");
876 		while (*entry) {
877 			spdk_json_write_named_string(w, entry[0], entry[1]);
878 			entry += 2;
879 		}
880 		spdk_json_write_object_end(w);
881 	}
882 
883 end:
884 	spdk_json_write_object_end(w);
885 
886 	return 0;
887 }
888 
889 static void
890 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
891 {
892 	struct bdev_rbd *rbd = bdev->ctxt;
893 
894 	spdk_json_write_object_begin(w);
895 
896 	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
897 
898 	spdk_json_write_named_object_begin(w, "params");
899 	spdk_json_write_named_string(w, "name", bdev->name);
900 	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
901 	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
902 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
903 	if (rbd->user_id) {
904 		spdk_json_write_named_string(w, "user_id", rbd->user_id);
905 	}
906 
907 	if (rbd->config) {
908 		char **entry = rbd->config;
909 
910 		spdk_json_write_named_object_begin(w, "config");
911 		while (*entry) {
912 			spdk_json_write_named_string(w, entry[0], entry[1]);
913 			entry += 2;
914 		}
915 		spdk_json_write_object_end(w);
916 	}
917 
918 	spdk_json_write_object_end(w);
919 
920 	spdk_json_write_object_end(w);
921 }
922 
923 static void
924 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
925 {
926 	assert(entry != NULL);
927 
928 	spdk_json_write_object_begin(w);
929 	spdk_json_write_named_string(w, "cluster_name", entry->name);
930 
931 	if (entry->user_id) {
932 		spdk_json_write_named_string(w, "user_id", entry->user_id);
933 	}
934 
935 	if (entry->config_param) {
936 		char **config_entry = entry->config_param;
937 
938 		spdk_json_write_named_object_begin(w, "config_param");
939 		while (*config_entry) {
940 			spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
941 			config_entry += 2;
942 		}
943 		spdk_json_write_object_end(w);
944 	} else if (entry->config_file) {
945 		spdk_json_write_named_string(w, "config_file", entry->config_file);
946 	}
947 
948 	spdk_json_write_object_end(w);
949 }
950 
951 int
952 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
953 {
954 	struct bdev_rbd_cluster *entry;
955 	struct spdk_json_write_ctx *w;
956 
957 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
958 
959 	if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
960 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
961 		return -ENOENT;
962 	}
963 
964 	/* If cluster name is provided */
965 	if (name) {
966 		STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
967 			if (strcmp(name, entry->name) == 0) {
968 				w = spdk_jsonrpc_begin_result(request);
969 				dump_single_cluster_entry(entry, w);
970 				spdk_jsonrpc_end_result(request, w);
971 
972 				pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
973 				return 0;
974 			}
975 		}
976 
977 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
978 		return -ENOENT;
979 	}
980 
981 	w = spdk_jsonrpc_begin_result(request);
982 	spdk_json_write_array_begin(w);
983 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
984 		dump_single_cluster_entry(entry, w);
985 	}
986 	spdk_json_write_array_end(w);
987 	spdk_jsonrpc_end_result(request, w);
988 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
989 
990 	return 0;
991 }
992 
993 static const struct spdk_bdev_fn_table rbd_fn_table = {
994 	.destruct		= bdev_rbd_destruct,
995 	.submit_request		= bdev_rbd_submit_request,
996 	.io_type_supported	= bdev_rbd_io_type_supported,
997 	.get_io_channel		= bdev_rbd_get_io_channel,
998 	.dump_info_json		= bdev_rbd_dump_info_json,
999 	.write_config_json	= bdev_rbd_write_config_json,
1000 };
1001 
1002 static int
1003 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
1004 		     const char *config_file)
1005 {
1006 	struct bdev_rbd_cluster *entry;
1007 	int rc;
1008 
1009 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1010 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1011 		if (strcmp(name, entry->name) == 0) {
1012 			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
1013 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1014 			return -1;
1015 		}
1016 	}
1017 
1018 	entry = calloc(1, sizeof(*entry));
1019 	if (!entry) {
1020 		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1021 		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1022 		return -1;
1023 	}
1024 
1025 	entry->name = strdup(name);
1026 	if (entry->name == NULL) {
1027 		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1028 		goto err_handle;
1029 	}
1030 
1031 	if (user_id) {
1032 		entry->user_id = strdup(user_id);
1033 		if (entry->user_id == NULL) {
1034 			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1035 			goto err_handle;
1036 		}
1037 	}
1038 
1039 	/* The first priority is the config_param, then we use the config_file */
1040 	if (config_param) {
1041 		entry->config_param = bdev_rbd_dup_config(config_param);
1042 		if (entry->config_param == NULL) {
1043 			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1044 			goto err_handle;
1045 		}
1046 	} else if (config_file) {
1047 		entry->config_file = strdup(config_file);
1048 		if (entry->config_file == NULL) {
1049 			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1050 			goto err_handle;
1051 		}
1052 	}
1053 
1054 	rc = rados_create(&entry->cluster, user_id);
1055 	if (rc < 0) {
1056 		SPDK_ERRLOG("Failed to create rados_t struct\n");
1057 		goto err_handle;
1058 	}
1059 
1060 	if (config_param) {
1061 		const char *const *config_entry = config_param;
1062 		while (*config_entry) {
1063 			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1064 			if (rc < 0) {
1065 				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1066 				rados_shutdown(entry->cluster);
1067 				goto err_handle;
1068 			}
1069 			config_entry += 2;
1070 		}
1071 	} else {
1072 		rc = rados_conf_read_file(entry->cluster, entry->config_file);
1073 		if (rc < 0) {
1074 			SPDK_ERRLOG("Failed to read conf file\n");
1075 			rados_shutdown(entry->cluster);
1076 			goto err_handle;
1077 		}
1078 	}
1079 
1080 	rc = rados_connect(entry->cluster);
1081 	if (rc < 0) {
1082 		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1083 		rados_shutdown(entry->cluster);
1084 		goto err_handle;
1085 	}
1086 
1087 	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1088 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1089 
1090 	return 0;
1091 
1092 err_handle:
1093 	bdev_rbd_cluster_free(entry);
1094 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1095 	return -1;
1096 }
1097 
1098 int
1099 bdev_rbd_unregister_cluster(const char *name)
1100 {
1101 	struct bdev_rbd_cluster *entry;
1102 	int rc = 0;
1103 
1104 	if (name == NULL) {
1105 		return -1;
1106 	}
1107 
1108 	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1109 	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1110 		if (strcmp(name, entry->name) == 0) {
1111 			if (entry->ref == 0) {
1112 				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1113 				rados_shutdown(entry->cluster);
1114 				bdev_rbd_cluster_free(entry);
1115 			} else {
1116 				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1117 					    entry->name);
1118 				rc = -1;
1119 			}
1120 
1121 			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1122 			return rc;
1123 		}
1124 	}
1125 
1126 	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1127 
1128 	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1129 
1130 	return -1;
1131 }
1132 
1133 static void *
1134 _bdev_rbd_register_cluster(void *arg)
1135 {
1136 	struct cluster_register_info *info = arg;
1137 	void *ret = arg;
1138 	int rc;
1139 
1140 	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1141 				  (const char *const *)info->config_param, (const char *)info->config_file);
1142 	if (rc) {
1143 		ret = NULL;
1144 	}
1145 
1146 	return ret;
1147 }
1148 
1149 int
1150 bdev_rbd_register_cluster(struct cluster_register_info *info)
1151 {
1152 	assert(info != NULL);
1153 
1154 	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1155 	 * resource contention */
1156 	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1157 		return -1;
1158 	}
1159 
1160 	return 0;
1161 }
1162 
1163 int
1164 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1165 		const char *pool_name,
1166 		const char *const *config,
1167 		const char *rbd_name,
1168 		uint32_t block_size,
1169 		const char *cluster_name)
1170 {
1171 	struct bdev_rbd *rbd;
1172 	int ret;
1173 
1174 	if ((pool_name == NULL) || (rbd_name == NULL)) {
1175 		return -EINVAL;
1176 	}
1177 
1178 	rbd = calloc(1, sizeof(struct bdev_rbd));
1179 	if (rbd == NULL) {
1180 		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1181 		return -ENOMEM;
1182 	}
1183 
1184 	ret = pthread_mutex_init(&rbd->mutex, NULL);
1185 	if (ret) {
1186 		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
1187 		free(rbd);
1188 		return ret;
1189 	}
1190 
1191 	rbd->pfd = -1;
1192 	rbd->rbd_name = strdup(rbd_name);
1193 	if (!rbd->rbd_name) {
1194 		bdev_rbd_free(rbd);
1195 		return -ENOMEM;
1196 	}
1197 
1198 	if (user_id) {
1199 		rbd->user_id = strdup(user_id);
1200 		if (!rbd->user_id) {
1201 			bdev_rbd_free(rbd);
1202 			return -ENOMEM;
1203 		}
1204 	}
1205 
1206 	if (cluster_name) {
1207 		rbd->cluster_name = strdup(cluster_name);
1208 		if (!rbd->cluster_name) {
1209 			bdev_rbd_free(rbd);
1210 			return -ENOMEM;
1211 		}
1212 	}
1213 	rbd->pool_name = strdup(pool_name);
1214 	if (!rbd->pool_name) {
1215 		bdev_rbd_free(rbd);
1216 		return -ENOMEM;
1217 	}
1218 
1219 	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1220 		bdev_rbd_free(rbd);
1221 		return -ENOMEM;
1222 	}
1223 
1224 	ret = bdev_rbd_init(rbd);
1225 	if (ret < 0) {
1226 		bdev_rbd_free(rbd);
1227 		SPDK_ERRLOG("Failed to init rbd device\n");
1228 		return ret;
1229 	}
1230 
1231 	if (name) {
1232 		rbd->disk.name = strdup(name);
1233 	} else {
1234 		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1235 	}
1236 	if (!rbd->disk.name) {
1237 		bdev_rbd_free(rbd);
1238 		return -ENOMEM;
1239 	}
1240 	rbd->disk.product_name = "Ceph Rbd Disk";
1241 	bdev_rbd_count++;
1242 
1243 	rbd->disk.write_cache = 0;
1244 	rbd->disk.blocklen = block_size;
1245 	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1246 	rbd->disk.ctxt = rbd;
1247 	rbd->disk.fn_table = &rbd_fn_table;
1248 	rbd->disk.module = &rbd_if;
1249 
1250 	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1251 
1252 	spdk_io_device_register(rbd, bdev_rbd_create_cb,
1253 				bdev_rbd_destroy_cb,
1254 				sizeof(struct bdev_rbd_io_channel),
1255 				rbd_name);
1256 	ret = spdk_bdev_register(&rbd->disk);
1257 	if (ret) {
1258 		spdk_io_device_unregister(rbd, NULL);
1259 		bdev_rbd_free(rbd);
1260 		return ret;
1261 	}
1262 
1263 	*bdev = &(rbd->disk);
1264 
1265 	return ret;
1266 }
1267 
1268 void
1269 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1270 {
1271 	if (!bdev || bdev->module != &rbd_if) {
1272 		cb_fn(cb_arg, -ENODEV);
1273 		return;
1274 	}
1275 
1276 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
1277 }
1278 
1279 int
1280 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
1281 {
1282 	struct spdk_io_channel *ch;
1283 	struct bdev_rbd_io_channel *rbd_io_ch;
1284 	int rc;
1285 	uint64_t new_size_in_byte;
1286 	uint64_t current_size_in_mb;
1287 
1288 	if (bdev->module != &rbd_if) {
1289 		return -EINVAL;
1290 	}
1291 
1292 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1293 	if (current_size_in_mb > new_size_in_mb) {
1294 		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
1295 		return -EINVAL;
1296 	}
1297 
1298 	ch = bdev_rbd_get_io_channel(bdev);
1299 	rbd_io_ch = spdk_io_channel_get_ctx(ch);
1300 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
1301 
1302 	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
1303 	spdk_put_io_channel(ch);
1304 	if (rc != 0) {
1305 		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1306 		return rc;
1307 	}
1308 
1309 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1310 	if (rc != 0) {
1311 		SPDK_ERRLOG("failed to notify block cnt change.\n");
1312 		return rc;
1313 	}
1314 
1315 	return rc;
1316 }
1317 
1318 static int
1319 bdev_rbd_group_poll(void *arg)
1320 {
1321 	struct bdev_rbd_group_channel *group_ch = arg;
1322 	struct epoll_event events[MAX_EVENTS_PER_POLL];
1323 	int num_events, i;
1324 
1325 	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
1326 
1327 	if (num_events <= 0) {
1328 		return SPDK_POLLER_IDLE;
1329 	}
1330 
1331 	for (i = 0; i < num_events; i++) {
1332 		bdev_rbd_io_poll((struct bdev_rbd *)events[i].data.ptr);
1333 	}
1334 
1335 	return SPDK_POLLER_BUSY;
1336 }
1337 
1338 static int
1339 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1340 {
1341 	struct bdev_rbd_group_channel *ch = ctx_buf;
1342 
1343 	ch->epoll_fd = epoll_create1(0);
1344 	if (ch->epoll_fd < 0) {
1345 		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
1346 		return -1;
1347 	}
1348 
1349 	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0);
1350 
1351 	return 0;
1352 }
1353 
1354 static void
1355 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1356 {
1357 	struct bdev_rbd_group_channel *ch = ctx_buf;
1358 
1359 	if (ch->epoll_fd >= 0) {
1360 		close(ch->epoll_fd);
1361 	}
1362 
1363 	spdk_poller_unregister(&ch->poller);
1364 }
1365 
1366 static int
1367 bdev_rbd_library_init(void)
1368 {
1369 	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1370 				sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups");
1371 
1372 	return 0;
1373 }
1374 
1375 static void
1376 bdev_rbd_library_fini(void)
1377 {
1378 	spdk_io_device_unregister(&rbd_if, NULL);
1379 }
1380 
1381 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1382