1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 #include "spdk/likely.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk/log.h" 53 54 #define SPDK_RBD_QUEUE_DEPTH 128 55 #define MAX_EVENTS_PER_POLL 128 56 57 static int bdev_rbd_count = 0; 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 rbd_image_info_t info; 66 TAILQ_ENTRY(bdev_rbd) tailq; 67 struct spdk_poller *reset_timer; 68 struct spdk_bdev_io *reset_bdev_io; 69 }; 70 71 struct bdev_rbd_group_channel { 72 struct spdk_poller *poller; 73 int epoll_fd; 74 }; 75 76 struct bdev_rbd_io_channel { 77 rados_ioctx_t io_ctx; 78 rados_t cluster; 79 int pfd; 80 rbd_image_t image; 81 struct bdev_rbd *disk; 82 struct bdev_rbd_group_channel *group_ch; 83 }; 84 85 struct bdev_rbd_io { 86 size_t total_len; 87 }; 88 89 static void 90 bdev_rbd_free(struct bdev_rbd *rbd) 91 { 92 if (!rbd) { 93 return; 94 } 95 96 free(rbd->disk.name); 97 free(rbd->rbd_name); 98 free(rbd->user_id); 99 free(rbd->pool_name); 100 bdev_rbd_free_config(rbd->config); 101 free(rbd); 102 } 103 104 void 105 bdev_rbd_free_config(char **config) 106 { 107 char **entry; 108 109 if (config) { 110 for (entry = config; *entry; entry++) { 111 free(*entry); 112 } 113 free(config); 114 } 115 } 116 117 char ** 118 bdev_rbd_dup_config(const char *const *config) 119 { 120 size_t count; 121 char **copy; 122 123 if (!config) { 124 return NULL; 125 } 126 for (count = 0; config[count]; count++) {} 127 copy = calloc(count + 1, sizeof(*copy)); 128 if (!copy) { 129 return NULL; 130 } 131 for (count = 0; config[count]; count++) { 132 if (!(copy[count] = strdup(config[count]))) { 133 bdev_rbd_free_config(copy); 134 return NULL; 135 } 136 } 137 return copy; 138 } 139 140 static int 141 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 142 rados_t *cluster, rados_ioctx_t *io_ctx) 143 { 144 int ret; 145 146 ret = rados_create(cluster, user_id); 147 if (ret < 0) { 148 SPDK_ERRLOG("Failed to create rados_t struct\n"); 149 return -1; 150 } 151 152 if (config) { 153 const char *const *entry = config; 154 while (*entry) { 155 ret = rados_conf_set(*cluster, entry[0], entry[1]); 156 if (ret < 0) { 157 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 158 rados_shutdown(*cluster); 159 return -1; 160 } 161 entry += 2; 162 } 163 } else { 164 ret = rados_conf_read_file(*cluster, NULL); 165 if (ret < 0) { 166 SPDK_ERRLOG("Failed to read conf file\n"); 167 rados_shutdown(*cluster); 168 return -1; 169 } 170 } 171 172 ret = rados_connect(*cluster); 173 if (ret < 0) { 174 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 175 rados_shutdown(*cluster); 176 return -1; 177 } 178 179 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx); 180 181 if (ret < 0) { 182 SPDK_ERRLOG("Failed to create ioctx\n"); 183 rados_shutdown(*cluster); 184 return -1; 185 } 186 187 return 0; 188 } 189 190 static int 191 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 192 const char *rbd_name, rbd_image_info_t *info) 193 { 194 int ret; 195 rados_t cluster = NULL; 196 rados_ioctx_t io_ctx = NULL; 197 rbd_image_t image = NULL; 198 199 ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx); 200 if (ret < 0) { 201 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n", 202 user_id ? user_id : "admin (the default)", rbd_pool_name); 203 return -1; 204 } 205 206 ret = rbd_open(io_ctx, rbd_name, &image, NULL); 207 if (ret < 0) { 208 SPDK_ERRLOG("Failed to open specified rbd device\n"); 209 goto err; 210 } 211 ret = rbd_stat(image, info, sizeof(*info)); 212 rbd_close(image); 213 if (ret < 0) { 214 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 215 goto err; 216 } 217 218 rados_ioctx_destroy(io_ctx); 219 return 0; 220 err: 221 rados_ioctx_destroy(io_ctx); 222 rados_shutdown(cluster); 223 return -1; 224 } 225 226 static void 227 bdev_rbd_exit(rbd_image_t image) 228 { 229 rbd_flush(image); 230 rbd_close(image); 231 } 232 233 static void 234 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 235 { 236 /* Doing nothing here */ 237 } 238 239 static void 240 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 241 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 242 { 243 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 244 int ret; 245 rbd_completion_t comp; 246 struct bdev_rbd_io *rbd_io; 247 rbd_image_t image = rbdio_ch->image; 248 249 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 250 &comp); 251 if (ret < 0) { 252 goto err; 253 } 254 255 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 256 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 257 rbd_io->total_len = len; 258 if (spdk_likely(iovcnt == 1)) { 259 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 260 } else { 261 ret = rbd_aio_readv(image, iov, iovcnt, offset, comp); 262 } 263 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 264 if (spdk_likely(iovcnt == 1)) { 265 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 266 } else { 267 ret = rbd_aio_writev(image, iov, iovcnt, offset, comp); 268 } 269 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 270 ret = rbd_aio_flush(image, comp); 271 } 272 273 if (ret < 0) { 274 rbd_aio_release(comp); 275 goto err; 276 } 277 278 return; 279 280 err: 281 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 282 } 283 284 static int bdev_rbd_library_init(void); 285 286 static void bdev_rbd_library_fini(void); 287 288 static int 289 bdev_rbd_get_ctx_size(void) 290 { 291 return sizeof(struct bdev_rbd_io); 292 } 293 294 static struct spdk_bdev_module rbd_if = { 295 .name = "rbd", 296 .module_init = bdev_rbd_library_init, 297 .module_fini = bdev_rbd_library_fini, 298 .get_ctx_size = bdev_rbd_get_ctx_size, 299 300 }; 301 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 302 303 static int 304 bdev_rbd_reset_timer(void *arg) 305 { 306 struct bdev_rbd *disk = arg; 307 308 /* 309 * TODO: This should check if any I/O is still in flight before completing the reset. 310 * For now, just complete after the timer expires. 311 */ 312 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 313 spdk_poller_unregister(&disk->reset_timer); 314 disk->reset_bdev_io = NULL; 315 316 return SPDK_POLLER_BUSY; 317 } 318 319 static void 320 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 321 { 322 /* 323 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 324 * timer to wait for in-flight I/O to complete. 325 */ 326 assert(disk->reset_bdev_io == NULL); 327 disk->reset_bdev_io = bdev_io; 328 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 329 } 330 331 static int 332 bdev_rbd_destruct(void *ctx) 333 { 334 struct bdev_rbd *rbd = ctx; 335 336 spdk_io_device_unregister(rbd, NULL); 337 338 bdev_rbd_free(rbd); 339 return 0; 340 } 341 342 static void 343 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 344 bool success) 345 { 346 if (!success) { 347 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 348 return; 349 } 350 351 bdev_rbd_start_aio(ch, 352 bdev_io, 353 bdev_io->u.bdev.iovs, 354 bdev_io->u.bdev.iovcnt, 355 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 356 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 357 } 358 359 static void 360 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 361 { 362 switch (bdev_io->type) { 363 case SPDK_BDEV_IO_TYPE_READ: 364 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 365 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 366 break; 367 368 case SPDK_BDEV_IO_TYPE_WRITE: 369 case SPDK_BDEV_IO_TYPE_FLUSH: 370 bdev_rbd_start_aio(ch, 371 bdev_io, 372 bdev_io->u.bdev.iovs, 373 bdev_io->u.bdev.iovcnt, 374 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 375 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 376 break; 377 378 case SPDK_BDEV_IO_TYPE_RESET: 379 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 380 bdev_io); 381 break; 382 383 default: 384 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 385 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 386 break; 387 } 388 } 389 390 static bool 391 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 392 { 393 switch (io_type) { 394 case SPDK_BDEV_IO_TYPE_READ: 395 case SPDK_BDEV_IO_TYPE_WRITE: 396 case SPDK_BDEV_IO_TYPE_FLUSH: 397 case SPDK_BDEV_IO_TYPE_RESET: 398 return true; 399 400 default: 401 return false; 402 } 403 } 404 405 static void 406 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch) 407 { 408 int i, io_status, rc; 409 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 410 struct spdk_bdev_io *bdev_io; 411 struct bdev_rbd_io *rbd_io; 412 enum spdk_bdev_io_status bio_status; 413 414 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 415 for (i = 0; i < rc; i++) { 416 bdev_io = rbd_aio_get_arg(comps[i]); 417 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 418 io_status = rbd_aio_get_return_value(comps[i]); 419 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 420 421 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 422 if ((int)rbd_io->total_len != io_status) { 423 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 424 } 425 } else { 426 /* For others, 0 means success */ 427 if (io_status != 0) { 428 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 429 } 430 } 431 432 rbd_aio_release(comps[i]); 433 434 spdk_bdev_io_complete(bdev_io, bio_status); 435 } 436 } 437 438 static void 439 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 440 { 441 if (!ch) { 442 return; 443 } 444 445 if (ch->image) { 446 bdev_rbd_exit(ch->image); 447 } 448 449 if (ch->io_ctx) { 450 rados_ioctx_destroy(ch->io_ctx); 451 } 452 453 if (ch->cluster) { 454 rados_shutdown(ch->cluster); 455 } 456 457 if (ch->pfd >= 0) { 458 close(ch->pfd); 459 } 460 461 if (ch->group_ch) { 462 spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch)); 463 } 464 } 465 466 static void * 467 bdev_rbd_handle(void *arg) 468 { 469 struct bdev_rbd_io_channel *ch = arg; 470 void *ret = arg; 471 int rc; 472 473 rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name, 474 (const char *const *)ch->disk->config, 475 &ch->cluster, &ch->io_ctx); 476 if (rc < 0) { 477 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n", 478 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name); 479 ret = NULL; 480 goto end; 481 } 482 483 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 484 SPDK_ERRLOG("Failed to open specified rbd device\n"); 485 ret = NULL; 486 } 487 488 end: 489 return ret; 490 } 491 492 static int 493 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 494 { 495 struct bdev_rbd_io_channel *ch = ctx_buf; 496 int ret; 497 struct epoll_event event; 498 499 ch->disk = io_device; 500 ch->image = NULL; 501 ch->io_ctx = NULL; 502 ch->pfd = -1; 503 504 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 505 goto err; 506 } 507 508 ch->pfd = eventfd(0, EFD_NONBLOCK); 509 if (ch->pfd < 0) { 510 SPDK_ERRLOG("Failed to get eventfd\n"); 511 goto err; 512 } 513 514 ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD); 515 if (ret < 0) { 516 SPDK_ERRLOG("Failed to set rbd image notification\n"); 517 goto err; 518 } 519 520 ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 521 assert(ch->group_ch != NULL); 522 memset(&event, 0, sizeof(event)); 523 event.events = EPOLLIN; 524 event.data.ptr = ch; 525 526 ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event); 527 if (ret < 0) { 528 SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch, 529 ch->group_ch); 530 goto err; 531 } 532 533 return 0; 534 535 err: 536 bdev_rbd_free_channel(ch); 537 return -1; 538 } 539 540 static void 541 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 542 { 543 struct bdev_rbd_io_channel *io_channel = ctx_buf; 544 int rc; 545 546 rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL, 547 io_channel->pfd, NULL); 548 if (rc < 0) { 549 SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n", 550 io_channel, io_channel->group_ch); 551 } 552 553 bdev_rbd_free_channel(io_channel); 554 } 555 556 static struct spdk_io_channel * 557 bdev_rbd_get_io_channel(void *ctx) 558 { 559 struct bdev_rbd *rbd_bdev = ctx; 560 561 return spdk_get_io_channel(rbd_bdev); 562 } 563 564 static int 565 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 566 { 567 struct bdev_rbd *rbd_bdev = ctx; 568 569 spdk_json_write_named_object_begin(w, "rbd"); 570 571 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 572 573 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 574 575 if (rbd_bdev->user_id) { 576 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 577 } 578 579 if (rbd_bdev->config) { 580 char **entry = rbd_bdev->config; 581 582 spdk_json_write_named_object_begin(w, "config"); 583 while (*entry) { 584 spdk_json_write_named_string(w, entry[0], entry[1]); 585 entry += 2; 586 } 587 spdk_json_write_object_end(w); 588 } 589 590 spdk_json_write_object_end(w); 591 592 return 0; 593 } 594 595 static void 596 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 597 { 598 struct bdev_rbd *rbd = bdev->ctxt; 599 600 spdk_json_write_object_begin(w); 601 602 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 603 604 spdk_json_write_named_object_begin(w, "params"); 605 spdk_json_write_named_string(w, "name", bdev->name); 606 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 607 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 608 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 609 if (rbd->user_id) { 610 spdk_json_write_named_string(w, "user_id", rbd->user_id); 611 } 612 613 if (rbd->config) { 614 char **entry = rbd->config; 615 616 spdk_json_write_named_object_begin(w, "config"); 617 while (*entry) { 618 spdk_json_write_named_string(w, entry[0], entry[1]); 619 entry += 2; 620 } 621 spdk_json_write_object_end(w); 622 } 623 624 spdk_json_write_object_end(w); 625 626 spdk_json_write_object_end(w); 627 } 628 629 static const struct spdk_bdev_fn_table rbd_fn_table = { 630 .destruct = bdev_rbd_destruct, 631 .submit_request = bdev_rbd_submit_request, 632 .io_type_supported = bdev_rbd_io_type_supported, 633 .get_io_channel = bdev_rbd_get_io_channel, 634 .dump_info_json = bdev_rbd_dump_info_json, 635 .write_config_json = bdev_rbd_write_config_json, 636 }; 637 638 int 639 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 640 const char *pool_name, 641 const char *const *config, 642 const char *rbd_name, 643 uint32_t block_size) 644 { 645 struct bdev_rbd *rbd; 646 int ret; 647 648 if ((pool_name == NULL) || (rbd_name == NULL)) { 649 return -EINVAL; 650 } 651 652 rbd = calloc(1, sizeof(struct bdev_rbd)); 653 if (rbd == NULL) { 654 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 655 return -ENOMEM; 656 } 657 658 rbd->rbd_name = strdup(rbd_name); 659 if (!rbd->rbd_name) { 660 bdev_rbd_free(rbd); 661 return -ENOMEM; 662 } 663 664 if (user_id) { 665 rbd->user_id = strdup(user_id); 666 if (!rbd->user_id) { 667 bdev_rbd_free(rbd); 668 return -ENOMEM; 669 } 670 } 671 672 rbd->pool_name = strdup(pool_name); 673 if (!rbd->pool_name) { 674 bdev_rbd_free(rbd); 675 return -ENOMEM; 676 } 677 678 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 679 bdev_rbd_free(rbd); 680 return -ENOMEM; 681 } 682 683 ret = bdev_rbd_init(rbd->user_id, rbd->pool_name, 684 (const char *const *)rbd->config, 685 rbd_name, &rbd->info); 686 if (ret < 0) { 687 bdev_rbd_free(rbd); 688 SPDK_ERRLOG("Failed to init rbd device\n"); 689 return ret; 690 } 691 692 if (name) { 693 rbd->disk.name = strdup(name); 694 } else { 695 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 696 } 697 if (!rbd->disk.name) { 698 bdev_rbd_free(rbd); 699 return -ENOMEM; 700 } 701 rbd->disk.product_name = "Ceph Rbd Disk"; 702 bdev_rbd_count++; 703 704 rbd->disk.write_cache = 0; 705 rbd->disk.blocklen = block_size; 706 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 707 rbd->disk.ctxt = rbd; 708 rbd->disk.fn_table = &rbd_fn_table; 709 rbd->disk.module = &rbd_if; 710 711 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 712 713 spdk_io_device_register(rbd, bdev_rbd_create_cb, 714 bdev_rbd_destroy_cb, 715 sizeof(struct bdev_rbd_io_channel), 716 rbd_name); 717 ret = spdk_bdev_register(&rbd->disk); 718 if (ret) { 719 spdk_io_device_unregister(rbd, NULL); 720 bdev_rbd_free(rbd); 721 return ret; 722 } 723 724 *bdev = &(rbd->disk); 725 726 return ret; 727 } 728 729 void 730 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 731 { 732 if (!bdev || bdev->module != &rbd_if) { 733 cb_fn(cb_arg, -ENODEV); 734 return; 735 } 736 737 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 738 } 739 740 int 741 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 742 { 743 struct spdk_io_channel *ch; 744 struct bdev_rbd_io_channel *rbd_io_ch; 745 int rc; 746 uint64_t new_size_in_byte; 747 uint64_t current_size_in_mb; 748 749 if (bdev->module != &rbd_if) { 750 return -EINVAL; 751 } 752 753 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 754 if (current_size_in_mb > new_size_in_mb) { 755 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 756 return -EINVAL; 757 } 758 759 ch = bdev_rbd_get_io_channel(bdev); 760 rbd_io_ch = spdk_io_channel_get_ctx(ch); 761 new_size_in_byte = new_size_in_mb * 1024 * 1024; 762 763 rc = rbd_resize(rbd_io_ch->image, new_size_in_byte); 764 if (rc != 0) { 765 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 766 return rc; 767 } 768 769 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 770 if (rc != 0) { 771 SPDK_ERRLOG("failed to notify block cnt change.\n"); 772 return rc; 773 } 774 775 return rc; 776 } 777 778 static int 779 bdev_rbd_group_poll(void *arg) 780 { 781 struct bdev_rbd_group_channel *group_ch = arg; 782 struct epoll_event events[MAX_EVENTS_PER_POLL]; 783 int num_events, i; 784 785 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 786 787 if (num_events <= 0) { 788 return SPDK_POLLER_IDLE; 789 } 790 791 for (i = 0; i < num_events; i++) { 792 bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr); 793 } 794 795 return SPDK_POLLER_BUSY; 796 } 797 798 static int 799 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 800 { 801 struct bdev_rbd_group_channel *ch = ctx_buf; 802 803 ch->epoll_fd = epoll_create1(0); 804 if (ch->epoll_fd < 0) { 805 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 806 return -1; 807 } 808 809 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0); 810 811 return 0; 812 } 813 814 static void 815 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 816 { 817 struct bdev_rbd_group_channel *ch = ctx_buf; 818 819 if (ch->epoll_fd >= 0) { 820 close(ch->epoll_fd); 821 } 822 823 spdk_poller_unregister(&ch->poller); 824 } 825 826 static int 827 bdev_rbd_library_init(void) 828 { 829 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 830 sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups"); 831 832 return 0; 833 } 834 835 static void 836 bdev_rbd_library_fini(void) 837 { 838 spdk_io_device_unregister(&rbd_if, NULL); 839 } 840 841 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 842