1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 42 #include "spdk/conf.h" 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 53 #define SPDK_RBD_QUEUE_DEPTH 128 54 55 static int bdev_rbd_count = 0; 56 57 #define BDEV_RBD_POLL_US 50 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 rbd_image_info_t info; 66 TAILQ_ENTRY(bdev_rbd) tailq; 67 struct spdk_poller *reset_timer; 68 struct spdk_bdev_io *reset_bdev_io; 69 }; 70 71 struct bdev_rbd_io_channel { 72 rados_ioctx_t io_ctx; 73 rados_t cluster; 74 struct pollfd pfd; 75 rbd_image_t image; 76 struct bdev_rbd *disk; 77 struct spdk_poller *poller; 78 }; 79 80 struct bdev_rbd_io { 81 uint64_t remaining_len; 82 int num_segments; 83 bool failed; 84 }; 85 86 static void 87 bdev_rbd_free(struct bdev_rbd *rbd) 88 { 89 if (!rbd) { 90 return; 91 } 92 93 free(rbd->disk.name); 94 free(rbd->rbd_name); 95 free(rbd->user_id); 96 free(rbd->pool_name); 97 bdev_rbd_free_config(rbd->config); 98 free(rbd); 99 } 100 101 void 102 bdev_rbd_free_config(char **config) 103 { 104 char **entry; 105 106 if (config) { 107 for (entry = config; *entry; entry++) { 108 free(*entry); 109 } 110 free(config); 111 } 112 } 113 114 char ** 115 bdev_rbd_dup_config(const char *const *config) 116 { 117 size_t count; 118 char **copy; 119 120 if (!config) { 121 return NULL; 122 } 123 for (count = 0; config[count]; count++) {} 124 copy = calloc(count + 1, sizeof(*copy)); 125 if (!copy) { 126 return NULL; 127 } 128 for (count = 0; config[count]; count++) { 129 if (!(copy[count] = strdup(config[count]))) { 130 bdev_rbd_free_config(copy); 131 return NULL; 132 } 133 } 134 return copy; 135 } 136 137 static int 138 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 139 rados_t *cluster, rados_ioctx_t *io_ctx) 140 { 141 int ret; 142 143 ret = rados_create(cluster, user_id); 144 if (ret < 0) { 145 SPDK_ERRLOG("Failed to create rados_t struct\n"); 146 return -1; 147 } 148 149 if (config) { 150 const char *const *entry = config; 151 while (*entry) { 152 ret = rados_conf_set(*cluster, entry[0], entry[1]); 153 if (ret < 0) { 154 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 155 rados_shutdown(*cluster); 156 return -1; 157 } 158 entry += 2; 159 } 160 } else { 161 ret = rados_conf_read_file(*cluster, NULL); 162 if (ret < 0) { 163 SPDK_ERRLOG("Failed to read conf file\n"); 164 rados_shutdown(*cluster); 165 return -1; 166 } 167 } 168 169 ret = rados_connect(*cluster); 170 if (ret < 0) { 171 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 172 rados_shutdown(*cluster); 173 return -1; 174 } 175 176 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx); 177 178 if (ret < 0) { 179 SPDK_ERRLOG("Failed to create ioctx\n"); 180 rados_shutdown(*cluster); 181 return -1; 182 } 183 184 return 0; 185 } 186 187 static int 188 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 189 const char *rbd_name, rbd_image_info_t *info) 190 { 191 int ret; 192 rados_t cluster = NULL; 193 rados_ioctx_t io_ctx = NULL; 194 rbd_image_t image = NULL; 195 196 ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx); 197 if (ret < 0) { 198 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n", 199 user_id ? user_id : "admin (the default)", rbd_pool_name); 200 return -1; 201 } 202 203 ret = rbd_open(io_ctx, rbd_name, &image, NULL); 204 if (ret < 0) { 205 SPDK_ERRLOG("Failed to open specified rbd device\n"); 206 goto err; 207 } 208 ret = rbd_stat(image, info, sizeof(*info)); 209 rbd_close(image); 210 if (ret < 0) { 211 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 212 goto err; 213 } 214 215 rados_ioctx_destroy(io_ctx); 216 return 0; 217 err: 218 rados_ioctx_destroy(io_ctx); 219 rados_shutdown(cluster); 220 return -1; 221 } 222 223 static void 224 bdev_rbd_exit(rbd_image_t image) 225 { 226 rbd_flush(image); 227 rbd_close(image); 228 } 229 230 static void 231 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 232 { 233 /* Doing nothing here */ 234 } 235 236 static int 237 bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io, 238 void *buf, uint64_t offset, size_t len) 239 { 240 int ret; 241 rbd_completion_t comp; 242 243 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 244 &comp); 245 if (ret < 0) { 246 return -1; 247 } 248 249 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 250 ret = rbd_aio_read(image, offset, len, 251 buf, comp); 252 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 253 ret = rbd_aio_write(image, offset, len, 254 buf, comp); 255 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 256 ret = rbd_aio_flush(image, comp); 257 } 258 259 if (ret < 0) { 260 rbd_aio_release(comp); 261 return -1; 262 } 263 264 return 0; 265 } 266 267 static int bdev_rbd_library_init(void); 268 269 static int 270 bdev_rbd_get_ctx_size(void) 271 { 272 return sizeof(struct bdev_rbd_io); 273 } 274 275 static struct spdk_bdev_module rbd_if = { 276 .name = "rbd", 277 .module_init = bdev_rbd_library_init, 278 .get_ctx_size = bdev_rbd_get_ctx_size, 279 280 }; 281 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 282 283 static int64_t 284 bdev_rbd_rw(struct bdev_rbd *disk, struct spdk_io_channel *ch, 285 struct spdk_bdev_io *bdev_io, struct iovec *iov, 286 int iovcnt, size_t len, uint64_t offset) 287 { 288 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 289 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 290 size_t remaining = len; 291 int i, rc; 292 293 rbd_io->remaining_len = 0; 294 rbd_io->num_segments = 0; 295 rbd_io->failed = false; 296 297 for (i = 0; i < iovcnt && remaining > 0; i++) { 298 size_t seg_len = spdk_min(remaining, iov[i].iov_len); 299 300 rc = bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov[i].iov_base, offset, seg_len); 301 if (rc) { 302 /* 303 * This bdev_rbd_start_aio() call failed, but if any previous ones were 304 * submitted, we need to wait for them to finish. 305 */ 306 if (rbd_io->num_segments == 0) { 307 /* No previous I/O submitted - return error code immediately. */ 308 return rc; 309 } 310 311 /* Return and wait for outstanding I/O to complete. */ 312 rbd_io->failed = true; 313 return 0; 314 } 315 316 rbd_io->num_segments++; 317 rbd_io->remaining_len += seg_len; 318 319 offset += seg_len; 320 remaining -= seg_len; 321 } 322 323 return 0; 324 } 325 326 static int64_t 327 bdev_rbd_flush(struct bdev_rbd *disk, struct spdk_io_channel *ch, 328 struct spdk_bdev_io *bdev_io, uint64_t offset, uint64_t nbytes) 329 { 330 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 331 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 332 333 rbd_io->num_segments++; 334 return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, NULL, offset, nbytes); 335 } 336 337 static int 338 bdev_rbd_reset_timer(void *arg) 339 { 340 struct bdev_rbd *disk = arg; 341 342 /* 343 * TODO: This should check if any I/O is still in flight before completing the reset. 344 * For now, just complete after the timer expires. 345 */ 346 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 347 spdk_poller_unregister(&disk->reset_timer); 348 disk->reset_bdev_io = NULL; 349 350 return SPDK_POLLER_BUSY; 351 } 352 353 static int 354 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 355 { 356 /* 357 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 358 * timer to wait for in-flight I/O to complete. 359 */ 360 assert(disk->reset_bdev_io == NULL); 361 disk->reset_bdev_io = bdev_io; 362 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 363 364 return 0; 365 } 366 367 static int 368 bdev_rbd_destruct(void *ctx) 369 { 370 struct bdev_rbd *rbd = ctx; 371 372 spdk_io_device_unregister(rbd, NULL); 373 374 bdev_rbd_free(rbd); 375 return 0; 376 } 377 378 static void 379 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 380 bool success) 381 { 382 int ret; 383 384 if (!success) { 385 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 386 return; 387 } 388 389 ret = bdev_rbd_rw(bdev_io->bdev->ctxt, 390 ch, 391 bdev_io, 392 bdev_io->u.bdev.iovs, 393 bdev_io->u.bdev.iovcnt, 394 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen, 395 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen); 396 397 if (ret != 0) { 398 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 399 } 400 } 401 402 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 403 { 404 switch (bdev_io->type) { 405 case SPDK_BDEV_IO_TYPE_READ: 406 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 407 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 408 return 0; 409 410 case SPDK_BDEV_IO_TYPE_WRITE: 411 return bdev_rbd_rw((struct bdev_rbd *)bdev_io->bdev->ctxt, 412 ch, 413 bdev_io, 414 bdev_io->u.bdev.iovs, 415 bdev_io->u.bdev.iovcnt, 416 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen, 417 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen); 418 419 case SPDK_BDEV_IO_TYPE_FLUSH: 420 return bdev_rbd_flush((struct bdev_rbd *)bdev_io->bdev->ctxt, 421 ch, 422 bdev_io, 423 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 424 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 425 426 case SPDK_BDEV_IO_TYPE_RESET: 427 return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 428 bdev_io); 429 430 default: 431 return -1; 432 } 433 return 0; 434 } 435 436 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 437 { 438 if (_bdev_rbd_submit_request(ch, bdev_io) < 0) { 439 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 440 } 441 } 442 443 static bool 444 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 445 { 446 switch (io_type) { 447 case SPDK_BDEV_IO_TYPE_READ: 448 case SPDK_BDEV_IO_TYPE_WRITE: 449 case SPDK_BDEV_IO_TYPE_FLUSH: 450 case SPDK_BDEV_IO_TYPE_RESET: 451 return true; 452 453 default: 454 return false; 455 } 456 } 457 458 static int 459 bdev_rbd_io_poll(void *arg) 460 { 461 struct bdev_rbd_io_channel *ch = arg; 462 int i, io_status, rc; 463 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 464 struct spdk_bdev_io *bdev_io; 465 struct bdev_rbd_io *rbd_io; 466 467 rc = poll(&ch->pfd, 1, 0); 468 469 /* check the return value of poll since we have only one fd for each channel */ 470 if (rc != 1) { 471 return SPDK_POLLER_BUSY; 472 } 473 474 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 475 for (i = 0; i < rc; i++) { 476 bdev_io = rbd_aio_get_arg(comps[i]); 477 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 478 io_status = rbd_aio_get_return_value(comps[i]); 479 480 assert(rbd_io->num_segments > 0); 481 rbd_io->num_segments--; 482 483 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 484 if (io_status > 0) { 485 /* For reads, io_status is the length */ 486 rbd_io->remaining_len -= io_status; 487 } 488 489 if (rbd_io->num_segments == 0 && rbd_io->remaining_len != 0) { 490 rbd_io->failed = true; 491 } 492 } else { 493 /* For others, 0 means success */ 494 if (io_status != 0) { 495 rbd_io->failed = true; 496 } 497 } 498 499 rbd_aio_release(comps[i]); 500 501 if (rbd_io->num_segments == 0) { 502 spdk_bdev_io_complete(bdev_io, 503 rbd_io->failed ? SPDK_BDEV_IO_STATUS_FAILED : SPDK_BDEV_IO_STATUS_SUCCESS); 504 } 505 } 506 507 return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE; 508 } 509 510 static void 511 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 512 { 513 if (!ch) { 514 return; 515 } 516 517 if (ch->image) { 518 bdev_rbd_exit(ch->image); 519 } 520 521 if (ch->io_ctx) { 522 rados_ioctx_destroy(ch->io_ctx); 523 } 524 525 if (ch->cluster) { 526 rados_shutdown(ch->cluster); 527 } 528 529 if (ch->pfd.fd >= 0) { 530 close(ch->pfd.fd); 531 } 532 } 533 534 static void * 535 bdev_rbd_handle(void *arg) 536 { 537 struct bdev_rbd_io_channel *ch = arg; 538 void *ret = arg; 539 540 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 541 SPDK_ERRLOG("Failed to open specified rbd device\n"); 542 ret = NULL; 543 } 544 545 return ret; 546 } 547 548 static int 549 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 550 { 551 struct bdev_rbd_io_channel *ch = ctx_buf; 552 int ret; 553 554 ch->disk = io_device; 555 ch->image = NULL; 556 ch->io_ctx = NULL; 557 ch->pfd.fd = -1; 558 559 ret = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name, 560 (const char *const *)ch->disk->config, 561 &ch->cluster, &ch->io_ctx); 562 if (ret < 0) { 563 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n", 564 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name); 565 goto err; 566 } 567 568 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 569 goto err; 570 } 571 572 ch->pfd.fd = eventfd(0, EFD_NONBLOCK); 573 if (ch->pfd.fd < 0) { 574 SPDK_ERRLOG("Failed to get eventfd\n"); 575 goto err; 576 } 577 578 ch->pfd.events = POLLIN; 579 ret = rbd_set_image_notification(ch->image, ch->pfd.fd, EVENT_TYPE_EVENTFD); 580 if (ret < 0) { 581 SPDK_ERRLOG("Failed to set rbd image notification\n"); 582 goto err; 583 } 584 585 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_io_poll, ch, BDEV_RBD_POLL_US); 586 587 return 0; 588 589 err: 590 bdev_rbd_free_channel(ch); 591 return -1; 592 } 593 594 static void 595 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 596 { 597 struct bdev_rbd_io_channel *io_channel = ctx_buf; 598 599 bdev_rbd_free_channel(io_channel); 600 601 spdk_poller_unregister(&io_channel->poller); 602 } 603 604 static struct spdk_io_channel * 605 bdev_rbd_get_io_channel(void *ctx) 606 { 607 struct bdev_rbd *rbd_bdev = ctx; 608 609 return spdk_get_io_channel(rbd_bdev); 610 } 611 612 static int 613 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 614 { 615 struct bdev_rbd *rbd_bdev = ctx; 616 617 spdk_json_write_named_object_begin(w, "rbd"); 618 619 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 620 621 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 622 623 if (rbd_bdev->user_id) { 624 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 625 } 626 627 if (rbd_bdev->config) { 628 char **entry = rbd_bdev->config; 629 630 spdk_json_write_named_object_begin(w, "config"); 631 while (*entry) { 632 spdk_json_write_named_string(w, entry[0], entry[1]); 633 entry += 2; 634 } 635 spdk_json_write_object_end(w); 636 } 637 638 spdk_json_write_object_end(w); 639 640 return 0; 641 } 642 643 static void 644 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 645 { 646 struct bdev_rbd *rbd = bdev->ctxt; 647 648 spdk_json_write_object_begin(w); 649 650 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 651 652 spdk_json_write_named_object_begin(w, "params"); 653 spdk_json_write_named_string(w, "name", bdev->name); 654 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 655 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 656 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 657 if (rbd->user_id) { 658 spdk_json_write_named_string(w, "user_id", rbd->user_id); 659 } 660 661 if (rbd->config) { 662 char **entry = rbd->config; 663 664 spdk_json_write_named_object_begin(w, "config"); 665 while (*entry) { 666 spdk_json_write_named_string(w, entry[0], entry[1]); 667 entry += 2; 668 } 669 spdk_json_write_object_end(w); 670 } 671 672 spdk_json_write_object_end(w); 673 674 spdk_json_write_object_end(w); 675 } 676 677 static const struct spdk_bdev_fn_table rbd_fn_table = { 678 .destruct = bdev_rbd_destruct, 679 .submit_request = bdev_rbd_submit_request, 680 .io_type_supported = bdev_rbd_io_type_supported, 681 .get_io_channel = bdev_rbd_get_io_channel, 682 .dump_info_json = bdev_rbd_dump_info_json, 683 .write_config_json = bdev_rbd_write_config_json, 684 }; 685 686 int 687 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 688 const char *pool_name, 689 const char *const *config, 690 const char *rbd_name, 691 uint32_t block_size) 692 { 693 struct bdev_rbd *rbd; 694 int ret; 695 696 if ((pool_name == NULL) || (rbd_name == NULL)) { 697 return -EINVAL; 698 } 699 700 rbd = calloc(1, sizeof(struct bdev_rbd)); 701 if (rbd == NULL) { 702 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 703 return -ENOMEM; 704 } 705 706 rbd->rbd_name = strdup(rbd_name); 707 if (!rbd->rbd_name) { 708 bdev_rbd_free(rbd); 709 return -ENOMEM; 710 } 711 712 if (user_id) { 713 rbd->user_id = strdup(user_id); 714 if (!rbd->user_id) { 715 bdev_rbd_free(rbd); 716 return -ENOMEM; 717 } 718 } 719 720 rbd->pool_name = strdup(pool_name); 721 if (!rbd->pool_name) { 722 bdev_rbd_free(rbd); 723 return -ENOMEM; 724 } 725 726 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 727 bdev_rbd_free(rbd); 728 return -ENOMEM; 729 } 730 731 ret = bdev_rbd_init(rbd->user_id, rbd->pool_name, 732 (const char *const *)rbd->config, 733 rbd_name, &rbd->info); 734 if (ret < 0) { 735 bdev_rbd_free(rbd); 736 SPDK_ERRLOG("Failed to init rbd device\n"); 737 return ret; 738 } 739 740 if (name) { 741 rbd->disk.name = strdup(name); 742 } else { 743 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 744 } 745 if (!rbd->disk.name) { 746 bdev_rbd_free(rbd); 747 return -ENOMEM; 748 } 749 rbd->disk.product_name = "Ceph Rbd Disk"; 750 bdev_rbd_count++; 751 752 rbd->disk.write_cache = 0; 753 rbd->disk.blocklen = block_size; 754 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 755 rbd->disk.ctxt = rbd; 756 rbd->disk.fn_table = &rbd_fn_table; 757 rbd->disk.module = &rbd_if; 758 759 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 760 761 spdk_io_device_register(rbd, bdev_rbd_create_cb, 762 bdev_rbd_destroy_cb, 763 sizeof(struct bdev_rbd_io_channel), 764 rbd_name); 765 ret = spdk_bdev_register(&rbd->disk); 766 if (ret) { 767 spdk_io_device_unregister(rbd, NULL); 768 bdev_rbd_free(rbd); 769 return ret; 770 } 771 772 *bdev = &(rbd->disk); 773 774 return ret; 775 } 776 777 void 778 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 779 { 780 if (!bdev || bdev->module != &rbd_if) { 781 cb_fn(cb_arg, -ENODEV); 782 return; 783 } 784 785 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 786 } 787 788 int 789 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 790 { 791 struct spdk_io_channel *ch; 792 struct bdev_rbd_io_channel *rbd_io_ch; 793 int rc; 794 uint64_t new_size_in_byte; 795 uint64_t current_size_in_mb; 796 797 if (bdev->module != &rbd_if) { 798 return -EINVAL; 799 } 800 801 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 802 if (current_size_in_mb > new_size_in_mb) { 803 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 804 return -EINVAL; 805 } 806 807 ch = bdev_rbd_get_io_channel(bdev); 808 rbd_io_ch = spdk_io_channel_get_ctx(ch); 809 new_size_in_byte = new_size_in_mb * 1024 * 1024; 810 811 rc = rbd_resize(rbd_io_ch->image, new_size_in_byte); 812 if (rc != 0) { 813 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 814 return rc; 815 } 816 817 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 818 if (rc != 0) { 819 SPDK_ERRLOG("failed to notify block cnt change.\n"); 820 return rc; 821 } 822 823 return rc; 824 } 825 826 static int 827 bdev_rbd_library_init(void) 828 { 829 int i, rc = 0; 830 const char *val; 831 const char *pool_name; 832 const char *rbd_name; 833 struct spdk_bdev *bdev; 834 uint32_t block_size; 835 long int tmp; 836 837 struct spdk_conf_section *sp = spdk_conf_find_section(NULL, "Ceph"); 838 839 if (sp == NULL) { 840 /* 841 * Ceph section not found. Do not initialize any rbd LUNS. 842 */ 843 goto end; 844 } 845 846 /* Init rbd block devices */ 847 for (i = 0; ; i++) { 848 val = spdk_conf_section_get_nval(sp, "Ceph", i); 849 if (val == NULL) { 850 break; 851 } 852 853 /* get the Rbd_pool name */ 854 pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0); 855 if (pool_name == NULL) { 856 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i); 857 rc = -1; 858 goto end; 859 } 860 861 rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1); 862 if (rbd_name == NULL) { 863 SPDK_ERRLOG("Ceph%d: format error\n", i); 864 rc = -1; 865 goto end; 866 } 867 868 val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2); 869 870 if (val == NULL) { 871 block_size = 512; /* default value */ 872 } else { 873 tmp = spdk_strtol(val, 10); 874 if (tmp <= 0) { 875 SPDK_ERRLOG("Invalid block size\n"); 876 rc = -1; 877 goto end; 878 } else if (tmp & 0x1ff) { 879 SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n", 880 tmp); 881 rc = -1; 882 goto end; 883 } 884 block_size = (uint32_t)tmp; 885 } 886 887 /* TODO(?): user_id and rbd config values */ 888 rc = bdev_rbd_create(&bdev, NULL, NULL, pool_name, NULL, rbd_name, block_size); 889 if (rc) { 890 goto end; 891 } 892 } 893 894 end: 895 return rc; 896 } 897 898 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD) 899