1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 42 #include "spdk/conf.h" 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 53 #define SPDK_RBD_QUEUE_DEPTH 128 54 55 static int bdev_rbd_count = 0; 56 57 #define BDEV_RBD_POLL_US 50 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 rbd_image_info_t info; 66 TAILQ_ENTRY(bdev_rbd) tailq; 67 struct spdk_poller *reset_timer; 68 struct spdk_bdev_io *reset_bdev_io; 69 }; 70 71 struct bdev_rbd_io_channel { 72 rados_ioctx_t io_ctx; 73 rados_t cluster; 74 struct pollfd pfd; 75 rbd_image_t image; 76 struct bdev_rbd *disk; 77 struct spdk_poller *poller; 78 }; 79 80 struct bdev_rbd_io { 81 uint64_t remaining_len; 82 int num_segments; 83 bool failed; 84 }; 85 86 static void 87 bdev_rbd_free(struct bdev_rbd *rbd) 88 { 89 if (!rbd) { 90 return; 91 } 92 93 free(rbd->disk.name); 94 free(rbd->rbd_name); 95 free(rbd->user_id); 96 free(rbd->pool_name); 97 spdk_bdev_rbd_free_config(rbd->config); 98 free(rbd); 99 } 100 101 void 102 spdk_bdev_rbd_free_config(char **config) 103 { 104 char **entry; 105 106 if (config) { 107 for (entry = config; *entry; entry++) { 108 free(*entry); 109 } 110 free(config); 111 } 112 } 113 114 char ** 115 spdk_bdev_rbd_dup_config(const char *const *config) 116 { 117 size_t count; 118 char **copy; 119 120 if (!config) { 121 return NULL; 122 } 123 for (count = 0; config[count]; count++) {} 124 copy = calloc(count + 1, sizeof(*copy)); 125 if (!copy) { 126 return NULL; 127 } 128 for (count = 0; config[count]; count++) { 129 if (!(copy[count] = strdup(config[count]))) { 130 spdk_bdev_rbd_free_config(copy); 131 return NULL; 132 } 133 } 134 return copy; 135 } 136 137 static int 138 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 139 rados_t *cluster, rados_ioctx_t *io_ctx) 140 { 141 int ret; 142 143 ret = rados_create(cluster, user_id); 144 if (ret < 0) { 145 SPDK_ERRLOG("Failed to create rados_t struct\n"); 146 return -1; 147 } 148 149 if (config) { 150 const char *const *entry = config; 151 while (*entry) { 152 ret = rados_conf_set(*cluster, entry[0], entry[1]); 153 if (ret < 0) { 154 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 155 rados_shutdown(*cluster); 156 return -1; 157 } 158 entry += 2; 159 } 160 } else { 161 ret = rados_conf_read_file(*cluster, NULL); 162 if (ret < 0) { 163 SPDK_ERRLOG("Failed to read conf file\n"); 164 rados_shutdown(*cluster); 165 return -1; 166 } 167 } 168 169 ret = rados_connect(*cluster); 170 if (ret < 0) { 171 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 172 rados_shutdown(*cluster); 173 return -1; 174 } 175 176 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx); 177 178 if (ret < 0) { 179 SPDK_ERRLOG("Failed to create ioctx\n"); 180 rados_shutdown(*cluster); 181 return -1; 182 } 183 184 return 0; 185 } 186 187 static int 188 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 189 const char *rbd_name, rbd_image_info_t *info) 190 { 191 int ret; 192 rados_t cluster = NULL; 193 rados_ioctx_t io_ctx = NULL; 194 rbd_image_t image = NULL; 195 196 ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx); 197 if (ret < 0) { 198 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n", 199 user_id ? user_id : "admin (the default)", rbd_pool_name); 200 return -1; 201 } 202 203 ret = rbd_open(io_ctx, rbd_name, &image, NULL); 204 if (ret < 0) { 205 SPDK_ERRLOG("Failed to open specified rbd device\n"); 206 goto err; 207 } 208 ret = rbd_stat(image, info, sizeof(*info)); 209 rbd_close(image); 210 if (ret < 0) { 211 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 212 goto err; 213 } 214 215 rados_ioctx_destroy(io_ctx); 216 return 0; 217 err: 218 rados_ioctx_destroy(io_ctx); 219 rados_shutdown(cluster); 220 return -1; 221 } 222 223 static void 224 bdev_rbd_exit(rbd_image_t image) 225 { 226 rbd_flush(image); 227 rbd_close(image); 228 } 229 230 static void 231 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 232 { 233 /* Doing nothing here */ 234 } 235 236 static int 237 bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io, 238 void *buf, uint64_t offset, size_t len) 239 { 240 int ret; 241 rbd_completion_t comp; 242 243 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 244 &comp); 245 if (ret < 0) { 246 return -1; 247 } 248 249 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 250 ret = rbd_aio_read(image, offset, len, 251 buf, comp); 252 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 253 ret = rbd_aio_write(image, offset, len, 254 buf, comp); 255 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 256 ret = rbd_aio_flush(image, comp); 257 } 258 259 if (ret < 0) { 260 rbd_aio_release(comp); 261 return -1; 262 } 263 264 return 0; 265 } 266 267 static int bdev_rbd_library_init(void); 268 269 static int 270 bdev_rbd_get_ctx_size(void) 271 { 272 return sizeof(struct bdev_rbd_io); 273 } 274 275 static struct spdk_bdev_module rbd_if = { 276 .name = "rbd", 277 .module_init = bdev_rbd_library_init, 278 .get_ctx_size = bdev_rbd_get_ctx_size, 279 280 }; 281 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 282 283 static int64_t 284 bdev_rbd_rw(struct bdev_rbd *disk, struct spdk_io_channel *ch, 285 struct spdk_bdev_io *bdev_io, struct iovec *iov, 286 int iovcnt, size_t len, uint64_t offset) 287 { 288 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 289 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 290 size_t remaining = len; 291 int i, rc; 292 293 rbd_io->remaining_len = 0; 294 rbd_io->num_segments = 0; 295 rbd_io->failed = false; 296 297 for (i = 0; i < iovcnt && remaining > 0; i++) { 298 size_t seg_len = spdk_min(remaining, iov[i].iov_len); 299 300 rc = bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov[i].iov_base, offset, seg_len); 301 if (rc) { 302 /* 303 * This bdev_rbd_start_aio() call failed, but if any previous ones were 304 * submitted, we need to wait for them to finish. 305 */ 306 if (rbd_io->num_segments == 0) { 307 /* No previous I/O submitted - return error code immediately. */ 308 return rc; 309 } 310 311 /* Return and wait for outstanding I/O to complete. */ 312 rbd_io->failed = true; 313 return 0; 314 } 315 316 rbd_io->num_segments++; 317 rbd_io->remaining_len += seg_len; 318 319 offset += seg_len; 320 remaining -= seg_len; 321 } 322 323 return 0; 324 } 325 326 static int64_t 327 bdev_rbd_flush(struct bdev_rbd *disk, struct spdk_io_channel *ch, 328 struct spdk_bdev_io *bdev_io, uint64_t offset, uint64_t nbytes) 329 { 330 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 331 332 return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, NULL, offset, nbytes); 333 } 334 335 static int 336 bdev_rbd_reset_timer(void *arg) 337 { 338 struct bdev_rbd *disk = arg; 339 340 /* 341 * TODO: This should check if any I/O is still in flight before completing the reset. 342 * For now, just complete after the timer expires. 343 */ 344 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 345 spdk_poller_unregister(&disk->reset_timer); 346 disk->reset_bdev_io = NULL; 347 348 return -1; 349 } 350 351 static int 352 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 353 { 354 /* 355 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 356 * timer to wait for in-flight I/O to complete. 357 */ 358 assert(disk->reset_bdev_io == NULL); 359 disk->reset_bdev_io = bdev_io; 360 disk->reset_timer = spdk_poller_register(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 361 362 return 0; 363 } 364 365 static int 366 bdev_rbd_destruct(void *ctx) 367 { 368 struct bdev_rbd *rbd = ctx; 369 370 spdk_io_device_unregister(rbd, NULL); 371 372 bdev_rbd_free(rbd); 373 return 0; 374 } 375 376 static void 377 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 378 bool success) 379 { 380 int ret; 381 382 if (!success) { 383 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 384 return; 385 } 386 387 ret = bdev_rbd_rw(bdev_io->bdev->ctxt, 388 ch, 389 bdev_io, 390 bdev_io->u.bdev.iovs, 391 bdev_io->u.bdev.iovcnt, 392 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen, 393 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen); 394 395 if (ret != 0) { 396 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 397 } 398 } 399 400 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 401 { 402 switch (bdev_io->type) { 403 case SPDK_BDEV_IO_TYPE_READ: 404 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 405 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 406 return 0; 407 408 case SPDK_BDEV_IO_TYPE_WRITE: 409 return bdev_rbd_rw((struct bdev_rbd *)bdev_io->bdev->ctxt, 410 ch, 411 bdev_io, 412 bdev_io->u.bdev.iovs, 413 bdev_io->u.bdev.iovcnt, 414 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen, 415 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen); 416 417 case SPDK_BDEV_IO_TYPE_FLUSH: 418 return bdev_rbd_flush((struct bdev_rbd *)bdev_io->bdev->ctxt, 419 ch, 420 bdev_io, 421 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 422 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 423 424 case SPDK_BDEV_IO_TYPE_RESET: 425 return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 426 bdev_io); 427 428 default: 429 return -1; 430 } 431 return 0; 432 } 433 434 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 435 { 436 if (_bdev_rbd_submit_request(ch, bdev_io) < 0) { 437 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 438 } 439 } 440 441 static bool 442 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 443 { 444 switch (io_type) { 445 case SPDK_BDEV_IO_TYPE_READ: 446 case SPDK_BDEV_IO_TYPE_WRITE: 447 case SPDK_BDEV_IO_TYPE_FLUSH: 448 case SPDK_BDEV_IO_TYPE_RESET: 449 return true; 450 451 default: 452 return false; 453 } 454 } 455 456 static int 457 bdev_rbd_io_poll(void *arg) 458 { 459 struct bdev_rbd_io_channel *ch = arg; 460 int i, io_status, rc; 461 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 462 struct spdk_bdev_io *bdev_io; 463 struct bdev_rbd_io *rbd_io; 464 465 rc = poll(&ch->pfd, 1, 0); 466 467 /* check the return value of poll since we have only one fd for each channel */ 468 if (rc != 1) { 469 return 0; 470 } 471 472 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 473 for (i = 0; i < rc; i++) { 474 bdev_io = rbd_aio_get_arg(comps[i]); 475 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 476 io_status = rbd_aio_get_return_value(comps[i]); 477 478 assert(rbd_io->num_segments > 0); 479 rbd_io->num_segments--; 480 481 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 482 if (io_status > 0) { 483 /* For reads, io_status is the length */ 484 rbd_io->remaining_len -= io_status; 485 } 486 487 if (rbd_io->num_segments == 0 && rbd_io->remaining_len != 0) { 488 rbd_io->failed = true; 489 } 490 } else { 491 /* For others, 0 means success */ 492 if (io_status != 0) { 493 rbd_io->failed = true; 494 } 495 } 496 497 rbd_aio_release(comps[i]); 498 499 if (rbd_io->num_segments == 0) { 500 spdk_bdev_io_complete(bdev_io, 501 rbd_io->failed ? SPDK_BDEV_IO_STATUS_FAILED : SPDK_BDEV_IO_STATUS_SUCCESS); 502 } 503 } 504 505 return rc; 506 } 507 508 static void 509 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 510 { 511 if (!ch) { 512 return; 513 } 514 515 if (ch->image) { 516 bdev_rbd_exit(ch->image); 517 } 518 519 if (ch->io_ctx) { 520 rados_ioctx_destroy(ch->io_ctx); 521 } 522 523 if (ch->cluster) { 524 rados_shutdown(ch->cluster); 525 } 526 527 if (ch->pfd.fd >= 0) { 528 close(ch->pfd.fd); 529 } 530 } 531 532 static void * 533 bdev_rbd_handle(void *arg) 534 { 535 struct bdev_rbd_io_channel *ch = arg; 536 void *ret = arg; 537 538 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 539 SPDK_ERRLOG("Failed to open specified rbd device\n"); 540 ret = NULL; 541 } 542 543 return ret; 544 } 545 546 static int 547 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 548 { 549 struct bdev_rbd_io_channel *ch = ctx_buf; 550 int ret; 551 552 ch->disk = io_device; 553 ch->image = NULL; 554 ch->io_ctx = NULL; 555 ch->pfd.fd = -1; 556 557 ret = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name, 558 (const char *const *)ch->disk->config, 559 &ch->cluster, &ch->io_ctx); 560 if (ret < 0) { 561 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n", 562 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name); 563 goto err; 564 } 565 566 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 567 goto err; 568 } 569 570 ch->pfd.fd = eventfd(0, EFD_NONBLOCK); 571 if (ch->pfd.fd < 0) { 572 SPDK_ERRLOG("Failed to get eventfd\n"); 573 goto err; 574 } 575 576 ch->pfd.events = POLLIN; 577 ret = rbd_set_image_notification(ch->image, ch->pfd.fd, EVENT_TYPE_EVENTFD); 578 if (ret < 0) { 579 SPDK_ERRLOG("Failed to set rbd image notification\n"); 580 goto err; 581 } 582 583 ch->poller = spdk_poller_register(bdev_rbd_io_poll, ch, BDEV_RBD_POLL_US); 584 585 return 0; 586 587 err: 588 bdev_rbd_free_channel(ch); 589 return -1; 590 } 591 592 static void 593 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 594 { 595 struct bdev_rbd_io_channel *io_channel = ctx_buf; 596 597 bdev_rbd_free_channel(io_channel); 598 599 spdk_poller_unregister(&io_channel->poller); 600 } 601 602 static struct spdk_io_channel * 603 bdev_rbd_get_io_channel(void *ctx) 604 { 605 struct bdev_rbd *rbd_bdev = ctx; 606 607 return spdk_get_io_channel(rbd_bdev); 608 } 609 610 static int 611 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 612 { 613 struct bdev_rbd *rbd_bdev = ctx; 614 615 spdk_json_write_named_object_begin(w, "rbd"); 616 617 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 618 619 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 620 621 if (rbd_bdev->user_id) { 622 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 623 } 624 625 if (rbd_bdev->config) { 626 char **entry = rbd_bdev->config; 627 628 spdk_json_write_named_object_begin(w, "config"); 629 while (*entry) { 630 spdk_json_write_named_string(w, entry[0], entry[1]); 631 entry += 2; 632 } 633 spdk_json_write_object_end(w); 634 } 635 636 spdk_json_write_object_end(w); 637 638 return 0; 639 } 640 641 static void 642 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 643 { 644 struct bdev_rbd *rbd = bdev->ctxt; 645 646 spdk_json_write_object_begin(w); 647 648 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 649 650 spdk_json_write_named_object_begin(w, "params"); 651 spdk_json_write_named_string(w, "name", bdev->name); 652 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 653 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 654 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 655 if (rbd->user_id) { 656 spdk_json_write_named_string(w, "user_id", rbd->user_id); 657 } 658 659 if (rbd->config) { 660 char **entry = rbd->config; 661 662 spdk_json_write_named_object_begin(w, "config"); 663 while (*entry) { 664 spdk_json_write_named_string(w, entry[0], entry[1]); 665 entry += 2; 666 } 667 spdk_json_write_object_end(w); 668 } 669 670 spdk_json_write_object_end(w); 671 672 spdk_json_write_object_end(w); 673 } 674 675 static const struct spdk_bdev_fn_table rbd_fn_table = { 676 .destruct = bdev_rbd_destruct, 677 .submit_request = bdev_rbd_submit_request, 678 .io_type_supported = bdev_rbd_io_type_supported, 679 .get_io_channel = bdev_rbd_get_io_channel, 680 .dump_info_json = bdev_rbd_dump_info_json, 681 .write_config_json = bdev_rbd_write_config_json, 682 }; 683 684 int 685 spdk_bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 686 const char *pool_name, 687 const char *const *config, 688 const char *rbd_name, 689 uint32_t block_size) 690 { 691 struct bdev_rbd *rbd; 692 int ret; 693 694 if ((pool_name == NULL) || (rbd_name == NULL)) { 695 return -EINVAL; 696 } 697 698 rbd = calloc(1, sizeof(struct bdev_rbd)); 699 if (rbd == NULL) { 700 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 701 return -ENOMEM; 702 } 703 704 rbd->rbd_name = strdup(rbd_name); 705 if (!rbd->rbd_name) { 706 bdev_rbd_free(rbd); 707 return -ENOMEM; 708 } 709 710 if (user_id) { 711 rbd->user_id = strdup(user_id); 712 if (!rbd->user_id) { 713 bdev_rbd_free(rbd); 714 return -ENOMEM; 715 } 716 } 717 718 rbd->pool_name = strdup(pool_name); 719 if (!rbd->pool_name) { 720 bdev_rbd_free(rbd); 721 return -ENOMEM; 722 } 723 724 if (config && !(rbd->config = spdk_bdev_rbd_dup_config(config))) { 725 bdev_rbd_free(rbd); 726 return -ENOMEM; 727 } 728 729 ret = bdev_rbd_init(rbd->user_id, rbd->pool_name, 730 (const char *const *)rbd->config, 731 rbd_name, &rbd->info); 732 if (ret < 0) { 733 bdev_rbd_free(rbd); 734 SPDK_ERRLOG("Failed to init rbd device\n"); 735 return ret; 736 } 737 738 if (name) { 739 rbd->disk.name = strdup(name); 740 } else { 741 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 742 } 743 if (!rbd->disk.name) { 744 bdev_rbd_free(rbd); 745 return -ENOMEM; 746 } 747 rbd->disk.product_name = "Ceph Rbd Disk"; 748 bdev_rbd_count++; 749 750 rbd->disk.write_cache = 0; 751 rbd->disk.blocklen = block_size; 752 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 753 rbd->disk.ctxt = rbd; 754 rbd->disk.fn_table = &rbd_fn_table; 755 rbd->disk.module = &rbd_if; 756 757 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 758 759 spdk_io_device_register(rbd, bdev_rbd_create_cb, 760 bdev_rbd_destroy_cb, 761 sizeof(struct bdev_rbd_io_channel), 762 rbd_name); 763 ret = spdk_bdev_register(&rbd->disk); 764 if (ret) { 765 spdk_io_device_unregister(rbd, NULL); 766 bdev_rbd_free(rbd); 767 return ret; 768 } 769 770 *bdev = &(rbd->disk); 771 772 return ret; 773 } 774 775 void 776 spdk_bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 777 { 778 if (!bdev || bdev->module != &rbd_if) { 779 cb_fn(cb_arg, -ENODEV); 780 return; 781 } 782 783 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 784 } 785 786 static int 787 bdev_rbd_library_init(void) 788 { 789 int i, rc = 0; 790 const char *val; 791 const char *pool_name; 792 const char *rbd_name; 793 struct spdk_bdev *bdev; 794 uint32_t block_size; 795 long int tmp; 796 797 struct spdk_conf_section *sp = spdk_conf_find_section(NULL, "Ceph"); 798 799 if (sp == NULL) { 800 /* 801 * Ceph section not found. Do not initialize any rbd LUNS. 802 */ 803 goto end; 804 } 805 806 /* Init rbd block devices */ 807 for (i = 0; ; i++) { 808 val = spdk_conf_section_get_nval(sp, "Ceph", i); 809 if (val == NULL) { 810 break; 811 } 812 813 /* get the Rbd_pool name */ 814 pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0); 815 if (pool_name == NULL) { 816 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i); 817 rc = -1; 818 goto end; 819 } 820 821 rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1); 822 if (rbd_name == NULL) { 823 SPDK_ERRLOG("Ceph%d: format error\n", i); 824 rc = -1; 825 goto end; 826 } 827 828 val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2); 829 830 if (val == NULL) { 831 block_size = 512; /* default value */ 832 } else { 833 tmp = spdk_strtol(val, 10); 834 if (tmp <= 0) { 835 SPDK_ERRLOG("Invalid block size\n"); 836 rc = -1; 837 goto end; 838 } else if (tmp & 0x1ff) { 839 SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n", 840 tmp); 841 rc = -1; 842 goto end; 843 } 844 block_size = (uint32_t)tmp; 845 } 846 847 /* TODO(?): user_id and rbd config values */ 848 rc = spdk_bdev_rbd_create(&bdev, NULL, NULL, pool_name, NULL, rbd_name, block_size); 849 if (rc) { 850 goto end; 851 } 852 } 853 854 end: 855 return rc; 856 } 857 858 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD) 859