1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/conf.h" 44 #include "spdk/env.h" 45 #include "spdk/bdev.h" 46 #include "spdk/thread.h" 47 #include "spdk/json.h" 48 #include "spdk/string.h" 49 #include "spdk/util.h" 50 #include "spdk/likely.h" 51 52 #include "spdk/bdev_module.h" 53 #include "spdk_internal/log.h" 54 55 #define SPDK_RBD_QUEUE_DEPTH 128 56 #define MAX_EVENTS_PER_POLL 128 57 58 static int bdev_rbd_count = 0; 59 60 struct bdev_rbd { 61 struct spdk_bdev disk; 62 char *rbd_name; 63 char *user_id; 64 char *pool_name; 65 char **config; 66 rbd_image_info_t info; 67 TAILQ_ENTRY(bdev_rbd) tailq; 68 struct spdk_poller *reset_timer; 69 struct spdk_bdev_io *reset_bdev_io; 70 }; 71 72 struct bdev_rbd_group_channel { 73 struct spdk_poller *poller; 74 int epoll_fd; 75 }; 76 77 struct bdev_rbd_io_channel { 78 rados_ioctx_t io_ctx; 79 rados_t cluster; 80 int pfd; 81 rbd_image_t image; 82 struct bdev_rbd *disk; 83 struct bdev_rbd_group_channel *group_ch; 84 }; 85 86 struct bdev_rbd_io { 87 size_t total_len; 88 }; 89 90 static void 91 bdev_rbd_free(struct bdev_rbd *rbd) 92 { 93 if (!rbd) { 94 return; 95 } 96 97 free(rbd->disk.name); 98 free(rbd->rbd_name); 99 free(rbd->user_id); 100 free(rbd->pool_name); 101 bdev_rbd_free_config(rbd->config); 102 free(rbd); 103 } 104 105 void 106 bdev_rbd_free_config(char **config) 107 { 108 char **entry; 109 110 if (config) { 111 for (entry = config; *entry; entry++) { 112 free(*entry); 113 } 114 free(config); 115 } 116 } 117 118 char ** 119 bdev_rbd_dup_config(const char *const *config) 120 { 121 size_t count; 122 char **copy; 123 124 if (!config) { 125 return NULL; 126 } 127 for (count = 0; config[count]; count++) {} 128 copy = calloc(count + 1, sizeof(*copy)); 129 if (!copy) { 130 return NULL; 131 } 132 for (count = 0; config[count]; count++) { 133 if (!(copy[count] = strdup(config[count]))) { 134 bdev_rbd_free_config(copy); 135 return NULL; 136 } 137 } 138 return copy; 139 } 140 141 static int 142 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 143 rados_t *cluster, rados_ioctx_t *io_ctx) 144 { 145 int ret; 146 147 ret = rados_create(cluster, user_id); 148 if (ret < 0) { 149 SPDK_ERRLOG("Failed to create rados_t struct\n"); 150 return -1; 151 } 152 153 if (config) { 154 const char *const *entry = config; 155 while (*entry) { 156 ret = rados_conf_set(*cluster, entry[0], entry[1]); 157 if (ret < 0) { 158 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 159 rados_shutdown(*cluster); 160 return -1; 161 } 162 entry += 2; 163 } 164 } else { 165 ret = rados_conf_read_file(*cluster, NULL); 166 if (ret < 0) { 167 SPDK_ERRLOG("Failed to read conf file\n"); 168 rados_shutdown(*cluster); 169 return -1; 170 } 171 } 172 173 ret = rados_connect(*cluster); 174 if (ret < 0) { 175 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 176 rados_shutdown(*cluster); 177 return -1; 178 } 179 180 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx); 181 182 if (ret < 0) { 183 SPDK_ERRLOG("Failed to create ioctx\n"); 184 rados_shutdown(*cluster); 185 return -1; 186 } 187 188 return 0; 189 } 190 191 static int 192 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 193 const char *rbd_name, rbd_image_info_t *info) 194 { 195 int ret; 196 rados_t cluster = NULL; 197 rados_ioctx_t io_ctx = NULL; 198 rbd_image_t image = NULL; 199 200 ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx); 201 if (ret < 0) { 202 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n", 203 user_id ? user_id : "admin (the default)", rbd_pool_name); 204 return -1; 205 } 206 207 ret = rbd_open(io_ctx, rbd_name, &image, NULL); 208 if (ret < 0) { 209 SPDK_ERRLOG("Failed to open specified rbd device\n"); 210 goto err; 211 } 212 ret = rbd_stat(image, info, sizeof(*info)); 213 rbd_close(image); 214 if (ret < 0) { 215 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 216 goto err; 217 } 218 219 rados_ioctx_destroy(io_ctx); 220 return 0; 221 err: 222 rados_ioctx_destroy(io_ctx); 223 rados_shutdown(cluster); 224 return -1; 225 } 226 227 static void 228 bdev_rbd_exit(rbd_image_t image) 229 { 230 rbd_flush(image); 231 rbd_close(image); 232 } 233 234 static void 235 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 236 { 237 /* Doing nothing here */ 238 } 239 240 static int 241 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 242 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 243 { 244 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 245 int ret; 246 rbd_completion_t comp; 247 struct bdev_rbd_io *rbd_io; 248 rbd_image_t image = rbdio_ch->image; 249 250 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 251 &comp); 252 if (ret < 0) { 253 return -1; 254 } 255 256 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 257 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 258 rbd_io->total_len = len; 259 if (spdk_likely(iovcnt == 1)) { 260 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 261 } else { 262 ret = rbd_aio_readv(image, iov, iovcnt, offset, comp); 263 } 264 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 265 if (spdk_likely(iovcnt == 1)) { 266 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 267 } else { 268 ret = rbd_aio_writev(image, iov, iovcnt, offset, comp); 269 } 270 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 271 ret = rbd_aio_flush(image, comp); 272 } 273 274 if (ret < 0) { 275 rbd_aio_release(comp); 276 return -1; 277 } 278 279 return 0; 280 } 281 282 static int bdev_rbd_library_init(void); 283 284 static void bdev_rbd_library_fini(void); 285 286 static int 287 bdev_rbd_get_ctx_size(void) 288 { 289 return sizeof(struct bdev_rbd_io); 290 } 291 292 static struct spdk_bdev_module rbd_if = { 293 .name = "rbd", 294 .module_init = bdev_rbd_library_init, 295 .module_fini = bdev_rbd_library_fini, 296 .get_ctx_size = bdev_rbd_get_ctx_size, 297 298 }; 299 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 300 301 static int 302 bdev_rbd_reset_timer(void *arg) 303 { 304 struct bdev_rbd *disk = arg; 305 306 /* 307 * TODO: This should check if any I/O is still in flight before completing the reset. 308 * For now, just complete after the timer expires. 309 */ 310 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 311 spdk_poller_unregister(&disk->reset_timer); 312 disk->reset_bdev_io = NULL; 313 314 return SPDK_POLLER_BUSY; 315 } 316 317 static int 318 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 319 { 320 /* 321 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 322 * timer to wait for in-flight I/O to complete. 323 */ 324 assert(disk->reset_bdev_io == NULL); 325 disk->reset_bdev_io = bdev_io; 326 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 327 328 return 0; 329 } 330 331 static int 332 bdev_rbd_destruct(void *ctx) 333 { 334 struct bdev_rbd *rbd = ctx; 335 336 spdk_io_device_unregister(rbd, NULL); 337 338 bdev_rbd_free(rbd); 339 return 0; 340 } 341 342 static void 343 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 344 bool success) 345 { 346 int ret; 347 348 if (!success) { 349 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 350 return; 351 } 352 353 ret = bdev_rbd_start_aio(ch, 354 bdev_io, 355 bdev_io->u.bdev.iovs, 356 bdev_io->u.bdev.iovcnt, 357 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 358 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 359 360 if (ret != 0) { 361 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 362 } 363 } 364 365 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 366 { 367 switch (bdev_io->type) { 368 case SPDK_BDEV_IO_TYPE_READ: 369 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 370 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 371 return 0; 372 373 case SPDK_BDEV_IO_TYPE_WRITE: 374 case SPDK_BDEV_IO_TYPE_FLUSH: 375 return bdev_rbd_start_aio(ch, 376 bdev_io, 377 bdev_io->u.bdev.iovs, 378 bdev_io->u.bdev.iovcnt, 379 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 380 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 381 382 case SPDK_BDEV_IO_TYPE_RESET: 383 return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 384 bdev_io); 385 386 default: 387 return -1; 388 } 389 return 0; 390 } 391 392 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 393 { 394 if (_bdev_rbd_submit_request(ch, bdev_io) < 0) { 395 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 396 } 397 } 398 399 static bool 400 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 401 { 402 switch (io_type) { 403 case SPDK_BDEV_IO_TYPE_READ: 404 case SPDK_BDEV_IO_TYPE_WRITE: 405 case SPDK_BDEV_IO_TYPE_FLUSH: 406 case SPDK_BDEV_IO_TYPE_RESET: 407 return true; 408 409 default: 410 return false; 411 } 412 } 413 414 static void 415 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch) 416 { 417 int i, io_status, rc; 418 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 419 struct spdk_bdev_io *bdev_io; 420 struct bdev_rbd_io *rbd_io; 421 enum spdk_bdev_io_status bio_status; 422 423 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 424 for (i = 0; i < rc; i++) { 425 bdev_io = rbd_aio_get_arg(comps[i]); 426 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 427 io_status = rbd_aio_get_return_value(comps[i]); 428 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 429 430 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 431 if ((int)rbd_io->total_len != io_status) { 432 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 433 } 434 } else { 435 /* For others, 0 means success */ 436 if (io_status != 0) { 437 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 438 } 439 } 440 441 rbd_aio_release(comps[i]); 442 443 spdk_bdev_io_complete(bdev_io, bio_status); 444 } 445 } 446 447 static void 448 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 449 { 450 if (!ch) { 451 return; 452 } 453 454 if (ch->image) { 455 bdev_rbd_exit(ch->image); 456 } 457 458 if (ch->io_ctx) { 459 rados_ioctx_destroy(ch->io_ctx); 460 } 461 462 if (ch->cluster) { 463 rados_shutdown(ch->cluster); 464 } 465 466 if (ch->pfd >= 0) { 467 close(ch->pfd); 468 } 469 470 if (ch->group_ch) { 471 spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch)); 472 } 473 } 474 475 static void * 476 bdev_rbd_handle(void *arg) 477 { 478 struct bdev_rbd_io_channel *ch = arg; 479 void *ret = arg; 480 int rc; 481 482 rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name, 483 (const char *const *)ch->disk->config, 484 &ch->cluster, &ch->io_ctx); 485 if (rc < 0) { 486 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n", 487 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name); 488 ret = NULL; 489 goto end; 490 } 491 492 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 493 SPDK_ERRLOG("Failed to open specified rbd device\n"); 494 ret = NULL; 495 } 496 497 end: 498 return ret; 499 } 500 501 static int 502 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 503 { 504 struct bdev_rbd_io_channel *ch = ctx_buf; 505 int ret; 506 struct epoll_event event; 507 508 ch->disk = io_device; 509 ch->image = NULL; 510 ch->io_ctx = NULL; 511 ch->pfd = -1; 512 513 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 514 goto err; 515 } 516 517 ch->pfd = eventfd(0, EFD_NONBLOCK); 518 if (ch->pfd < 0) { 519 SPDK_ERRLOG("Failed to get eventfd\n"); 520 goto err; 521 } 522 523 ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD); 524 if (ret < 0) { 525 SPDK_ERRLOG("Failed to set rbd image notification\n"); 526 goto err; 527 } 528 529 ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 530 assert(ch->group_ch != NULL); 531 memset(&event, 0, sizeof(event)); 532 event.events = EPOLLIN; 533 event.data.ptr = ch; 534 535 ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event); 536 if (ret < 0) { 537 SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch, 538 ch->group_ch); 539 goto err; 540 } 541 542 return 0; 543 544 err: 545 bdev_rbd_free_channel(ch); 546 return -1; 547 } 548 549 static void 550 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 551 { 552 struct bdev_rbd_io_channel *io_channel = ctx_buf; 553 int rc; 554 555 rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL, 556 io_channel->pfd, NULL); 557 if (rc < 0) { 558 SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n", 559 io_channel, io_channel->group_ch); 560 } 561 562 bdev_rbd_free_channel(io_channel); 563 } 564 565 static struct spdk_io_channel * 566 bdev_rbd_get_io_channel(void *ctx) 567 { 568 struct bdev_rbd *rbd_bdev = ctx; 569 570 return spdk_get_io_channel(rbd_bdev); 571 } 572 573 static int 574 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 575 { 576 struct bdev_rbd *rbd_bdev = ctx; 577 578 spdk_json_write_named_object_begin(w, "rbd"); 579 580 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 581 582 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 583 584 if (rbd_bdev->user_id) { 585 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 586 } 587 588 if (rbd_bdev->config) { 589 char **entry = rbd_bdev->config; 590 591 spdk_json_write_named_object_begin(w, "config"); 592 while (*entry) { 593 spdk_json_write_named_string(w, entry[0], entry[1]); 594 entry += 2; 595 } 596 spdk_json_write_object_end(w); 597 } 598 599 spdk_json_write_object_end(w); 600 601 return 0; 602 } 603 604 static void 605 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 606 { 607 struct bdev_rbd *rbd = bdev->ctxt; 608 609 spdk_json_write_object_begin(w); 610 611 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 612 613 spdk_json_write_named_object_begin(w, "params"); 614 spdk_json_write_named_string(w, "name", bdev->name); 615 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 616 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 617 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 618 if (rbd->user_id) { 619 spdk_json_write_named_string(w, "user_id", rbd->user_id); 620 } 621 622 if (rbd->config) { 623 char **entry = rbd->config; 624 625 spdk_json_write_named_object_begin(w, "config"); 626 while (*entry) { 627 spdk_json_write_named_string(w, entry[0], entry[1]); 628 entry += 2; 629 } 630 spdk_json_write_object_end(w); 631 } 632 633 spdk_json_write_object_end(w); 634 635 spdk_json_write_object_end(w); 636 } 637 638 static const struct spdk_bdev_fn_table rbd_fn_table = { 639 .destruct = bdev_rbd_destruct, 640 .submit_request = bdev_rbd_submit_request, 641 .io_type_supported = bdev_rbd_io_type_supported, 642 .get_io_channel = bdev_rbd_get_io_channel, 643 .dump_info_json = bdev_rbd_dump_info_json, 644 .write_config_json = bdev_rbd_write_config_json, 645 }; 646 647 int 648 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 649 const char *pool_name, 650 const char *const *config, 651 const char *rbd_name, 652 uint32_t block_size) 653 { 654 struct bdev_rbd *rbd; 655 int ret; 656 657 if ((pool_name == NULL) || (rbd_name == NULL)) { 658 return -EINVAL; 659 } 660 661 rbd = calloc(1, sizeof(struct bdev_rbd)); 662 if (rbd == NULL) { 663 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 664 return -ENOMEM; 665 } 666 667 rbd->rbd_name = strdup(rbd_name); 668 if (!rbd->rbd_name) { 669 bdev_rbd_free(rbd); 670 return -ENOMEM; 671 } 672 673 if (user_id) { 674 rbd->user_id = strdup(user_id); 675 if (!rbd->user_id) { 676 bdev_rbd_free(rbd); 677 return -ENOMEM; 678 } 679 } 680 681 rbd->pool_name = strdup(pool_name); 682 if (!rbd->pool_name) { 683 bdev_rbd_free(rbd); 684 return -ENOMEM; 685 } 686 687 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 688 bdev_rbd_free(rbd); 689 return -ENOMEM; 690 } 691 692 ret = bdev_rbd_init(rbd->user_id, rbd->pool_name, 693 (const char *const *)rbd->config, 694 rbd_name, &rbd->info); 695 if (ret < 0) { 696 bdev_rbd_free(rbd); 697 SPDK_ERRLOG("Failed to init rbd device\n"); 698 return ret; 699 } 700 701 if (name) { 702 rbd->disk.name = strdup(name); 703 } else { 704 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 705 } 706 if (!rbd->disk.name) { 707 bdev_rbd_free(rbd); 708 return -ENOMEM; 709 } 710 rbd->disk.product_name = "Ceph Rbd Disk"; 711 bdev_rbd_count++; 712 713 rbd->disk.write_cache = 0; 714 rbd->disk.blocklen = block_size; 715 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 716 rbd->disk.ctxt = rbd; 717 rbd->disk.fn_table = &rbd_fn_table; 718 rbd->disk.module = &rbd_if; 719 720 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 721 722 spdk_io_device_register(rbd, bdev_rbd_create_cb, 723 bdev_rbd_destroy_cb, 724 sizeof(struct bdev_rbd_io_channel), 725 rbd_name); 726 ret = spdk_bdev_register(&rbd->disk); 727 if (ret) { 728 spdk_io_device_unregister(rbd, NULL); 729 bdev_rbd_free(rbd); 730 return ret; 731 } 732 733 *bdev = &(rbd->disk); 734 735 return ret; 736 } 737 738 void 739 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 740 { 741 if (!bdev || bdev->module != &rbd_if) { 742 cb_fn(cb_arg, -ENODEV); 743 return; 744 } 745 746 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 747 } 748 749 int 750 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 751 { 752 struct spdk_io_channel *ch; 753 struct bdev_rbd_io_channel *rbd_io_ch; 754 int rc; 755 uint64_t new_size_in_byte; 756 uint64_t current_size_in_mb; 757 758 if (bdev->module != &rbd_if) { 759 return -EINVAL; 760 } 761 762 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 763 if (current_size_in_mb > new_size_in_mb) { 764 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 765 return -EINVAL; 766 } 767 768 ch = bdev_rbd_get_io_channel(bdev); 769 rbd_io_ch = spdk_io_channel_get_ctx(ch); 770 new_size_in_byte = new_size_in_mb * 1024 * 1024; 771 772 rc = rbd_resize(rbd_io_ch->image, new_size_in_byte); 773 if (rc != 0) { 774 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 775 return rc; 776 } 777 778 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 779 if (rc != 0) { 780 SPDK_ERRLOG("failed to notify block cnt change.\n"); 781 return rc; 782 } 783 784 return rc; 785 } 786 787 static int 788 bdev_rbd_group_poll(void *arg) 789 { 790 struct bdev_rbd_group_channel *group_ch = arg; 791 struct epoll_event events[MAX_EVENTS_PER_POLL]; 792 int num_events, i; 793 794 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 795 796 if (num_events <= 0) { 797 return SPDK_POLLER_IDLE; 798 } 799 800 for (i = 0; i < num_events; i++) { 801 bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr); 802 } 803 804 return SPDK_POLLER_BUSY; 805 } 806 807 static int 808 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 809 { 810 struct bdev_rbd_group_channel *ch = ctx_buf; 811 812 ch->epoll_fd = epoll_create1(0); 813 if (ch->epoll_fd < 0) { 814 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 815 return -1; 816 } 817 818 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0); 819 820 return 0; 821 } 822 823 static void 824 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 825 { 826 struct bdev_rbd_group_channel *ch = ctx_buf; 827 828 if (ch->epoll_fd >= 0) { 829 close(ch->epoll_fd); 830 } 831 832 spdk_poller_unregister(&ch->poller); 833 } 834 835 static int 836 bdev_rbd_library_init(void) 837 { 838 int i, rc = 0; 839 const char *val; 840 const char *pool_name; 841 const char *rbd_name; 842 struct spdk_bdev *bdev; 843 uint32_t block_size; 844 long int tmp; 845 struct spdk_conf_section *sp; 846 847 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 848 sizeof(struct bdev_rbd_group_channel), 849 "bdev_rbd_poll_groups"); 850 851 sp = spdk_conf_find_section(NULL, "Ceph"); 852 if (sp == NULL) { 853 /* 854 * Ceph section not found. Do not initialize any rbd LUNS. 855 */ 856 goto end; 857 } 858 859 /* Init rbd block devices */ 860 for (i = 0; ; i++) { 861 val = spdk_conf_section_get_nval(sp, "Ceph", i); 862 if (val == NULL) { 863 break; 864 } 865 866 /* get the Rbd_pool name */ 867 pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0); 868 if (pool_name == NULL) { 869 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i); 870 rc = -1; 871 goto end; 872 } 873 874 rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1); 875 if (rbd_name == NULL) { 876 SPDK_ERRLOG("Ceph%d: format error\n", i); 877 rc = -1; 878 goto end; 879 } 880 881 val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2); 882 883 if (val == NULL) { 884 block_size = 512; /* default value */ 885 } else { 886 tmp = spdk_strtol(val, 10); 887 if (tmp <= 0) { 888 SPDK_ERRLOG("Invalid block size\n"); 889 rc = -1; 890 goto end; 891 } else if (tmp & 0x1ff) { 892 SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n", 893 tmp); 894 rc = -1; 895 goto end; 896 } 897 block_size = (uint32_t)tmp; 898 } 899 900 /* TODO(?): user_id and rbd config values */ 901 rc = bdev_rbd_create(&bdev, NULL, NULL, pool_name, NULL, rbd_name, block_size); 902 if (rc) { 903 goto end; 904 } 905 } 906 907 end: 908 return rc; 909 } 910 911 static void 912 bdev_rbd_library_fini(void) 913 { 914 spdk_io_device_unregister(&rbd_if, NULL); 915 } 916 917 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 918