xref: /spdk/module/bdev/rbd/bdev_rbd.c (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1  /*-
2   *   BSD LICENSE
3   *
4   *   Copyright (c) Intel Corporation.
5   *   All rights reserved.
6   *
7   *   Redistribution and use in source and binary forms, with or without
8   *   modification, are permitted provided that the following conditions
9   *   are met:
10   *
11   *     * Redistributions of source code must retain the above copyright
12   *       notice, this list of conditions and the following disclaimer.
13   *     * Redistributions in binary form must reproduce the above copyright
14   *       notice, this list of conditions and the following disclaimer in
15   *       the documentation and/or other materials provided with the
16   *       distribution.
17   *     * Neither the name of Intel Corporation nor the names of its
18   *       contributors may be used to endorse or promote products derived
19   *       from this software without specific prior written permission.
20   *
21   *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22   *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23   *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24   *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25   *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26   *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27   *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28   *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29   *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30   *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31   *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32   */
33  
34  #include "spdk/stdinc.h"
35  
36  #include "bdev_rbd.h"
37  
38  #include <rbd/librbd.h>
39  #include <rados/librados.h>
40  #include <sys/eventfd.h>
41  #include <sys/epoll.h>
42  
43  #include "spdk/env.h"
44  #include "spdk/bdev.h"
45  #include "spdk/thread.h"
46  #include "spdk/json.h"
47  #include "spdk/string.h"
48  #include "spdk/util.h"
49  #include "spdk/likely.h"
50  
51  #include "spdk/bdev_module.h"
52  #include "spdk/log.h"
53  
54  #define SPDK_RBD_QUEUE_DEPTH 128
55  #define MAX_EVENTS_PER_POLL 128
56  
57  static int bdev_rbd_count = 0;
58  
59  struct bdev_rbd {
60  	struct spdk_bdev disk;
61  	char *rbd_name;
62  	char *user_id;
63  	char *pool_name;
64  	char **config;
65  	rbd_image_info_t info;
66  	TAILQ_ENTRY(bdev_rbd) tailq;
67  	struct spdk_poller *reset_timer;
68  	struct spdk_bdev_io *reset_bdev_io;
69  };
70  
71  struct bdev_rbd_group_channel {
72  	struct spdk_poller *poller;
73  	int epoll_fd;
74  };
75  
76  struct bdev_rbd_io_channel {
77  	rados_ioctx_t io_ctx;
78  	rados_t cluster;
79  	int pfd;
80  	rbd_image_t image;
81  	struct bdev_rbd *disk;
82  	struct bdev_rbd_group_channel *group_ch;
83  };
84  
85  struct bdev_rbd_io {
86  	size_t	total_len;
87  };
88  
89  static void
90  bdev_rbd_free(struct bdev_rbd *rbd)
91  {
92  	if (!rbd) {
93  		return;
94  	}
95  
96  	free(rbd->disk.name);
97  	free(rbd->rbd_name);
98  	free(rbd->user_id);
99  	free(rbd->pool_name);
100  	bdev_rbd_free_config(rbd->config);
101  	free(rbd);
102  }
103  
104  void
105  bdev_rbd_free_config(char **config)
106  {
107  	char **entry;
108  
109  	if (config) {
110  		for (entry = config; *entry; entry++) {
111  			free(*entry);
112  		}
113  		free(config);
114  	}
115  }
116  
117  char **
118  bdev_rbd_dup_config(const char *const *config)
119  {
120  	size_t count;
121  	char **copy;
122  
123  	if (!config) {
124  		return NULL;
125  	}
126  	for (count = 0; config[count]; count++) {}
127  	copy = calloc(count + 1, sizeof(*copy));
128  	if (!copy) {
129  		return NULL;
130  	}
131  	for (count = 0; config[count]; count++) {
132  		if (!(copy[count] = strdup(config[count]))) {
133  			bdev_rbd_free_config(copy);
134  			return NULL;
135  		}
136  	}
137  	return copy;
138  }
139  
140  static int
141  bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
142  			rados_t *cluster, rados_ioctx_t *io_ctx)
143  {
144  	int ret;
145  
146  	ret = rados_create(cluster, user_id);
147  	if (ret < 0) {
148  		SPDK_ERRLOG("Failed to create rados_t struct\n");
149  		return -1;
150  	}
151  
152  	if (config) {
153  		const char *const *entry = config;
154  		while (*entry) {
155  			ret = rados_conf_set(*cluster, entry[0], entry[1]);
156  			if (ret < 0) {
157  				SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
158  				rados_shutdown(*cluster);
159  				return -1;
160  			}
161  			entry += 2;
162  		}
163  	} else {
164  		ret = rados_conf_read_file(*cluster, NULL);
165  		if (ret < 0) {
166  			SPDK_ERRLOG("Failed to read conf file\n");
167  			rados_shutdown(*cluster);
168  			return -1;
169  		}
170  	}
171  
172  	ret = rados_connect(*cluster);
173  	if (ret < 0) {
174  		SPDK_ERRLOG("Failed to connect to rbd_pool\n");
175  		rados_shutdown(*cluster);
176  		return -1;
177  	}
178  
179  	ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx);
180  
181  	if (ret < 0) {
182  		SPDK_ERRLOG("Failed to create ioctx\n");
183  		rados_shutdown(*cluster);
184  		return -1;
185  	}
186  
187  	return 0;
188  }
189  
190  static int
191  bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
192  	      const char *rbd_name, rbd_image_info_t *info)
193  {
194  	int ret;
195  	rados_t cluster = NULL;
196  	rados_ioctx_t io_ctx = NULL;
197  	rbd_image_t image = NULL;
198  
199  	ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx);
200  	if (ret < 0) {
201  		SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n",
202  			    user_id ? user_id : "admin (the default)", rbd_pool_name);
203  		return -1;
204  	}
205  
206  	ret = rbd_open(io_ctx, rbd_name, &image, NULL);
207  	if (ret < 0) {
208  		SPDK_ERRLOG("Failed to open specified rbd device\n");
209  		goto err;
210  	}
211  	ret = rbd_stat(image, info, sizeof(*info));
212  	rbd_close(image);
213  	if (ret < 0) {
214  		SPDK_ERRLOG("Failed to stat specified rbd device\n");
215  		goto err;
216  	}
217  
218  	rados_ioctx_destroy(io_ctx);
219  	return 0;
220  err:
221  	rados_ioctx_destroy(io_ctx);
222  	rados_shutdown(cluster);
223  	return -1;
224  }
225  
226  static void
227  bdev_rbd_exit(rbd_image_t image)
228  {
229  	rbd_flush(image);
230  	rbd_close(image);
231  }
232  
233  static void
234  bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
235  {
236  	/* Doing nothing here */
237  }
238  
239  static int
240  bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
241  		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
242  {
243  	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
244  	int ret;
245  	rbd_completion_t comp;
246  	struct bdev_rbd_io *rbd_io;
247  	rbd_image_t image = rbdio_ch->image;
248  
249  	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
250  					&comp);
251  	if (ret < 0) {
252  		return -1;
253  	}
254  
255  	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
256  		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
257  		rbd_io->total_len = len;
258  		if (spdk_likely(iovcnt == 1)) {
259  			ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
260  		} else {
261  			ret = rbd_aio_readv(image, iov, iovcnt, offset, comp);
262  		}
263  	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
264  		if (spdk_likely(iovcnt == 1)) {
265  			ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp);
266  		} else {
267  			ret = rbd_aio_writev(image, iov, iovcnt, offset, comp);
268  		}
269  	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
270  		ret = rbd_aio_flush(image, comp);
271  	}
272  
273  	if (ret < 0) {
274  		rbd_aio_release(comp);
275  		return -1;
276  	}
277  
278  	return 0;
279  }
280  
281  static int bdev_rbd_library_init(void);
282  
283  static void bdev_rbd_library_fini(void);
284  
285  static int
286  bdev_rbd_get_ctx_size(void)
287  {
288  	return sizeof(struct bdev_rbd_io);
289  }
290  
291  static struct spdk_bdev_module rbd_if = {
292  	.name = "rbd",
293  	.module_init = bdev_rbd_library_init,
294  	.module_fini = bdev_rbd_library_fini,
295  	.get_ctx_size = bdev_rbd_get_ctx_size,
296  
297  };
298  SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
299  
300  static int
301  bdev_rbd_reset_timer(void *arg)
302  {
303  	struct bdev_rbd *disk = arg;
304  
305  	/*
306  	 * TODO: This should check if any I/O is still in flight before completing the reset.
307  	 * For now, just complete after the timer expires.
308  	 */
309  	spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
310  	spdk_poller_unregister(&disk->reset_timer);
311  	disk->reset_bdev_io = NULL;
312  
313  	return SPDK_POLLER_BUSY;
314  }
315  
316  static int
317  bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
318  {
319  	/*
320  	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
321  	 * timer to wait for in-flight I/O to complete.
322  	 */
323  	assert(disk->reset_bdev_io == NULL);
324  	disk->reset_bdev_io = bdev_io;
325  	disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
326  
327  	return 0;
328  }
329  
330  static int
331  bdev_rbd_destruct(void *ctx)
332  {
333  	struct bdev_rbd *rbd = ctx;
334  
335  	spdk_io_device_unregister(rbd, NULL);
336  
337  	bdev_rbd_free(rbd);
338  	return 0;
339  }
340  
341  static void
342  bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
343  		    bool success)
344  {
345  	int ret;
346  
347  	if (!success) {
348  		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
349  		return;
350  	}
351  
352  	ret = bdev_rbd_start_aio(ch,
353  				 bdev_io,
354  				 bdev_io->u.bdev.iovs,
355  				 bdev_io->u.bdev.iovcnt,
356  				 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
357  				 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
358  
359  	if (ret != 0) {
360  		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
361  	}
362  }
363  
364  static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
365  {
366  	switch (bdev_io->type) {
367  	case SPDK_BDEV_IO_TYPE_READ:
368  		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
369  				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
370  		return 0;
371  
372  	case SPDK_BDEV_IO_TYPE_WRITE:
373  	case SPDK_BDEV_IO_TYPE_FLUSH:
374  		return bdev_rbd_start_aio(ch,
375  					  bdev_io,
376  					  bdev_io->u.bdev.iovs,
377  					  bdev_io->u.bdev.iovcnt,
378  					  bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
379  					  bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
380  
381  	case SPDK_BDEV_IO_TYPE_RESET:
382  		return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
383  				      bdev_io);
384  
385  	default:
386  		return -1;
387  	}
388  	return 0;
389  }
390  
391  static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
392  {
393  	if (_bdev_rbd_submit_request(ch, bdev_io) < 0) {
394  		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
395  	}
396  }
397  
398  static bool
399  bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
400  {
401  	switch (io_type) {
402  	case SPDK_BDEV_IO_TYPE_READ:
403  	case SPDK_BDEV_IO_TYPE_WRITE:
404  	case SPDK_BDEV_IO_TYPE_FLUSH:
405  	case SPDK_BDEV_IO_TYPE_RESET:
406  		return true;
407  
408  	default:
409  		return false;
410  	}
411  }
412  
413  static void
414  bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
415  {
416  	int i, io_status, rc;
417  	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
418  	struct spdk_bdev_io *bdev_io;
419  	struct bdev_rbd_io *rbd_io;
420  	enum spdk_bdev_io_status bio_status;
421  
422  	rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
423  	for (i = 0; i < rc; i++) {
424  		bdev_io = rbd_aio_get_arg(comps[i]);
425  		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
426  		io_status = rbd_aio_get_return_value(comps[i]);
427  		bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
428  
429  		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
430  			if ((int)rbd_io->total_len != io_status) {
431  				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
432  			}
433  		} else {
434  			/* For others, 0 means success */
435  			if (io_status != 0) {
436  				bio_status = SPDK_BDEV_IO_STATUS_FAILED;
437  			}
438  		}
439  
440  		rbd_aio_release(comps[i]);
441  
442  		spdk_bdev_io_complete(bdev_io, bio_status);
443  	}
444  }
445  
446  static void
447  bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
448  {
449  	if (!ch) {
450  		return;
451  	}
452  
453  	if (ch->image) {
454  		bdev_rbd_exit(ch->image);
455  	}
456  
457  	if (ch->io_ctx) {
458  		rados_ioctx_destroy(ch->io_ctx);
459  	}
460  
461  	if (ch->cluster) {
462  		rados_shutdown(ch->cluster);
463  	}
464  
465  	if (ch->pfd >= 0) {
466  		close(ch->pfd);
467  	}
468  
469  	if (ch->group_ch) {
470  		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
471  	}
472  }
473  
474  static void *
475  bdev_rbd_handle(void *arg)
476  {
477  	struct bdev_rbd_io_channel *ch = arg;
478  	void *ret = arg;
479  	int rc;
480  
481  	rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name,
482  				     (const char *const *)ch->disk->config,
483  				     &ch->cluster, &ch->io_ctx);
484  	if (rc < 0) {
485  		SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n",
486  			    ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name);
487  		ret = NULL;
488  		goto end;
489  	}
490  
491  	if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
492  		SPDK_ERRLOG("Failed to open specified rbd device\n");
493  		ret = NULL;
494  	}
495  
496  end:
497  	return ret;
498  }
499  
500  static int
501  bdev_rbd_create_cb(void *io_device, void *ctx_buf)
502  {
503  	struct bdev_rbd_io_channel *ch = ctx_buf;
504  	int ret;
505  	struct epoll_event event;
506  
507  	ch->disk = io_device;
508  	ch->image = NULL;
509  	ch->io_ctx = NULL;
510  	ch->pfd = -1;
511  
512  	if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
513  		goto err;
514  	}
515  
516  	ch->pfd = eventfd(0, EFD_NONBLOCK);
517  	if (ch->pfd < 0) {
518  		SPDK_ERRLOG("Failed to get eventfd\n");
519  		goto err;
520  	}
521  
522  	ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD);
523  	if (ret < 0) {
524  		SPDK_ERRLOG("Failed to set rbd image notification\n");
525  		goto err;
526  	}
527  
528  	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
529  	assert(ch->group_ch != NULL);
530  	memset(&event, 0, sizeof(event));
531  	event.events = EPOLLIN;
532  	event.data.ptr = ch;
533  
534  	ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event);
535  	if (ret < 0) {
536  		SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch,
537  			    ch->group_ch);
538  		goto err;
539  	}
540  
541  	return 0;
542  
543  err:
544  	bdev_rbd_free_channel(ch);
545  	return -1;
546  }
547  
548  static void
549  bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
550  {
551  	struct bdev_rbd_io_channel *io_channel = ctx_buf;
552  	int rc;
553  
554  	rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL,
555  		       io_channel->pfd, NULL);
556  	if (rc < 0) {
557  		SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n",
558  			    io_channel, io_channel->group_ch);
559  	}
560  
561  	bdev_rbd_free_channel(io_channel);
562  }
563  
564  static struct spdk_io_channel *
565  bdev_rbd_get_io_channel(void *ctx)
566  {
567  	struct bdev_rbd *rbd_bdev = ctx;
568  
569  	return spdk_get_io_channel(rbd_bdev);
570  }
571  
572  static int
573  bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
574  {
575  	struct bdev_rbd *rbd_bdev = ctx;
576  
577  	spdk_json_write_named_object_begin(w, "rbd");
578  
579  	spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
580  
581  	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
582  
583  	if (rbd_bdev->user_id) {
584  		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
585  	}
586  
587  	if (rbd_bdev->config) {
588  		char **entry = rbd_bdev->config;
589  
590  		spdk_json_write_named_object_begin(w, "config");
591  		while (*entry) {
592  			spdk_json_write_named_string(w, entry[0], entry[1]);
593  			entry += 2;
594  		}
595  		spdk_json_write_object_end(w);
596  	}
597  
598  	spdk_json_write_object_end(w);
599  
600  	return 0;
601  }
602  
603  static void
604  bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
605  {
606  	struct bdev_rbd *rbd = bdev->ctxt;
607  
608  	spdk_json_write_object_begin(w);
609  
610  	spdk_json_write_named_string(w, "method", "bdev_rbd_create");
611  
612  	spdk_json_write_named_object_begin(w, "params");
613  	spdk_json_write_named_string(w, "name", bdev->name);
614  	spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
615  	spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
616  	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
617  	if (rbd->user_id) {
618  		spdk_json_write_named_string(w, "user_id", rbd->user_id);
619  	}
620  
621  	if (rbd->config) {
622  		char **entry = rbd->config;
623  
624  		spdk_json_write_named_object_begin(w, "config");
625  		while (*entry) {
626  			spdk_json_write_named_string(w, entry[0], entry[1]);
627  			entry += 2;
628  		}
629  		spdk_json_write_object_end(w);
630  	}
631  
632  	spdk_json_write_object_end(w);
633  
634  	spdk_json_write_object_end(w);
635  }
636  
637  static const struct spdk_bdev_fn_table rbd_fn_table = {
638  	.destruct		= bdev_rbd_destruct,
639  	.submit_request		= bdev_rbd_submit_request,
640  	.io_type_supported	= bdev_rbd_io_type_supported,
641  	.get_io_channel		= bdev_rbd_get_io_channel,
642  	.dump_info_json		= bdev_rbd_dump_info_json,
643  	.write_config_json	= bdev_rbd_write_config_json,
644  };
645  
646  int
647  bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
648  		const char *pool_name,
649  		const char *const *config,
650  		const char *rbd_name,
651  		uint32_t block_size)
652  {
653  	struct bdev_rbd *rbd;
654  	int ret;
655  
656  	if ((pool_name == NULL) || (rbd_name == NULL)) {
657  		return -EINVAL;
658  	}
659  
660  	rbd = calloc(1, sizeof(struct bdev_rbd));
661  	if (rbd == NULL) {
662  		SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
663  		return -ENOMEM;
664  	}
665  
666  	rbd->rbd_name = strdup(rbd_name);
667  	if (!rbd->rbd_name) {
668  		bdev_rbd_free(rbd);
669  		return -ENOMEM;
670  	}
671  
672  	if (user_id) {
673  		rbd->user_id = strdup(user_id);
674  		if (!rbd->user_id) {
675  			bdev_rbd_free(rbd);
676  			return -ENOMEM;
677  		}
678  	}
679  
680  	rbd->pool_name = strdup(pool_name);
681  	if (!rbd->pool_name) {
682  		bdev_rbd_free(rbd);
683  		return -ENOMEM;
684  	}
685  
686  	if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
687  		bdev_rbd_free(rbd);
688  		return -ENOMEM;
689  	}
690  
691  	ret = bdev_rbd_init(rbd->user_id, rbd->pool_name,
692  			    (const char *const *)rbd->config,
693  			    rbd_name, &rbd->info);
694  	if (ret < 0) {
695  		bdev_rbd_free(rbd);
696  		SPDK_ERRLOG("Failed to init rbd device\n");
697  		return ret;
698  	}
699  
700  	if (name) {
701  		rbd->disk.name = strdup(name);
702  	} else {
703  		rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
704  	}
705  	if (!rbd->disk.name) {
706  		bdev_rbd_free(rbd);
707  		return -ENOMEM;
708  	}
709  	rbd->disk.product_name = "Ceph Rbd Disk";
710  	bdev_rbd_count++;
711  
712  	rbd->disk.write_cache = 0;
713  	rbd->disk.blocklen = block_size;
714  	rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
715  	rbd->disk.ctxt = rbd;
716  	rbd->disk.fn_table = &rbd_fn_table;
717  	rbd->disk.module = &rbd_if;
718  
719  	SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
720  
721  	spdk_io_device_register(rbd, bdev_rbd_create_cb,
722  				bdev_rbd_destroy_cb,
723  				sizeof(struct bdev_rbd_io_channel),
724  				rbd_name);
725  	ret = spdk_bdev_register(&rbd->disk);
726  	if (ret) {
727  		spdk_io_device_unregister(rbd, NULL);
728  		bdev_rbd_free(rbd);
729  		return ret;
730  	}
731  
732  	*bdev = &(rbd->disk);
733  
734  	return ret;
735  }
736  
737  void
738  bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
739  {
740  	if (!bdev || bdev->module != &rbd_if) {
741  		cb_fn(cb_arg, -ENODEV);
742  		return;
743  	}
744  
745  	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
746  }
747  
748  int
749  bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
750  {
751  	struct spdk_io_channel *ch;
752  	struct bdev_rbd_io_channel *rbd_io_ch;
753  	int rc;
754  	uint64_t new_size_in_byte;
755  	uint64_t current_size_in_mb;
756  
757  	if (bdev->module != &rbd_if) {
758  		return -EINVAL;
759  	}
760  
761  	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
762  	if (current_size_in_mb > new_size_in_mb) {
763  		SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n");
764  		return -EINVAL;
765  	}
766  
767  	ch = bdev_rbd_get_io_channel(bdev);
768  	rbd_io_ch = spdk_io_channel_get_ctx(ch);
769  	new_size_in_byte = new_size_in_mb * 1024 * 1024;
770  
771  	rc = rbd_resize(rbd_io_ch->image, new_size_in_byte);
772  	if (rc != 0) {
773  		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
774  		return rc;
775  	}
776  
777  	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
778  	if (rc != 0) {
779  		SPDK_ERRLOG("failed to notify block cnt change.\n");
780  		return rc;
781  	}
782  
783  	return rc;
784  }
785  
786  static int
787  bdev_rbd_group_poll(void *arg)
788  {
789  	struct bdev_rbd_group_channel *group_ch = arg;
790  	struct epoll_event events[MAX_EVENTS_PER_POLL];
791  	int num_events, i;
792  
793  	num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0);
794  
795  	if (num_events <= 0) {
796  		return SPDK_POLLER_IDLE;
797  	}
798  
799  	for (i = 0; i < num_events; i++) {
800  		bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr);
801  	}
802  
803  	return SPDK_POLLER_BUSY;
804  }
805  
806  static int
807  bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
808  {
809  	struct bdev_rbd_group_channel *ch = ctx_buf;
810  
811  	ch->epoll_fd = epoll_create1(0);
812  	if (ch->epoll_fd < 0) {
813  		SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device);
814  		return -1;
815  	}
816  
817  	ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0);
818  
819  	return 0;
820  }
821  
822  static void
823  bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
824  {
825  	struct bdev_rbd_group_channel *ch = ctx_buf;
826  
827  	if (ch->epoll_fd >= 0) {
828  		close(ch->epoll_fd);
829  	}
830  
831  	spdk_poller_unregister(&ch->poller);
832  }
833  
834  static int
835  bdev_rbd_library_init(void)
836  {
837  	spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
838  				sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups");
839  
840  	return 0;
841  }
842  
843  static void
844  bdev_rbd_library_fini(void)
845  {
846  	spdk_io_device_unregister(&rbd_if, NULL);
847  }
848  
849  SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
850