1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 #include "spdk/likely.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk/log.h" 53 54 #define SPDK_RBD_QUEUE_DEPTH 128 55 #define MAX_EVENTS_PER_POLL 128 56 57 static int bdev_rbd_count = 0; 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 rbd_image_info_t info; 66 TAILQ_ENTRY(bdev_rbd) tailq; 67 struct spdk_poller *reset_timer; 68 struct spdk_bdev_io *reset_bdev_io; 69 }; 70 71 struct bdev_rbd_group_channel { 72 struct spdk_poller *poller; 73 int epoll_fd; 74 }; 75 76 struct bdev_rbd_io_channel { 77 rados_ioctx_t io_ctx; 78 rados_t cluster; 79 int pfd; 80 rbd_image_t image; 81 struct bdev_rbd *disk; 82 struct bdev_rbd_group_channel *group_ch; 83 }; 84 85 struct bdev_rbd_io { 86 size_t total_len; 87 }; 88 89 static void 90 bdev_rbd_free(struct bdev_rbd *rbd) 91 { 92 if (!rbd) { 93 return; 94 } 95 96 free(rbd->disk.name); 97 free(rbd->rbd_name); 98 free(rbd->user_id); 99 free(rbd->pool_name); 100 bdev_rbd_free_config(rbd->config); 101 free(rbd); 102 } 103 104 void 105 bdev_rbd_free_config(char **config) 106 { 107 char **entry; 108 109 if (config) { 110 for (entry = config; *entry; entry++) { 111 free(*entry); 112 } 113 free(config); 114 } 115 } 116 117 char ** 118 bdev_rbd_dup_config(const char *const *config) 119 { 120 size_t count; 121 char **copy; 122 123 if (!config) { 124 return NULL; 125 } 126 for (count = 0; config[count]; count++) {} 127 copy = calloc(count + 1, sizeof(*copy)); 128 if (!copy) { 129 return NULL; 130 } 131 for (count = 0; config[count]; count++) { 132 if (!(copy[count] = strdup(config[count]))) { 133 bdev_rbd_free_config(copy); 134 return NULL; 135 } 136 } 137 return copy; 138 } 139 140 static int 141 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 142 rados_t *cluster, rados_ioctx_t *io_ctx) 143 { 144 int ret; 145 146 ret = rados_create(cluster, user_id); 147 if (ret < 0) { 148 SPDK_ERRLOG("Failed to create rados_t struct\n"); 149 return -1; 150 } 151 152 if (config) { 153 const char *const *entry = config; 154 while (*entry) { 155 ret = rados_conf_set(*cluster, entry[0], entry[1]); 156 if (ret < 0) { 157 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 158 rados_shutdown(*cluster); 159 return -1; 160 } 161 entry += 2; 162 } 163 } else { 164 ret = rados_conf_read_file(*cluster, NULL); 165 if (ret < 0) { 166 SPDK_ERRLOG("Failed to read conf file\n"); 167 rados_shutdown(*cluster); 168 return -1; 169 } 170 } 171 172 ret = rados_connect(*cluster); 173 if (ret < 0) { 174 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 175 rados_shutdown(*cluster); 176 return -1; 177 } 178 179 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx); 180 181 if (ret < 0) { 182 SPDK_ERRLOG("Failed to create ioctx\n"); 183 rados_shutdown(*cluster); 184 return -1; 185 } 186 187 return 0; 188 } 189 190 static int 191 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 192 const char *rbd_name, rbd_image_info_t *info) 193 { 194 int ret; 195 rados_t cluster = NULL; 196 rados_ioctx_t io_ctx = NULL; 197 rbd_image_t image = NULL; 198 199 ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx); 200 if (ret < 0) { 201 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n", 202 user_id ? user_id : "admin (the default)", rbd_pool_name); 203 return -1; 204 } 205 206 ret = rbd_open(io_ctx, rbd_name, &image, NULL); 207 if (ret < 0) { 208 SPDK_ERRLOG("Failed to open specified rbd device\n"); 209 goto err; 210 } 211 ret = rbd_stat(image, info, sizeof(*info)); 212 rbd_close(image); 213 if (ret < 0) { 214 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 215 goto err; 216 } 217 218 rados_ioctx_destroy(io_ctx); 219 return 0; 220 err: 221 rados_ioctx_destroy(io_ctx); 222 rados_shutdown(cluster); 223 return -1; 224 } 225 226 static void 227 bdev_rbd_exit(rbd_image_t image) 228 { 229 rbd_flush(image); 230 rbd_close(image); 231 } 232 233 static void 234 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 235 { 236 /* Doing nothing here */ 237 } 238 239 static int 240 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 241 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 242 { 243 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 244 int ret; 245 rbd_completion_t comp; 246 struct bdev_rbd_io *rbd_io; 247 rbd_image_t image = rbdio_ch->image; 248 249 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 250 &comp); 251 if (ret < 0) { 252 return -1; 253 } 254 255 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 256 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 257 rbd_io->total_len = len; 258 if (spdk_likely(iovcnt == 1)) { 259 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 260 } else { 261 ret = rbd_aio_readv(image, iov, iovcnt, offset, comp); 262 } 263 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 264 if (spdk_likely(iovcnt == 1)) { 265 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 266 } else { 267 ret = rbd_aio_writev(image, iov, iovcnt, offset, comp); 268 } 269 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 270 ret = rbd_aio_flush(image, comp); 271 } 272 273 if (ret < 0) { 274 rbd_aio_release(comp); 275 return -1; 276 } 277 278 return 0; 279 } 280 281 static int bdev_rbd_library_init(void); 282 283 static void bdev_rbd_library_fini(void); 284 285 static int 286 bdev_rbd_get_ctx_size(void) 287 { 288 return sizeof(struct bdev_rbd_io); 289 } 290 291 static struct spdk_bdev_module rbd_if = { 292 .name = "rbd", 293 .module_init = bdev_rbd_library_init, 294 .module_fini = bdev_rbd_library_fini, 295 .get_ctx_size = bdev_rbd_get_ctx_size, 296 297 }; 298 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 299 300 static int 301 bdev_rbd_reset_timer(void *arg) 302 { 303 struct bdev_rbd *disk = arg; 304 305 /* 306 * TODO: This should check if any I/O is still in flight before completing the reset. 307 * For now, just complete after the timer expires. 308 */ 309 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 310 spdk_poller_unregister(&disk->reset_timer); 311 disk->reset_bdev_io = NULL; 312 313 return SPDK_POLLER_BUSY; 314 } 315 316 static int 317 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 318 { 319 /* 320 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 321 * timer to wait for in-flight I/O to complete. 322 */ 323 assert(disk->reset_bdev_io == NULL); 324 disk->reset_bdev_io = bdev_io; 325 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 326 327 return 0; 328 } 329 330 static int 331 bdev_rbd_destruct(void *ctx) 332 { 333 struct bdev_rbd *rbd = ctx; 334 335 spdk_io_device_unregister(rbd, NULL); 336 337 bdev_rbd_free(rbd); 338 return 0; 339 } 340 341 static void 342 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 343 bool success) 344 { 345 int ret; 346 347 if (!success) { 348 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 349 return; 350 } 351 352 ret = bdev_rbd_start_aio(ch, 353 bdev_io, 354 bdev_io->u.bdev.iovs, 355 bdev_io->u.bdev.iovcnt, 356 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 357 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 358 359 if (ret != 0) { 360 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 361 } 362 } 363 364 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 365 { 366 switch (bdev_io->type) { 367 case SPDK_BDEV_IO_TYPE_READ: 368 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 369 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 370 return 0; 371 372 case SPDK_BDEV_IO_TYPE_WRITE: 373 case SPDK_BDEV_IO_TYPE_FLUSH: 374 return bdev_rbd_start_aio(ch, 375 bdev_io, 376 bdev_io->u.bdev.iovs, 377 bdev_io->u.bdev.iovcnt, 378 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 379 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 380 381 case SPDK_BDEV_IO_TYPE_RESET: 382 return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 383 bdev_io); 384 385 default: 386 return -1; 387 } 388 return 0; 389 } 390 391 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 392 { 393 if (_bdev_rbd_submit_request(ch, bdev_io) < 0) { 394 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 395 } 396 } 397 398 static bool 399 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 400 { 401 switch (io_type) { 402 case SPDK_BDEV_IO_TYPE_READ: 403 case SPDK_BDEV_IO_TYPE_WRITE: 404 case SPDK_BDEV_IO_TYPE_FLUSH: 405 case SPDK_BDEV_IO_TYPE_RESET: 406 return true; 407 408 default: 409 return false; 410 } 411 } 412 413 static void 414 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch) 415 { 416 int i, io_status, rc; 417 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 418 struct spdk_bdev_io *bdev_io; 419 struct bdev_rbd_io *rbd_io; 420 enum spdk_bdev_io_status bio_status; 421 422 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 423 for (i = 0; i < rc; i++) { 424 bdev_io = rbd_aio_get_arg(comps[i]); 425 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 426 io_status = rbd_aio_get_return_value(comps[i]); 427 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 428 429 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 430 if ((int)rbd_io->total_len != io_status) { 431 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 432 } 433 } else { 434 /* For others, 0 means success */ 435 if (io_status != 0) { 436 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 437 } 438 } 439 440 rbd_aio_release(comps[i]); 441 442 spdk_bdev_io_complete(bdev_io, bio_status); 443 } 444 } 445 446 static void 447 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 448 { 449 if (!ch) { 450 return; 451 } 452 453 if (ch->image) { 454 bdev_rbd_exit(ch->image); 455 } 456 457 if (ch->io_ctx) { 458 rados_ioctx_destroy(ch->io_ctx); 459 } 460 461 if (ch->cluster) { 462 rados_shutdown(ch->cluster); 463 } 464 465 if (ch->pfd >= 0) { 466 close(ch->pfd); 467 } 468 469 if (ch->group_ch) { 470 spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch)); 471 } 472 } 473 474 static void * 475 bdev_rbd_handle(void *arg) 476 { 477 struct bdev_rbd_io_channel *ch = arg; 478 void *ret = arg; 479 int rc; 480 481 rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name, 482 (const char *const *)ch->disk->config, 483 &ch->cluster, &ch->io_ctx); 484 if (rc < 0) { 485 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n", 486 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name); 487 ret = NULL; 488 goto end; 489 } 490 491 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 492 SPDK_ERRLOG("Failed to open specified rbd device\n"); 493 ret = NULL; 494 } 495 496 end: 497 return ret; 498 } 499 500 static int 501 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 502 { 503 struct bdev_rbd_io_channel *ch = ctx_buf; 504 int ret; 505 struct epoll_event event; 506 507 ch->disk = io_device; 508 ch->image = NULL; 509 ch->io_ctx = NULL; 510 ch->pfd = -1; 511 512 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 513 goto err; 514 } 515 516 ch->pfd = eventfd(0, EFD_NONBLOCK); 517 if (ch->pfd < 0) { 518 SPDK_ERRLOG("Failed to get eventfd\n"); 519 goto err; 520 } 521 522 ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD); 523 if (ret < 0) { 524 SPDK_ERRLOG("Failed to set rbd image notification\n"); 525 goto err; 526 } 527 528 ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 529 assert(ch->group_ch != NULL); 530 memset(&event, 0, sizeof(event)); 531 event.events = EPOLLIN; 532 event.data.ptr = ch; 533 534 ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event); 535 if (ret < 0) { 536 SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch, 537 ch->group_ch); 538 goto err; 539 } 540 541 return 0; 542 543 err: 544 bdev_rbd_free_channel(ch); 545 return -1; 546 } 547 548 static void 549 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 550 { 551 struct bdev_rbd_io_channel *io_channel = ctx_buf; 552 int rc; 553 554 rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL, 555 io_channel->pfd, NULL); 556 if (rc < 0) { 557 SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n", 558 io_channel, io_channel->group_ch); 559 } 560 561 bdev_rbd_free_channel(io_channel); 562 } 563 564 static struct spdk_io_channel * 565 bdev_rbd_get_io_channel(void *ctx) 566 { 567 struct bdev_rbd *rbd_bdev = ctx; 568 569 return spdk_get_io_channel(rbd_bdev); 570 } 571 572 static int 573 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 574 { 575 struct bdev_rbd *rbd_bdev = ctx; 576 577 spdk_json_write_named_object_begin(w, "rbd"); 578 579 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 580 581 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 582 583 if (rbd_bdev->user_id) { 584 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 585 } 586 587 if (rbd_bdev->config) { 588 char **entry = rbd_bdev->config; 589 590 spdk_json_write_named_object_begin(w, "config"); 591 while (*entry) { 592 spdk_json_write_named_string(w, entry[0], entry[1]); 593 entry += 2; 594 } 595 spdk_json_write_object_end(w); 596 } 597 598 spdk_json_write_object_end(w); 599 600 return 0; 601 } 602 603 static void 604 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 605 { 606 struct bdev_rbd *rbd = bdev->ctxt; 607 608 spdk_json_write_object_begin(w); 609 610 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 611 612 spdk_json_write_named_object_begin(w, "params"); 613 spdk_json_write_named_string(w, "name", bdev->name); 614 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 615 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 616 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 617 if (rbd->user_id) { 618 spdk_json_write_named_string(w, "user_id", rbd->user_id); 619 } 620 621 if (rbd->config) { 622 char **entry = rbd->config; 623 624 spdk_json_write_named_object_begin(w, "config"); 625 while (*entry) { 626 spdk_json_write_named_string(w, entry[0], entry[1]); 627 entry += 2; 628 } 629 spdk_json_write_object_end(w); 630 } 631 632 spdk_json_write_object_end(w); 633 634 spdk_json_write_object_end(w); 635 } 636 637 static const struct spdk_bdev_fn_table rbd_fn_table = { 638 .destruct = bdev_rbd_destruct, 639 .submit_request = bdev_rbd_submit_request, 640 .io_type_supported = bdev_rbd_io_type_supported, 641 .get_io_channel = bdev_rbd_get_io_channel, 642 .dump_info_json = bdev_rbd_dump_info_json, 643 .write_config_json = bdev_rbd_write_config_json, 644 }; 645 646 int 647 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 648 const char *pool_name, 649 const char *const *config, 650 const char *rbd_name, 651 uint32_t block_size) 652 { 653 struct bdev_rbd *rbd; 654 int ret; 655 656 if ((pool_name == NULL) || (rbd_name == NULL)) { 657 return -EINVAL; 658 } 659 660 rbd = calloc(1, sizeof(struct bdev_rbd)); 661 if (rbd == NULL) { 662 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 663 return -ENOMEM; 664 } 665 666 rbd->rbd_name = strdup(rbd_name); 667 if (!rbd->rbd_name) { 668 bdev_rbd_free(rbd); 669 return -ENOMEM; 670 } 671 672 if (user_id) { 673 rbd->user_id = strdup(user_id); 674 if (!rbd->user_id) { 675 bdev_rbd_free(rbd); 676 return -ENOMEM; 677 } 678 } 679 680 rbd->pool_name = strdup(pool_name); 681 if (!rbd->pool_name) { 682 bdev_rbd_free(rbd); 683 return -ENOMEM; 684 } 685 686 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 687 bdev_rbd_free(rbd); 688 return -ENOMEM; 689 } 690 691 ret = bdev_rbd_init(rbd->user_id, rbd->pool_name, 692 (const char *const *)rbd->config, 693 rbd_name, &rbd->info); 694 if (ret < 0) { 695 bdev_rbd_free(rbd); 696 SPDK_ERRLOG("Failed to init rbd device\n"); 697 return ret; 698 } 699 700 if (name) { 701 rbd->disk.name = strdup(name); 702 } else { 703 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 704 } 705 if (!rbd->disk.name) { 706 bdev_rbd_free(rbd); 707 return -ENOMEM; 708 } 709 rbd->disk.product_name = "Ceph Rbd Disk"; 710 bdev_rbd_count++; 711 712 rbd->disk.write_cache = 0; 713 rbd->disk.blocklen = block_size; 714 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 715 rbd->disk.ctxt = rbd; 716 rbd->disk.fn_table = &rbd_fn_table; 717 rbd->disk.module = &rbd_if; 718 719 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 720 721 spdk_io_device_register(rbd, bdev_rbd_create_cb, 722 bdev_rbd_destroy_cb, 723 sizeof(struct bdev_rbd_io_channel), 724 rbd_name); 725 ret = spdk_bdev_register(&rbd->disk); 726 if (ret) { 727 spdk_io_device_unregister(rbd, NULL); 728 bdev_rbd_free(rbd); 729 return ret; 730 } 731 732 *bdev = &(rbd->disk); 733 734 return ret; 735 } 736 737 void 738 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 739 { 740 if (!bdev || bdev->module != &rbd_if) { 741 cb_fn(cb_arg, -ENODEV); 742 return; 743 } 744 745 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 746 } 747 748 int 749 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 750 { 751 struct spdk_io_channel *ch; 752 struct bdev_rbd_io_channel *rbd_io_ch; 753 int rc; 754 uint64_t new_size_in_byte; 755 uint64_t current_size_in_mb; 756 757 if (bdev->module != &rbd_if) { 758 return -EINVAL; 759 } 760 761 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 762 if (current_size_in_mb > new_size_in_mb) { 763 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 764 return -EINVAL; 765 } 766 767 ch = bdev_rbd_get_io_channel(bdev); 768 rbd_io_ch = spdk_io_channel_get_ctx(ch); 769 new_size_in_byte = new_size_in_mb * 1024 * 1024; 770 771 rc = rbd_resize(rbd_io_ch->image, new_size_in_byte); 772 if (rc != 0) { 773 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 774 return rc; 775 } 776 777 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 778 if (rc != 0) { 779 SPDK_ERRLOG("failed to notify block cnt change.\n"); 780 return rc; 781 } 782 783 return rc; 784 } 785 786 static int 787 bdev_rbd_group_poll(void *arg) 788 { 789 struct bdev_rbd_group_channel *group_ch = arg; 790 struct epoll_event events[MAX_EVENTS_PER_POLL]; 791 int num_events, i; 792 793 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 794 795 if (num_events <= 0) { 796 return SPDK_POLLER_IDLE; 797 } 798 799 for (i = 0; i < num_events; i++) { 800 bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr); 801 } 802 803 return SPDK_POLLER_BUSY; 804 } 805 806 static int 807 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 808 { 809 struct bdev_rbd_group_channel *ch = ctx_buf; 810 811 ch->epoll_fd = epoll_create1(0); 812 if (ch->epoll_fd < 0) { 813 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 814 return -1; 815 } 816 817 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0); 818 819 return 0; 820 } 821 822 static void 823 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 824 { 825 struct bdev_rbd_group_channel *ch = ctx_buf; 826 827 if (ch->epoll_fd >= 0) { 828 close(ch->epoll_fd); 829 } 830 831 spdk_poller_unregister(&ch->poller); 832 } 833 834 static int 835 bdev_rbd_library_init(void) 836 { 837 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 838 sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups"); 839 840 return 0; 841 } 842 843 static void 844 bdev_rbd_library_fini(void) 845 { 846 spdk_io_device_unregister(&rbd_if, NULL); 847 } 848 849 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 850