1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 #include "spdk/likely.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk/log.h" 53 54 #define SPDK_RBD_QUEUE_DEPTH 128 55 #define MAX_EVENTS_PER_POLL 128 56 57 static int bdev_rbd_count = 0; 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 rbd_image_info_t info; 66 TAILQ_ENTRY(bdev_rbd) tailq; 67 struct spdk_poller *reset_timer; 68 struct spdk_bdev_io *reset_bdev_io; 69 }; 70 71 struct bdev_rbd_group_channel { 72 struct spdk_poller *poller; 73 int epoll_fd; 74 }; 75 76 struct bdev_rbd_io_channel { 77 rados_ioctx_t io_ctx; 78 rados_t cluster; 79 int pfd; 80 rbd_image_t image; 81 struct bdev_rbd *disk; 82 struct bdev_rbd_group_channel *group_ch; 83 }; 84 85 struct bdev_rbd_io { 86 size_t total_len; 87 }; 88 89 static void 90 bdev_rbd_free(struct bdev_rbd *rbd) 91 { 92 if (!rbd) { 93 return; 94 } 95 96 free(rbd->disk.name); 97 free(rbd->rbd_name); 98 free(rbd->user_id); 99 free(rbd->pool_name); 100 bdev_rbd_free_config(rbd->config); 101 free(rbd); 102 } 103 104 void 105 bdev_rbd_free_config(char **config) 106 { 107 char **entry; 108 109 if (config) { 110 for (entry = config; *entry; entry++) { 111 free(*entry); 112 } 113 free(config); 114 } 115 } 116 117 char ** 118 bdev_rbd_dup_config(const char *const *config) 119 { 120 size_t count; 121 char **copy; 122 123 if (!config) { 124 return NULL; 125 } 126 for (count = 0; config[count]; count++) {} 127 copy = calloc(count + 1, sizeof(*copy)); 128 if (!copy) { 129 return NULL; 130 } 131 for (count = 0; config[count]; count++) { 132 if (!(copy[count] = strdup(config[count]))) { 133 bdev_rbd_free_config(copy); 134 return NULL; 135 } 136 } 137 return copy; 138 } 139 140 static int 141 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config, 142 rados_t *cluster, rados_ioctx_t *io_ctx) 143 { 144 int ret; 145 146 ret = rados_create(cluster, user_id); 147 if (ret < 0) { 148 SPDK_ERRLOG("Failed to create rados_t struct\n"); 149 return -1; 150 } 151 152 if (config) { 153 const char *const *entry = config; 154 while (*entry) { 155 ret = rados_conf_set(*cluster, entry[0], entry[1]); 156 if (ret < 0) { 157 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 158 rados_shutdown(*cluster); 159 return -1; 160 } 161 entry += 2; 162 } 163 } else { 164 ret = rados_conf_read_file(*cluster, NULL); 165 if (ret < 0) { 166 SPDK_ERRLOG("Failed to read conf file\n"); 167 rados_shutdown(*cluster); 168 return -1; 169 } 170 } 171 172 ret = rados_connect(*cluster); 173 if (ret < 0) { 174 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 175 rados_shutdown(*cluster); 176 return -1; 177 } 178 179 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx); 180 181 if (ret < 0) { 182 SPDK_ERRLOG("Failed to create ioctx\n"); 183 rados_shutdown(*cluster); 184 return -1; 185 } 186 187 return 0; 188 } 189 190 static int 191 bdev_rbd_init(struct bdev_rbd *rbd) 192 { 193 int ret = 0; 194 rados_t cluster = NULL; 195 rados_ioctx_t io_ctx = NULL; 196 rbd_image_t image = NULL; 197 198 ret = bdev_rados_context_init(rbd->user_id, rbd->pool_name, (const char *const *)rbd->config, 199 &cluster, &io_ctx); 200 if (ret < 0) { 201 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n", 202 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 203 return -1; 204 } 205 206 ret = rbd_open(io_ctx, rbd->rbd_name, &image, NULL); 207 if (ret < 0) { 208 SPDK_ERRLOG("Failed to open specified rbd device\n"); 209 goto end; 210 } 211 ret = rbd_stat(image, &rbd->info, sizeof(rbd->info)); 212 rbd_close(image); 213 if (ret < 0) { 214 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 215 } 216 217 end: 218 rados_ioctx_destroy(io_ctx); 219 rados_shutdown(cluster); 220 return ret; 221 } 222 223 static void 224 bdev_rbd_exit(rbd_image_t image) 225 { 226 rbd_flush(image); 227 rbd_close(image); 228 } 229 230 static void 231 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 232 { 233 /* Doing nothing here */ 234 } 235 236 static void 237 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 238 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 239 { 240 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 241 int ret; 242 rbd_completion_t comp; 243 struct bdev_rbd_io *rbd_io; 244 rbd_image_t image = rbdio_ch->image; 245 246 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 247 &comp); 248 if (ret < 0) { 249 goto err; 250 } 251 252 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 253 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 254 rbd_io->total_len = len; 255 if (spdk_likely(iovcnt == 1)) { 256 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 257 } else { 258 ret = rbd_aio_readv(image, iov, iovcnt, offset, comp); 259 } 260 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 261 if (spdk_likely(iovcnt == 1)) { 262 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 263 } else { 264 ret = rbd_aio_writev(image, iov, iovcnt, offset, comp); 265 } 266 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 267 ret = rbd_aio_flush(image, comp); 268 } 269 270 if (ret < 0) { 271 rbd_aio_release(comp); 272 goto err; 273 } 274 275 return; 276 277 err: 278 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 279 } 280 281 static int bdev_rbd_library_init(void); 282 283 static void bdev_rbd_library_fini(void); 284 285 static int 286 bdev_rbd_get_ctx_size(void) 287 { 288 return sizeof(struct bdev_rbd_io); 289 } 290 291 static struct spdk_bdev_module rbd_if = { 292 .name = "rbd", 293 .module_init = bdev_rbd_library_init, 294 .module_fini = bdev_rbd_library_fini, 295 .get_ctx_size = bdev_rbd_get_ctx_size, 296 297 }; 298 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 299 300 static int 301 bdev_rbd_reset_timer(void *arg) 302 { 303 struct bdev_rbd *disk = arg; 304 305 /* 306 * TODO: This should check if any I/O is still in flight before completing the reset. 307 * For now, just complete after the timer expires. 308 */ 309 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 310 spdk_poller_unregister(&disk->reset_timer); 311 disk->reset_bdev_io = NULL; 312 313 return SPDK_POLLER_BUSY; 314 } 315 316 static void 317 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 318 { 319 /* 320 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 321 * timer to wait for in-flight I/O to complete. 322 */ 323 assert(disk->reset_bdev_io == NULL); 324 disk->reset_bdev_io = bdev_io; 325 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 326 } 327 328 static int 329 bdev_rbd_destruct(void *ctx) 330 { 331 struct bdev_rbd *rbd = ctx; 332 333 spdk_io_device_unregister(rbd, NULL); 334 335 bdev_rbd_free(rbd); 336 return 0; 337 } 338 339 static void 340 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 341 bool success) 342 { 343 if (!success) { 344 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 345 return; 346 } 347 348 bdev_rbd_start_aio(ch, 349 bdev_io, 350 bdev_io->u.bdev.iovs, 351 bdev_io->u.bdev.iovcnt, 352 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 353 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 354 } 355 356 static void 357 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 358 { 359 switch (bdev_io->type) { 360 case SPDK_BDEV_IO_TYPE_READ: 361 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 362 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 363 break; 364 365 case SPDK_BDEV_IO_TYPE_WRITE: 366 case SPDK_BDEV_IO_TYPE_FLUSH: 367 bdev_rbd_start_aio(ch, 368 bdev_io, 369 bdev_io->u.bdev.iovs, 370 bdev_io->u.bdev.iovcnt, 371 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 372 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 373 break; 374 375 case SPDK_BDEV_IO_TYPE_RESET: 376 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 377 bdev_io); 378 break; 379 380 default: 381 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 382 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 383 break; 384 } 385 } 386 387 static bool 388 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 389 { 390 switch (io_type) { 391 case SPDK_BDEV_IO_TYPE_READ: 392 case SPDK_BDEV_IO_TYPE_WRITE: 393 case SPDK_BDEV_IO_TYPE_FLUSH: 394 case SPDK_BDEV_IO_TYPE_RESET: 395 return true; 396 397 default: 398 return false; 399 } 400 } 401 402 static void 403 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch) 404 { 405 int i, io_status, rc; 406 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 407 struct spdk_bdev_io *bdev_io; 408 struct bdev_rbd_io *rbd_io; 409 enum spdk_bdev_io_status bio_status; 410 411 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 412 for (i = 0; i < rc; i++) { 413 bdev_io = rbd_aio_get_arg(comps[i]); 414 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 415 io_status = rbd_aio_get_return_value(comps[i]); 416 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 417 418 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 419 if ((int)rbd_io->total_len != io_status) { 420 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 421 } 422 } else { 423 /* For others, 0 means success */ 424 if (io_status != 0) { 425 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 426 } 427 } 428 429 rbd_aio_release(comps[i]); 430 431 spdk_bdev_io_complete(bdev_io, bio_status); 432 } 433 } 434 435 static void 436 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 437 { 438 if (!ch) { 439 return; 440 } 441 442 if (ch->image) { 443 bdev_rbd_exit(ch->image); 444 } 445 446 if (ch->io_ctx) { 447 rados_ioctx_destroy(ch->io_ctx); 448 } 449 450 if (ch->cluster) { 451 rados_shutdown(ch->cluster); 452 } 453 454 if (ch->pfd >= 0) { 455 close(ch->pfd); 456 } 457 458 if (ch->group_ch) { 459 spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch)); 460 } 461 } 462 463 static void * 464 bdev_rbd_handle(void *arg) 465 { 466 struct bdev_rbd_io_channel *ch = arg; 467 void *ret = arg; 468 int rc; 469 470 rc = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name, 471 (const char *const *)ch->disk->config, 472 &ch->cluster, &ch->io_ctx); 473 if (rc < 0) { 474 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n", 475 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name); 476 ret = NULL; 477 goto end; 478 } 479 480 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 481 SPDK_ERRLOG("Failed to open specified rbd device\n"); 482 ret = NULL; 483 } 484 485 end: 486 return ret; 487 } 488 489 static int 490 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 491 { 492 struct bdev_rbd_io_channel *ch = ctx_buf; 493 int ret; 494 struct epoll_event event; 495 496 ch->disk = io_device; 497 ch->image = NULL; 498 ch->io_ctx = NULL; 499 ch->pfd = -1; 500 501 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 502 goto err; 503 } 504 505 ch->pfd = eventfd(0, EFD_NONBLOCK); 506 if (ch->pfd < 0) { 507 SPDK_ERRLOG("Failed to get eventfd\n"); 508 goto err; 509 } 510 511 ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD); 512 if (ret < 0) { 513 SPDK_ERRLOG("Failed to set rbd image notification\n"); 514 goto err; 515 } 516 517 ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 518 assert(ch->group_ch != NULL); 519 memset(&event, 0, sizeof(event)); 520 event.events = EPOLLIN; 521 event.data.ptr = ch; 522 523 ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event); 524 if (ret < 0) { 525 SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch, 526 ch->group_ch); 527 goto err; 528 } 529 530 return 0; 531 532 err: 533 bdev_rbd_free_channel(ch); 534 return -1; 535 } 536 537 static void 538 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 539 { 540 struct bdev_rbd_io_channel *io_channel = ctx_buf; 541 int rc; 542 543 rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL, 544 io_channel->pfd, NULL); 545 if (rc < 0) { 546 SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n", 547 io_channel, io_channel->group_ch); 548 } 549 550 bdev_rbd_free_channel(io_channel); 551 } 552 553 static struct spdk_io_channel * 554 bdev_rbd_get_io_channel(void *ctx) 555 { 556 struct bdev_rbd *rbd_bdev = ctx; 557 558 return spdk_get_io_channel(rbd_bdev); 559 } 560 561 static int 562 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 563 { 564 struct bdev_rbd *rbd_bdev = ctx; 565 566 spdk_json_write_named_object_begin(w, "rbd"); 567 568 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 569 570 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 571 572 if (rbd_bdev->user_id) { 573 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 574 } 575 576 if (rbd_bdev->config) { 577 char **entry = rbd_bdev->config; 578 579 spdk_json_write_named_object_begin(w, "config"); 580 while (*entry) { 581 spdk_json_write_named_string(w, entry[0], entry[1]); 582 entry += 2; 583 } 584 spdk_json_write_object_end(w); 585 } 586 587 spdk_json_write_object_end(w); 588 589 return 0; 590 } 591 592 static void 593 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 594 { 595 struct bdev_rbd *rbd = bdev->ctxt; 596 597 spdk_json_write_object_begin(w); 598 599 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 600 601 spdk_json_write_named_object_begin(w, "params"); 602 spdk_json_write_named_string(w, "name", bdev->name); 603 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 604 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 605 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 606 if (rbd->user_id) { 607 spdk_json_write_named_string(w, "user_id", rbd->user_id); 608 } 609 610 if (rbd->config) { 611 char **entry = rbd->config; 612 613 spdk_json_write_named_object_begin(w, "config"); 614 while (*entry) { 615 spdk_json_write_named_string(w, entry[0], entry[1]); 616 entry += 2; 617 } 618 spdk_json_write_object_end(w); 619 } 620 621 spdk_json_write_object_end(w); 622 623 spdk_json_write_object_end(w); 624 } 625 626 static const struct spdk_bdev_fn_table rbd_fn_table = { 627 .destruct = bdev_rbd_destruct, 628 .submit_request = bdev_rbd_submit_request, 629 .io_type_supported = bdev_rbd_io_type_supported, 630 .get_io_channel = bdev_rbd_get_io_channel, 631 .dump_info_json = bdev_rbd_dump_info_json, 632 .write_config_json = bdev_rbd_write_config_json, 633 }; 634 635 int 636 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 637 const char *pool_name, 638 const char *const *config, 639 const char *rbd_name, 640 uint32_t block_size) 641 { 642 struct bdev_rbd *rbd; 643 int ret; 644 645 if ((pool_name == NULL) || (rbd_name == NULL)) { 646 return -EINVAL; 647 } 648 649 rbd = calloc(1, sizeof(struct bdev_rbd)); 650 if (rbd == NULL) { 651 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 652 return -ENOMEM; 653 } 654 655 rbd->rbd_name = strdup(rbd_name); 656 if (!rbd->rbd_name) { 657 bdev_rbd_free(rbd); 658 return -ENOMEM; 659 } 660 661 if (user_id) { 662 rbd->user_id = strdup(user_id); 663 if (!rbd->user_id) { 664 bdev_rbd_free(rbd); 665 return -ENOMEM; 666 } 667 } 668 669 rbd->pool_name = strdup(pool_name); 670 if (!rbd->pool_name) { 671 bdev_rbd_free(rbd); 672 return -ENOMEM; 673 } 674 675 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 676 bdev_rbd_free(rbd); 677 return -ENOMEM; 678 } 679 680 ret = bdev_rbd_init(rbd); 681 if (ret < 0) { 682 bdev_rbd_free(rbd); 683 SPDK_ERRLOG("Failed to init rbd device\n"); 684 return ret; 685 } 686 687 if (name) { 688 rbd->disk.name = strdup(name); 689 } else { 690 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 691 } 692 if (!rbd->disk.name) { 693 bdev_rbd_free(rbd); 694 return -ENOMEM; 695 } 696 rbd->disk.product_name = "Ceph Rbd Disk"; 697 bdev_rbd_count++; 698 699 rbd->disk.write_cache = 0; 700 rbd->disk.blocklen = block_size; 701 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 702 rbd->disk.ctxt = rbd; 703 rbd->disk.fn_table = &rbd_fn_table; 704 rbd->disk.module = &rbd_if; 705 706 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 707 708 spdk_io_device_register(rbd, bdev_rbd_create_cb, 709 bdev_rbd_destroy_cb, 710 sizeof(struct bdev_rbd_io_channel), 711 rbd_name); 712 ret = spdk_bdev_register(&rbd->disk); 713 if (ret) { 714 spdk_io_device_unregister(rbd, NULL); 715 bdev_rbd_free(rbd); 716 return ret; 717 } 718 719 *bdev = &(rbd->disk); 720 721 return ret; 722 } 723 724 void 725 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 726 { 727 if (!bdev || bdev->module != &rbd_if) { 728 cb_fn(cb_arg, -ENODEV); 729 return; 730 } 731 732 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 733 } 734 735 int 736 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 737 { 738 struct spdk_io_channel *ch; 739 struct bdev_rbd_io_channel *rbd_io_ch; 740 int rc; 741 uint64_t new_size_in_byte; 742 uint64_t current_size_in_mb; 743 744 if (bdev->module != &rbd_if) { 745 return -EINVAL; 746 } 747 748 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 749 if (current_size_in_mb > new_size_in_mb) { 750 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 751 return -EINVAL; 752 } 753 754 ch = bdev_rbd_get_io_channel(bdev); 755 rbd_io_ch = spdk_io_channel_get_ctx(ch); 756 new_size_in_byte = new_size_in_mb * 1024 * 1024; 757 758 rc = rbd_resize(rbd_io_ch->image, new_size_in_byte); 759 spdk_put_io_channel(ch); 760 if (rc != 0) { 761 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 762 return rc; 763 } 764 765 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 766 if (rc != 0) { 767 SPDK_ERRLOG("failed to notify block cnt change.\n"); 768 return rc; 769 } 770 771 return rc; 772 } 773 774 static int 775 bdev_rbd_group_poll(void *arg) 776 { 777 struct bdev_rbd_group_channel *group_ch = arg; 778 struct epoll_event events[MAX_EVENTS_PER_POLL]; 779 int num_events, i; 780 781 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 782 783 if (num_events <= 0) { 784 return SPDK_POLLER_IDLE; 785 } 786 787 for (i = 0; i < num_events; i++) { 788 bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr); 789 } 790 791 return SPDK_POLLER_BUSY; 792 } 793 794 static int 795 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 796 { 797 struct bdev_rbd_group_channel *ch = ctx_buf; 798 799 ch->epoll_fd = epoll_create1(0); 800 if (ch->epoll_fd < 0) { 801 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 802 return -1; 803 } 804 805 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0); 806 807 return 0; 808 } 809 810 static void 811 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 812 { 813 struct bdev_rbd_group_channel *ch = ctx_buf; 814 815 if (ch->epoll_fd >= 0) { 816 close(ch->epoll_fd); 817 } 818 819 spdk_poller_unregister(&ch->poller); 820 } 821 822 static int 823 bdev_rbd_library_init(void) 824 { 825 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 826 sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups"); 827 828 return 0; 829 } 830 831 static void 832 bdev_rbd_library_fini(void) 833 { 834 spdk_io_device_unregister(&rbd_if, NULL); 835 } 836 837 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 838