1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 #include "spdk/likely.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk/log.h" 53 54 #define SPDK_RBD_QUEUE_DEPTH 128 55 #define MAX_EVENTS_PER_POLL 128 56 57 static int bdev_rbd_count = 0; 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 66 rados_t cluster; 67 rados_t *cluster_p; 68 char *cluster_name; 69 70 rados_ioctx_t io_ctx; 71 rbd_image_t image; 72 int pfd; 73 74 rbd_image_info_t info; 75 pthread_mutex_t mutex; 76 struct spdk_thread *main_td; 77 struct spdk_thread *destruct_td; 78 uint32_t ch_count; 79 struct bdev_rbd_group_channel *group_ch; 80 81 TAILQ_ENTRY(bdev_rbd) tailq; 82 struct spdk_poller *reset_timer; 83 struct spdk_bdev_io *reset_bdev_io; 84 }; 85 86 struct bdev_rbd_group_channel { 87 struct spdk_poller *poller; 88 int epoll_fd; 89 }; 90 91 struct bdev_rbd_io_channel { 92 struct bdev_rbd *disk; 93 }; 94 95 struct bdev_rbd_io { 96 struct spdk_thread *submit_td; 97 enum spdk_bdev_io_status status; 98 rbd_completion_t comp; 99 size_t total_len; 100 }; 101 102 struct bdev_rbd_cluster { 103 char *name; 104 char *user_id; 105 char **config_param; 106 char *config_file; 107 rados_t cluster; 108 uint32_t ref; 109 STAILQ_ENTRY(bdev_rbd_cluster) link; 110 }; 111 112 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 113 g_map_bdev_rbd_cluster); 114 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 115 116 static void 117 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 118 { 119 assert(entry != NULL); 120 121 bdev_rbd_free_config(entry->config_param); 122 free(entry->config_file); 123 free(entry->user_id); 124 free(entry->name); 125 free(entry); 126 } 127 128 static void 129 bdev_rbd_put_cluster(rados_t **cluster) 130 { 131 struct bdev_rbd_cluster *entry; 132 133 assert(cluster != NULL); 134 135 /* No need go through the map if *cluster equals to NULL */ 136 if (*cluster == NULL) { 137 return; 138 } 139 140 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 141 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 142 if (*cluster != &entry->cluster) { 143 continue; 144 } 145 146 assert(entry->ref > 0); 147 entry->ref--; 148 *cluster = NULL; 149 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 150 return; 151 } 152 153 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 154 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 155 } 156 157 static void 158 bdev_rbd_free(struct bdev_rbd *rbd) 159 { 160 if (!rbd) { 161 return; 162 } 163 164 free(rbd->disk.name); 165 free(rbd->rbd_name); 166 free(rbd->user_id); 167 free(rbd->pool_name); 168 bdev_rbd_free_config(rbd->config); 169 170 if (rbd->io_ctx) { 171 rados_ioctx_destroy(rbd->io_ctx); 172 } 173 174 if (rbd->cluster_name) { 175 bdev_rbd_put_cluster(&rbd->cluster_p); 176 free(rbd->cluster_name); 177 } else if (rbd->cluster) { 178 rados_shutdown(rbd->cluster); 179 } 180 181 pthread_mutex_destroy(&rbd->mutex); 182 free(rbd); 183 } 184 185 void 186 bdev_rbd_free_config(char **config) 187 { 188 char **entry; 189 190 if (config) { 191 for (entry = config; *entry; entry++) { 192 free(*entry); 193 } 194 free(config); 195 } 196 } 197 198 char ** 199 bdev_rbd_dup_config(const char *const *config) 200 { 201 size_t count; 202 char **copy; 203 204 if (!config) { 205 return NULL; 206 } 207 for (count = 0; config[count]; count++) {} 208 copy = calloc(count + 1, sizeof(*copy)); 209 if (!copy) { 210 return NULL; 211 } 212 for (count = 0; config[count]; count++) { 213 if (!(copy[count] = strdup(config[count]))) { 214 bdev_rbd_free_config(copy); 215 return NULL; 216 } 217 } 218 return copy; 219 } 220 221 static int 222 bdev_rados_cluster_init(const char *user_id, const char *const *config, 223 rados_t *cluster) 224 { 225 int ret; 226 227 ret = rados_create(cluster, user_id); 228 if (ret < 0) { 229 SPDK_ERRLOG("Failed to create rados_t struct\n"); 230 return -1; 231 } 232 233 if (config) { 234 const char *const *entry = config; 235 while (*entry) { 236 ret = rados_conf_set(*cluster, entry[0], entry[1]); 237 if (ret < 0) { 238 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 239 rados_shutdown(*cluster); 240 return -1; 241 } 242 entry += 2; 243 } 244 } else { 245 ret = rados_conf_read_file(*cluster, NULL); 246 if (ret < 0) { 247 SPDK_ERRLOG("Failed to read conf file\n"); 248 rados_shutdown(*cluster); 249 return -1; 250 } 251 } 252 253 ret = rados_connect(*cluster); 254 if (ret < 0) { 255 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 256 rados_shutdown(*cluster); 257 return -1; 258 } 259 260 return 0; 261 } 262 263 static int 264 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 265 { 266 struct bdev_rbd_cluster *entry; 267 268 if (cluster == NULL) { 269 SPDK_ERRLOG("cluster should not be NULL\n"); 270 return -1; 271 } 272 273 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 274 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 275 if (strcmp(cluster_name, entry->name) == 0) { 276 entry->ref++; 277 *cluster = &entry->cluster; 278 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 279 return 0; 280 } 281 } 282 283 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 284 return -1; 285 } 286 287 static int 288 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 289 { 290 int ret; 291 292 ret = bdev_rbd_get_cluster(cluster_name, cluster); 293 if (ret < 0) { 294 SPDK_ERRLOG("Failed to create rados_t struct\n"); 295 return -1; 296 } 297 298 return ret; 299 } 300 301 static void * 302 bdev_rbd_cluster_handle(void *arg) 303 { 304 void *ret = arg; 305 struct bdev_rbd *rbd = arg; 306 int rc; 307 308 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 309 &rbd->cluster); 310 if (rc < 0) { 311 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 312 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 313 ret = NULL; 314 } 315 316 return ret; 317 } 318 319 static void * 320 bdev_rbd_init_context(void *arg) 321 { 322 struct bdev_rbd *rbd = arg; 323 int rc; 324 325 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 326 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 327 return NULL; 328 } 329 330 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 331 if (rc < 0) { 332 SPDK_ERRLOG("Failed to open specified rbd device\n"); 333 return NULL; 334 } 335 336 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 337 rbd_close(rbd->image); 338 if (rc < 0) { 339 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 340 return NULL; 341 } 342 343 return arg; 344 } 345 346 static int 347 bdev_rbd_init(struct bdev_rbd *rbd) 348 { 349 int ret = 0; 350 351 if (!rbd->cluster_name) { 352 rbd->cluster_p = &rbd->cluster; 353 /* Cluster should be created in non-SPDK thread to avoid conflict between 354 * Rados and SPDK thread */ 355 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 356 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 357 return -1; 358 } 359 } else { 360 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 361 if (ret < 0) { 362 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 363 rbd, rbd->cluster_name); 364 return -1; 365 } 366 } 367 368 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 369 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 370 } 371 372 return ret; 373 } 374 375 static void 376 bdev_rbd_exit(rbd_image_t image) 377 { 378 rbd_flush(image); 379 rbd_close(image); 380 } 381 382 static void 383 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 384 { 385 /* Doing nothing here */ 386 } 387 388 static void 389 _bdev_rbd_io_complete(void *_rbd_io) 390 { 391 struct bdev_rbd_io *rbd_io = _rbd_io; 392 393 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 394 } 395 396 static void 397 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 398 { 399 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 400 struct spdk_thread *current_thread = spdk_get_thread(); 401 402 rbd_io->status = status; 403 assert(rbd_io->submit_td != NULL); 404 if (rbd_io->submit_td != current_thread) { 405 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 406 } else { 407 _bdev_rbd_io_complete(rbd_io); 408 } 409 } 410 411 static void 412 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 413 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 414 { 415 int ret; 416 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 417 rbd_image_t image = disk->image; 418 419 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 420 &rbd_io->comp); 421 if (ret < 0) { 422 goto err; 423 } 424 425 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 426 rbd_io->total_len = len; 427 if (spdk_likely(iovcnt == 1)) { 428 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 429 } else { 430 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 431 } 432 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 433 if (spdk_likely(iovcnt == 1)) { 434 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 435 } else { 436 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 437 } 438 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 439 ret = rbd_aio_flush(image, rbd_io->comp); 440 } 441 442 if (ret < 0) { 443 rbd_aio_release(rbd_io->comp); 444 goto err; 445 } 446 447 return; 448 449 err: 450 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 451 } 452 453 static int bdev_rbd_library_init(void); 454 455 static void bdev_rbd_library_fini(void); 456 457 static int 458 bdev_rbd_get_ctx_size(void) 459 { 460 return sizeof(struct bdev_rbd_io); 461 } 462 463 static struct spdk_bdev_module rbd_if = { 464 .name = "rbd", 465 .module_init = bdev_rbd_library_init, 466 .module_fini = bdev_rbd_library_fini, 467 .get_ctx_size = bdev_rbd_get_ctx_size, 468 469 }; 470 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 471 472 static int 473 bdev_rbd_reset_timer(void *arg) 474 { 475 struct bdev_rbd *disk = arg; 476 477 /* 478 * TODO: This should check if any I/O is still in flight before completing the reset. 479 * For now, just complete after the timer expires. 480 */ 481 bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 482 spdk_poller_unregister(&disk->reset_timer); 483 disk->reset_bdev_io = NULL; 484 485 return SPDK_POLLER_BUSY; 486 } 487 488 static void 489 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 490 { 491 /* 492 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 493 * timer to wait for in-flight I/O to complete. 494 */ 495 assert(disk->reset_bdev_io == NULL); 496 disk->reset_bdev_io = bdev_io; 497 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 498 } 499 500 static void 501 _bdev_rbd_destruct_done(void *io_device) 502 { 503 struct bdev_rbd *rbd = io_device; 504 505 assert(rbd != NULL); 506 assert(rbd->ch_count == 0); 507 508 spdk_bdev_destruct_done(&rbd->disk, 0); 509 bdev_rbd_free(rbd); 510 } 511 512 static void 513 bdev_rbd_free_cb(void *io_device) 514 { 515 struct bdev_rbd *rbd = io_device; 516 517 /* The io device has been unregistered. Send a message back to the 518 * original thread that started the destruct operation, so that the 519 * bdev unregister callback is invoked on the same thread that started 520 * this whole process. 521 */ 522 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 523 } 524 525 static void 526 _bdev_rbd_destruct(void *ctx) 527 { 528 struct bdev_rbd *rbd = ctx; 529 530 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 531 } 532 533 static int 534 bdev_rbd_destruct(void *ctx) 535 { 536 struct bdev_rbd *rbd = ctx; 537 struct spdk_thread *td; 538 539 if (rbd->main_td == NULL) { 540 td = spdk_get_thread(); 541 } else { 542 td = rbd->main_td; 543 } 544 545 /* Start the destruct operation on the rbd bdev's 546 * main thread. This guarantees it will only start 547 * executing after any messages related to channel 548 * deletions have finished completing. *Always* 549 * send a message, even if this function gets called 550 * from the main thread, in case there are pending 551 * channel delete messages in flight to this thread. 552 */ 553 assert(rbd->destruct_td == NULL); 554 rbd->destruct_td = td; 555 spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd); 556 557 /* Return 1 to indicate the destruct path is asynchronous. */ 558 return 1; 559 } 560 561 static void 562 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 563 bool success) 564 { 565 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 566 567 if (!success) { 568 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 569 return; 570 } 571 572 bdev_rbd_start_aio(disk, 573 bdev_io, 574 bdev_io->u.bdev.iovs, 575 bdev_io->u.bdev.iovcnt, 576 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 577 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 578 } 579 580 static void 581 _bdev_rbd_submit_request(void *ctx) 582 { 583 struct spdk_bdev_io *bdev_io = ctx; 584 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 585 586 switch (bdev_io->type) { 587 case SPDK_BDEV_IO_TYPE_READ: 588 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 589 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 590 break; 591 592 case SPDK_BDEV_IO_TYPE_WRITE: 593 case SPDK_BDEV_IO_TYPE_FLUSH: 594 bdev_rbd_start_aio(disk, 595 bdev_io, 596 bdev_io->u.bdev.iovs, 597 bdev_io->u.bdev.iovcnt, 598 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 599 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 600 break; 601 602 case SPDK_BDEV_IO_TYPE_RESET: 603 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 604 bdev_io); 605 break; 606 607 default: 608 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 609 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 610 break; 611 } 612 } 613 614 static void 615 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 616 { 617 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 618 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 619 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 620 621 rbd_io->submit_td = submit_td; 622 if (disk->main_td != submit_td) { 623 spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io); 624 } else { 625 _bdev_rbd_submit_request(bdev_io); 626 } 627 } 628 629 static bool 630 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 631 { 632 switch (io_type) { 633 case SPDK_BDEV_IO_TYPE_READ: 634 case SPDK_BDEV_IO_TYPE_WRITE: 635 case SPDK_BDEV_IO_TYPE_FLUSH: 636 case SPDK_BDEV_IO_TYPE_RESET: 637 return true; 638 639 default: 640 return false; 641 } 642 } 643 644 static void 645 bdev_rbd_io_poll(struct bdev_rbd *disk) 646 { 647 int i, io_status, rc; 648 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 649 struct spdk_bdev_io *bdev_io; 650 struct bdev_rbd_io *rbd_io; 651 enum spdk_bdev_io_status bio_status; 652 653 rc = rbd_poll_io_events(disk->image, comps, SPDK_RBD_QUEUE_DEPTH); 654 for (i = 0; i < rc; i++) { 655 bdev_io = rbd_aio_get_arg(comps[i]); 656 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 657 io_status = rbd_aio_get_return_value(comps[i]); 658 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 659 660 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 661 if ((int)rbd_io->total_len != io_status) { 662 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 663 } 664 } else { 665 /* For others, 0 means success */ 666 if (io_status != 0) { 667 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 668 } 669 } 670 671 rbd_aio_release(comps[i]); 672 673 bdev_rbd_io_complete(bdev_io, bio_status); 674 } 675 } 676 677 static void 678 bdev_rbd_free_channel_resources(struct bdev_rbd *disk) 679 { 680 int rc; 681 682 assert(disk != NULL); 683 assert(disk->main_td == spdk_get_thread()); 684 assert(disk->ch_count == 0); 685 assert(disk->group_ch != NULL); 686 rc = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_DEL, 687 disk->pfd, NULL); 688 if (rc < 0) { 689 SPDK_ERRLOG("Failed to remove fd on disk=%p from the polling group=%p\n", 690 disk, disk->group_ch); 691 } 692 spdk_put_io_channel(spdk_io_channel_from_ctx(disk->group_ch)); 693 694 if (disk->image) { 695 bdev_rbd_exit(disk->image); 696 } 697 698 if (disk->pfd >= 0) { 699 close(disk->pfd); 700 } 701 702 disk->main_td = NULL; 703 disk->group_ch = NULL; 704 } 705 706 static void * 707 bdev_rbd_handle(void *arg) 708 { 709 struct bdev_rbd *disk = arg; 710 void *ret = arg; 711 712 if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) { 713 SPDK_ERRLOG("Failed to open specified rbd device\n"); 714 ret = NULL; 715 } 716 717 return ret; 718 } 719 720 static int 721 _bdev_rbd_create_cb(struct bdev_rbd *disk) 722 { 723 int ret; 724 struct epoll_event event = {}; 725 726 disk->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 727 assert(disk->group_ch != NULL); 728 event.events = EPOLLIN; 729 event.data.ptr = disk; 730 731 if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) { 732 goto err; 733 } 734 735 disk->pfd = eventfd(0, EFD_NONBLOCK); 736 if (disk->pfd < 0) { 737 SPDK_ERRLOG("Failed to get eventfd\n"); 738 goto err; 739 } 740 741 ret = rbd_set_image_notification(disk->image, disk->pfd, EVENT_TYPE_EVENTFD); 742 if (ret < 0) { 743 SPDK_ERRLOG("Failed to set rbd image notification\n"); 744 goto err; 745 } 746 747 ret = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_ADD, disk->pfd, &event); 748 if (ret < 0) { 749 SPDK_ERRLOG("Failed to add the fd of disk=%p to the epoll group from group_ch=%p\n", disk, 750 disk->group_ch); 751 goto err; 752 } 753 754 return 0; 755 756 err: 757 bdev_rbd_free_channel_resources(disk); 758 return -1; 759 } 760 761 static int 762 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 763 { 764 struct bdev_rbd_io_channel *ch = ctx_buf; 765 struct bdev_rbd *disk = io_device; 766 int rc; 767 768 ch->disk = disk; 769 pthread_mutex_lock(&disk->mutex); 770 if (disk->ch_count == 0) { 771 assert(disk->main_td == NULL); 772 rc = _bdev_rbd_create_cb(disk); 773 if (rc) { 774 SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk); 775 pthread_mutex_unlock(&disk->mutex); 776 return rc; 777 } 778 779 disk->main_td = spdk_get_thread(); 780 } 781 782 disk->ch_count++; 783 pthread_mutex_unlock(&disk->mutex); 784 785 return 0; 786 } 787 788 static void 789 _bdev_rbd_destroy_cb(void *ctx) 790 { 791 struct bdev_rbd *disk = ctx; 792 793 pthread_mutex_lock(&disk->mutex); 794 assert(disk->ch_count > 0); 795 disk->ch_count--; 796 797 if (disk->ch_count > 0) { 798 /* A new channel was created between when message was sent and this function executed */ 799 pthread_mutex_unlock(&disk->mutex); 800 return; 801 } 802 803 bdev_rbd_free_channel_resources(disk); 804 pthread_mutex_unlock(&disk->mutex); 805 } 806 807 static void 808 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 809 { 810 struct bdev_rbd *disk = io_device; 811 struct spdk_thread *thread; 812 813 pthread_mutex_lock(&disk->mutex); 814 assert(disk->ch_count > 0); 815 disk->ch_count--; 816 if (disk->ch_count == 0) { 817 assert(disk->main_td != NULL); 818 if (disk->main_td != spdk_get_thread()) { 819 /* The final channel was destroyed on a different thread 820 * than where the first channel was created. Pass a message 821 * to the main thread to unregister the poller. */ 822 disk->ch_count++; 823 thread = disk->main_td; 824 pthread_mutex_unlock(&disk->mutex); 825 spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk); 826 return; 827 } 828 829 bdev_rbd_free_channel_resources(disk); 830 } 831 pthread_mutex_unlock(&disk->mutex); 832 } 833 834 static struct spdk_io_channel * 835 bdev_rbd_get_io_channel(void *ctx) 836 { 837 struct bdev_rbd *rbd_bdev = ctx; 838 839 return spdk_get_io_channel(rbd_bdev); 840 } 841 842 static void 843 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 844 { 845 struct bdev_rbd_cluster *entry; 846 847 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 848 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 849 if (strcmp(cluster_name, entry->name)) { 850 continue; 851 } 852 if (entry->user_id) { 853 spdk_json_write_named_string(w, "user_id", entry->user_id); 854 } 855 856 if (entry->config_param) { 857 char **config_entry = entry->config_param; 858 859 spdk_json_write_named_object_begin(w, "config_param"); 860 while (*config_entry) { 861 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 862 config_entry += 2; 863 } 864 spdk_json_write_object_end(w); 865 } else if (entry->config_file) { 866 spdk_json_write_named_string(w, "config_file", entry->config_file); 867 } 868 869 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 870 return; 871 } 872 873 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 874 } 875 876 static int 877 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 878 { 879 struct bdev_rbd *rbd_bdev = ctx; 880 881 spdk_json_write_named_object_begin(w, "rbd"); 882 883 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 884 885 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 886 887 if (rbd_bdev->cluster_name) { 888 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 889 goto end; 890 } 891 892 if (rbd_bdev->user_id) { 893 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 894 } 895 896 if (rbd_bdev->config) { 897 char **entry = rbd_bdev->config; 898 899 spdk_json_write_named_object_begin(w, "config"); 900 while (*entry) { 901 spdk_json_write_named_string(w, entry[0], entry[1]); 902 entry += 2; 903 } 904 spdk_json_write_object_end(w); 905 } 906 907 end: 908 spdk_json_write_object_end(w); 909 910 return 0; 911 } 912 913 static void 914 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 915 { 916 struct bdev_rbd *rbd = bdev->ctxt; 917 918 spdk_json_write_object_begin(w); 919 920 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 921 922 spdk_json_write_named_object_begin(w, "params"); 923 spdk_json_write_named_string(w, "name", bdev->name); 924 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 925 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 926 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 927 if (rbd->user_id) { 928 spdk_json_write_named_string(w, "user_id", rbd->user_id); 929 } 930 931 if (rbd->config) { 932 char **entry = rbd->config; 933 934 spdk_json_write_named_object_begin(w, "config"); 935 while (*entry) { 936 spdk_json_write_named_string(w, entry[0], entry[1]); 937 entry += 2; 938 } 939 spdk_json_write_object_end(w); 940 } 941 942 spdk_json_write_object_end(w); 943 944 spdk_json_write_object_end(w); 945 } 946 947 static void 948 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 949 { 950 assert(entry != NULL); 951 952 spdk_json_write_object_begin(w); 953 spdk_json_write_named_string(w, "cluster_name", entry->name); 954 955 if (entry->user_id) { 956 spdk_json_write_named_string(w, "user_id", entry->user_id); 957 } 958 959 if (entry->config_param) { 960 char **config_entry = entry->config_param; 961 962 spdk_json_write_named_object_begin(w, "config_param"); 963 while (*config_entry) { 964 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 965 config_entry += 2; 966 } 967 spdk_json_write_object_end(w); 968 } else if (entry->config_file) { 969 spdk_json_write_named_string(w, "config_file", entry->config_file); 970 } 971 972 spdk_json_write_object_end(w); 973 } 974 975 int 976 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 977 { 978 struct bdev_rbd_cluster *entry; 979 struct spdk_json_write_ctx *w; 980 981 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 982 983 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 984 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 985 return -ENOENT; 986 } 987 988 /* If cluster name is provided */ 989 if (name) { 990 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 991 if (strcmp(name, entry->name) == 0) { 992 w = spdk_jsonrpc_begin_result(request); 993 dump_single_cluster_entry(entry, w); 994 spdk_jsonrpc_end_result(request, w); 995 996 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 997 return 0; 998 } 999 } 1000 1001 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1002 return -ENOENT; 1003 } 1004 1005 w = spdk_jsonrpc_begin_result(request); 1006 spdk_json_write_array_begin(w); 1007 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1008 dump_single_cluster_entry(entry, w); 1009 } 1010 spdk_json_write_array_end(w); 1011 spdk_jsonrpc_end_result(request, w); 1012 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1013 1014 return 0; 1015 } 1016 1017 static const struct spdk_bdev_fn_table rbd_fn_table = { 1018 .destruct = bdev_rbd_destruct, 1019 .submit_request = bdev_rbd_submit_request, 1020 .io_type_supported = bdev_rbd_io_type_supported, 1021 .get_io_channel = bdev_rbd_get_io_channel, 1022 .dump_info_json = bdev_rbd_dump_info_json, 1023 .write_config_json = bdev_rbd_write_config_json, 1024 }; 1025 1026 static int 1027 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 1028 const char *config_file) 1029 { 1030 struct bdev_rbd_cluster *entry; 1031 int rc; 1032 1033 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1034 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1035 if (strcmp(name, entry->name) == 0) { 1036 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 1037 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1038 return -1; 1039 } 1040 } 1041 1042 entry = calloc(1, sizeof(*entry)); 1043 if (!entry) { 1044 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 1045 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1046 return -1; 1047 } 1048 1049 entry->name = strdup(name); 1050 if (entry->name == NULL) { 1051 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1052 goto err_handle; 1053 } 1054 1055 if (user_id) { 1056 entry->user_id = strdup(user_id); 1057 if (entry->user_id == NULL) { 1058 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1059 goto err_handle; 1060 } 1061 } 1062 1063 /* The first priority is the config_param, then we use the config_file */ 1064 if (config_param) { 1065 entry->config_param = bdev_rbd_dup_config(config_param); 1066 if (entry->config_param == NULL) { 1067 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1068 goto err_handle; 1069 } 1070 } else if (config_file) { 1071 entry->config_file = strdup(config_file); 1072 if (entry->config_file == NULL) { 1073 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1074 goto err_handle; 1075 } 1076 } 1077 1078 rc = rados_create(&entry->cluster, user_id); 1079 if (rc < 0) { 1080 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1081 goto err_handle; 1082 } 1083 1084 if (config_param) { 1085 const char *const *config_entry = config_param; 1086 while (*config_entry) { 1087 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1088 if (rc < 0) { 1089 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1090 rados_shutdown(entry->cluster); 1091 goto err_handle; 1092 } 1093 config_entry += 2; 1094 } 1095 } else { 1096 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1097 if (rc < 0) { 1098 SPDK_ERRLOG("Failed to read conf file\n"); 1099 rados_shutdown(entry->cluster); 1100 goto err_handle; 1101 } 1102 } 1103 1104 rc = rados_connect(entry->cluster); 1105 if (rc < 0) { 1106 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1107 rados_shutdown(entry->cluster); 1108 goto err_handle; 1109 } 1110 1111 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1112 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1113 1114 return 0; 1115 1116 err_handle: 1117 bdev_rbd_cluster_free(entry); 1118 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1119 return -1; 1120 } 1121 1122 int 1123 bdev_rbd_unregister_cluster(const char *name) 1124 { 1125 struct bdev_rbd_cluster *entry; 1126 int rc = 0; 1127 1128 if (name == NULL) { 1129 return -1; 1130 } 1131 1132 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1133 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1134 if (strcmp(name, entry->name) == 0) { 1135 if (entry->ref == 0) { 1136 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1137 rados_shutdown(entry->cluster); 1138 bdev_rbd_cluster_free(entry); 1139 } else { 1140 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1141 entry->name); 1142 rc = -1; 1143 } 1144 1145 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1146 return rc; 1147 } 1148 } 1149 1150 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1151 1152 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1153 1154 return -1; 1155 } 1156 1157 static void * 1158 _bdev_rbd_register_cluster(void *arg) 1159 { 1160 struct cluster_register_info *info = arg; 1161 void *ret = arg; 1162 int rc; 1163 1164 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1165 (const char *const *)info->config_param, (const char *)info->config_file); 1166 if (rc) { 1167 ret = NULL; 1168 } 1169 1170 return ret; 1171 } 1172 1173 int 1174 bdev_rbd_register_cluster(struct cluster_register_info *info) 1175 { 1176 assert(info != NULL); 1177 1178 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1179 * resource contention */ 1180 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1181 return -1; 1182 } 1183 1184 return 0; 1185 } 1186 1187 int 1188 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1189 const char *pool_name, 1190 const char *const *config, 1191 const char *rbd_name, 1192 uint32_t block_size, 1193 const char *cluster_name) 1194 { 1195 struct bdev_rbd *rbd; 1196 int ret; 1197 1198 if ((pool_name == NULL) || (rbd_name == NULL)) { 1199 return -EINVAL; 1200 } 1201 1202 rbd = calloc(1, sizeof(struct bdev_rbd)); 1203 if (rbd == NULL) { 1204 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1205 return -ENOMEM; 1206 } 1207 1208 ret = pthread_mutex_init(&rbd->mutex, NULL); 1209 if (ret) { 1210 SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name); 1211 free(rbd); 1212 return ret; 1213 } 1214 1215 rbd->pfd = -1; 1216 rbd->rbd_name = strdup(rbd_name); 1217 if (!rbd->rbd_name) { 1218 bdev_rbd_free(rbd); 1219 return -ENOMEM; 1220 } 1221 1222 if (user_id) { 1223 rbd->user_id = strdup(user_id); 1224 if (!rbd->user_id) { 1225 bdev_rbd_free(rbd); 1226 return -ENOMEM; 1227 } 1228 } 1229 1230 if (cluster_name) { 1231 rbd->cluster_name = strdup(cluster_name); 1232 if (!rbd->cluster_name) { 1233 bdev_rbd_free(rbd); 1234 return -ENOMEM; 1235 } 1236 } 1237 rbd->pool_name = strdup(pool_name); 1238 if (!rbd->pool_name) { 1239 bdev_rbd_free(rbd); 1240 return -ENOMEM; 1241 } 1242 1243 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1244 bdev_rbd_free(rbd); 1245 return -ENOMEM; 1246 } 1247 1248 ret = bdev_rbd_init(rbd); 1249 if (ret < 0) { 1250 bdev_rbd_free(rbd); 1251 SPDK_ERRLOG("Failed to init rbd device\n"); 1252 return ret; 1253 } 1254 1255 if (name) { 1256 rbd->disk.name = strdup(name); 1257 } else { 1258 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1259 } 1260 if (!rbd->disk.name) { 1261 bdev_rbd_free(rbd); 1262 return -ENOMEM; 1263 } 1264 rbd->disk.product_name = "Ceph Rbd Disk"; 1265 bdev_rbd_count++; 1266 1267 rbd->disk.write_cache = 0; 1268 rbd->disk.blocklen = block_size; 1269 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1270 rbd->disk.ctxt = rbd; 1271 rbd->disk.fn_table = &rbd_fn_table; 1272 rbd->disk.module = &rbd_if; 1273 1274 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1275 1276 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1277 bdev_rbd_destroy_cb, 1278 sizeof(struct bdev_rbd_io_channel), 1279 rbd_name); 1280 ret = spdk_bdev_register(&rbd->disk); 1281 if (ret) { 1282 spdk_io_device_unregister(rbd, NULL); 1283 bdev_rbd_free(rbd); 1284 return ret; 1285 } 1286 1287 *bdev = &(rbd->disk); 1288 1289 return ret; 1290 } 1291 1292 void 1293 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1294 { 1295 if (!bdev || bdev->module != &rbd_if) { 1296 cb_fn(cb_arg, -ENODEV); 1297 return; 1298 } 1299 1300 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 1301 } 1302 1303 int 1304 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 1305 { 1306 struct spdk_io_channel *ch; 1307 struct bdev_rbd_io_channel *rbd_io_ch; 1308 int rc; 1309 uint64_t new_size_in_byte; 1310 uint64_t current_size_in_mb; 1311 1312 if (bdev->module != &rbd_if) { 1313 return -EINVAL; 1314 } 1315 1316 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1317 if (current_size_in_mb > new_size_in_mb) { 1318 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 1319 return -EINVAL; 1320 } 1321 1322 ch = bdev_rbd_get_io_channel(bdev); 1323 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1324 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1325 1326 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1327 spdk_put_io_channel(ch); 1328 if (rc != 0) { 1329 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1330 return rc; 1331 } 1332 1333 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1334 if (rc != 0) { 1335 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1336 return rc; 1337 } 1338 1339 return rc; 1340 } 1341 1342 static int 1343 bdev_rbd_group_poll(void *arg) 1344 { 1345 struct bdev_rbd_group_channel *group_ch = arg; 1346 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1347 int num_events, i; 1348 1349 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 1350 1351 if (num_events <= 0) { 1352 return SPDK_POLLER_IDLE; 1353 } 1354 1355 for (i = 0; i < num_events; i++) { 1356 bdev_rbd_io_poll((struct bdev_rbd *)events[i].data.ptr); 1357 } 1358 1359 return SPDK_POLLER_BUSY; 1360 } 1361 1362 static int 1363 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1364 { 1365 struct bdev_rbd_group_channel *ch = ctx_buf; 1366 1367 ch->epoll_fd = epoll_create1(0); 1368 if (ch->epoll_fd < 0) { 1369 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 1370 return -1; 1371 } 1372 1373 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0); 1374 1375 return 0; 1376 } 1377 1378 static void 1379 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1380 { 1381 struct bdev_rbd_group_channel *ch = ctx_buf; 1382 1383 if (ch->epoll_fd >= 0) { 1384 close(ch->epoll_fd); 1385 } 1386 1387 spdk_poller_unregister(&ch->poller); 1388 } 1389 1390 static int 1391 bdev_rbd_library_init(void) 1392 { 1393 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1394 sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups"); 1395 1396 return 0; 1397 } 1398 1399 static void 1400 bdev_rbd_library_fini(void) 1401 { 1402 spdk_io_device_unregister(&rbd_if, NULL); 1403 } 1404 1405 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1406