1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/conf.h" 44 #include "spdk/env.h" 45 #include "spdk/bdev.h" 46 #include "spdk/thread.h" 47 #include "spdk/json.h" 48 #include "spdk/string.h" 49 #include "spdk/util.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 54 #define SPDK_RBD_QUEUE_DEPTH 128 55 #define MAX_EVENTS_PER_POLL 128 56 57 static int bdev_rbd_count = 0; 58 59 #define BDEV_RBD_POLL_US 50 60 61 struct bdev_rbd { 62 struct spdk_bdev disk; 63 char *rbd_name; 64 char *user_id; 65 char *pool_name; 66 char **config; 67 rbd_image_info_t info; 68 TAILQ_ENTRY(bdev_rbd) tailq; 69 struct spdk_poller *reset_timer; 70 struct spdk_bdev_io *reset_bdev_io; 71 }; 72 73 struct bdev_rbd_group_channel { 74 struct spdk_poller *poller; 75 int epoll_fd; 76 }; 77 78 struct bdev_rbd_io_channel { 79 rados_ioctx_t io_ctx; 80 rados_t cluster; 81 int pfd; 82 rbd_image_t image; 83 struct bdev_rbd *disk; 84 struct bdev_rbd_group_channel *group_ch; 85 }; 86 87 struct bdev_rbd_io { 88 size_t total_len; 89 }; 90 91 static void 92 bdev_rbd_free(struct bdev_rbd *rbd) 93 { 94 if (!rbd) { 95 return; 96 } 97 98 free(rbd->disk.name); 99 free(rbd->rbd_name); 100 free(rbd->user_id); 101 free(rbd->pool_name); 102 bdev_rbd_free_config(rbd->config); 103 free(rbd); 104 } 105 106 void 107 bdev_rbd_free_config(char **config) 108 { 109 char **entry; 110 111 if (config) { 112 for (entry = config; *entry; entry++) { 113 free(*entry); 114 } 115 free(config); 116 } 117 } 118 119 char ** 120 bdev_rbd_dup_config(const char *const *config) 121 { 122 size_t count; 123 char **copy; 124 125 if (!config) { 126 return NULL; 127 } 128 for (count = 0; config[count]; count++) {} 129 copy = calloc(count + 1, sizeof(*copy)); 130 if (!copy) { 131 return NULL; 132 } 133 for (count = 0; config[count]; count++) { 134 if (!(copy[count] = strdup(config[count]))) { 135 bdev_rbd_free_config(copy); 136 return NULL; 137 } 138 } 139 return copy; 140 } 141 142 static int 143 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 144 rados_t *cluster, rados_ioctx_t *io_ctx) 145 { 146 int ret; 147 148 ret = rados_create(cluster, user_id); 149 if (ret < 0) { 150 SPDK_ERRLOG("Failed to create rados_t struct\n"); 151 return -1; 152 } 153 154 if (config) { 155 const char *const *entry = config; 156 while (*entry) { 157 ret = rados_conf_set(*cluster, entry[0], entry[1]); 158 if (ret < 0) { 159 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 160 rados_shutdown(*cluster); 161 return -1; 162 } 163 entry += 2; 164 } 165 } else { 166 ret = rados_conf_read_file(*cluster, NULL); 167 if (ret < 0) { 168 SPDK_ERRLOG("Failed to read conf file\n"); 169 rados_shutdown(*cluster); 170 return -1; 171 } 172 } 173 174 ret = rados_connect(*cluster); 175 if (ret < 0) { 176 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 177 rados_shutdown(*cluster); 178 return -1; 179 } 180 181 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx); 182 183 if (ret < 0) { 184 SPDK_ERRLOG("Failed to create ioctx\n"); 185 rados_shutdown(*cluster); 186 return -1; 187 } 188 189 return 0; 190 } 191 192 static int 193 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 194 const char *rbd_name, rbd_image_info_t *info) 195 { 196 int ret; 197 rados_t cluster = NULL; 198 rados_ioctx_t io_ctx = NULL; 199 rbd_image_t image = NULL; 200 201 ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx); 202 if (ret < 0) { 203 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n", 204 user_id ? user_id : "admin (the default)", rbd_pool_name); 205 return -1; 206 } 207 208 ret = rbd_open(io_ctx, rbd_name, &image, NULL); 209 if (ret < 0) { 210 SPDK_ERRLOG("Failed to open specified rbd device\n"); 211 goto err; 212 } 213 ret = rbd_stat(image, info, sizeof(*info)); 214 rbd_close(image); 215 if (ret < 0) { 216 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 217 goto err; 218 } 219 220 rados_ioctx_destroy(io_ctx); 221 return 0; 222 err: 223 rados_ioctx_destroy(io_ctx); 224 rados_shutdown(cluster); 225 return -1; 226 } 227 228 static void 229 bdev_rbd_exit(rbd_image_t image) 230 { 231 rbd_flush(image); 232 rbd_close(image); 233 } 234 235 static void 236 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 237 { 238 /* Doing nothing here */ 239 } 240 241 static int 242 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 243 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 244 { 245 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 246 int ret; 247 rbd_completion_t comp; 248 struct bdev_rbd_io *rbd_io; 249 rbd_image_t image = rbdio_ch->image; 250 251 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 252 &comp); 253 if (ret < 0) { 254 return -1; 255 } 256 257 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 258 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 259 rbd_io->total_len = len; 260 ret = rbd_aio_readv(image, iov, iovcnt, offset, comp); 261 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 262 ret = rbd_aio_writev(image, iov, iovcnt, offset, comp); 263 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 264 ret = rbd_aio_flush(image, comp); 265 } 266 267 if (ret < 0) { 268 rbd_aio_release(comp); 269 return -1; 270 } 271 272 return 0; 273 } 274 275 static int bdev_rbd_library_init(void); 276 277 static void bdev_rbd_library_fini(void); 278 279 static int 280 bdev_rbd_get_ctx_size(void) 281 { 282 return sizeof(struct bdev_rbd_io); 283 } 284 285 static struct spdk_bdev_module rbd_if = { 286 .name = "rbd", 287 .module_init = bdev_rbd_library_init, 288 .module_fini = bdev_rbd_library_fini, 289 .get_ctx_size = bdev_rbd_get_ctx_size, 290 291 }; 292 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 293 294 static int 295 bdev_rbd_reset_timer(void *arg) 296 { 297 struct bdev_rbd *disk = arg; 298 299 /* 300 * TODO: This should check if any I/O is still in flight before completing the reset. 301 * For now, just complete after the timer expires. 302 */ 303 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 304 spdk_poller_unregister(&disk->reset_timer); 305 disk->reset_bdev_io = NULL; 306 307 return SPDK_POLLER_BUSY; 308 } 309 310 static int 311 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 312 { 313 /* 314 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 315 * timer to wait for in-flight I/O to complete. 316 */ 317 assert(disk->reset_bdev_io == NULL); 318 disk->reset_bdev_io = bdev_io; 319 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 320 321 return 0; 322 } 323 324 static int 325 bdev_rbd_destruct(void *ctx) 326 { 327 struct bdev_rbd *rbd = ctx; 328 329 spdk_io_device_unregister(rbd, NULL); 330 331 bdev_rbd_free(rbd); 332 return 0; 333 } 334 335 static void 336 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 337 bool success) 338 { 339 int ret; 340 341 if (!success) { 342 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 343 return; 344 } 345 346 ret = bdev_rbd_start_aio(ch, 347 bdev_io, 348 bdev_io->u.bdev.iovs, 349 bdev_io->u.bdev.iovcnt, 350 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 351 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 352 353 if (ret != 0) { 354 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 355 } 356 } 357 358 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 359 { 360 switch (bdev_io->type) { 361 case SPDK_BDEV_IO_TYPE_READ: 362 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 363 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 364 return 0; 365 366 case SPDK_BDEV_IO_TYPE_WRITE: 367 case SPDK_BDEV_IO_TYPE_FLUSH: 368 return bdev_rbd_start_aio(ch, 369 bdev_io, 370 bdev_io->u.bdev.iovs, 371 bdev_io->u.bdev.iovcnt, 372 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 373 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 374 375 case SPDK_BDEV_IO_TYPE_RESET: 376 return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 377 bdev_io); 378 379 default: 380 return -1; 381 } 382 return 0; 383 } 384 385 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 386 { 387 if (_bdev_rbd_submit_request(ch, bdev_io) < 0) { 388 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 389 } 390 } 391 392 static bool 393 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 394 { 395 switch (io_type) { 396 case SPDK_BDEV_IO_TYPE_READ: 397 case SPDK_BDEV_IO_TYPE_WRITE: 398 case SPDK_BDEV_IO_TYPE_FLUSH: 399 case SPDK_BDEV_IO_TYPE_RESET: 400 return true; 401 402 default: 403 return false; 404 } 405 } 406 407 static void 408 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch) 409 { 410 int i, io_status, rc; 411 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 412 struct spdk_bdev_io *bdev_io; 413 struct bdev_rbd_io *rbd_io; 414 enum spdk_bdev_io_status bio_status; 415 416 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 417 for (i = 0; i < rc; i++) { 418 bdev_io = rbd_aio_get_arg(comps[i]); 419 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 420 io_status = rbd_aio_get_return_value(comps[i]); 421 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 422 423 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 424 if ((int)rbd_io->total_len != io_status) { 425 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 426 } 427 } else { 428 /* For others, 0 means success */ 429 if (io_status != 0) { 430 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 431 } 432 } 433 434 rbd_aio_release(comps[i]); 435 436 spdk_bdev_io_complete(bdev_io, bio_status); 437 } 438 } 439 440 static void 441 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 442 { 443 if (!ch) { 444 return; 445 } 446 447 if (ch->image) { 448 bdev_rbd_exit(ch->image); 449 } 450 451 if (ch->io_ctx) { 452 rados_ioctx_destroy(ch->io_ctx); 453 } 454 455 if (ch->cluster) { 456 rados_shutdown(ch->cluster); 457 } 458 459 if (ch->pfd >= 0) { 460 close(ch->pfd); 461 } 462 463 if (ch->group_ch) { 464 spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch)); 465 } 466 } 467 468 static void * 469 bdev_rbd_handle(void *arg) 470 { 471 struct bdev_rbd_io_channel *ch = arg; 472 void *ret = arg; 473 int rc; 474 475 rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name, 476 (const char *const *)ch->disk->config, 477 &ch->cluster, &ch->io_ctx); 478 if (rc < 0) { 479 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n", 480 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name); 481 ret = NULL; 482 goto end; 483 } 484 485 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 486 SPDK_ERRLOG("Failed to open specified rbd device\n"); 487 ret = NULL; 488 } 489 490 end: 491 return ret; 492 } 493 494 static int 495 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 496 { 497 struct bdev_rbd_io_channel *ch = ctx_buf; 498 int ret; 499 struct epoll_event event; 500 501 ch->disk = io_device; 502 ch->image = NULL; 503 ch->io_ctx = NULL; 504 ch->pfd = -1; 505 506 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 507 goto err; 508 } 509 510 ch->pfd = eventfd(0, EFD_NONBLOCK); 511 if (ch->pfd < 0) { 512 SPDK_ERRLOG("Failed to get eventfd\n"); 513 goto err; 514 } 515 516 ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD); 517 if (ret < 0) { 518 SPDK_ERRLOG("Failed to set rbd image notification\n"); 519 goto err; 520 } 521 522 ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 523 assert(ch->group_ch != NULL); 524 memset(&event, 0, sizeof(event)); 525 event.events = EPOLLIN; 526 event.data.ptr = ch; 527 528 ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event); 529 if (ret < 0) { 530 SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch, 531 ch->group_ch); 532 goto err; 533 } 534 535 return 0; 536 537 err: 538 bdev_rbd_free_channel(ch); 539 return -1; 540 } 541 542 static void 543 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 544 { 545 struct bdev_rbd_io_channel *io_channel = ctx_buf; 546 int rc; 547 548 rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL, 549 io_channel->pfd, NULL); 550 if (rc < 0) { 551 SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n", 552 io_channel, io_channel->group_ch); 553 } 554 555 bdev_rbd_free_channel(io_channel); 556 } 557 558 static struct spdk_io_channel * 559 bdev_rbd_get_io_channel(void *ctx) 560 { 561 struct bdev_rbd *rbd_bdev = ctx; 562 563 return spdk_get_io_channel(rbd_bdev); 564 } 565 566 static int 567 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 568 { 569 struct bdev_rbd *rbd_bdev = ctx; 570 571 spdk_json_write_named_object_begin(w, "rbd"); 572 573 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 574 575 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 576 577 if (rbd_bdev->user_id) { 578 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 579 } 580 581 if (rbd_bdev->config) { 582 char **entry = rbd_bdev->config; 583 584 spdk_json_write_named_object_begin(w, "config"); 585 while (*entry) { 586 spdk_json_write_named_string(w, entry[0], entry[1]); 587 entry += 2; 588 } 589 spdk_json_write_object_end(w); 590 } 591 592 spdk_json_write_object_end(w); 593 594 return 0; 595 } 596 597 static void 598 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 599 { 600 struct bdev_rbd *rbd = bdev->ctxt; 601 602 spdk_json_write_object_begin(w); 603 604 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 605 606 spdk_json_write_named_object_begin(w, "params"); 607 spdk_json_write_named_string(w, "name", bdev->name); 608 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 609 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 610 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 611 if (rbd->user_id) { 612 spdk_json_write_named_string(w, "user_id", rbd->user_id); 613 } 614 615 if (rbd->config) { 616 char **entry = rbd->config; 617 618 spdk_json_write_named_object_begin(w, "config"); 619 while (*entry) { 620 spdk_json_write_named_string(w, entry[0], entry[1]); 621 entry += 2; 622 } 623 spdk_json_write_object_end(w); 624 } 625 626 spdk_json_write_object_end(w); 627 628 spdk_json_write_object_end(w); 629 } 630 631 static const struct spdk_bdev_fn_table rbd_fn_table = { 632 .destruct = bdev_rbd_destruct, 633 .submit_request = bdev_rbd_submit_request, 634 .io_type_supported = bdev_rbd_io_type_supported, 635 .get_io_channel = bdev_rbd_get_io_channel, 636 .dump_info_json = bdev_rbd_dump_info_json, 637 .write_config_json = bdev_rbd_write_config_json, 638 }; 639 640 int 641 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 642 const char *pool_name, 643 const char *const *config, 644 const char *rbd_name, 645 uint32_t block_size) 646 { 647 struct bdev_rbd *rbd; 648 int ret; 649 650 if ((pool_name == NULL) || (rbd_name == NULL)) { 651 return -EINVAL; 652 } 653 654 rbd = calloc(1, sizeof(struct bdev_rbd)); 655 if (rbd == NULL) { 656 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 657 return -ENOMEM; 658 } 659 660 rbd->rbd_name = strdup(rbd_name); 661 if (!rbd->rbd_name) { 662 bdev_rbd_free(rbd); 663 return -ENOMEM; 664 } 665 666 if (user_id) { 667 rbd->user_id = strdup(user_id); 668 if (!rbd->user_id) { 669 bdev_rbd_free(rbd); 670 return -ENOMEM; 671 } 672 } 673 674 rbd->pool_name = strdup(pool_name); 675 if (!rbd->pool_name) { 676 bdev_rbd_free(rbd); 677 return -ENOMEM; 678 } 679 680 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 681 bdev_rbd_free(rbd); 682 return -ENOMEM; 683 } 684 685 ret = bdev_rbd_init(rbd->user_id, rbd->pool_name, 686 (const char *const *)rbd->config, 687 rbd_name, &rbd->info); 688 if (ret < 0) { 689 bdev_rbd_free(rbd); 690 SPDK_ERRLOG("Failed to init rbd device\n"); 691 return ret; 692 } 693 694 if (name) { 695 rbd->disk.name = strdup(name); 696 } else { 697 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 698 } 699 if (!rbd->disk.name) { 700 bdev_rbd_free(rbd); 701 return -ENOMEM; 702 } 703 rbd->disk.product_name = "Ceph Rbd Disk"; 704 bdev_rbd_count++; 705 706 rbd->disk.write_cache = 0; 707 rbd->disk.blocklen = block_size; 708 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 709 rbd->disk.ctxt = rbd; 710 rbd->disk.fn_table = &rbd_fn_table; 711 rbd->disk.module = &rbd_if; 712 713 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 714 715 spdk_io_device_register(rbd, bdev_rbd_create_cb, 716 bdev_rbd_destroy_cb, 717 sizeof(struct bdev_rbd_io_channel), 718 rbd_name); 719 ret = spdk_bdev_register(&rbd->disk); 720 if (ret) { 721 spdk_io_device_unregister(rbd, NULL); 722 bdev_rbd_free(rbd); 723 return ret; 724 } 725 726 *bdev = &(rbd->disk); 727 728 return ret; 729 } 730 731 void 732 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 733 { 734 if (!bdev || bdev->module != &rbd_if) { 735 cb_fn(cb_arg, -ENODEV); 736 return; 737 } 738 739 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 740 } 741 742 int 743 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 744 { 745 struct spdk_io_channel *ch; 746 struct bdev_rbd_io_channel *rbd_io_ch; 747 int rc; 748 uint64_t new_size_in_byte; 749 uint64_t current_size_in_mb; 750 751 if (bdev->module != &rbd_if) { 752 return -EINVAL; 753 } 754 755 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 756 if (current_size_in_mb > new_size_in_mb) { 757 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 758 return -EINVAL; 759 } 760 761 ch = bdev_rbd_get_io_channel(bdev); 762 rbd_io_ch = spdk_io_channel_get_ctx(ch); 763 new_size_in_byte = new_size_in_mb * 1024 * 1024; 764 765 rc = rbd_resize(rbd_io_ch->image, new_size_in_byte); 766 if (rc != 0) { 767 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 768 return rc; 769 } 770 771 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 772 if (rc != 0) { 773 SPDK_ERRLOG("failed to notify block cnt change.\n"); 774 return rc; 775 } 776 777 return rc; 778 } 779 780 static int 781 bdev_rbd_group_poll(void *arg) 782 { 783 struct bdev_rbd_group_channel *group_ch = arg; 784 struct epoll_event events[MAX_EVENTS_PER_POLL]; 785 int num_events, i; 786 787 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 788 789 if (num_events <= 0) { 790 return SPDK_POLLER_IDLE; 791 } 792 793 for (i = 0; i < num_events; i++) { 794 bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr); 795 } 796 797 return SPDK_POLLER_BUSY; 798 } 799 800 static int 801 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 802 { 803 struct bdev_rbd_group_channel *ch = ctx_buf; 804 805 ch->epoll_fd = epoll_create1(0); 806 if (ch->epoll_fd < 0) { 807 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 808 return -1; 809 } 810 811 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, BDEV_RBD_POLL_US); 812 813 return 0; 814 } 815 816 static void 817 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 818 { 819 struct bdev_rbd_group_channel *ch = ctx_buf; 820 821 if (ch->epoll_fd >= 0) { 822 close(ch->epoll_fd); 823 } 824 825 spdk_poller_unregister(&ch->poller); 826 } 827 828 static int 829 bdev_rbd_library_init(void) 830 { 831 int i, rc = 0; 832 const char *val; 833 const char *pool_name; 834 const char *rbd_name; 835 struct spdk_bdev *bdev; 836 uint32_t block_size; 837 long int tmp; 838 struct spdk_conf_section *sp; 839 840 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 841 sizeof(struct bdev_rbd_group_channel), 842 "bdev_rbd_poll_groups"); 843 844 sp = spdk_conf_find_section(NULL, "Ceph"); 845 if (sp == NULL) { 846 /* 847 * Ceph section not found. Do not initialize any rbd LUNS. 848 */ 849 goto end; 850 } 851 852 /* Init rbd block devices */ 853 for (i = 0; ; i++) { 854 val = spdk_conf_section_get_nval(sp, "Ceph", i); 855 if (val == NULL) { 856 break; 857 } 858 859 /* get the Rbd_pool name */ 860 pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0); 861 if (pool_name == NULL) { 862 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i); 863 rc = -1; 864 goto end; 865 } 866 867 rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1); 868 if (rbd_name == NULL) { 869 SPDK_ERRLOG("Ceph%d: format error\n", i); 870 rc = -1; 871 goto end; 872 } 873 874 val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2); 875 876 if (val == NULL) { 877 block_size = 512; /* default value */ 878 } else { 879 tmp = spdk_strtol(val, 10); 880 if (tmp <= 0) { 881 SPDK_ERRLOG("Invalid block size\n"); 882 rc = -1; 883 goto end; 884 } else if (tmp & 0x1ff) { 885 SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n", 886 tmp); 887 rc = -1; 888 goto end; 889 } 890 block_size = (uint32_t)tmp; 891 } 892 893 /* TODO(?): user_id and rbd config values */ 894 rc = bdev_rbd_create(&bdev, NULL, NULL, pool_name, NULL, rbd_name, block_size); 895 if (rc) { 896 goto end; 897 } 898 } 899 900 end: 901 return rc; 902 } 903 904 static void 905 bdev_rbd_library_fini(void) 906 { 907 spdk_io_device_unregister(&rbd_if, NULL); 908 } 909 910 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD) 911