1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "bdev_rbd.h" 9 10 #include <rbd/librbd.h> 11 #include <rados/librados.h> 12 13 #include "spdk/env.h" 14 #include "spdk/bdev.h" 15 #include "spdk/thread.h" 16 #include "spdk/json.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/likely.h" 20 21 #include "spdk/bdev_module.h" 22 #include "spdk/log.h" 23 24 static int bdev_rbd_count = 0; 25 26 struct bdev_rbd_pool_ctx { 27 rados_t *cluster_p; 28 char *name; 29 rados_ioctx_t io_ctx; 30 uint32_t ref; 31 STAILQ_ENTRY(bdev_rbd_pool_ctx) link; 32 }; 33 34 static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER( 35 g_map_bdev_rbd_pool_ctx); 36 37 struct bdev_rbd { 38 struct spdk_bdev disk; 39 char *rbd_name; 40 char *user_id; 41 char *pool_name; 42 char **config; 43 44 rados_t cluster; 45 rados_t *cluster_p; 46 char *cluster_name; 47 48 struct bdev_rbd_pool_ctx *ctx; 49 rbd_image_t image; 50 51 rbd_image_info_t info; 52 struct spdk_thread *destruct_td; 53 54 TAILQ_ENTRY(bdev_rbd) tailq; 55 struct spdk_poller *reset_timer; 56 struct spdk_bdev_io *reset_bdev_io; 57 }; 58 59 struct bdev_rbd_io_channel { 60 struct bdev_rbd *disk; 61 struct spdk_io_channel *group_ch; 62 }; 63 64 struct bdev_rbd_io { 65 struct spdk_thread *submit_td; 66 enum spdk_bdev_io_status status; 67 rbd_completion_t comp; 68 size_t total_len; 69 }; 70 71 struct bdev_rbd_cluster { 72 char *name; 73 char *user_id; 74 char **config_param; 75 char *config_file; 76 char *key_file; 77 char *core_mask; 78 rados_t cluster; 79 uint32_t ref; 80 STAILQ_ENTRY(bdev_rbd_cluster) link; 81 }; 82 83 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 84 g_map_bdev_rbd_cluster); 85 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 86 87 static void 88 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 89 { 90 assert(entry != NULL); 91 92 bdev_rbd_free_config(entry->config_param); 93 free(entry->config_file); 94 free(entry->key_file); 95 free(entry->user_id); 96 free(entry->name); 97 free(entry->core_mask); 98 free(entry); 99 } 100 101 static void 102 bdev_rbd_put_cluster(rados_t **cluster) 103 { 104 struct bdev_rbd_cluster *entry; 105 106 assert(cluster != NULL); 107 108 /* No need go through the map if *cluster equals to NULL */ 109 if (*cluster == NULL) { 110 return; 111 } 112 113 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 114 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 115 if (*cluster != &entry->cluster) { 116 continue; 117 } 118 119 assert(entry->ref > 0); 120 entry->ref--; 121 *cluster = NULL; 122 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 123 return; 124 } 125 126 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 127 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 128 } 129 130 static void 131 bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry) 132 { 133 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 134 135 assert(entry != NULL); 136 assert(entry->ref > 0); 137 entry->ref--; 138 if (entry->ref == 0) { 139 STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link); 140 rados_ioctx_destroy(entry->io_ctx); 141 free(entry->name); 142 free(entry); 143 } 144 } 145 146 static void 147 bdev_rbd_free(struct bdev_rbd *rbd) 148 { 149 if (!rbd) { 150 return; 151 } 152 153 if (rbd->image) { 154 rbd_flush(rbd->image); 155 rbd_close(rbd->image); 156 } 157 158 free(rbd->disk.name); 159 free(rbd->rbd_name); 160 free(rbd->user_id); 161 free(rbd->pool_name); 162 bdev_rbd_free_config(rbd->config); 163 164 /* When rbd is destructed by bdev_rbd_destruct, it will not enter here 165 * because the ctx will already freed by bdev_rbd_free_cb in async manner. 166 * This path only happens during the rbd initialization procedure of rbd */ 167 if (rbd->ctx) { 168 bdev_rbd_put_pool_ctx(rbd->ctx); 169 rbd->ctx = NULL; 170 } 171 172 if (rbd->cluster_name) { 173 bdev_rbd_put_cluster(&rbd->cluster_p); 174 free(rbd->cluster_name); 175 } else if (rbd->cluster) { 176 rados_shutdown(rbd->cluster); 177 } 178 179 free(rbd); 180 } 181 182 void 183 bdev_rbd_free_config(char **config) 184 { 185 char **entry; 186 187 if (config) { 188 for (entry = config; *entry; entry++) { 189 free(*entry); 190 } 191 free(config); 192 } 193 } 194 195 char ** 196 bdev_rbd_dup_config(const char *const *config) 197 { 198 size_t count; 199 char **copy; 200 201 if (!config) { 202 return NULL; 203 } 204 for (count = 0; config[count]; count++) {} 205 copy = calloc(count + 1, sizeof(*copy)); 206 if (!copy) { 207 return NULL; 208 } 209 for (count = 0; config[count]; count++) { 210 if (!(copy[count] = strdup(config[count]))) { 211 bdev_rbd_free_config(copy); 212 return NULL; 213 } 214 } 215 return copy; 216 } 217 218 static int 219 bdev_rados_cluster_init(const char *user_id, const char *const *config, 220 rados_t *cluster) 221 { 222 int ret; 223 224 ret = rados_create(cluster, user_id); 225 if (ret < 0) { 226 SPDK_ERRLOG("Failed to create rados_t struct\n"); 227 return -1; 228 } 229 230 if (config) { 231 const char *const *entry = config; 232 while (*entry) { 233 ret = rados_conf_set(*cluster, entry[0], entry[1]); 234 if (ret < 0) { 235 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 236 rados_shutdown(*cluster); 237 *cluster = NULL; 238 return -1; 239 } 240 entry += 2; 241 } 242 } else { 243 ret = rados_conf_read_file(*cluster, NULL); 244 if (ret < 0) { 245 SPDK_ERRLOG("Failed to read conf file\n"); 246 rados_shutdown(*cluster); 247 *cluster = NULL; 248 return -1; 249 } 250 } 251 252 ret = rados_connect(*cluster); 253 if (ret < 0) { 254 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 255 rados_shutdown(*cluster); 256 *cluster = NULL; 257 return -1; 258 } 259 260 return 0; 261 } 262 263 static int 264 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 265 { 266 struct bdev_rbd_cluster *entry; 267 268 if (cluster == NULL) { 269 SPDK_ERRLOG("cluster should not be NULL\n"); 270 return -1; 271 } 272 273 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 274 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 275 if (strcmp(cluster_name, entry->name) == 0) { 276 entry->ref++; 277 *cluster = &entry->cluster; 278 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 279 return 0; 280 } 281 } 282 283 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 284 return -1; 285 } 286 287 static int 288 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 289 { 290 int ret; 291 292 ret = bdev_rbd_get_cluster(cluster_name, cluster); 293 if (ret < 0) { 294 SPDK_ERRLOG("Failed to create rados_t struct\n"); 295 return -1; 296 } 297 298 return ret; 299 } 300 301 static void * 302 bdev_rbd_cluster_handle(void *arg) 303 { 304 void *ret = arg; 305 struct bdev_rbd *rbd = arg; 306 int rc; 307 308 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 309 &rbd->cluster); 310 if (rc < 0) { 311 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 312 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 313 ret = NULL; 314 } 315 316 return ret; 317 } 318 319 static int 320 bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name, struct bdev_rbd_pool_ctx **ctx) 321 { 322 struct bdev_rbd_pool_ctx *entry; 323 324 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 325 326 if (name == NULL || ctx == NULL) { 327 return -1; 328 } 329 330 STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) { 331 if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) { 332 entry->ref++; 333 *ctx = entry; 334 return 0; 335 } 336 } 337 338 entry = calloc(1, sizeof(*entry)); 339 if (!entry) { 340 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 341 return -1; 342 } 343 344 entry->name = strdup(name); 345 if (entry->name == NULL) { 346 SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry); 347 goto err_handle; 348 } 349 350 if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) { 351 goto err_handle1; 352 } 353 354 entry->cluster_p = cluster_p; 355 entry->ref = 1; 356 *ctx = entry; 357 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link); 358 359 return 0; 360 361 err_handle1: 362 free(entry->name); 363 err_handle: 364 free(entry); 365 366 return -1; 367 } 368 369 static void * 370 bdev_rbd_init_context(void *arg) 371 { 372 struct bdev_rbd *rbd = arg; 373 int rc; 374 375 if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->ctx) < 0) { 376 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 377 return NULL; 378 } 379 380 assert(rbd->ctx != NULL); 381 rc = rbd_open(rbd->ctx->io_ctx, rbd->rbd_name, &rbd->image, NULL); 382 if (rc < 0) { 383 SPDK_ERRLOG("Failed to open specified rbd device\n"); 384 return NULL; 385 } 386 387 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 388 if (rc < 0) { 389 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 390 return NULL; 391 } 392 393 return arg; 394 } 395 396 static int 397 bdev_rbd_init(struct bdev_rbd *rbd) 398 { 399 int ret = 0; 400 401 if (!rbd->cluster_name) { 402 rbd->cluster_p = &rbd->cluster; 403 /* Cluster should be created in non-SPDK thread to avoid conflict between 404 * Rados and SPDK thread */ 405 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 406 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 407 return -1; 408 } 409 } else { 410 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 411 if (ret < 0) { 412 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 413 rbd, rbd->cluster_name); 414 return -1; 415 } 416 } 417 418 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 419 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 420 return -1; 421 } 422 423 return ret; 424 } 425 426 static void 427 _bdev_rbd_io_complete(void *_rbd_io) 428 { 429 struct bdev_rbd_io *rbd_io = _rbd_io; 430 431 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 432 } 433 434 static void 435 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 436 { 437 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 438 struct spdk_thread *current_thread = spdk_get_thread(); 439 440 rbd_io->status = status; 441 assert(rbd_io->submit_td != NULL); 442 if (rbd_io->submit_td != current_thread) { 443 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 444 } else { 445 _bdev_rbd_io_complete(rbd_io); 446 } 447 } 448 449 static void 450 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 451 { 452 int io_status; 453 struct spdk_bdev_io *bdev_io; 454 struct bdev_rbd_io *rbd_io; 455 enum spdk_bdev_io_status bio_status; 456 457 bdev_io = rbd_aio_get_arg(cb); 458 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 459 io_status = rbd_aio_get_return_value(cb); 460 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 461 462 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 463 if ((int)rbd_io->total_len != io_status) { 464 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 465 } 466 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 467 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE && io_status == -EILSEQ) { 468 bio_status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 469 #endif 470 } else if (io_status != 0) { /* For others, 0 means success */ 471 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 472 } 473 474 rbd_aio_release(cb); 475 476 bdev_rbd_io_complete(bdev_io, bio_status); 477 } 478 479 static void 480 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 481 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 482 { 483 int ret; 484 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 485 rbd_image_t image = disk->image; 486 487 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 488 &rbd_io->comp); 489 if (ret < 0) { 490 goto err; 491 } 492 493 switch (bdev_io->type) { 494 case SPDK_BDEV_IO_TYPE_READ: 495 rbd_io->total_len = len; 496 if (spdk_likely(iovcnt == 1)) { 497 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, 498 rbd_io->comp); 499 } else { 500 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 501 } 502 break; 503 case SPDK_BDEV_IO_TYPE_WRITE: 504 if (spdk_likely(iovcnt == 1)) { 505 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, 506 rbd_io->comp); 507 } else { 508 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 509 } 510 break; 511 case SPDK_BDEV_IO_TYPE_UNMAP: 512 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 513 break; 514 case SPDK_BDEV_IO_TYPE_FLUSH: 515 ret = rbd_aio_flush(image, rbd_io->comp); 516 break; 517 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 518 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, 519 /* op_flags */ 0); 520 break; 521 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 522 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 523 ret = rbd_aio_compare_and_writev(image, offset, iov /* cmp */, iovcnt, 524 bdev_io->u.bdev.fused_iovs /* write */, 525 bdev_io->u.bdev.fused_iovcnt, 526 rbd_io->comp, NULL, 527 /* op_flags */ 0); 528 break; 529 #endif 530 default: 531 /* This should not happen. 532 * Function should only be called with supported io types in bdev_rbd_submit_request 533 */ 534 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 535 ret = -ENOTSUP; 536 break; 537 } 538 539 if (ret < 0) { 540 rbd_aio_release(rbd_io->comp); 541 goto err; 542 } 543 544 return; 545 546 err: 547 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 548 } 549 550 static void 551 bdev_rbd_start_aio(void *ctx) 552 { 553 struct spdk_bdev_io *bdev_io = ctx; 554 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 555 556 _bdev_rbd_start_aio(disk, 557 bdev_io, 558 bdev_io->u.bdev.iovs, 559 bdev_io->u.bdev.iovcnt, 560 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 561 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 562 } 563 564 static int bdev_rbd_library_init(void); 565 static void bdev_rbd_library_fini(void); 566 567 static int 568 bdev_rbd_get_ctx_size(void) 569 { 570 return sizeof(struct bdev_rbd_io); 571 } 572 573 static struct spdk_bdev_module rbd_if = { 574 .name = "rbd", 575 .module_init = bdev_rbd_library_init, 576 .module_fini = bdev_rbd_library_fini, 577 .get_ctx_size = bdev_rbd_get_ctx_size, 578 579 }; 580 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 581 582 static int bdev_rbd_reset_timer(void *arg); 583 584 static void 585 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd, 586 void *cb_arg, int rc) 587 { 588 struct bdev_rbd *disk = cb_arg; 589 enum spdk_bdev_io_status bio_status; 590 591 if (rc == 0 && current_qd > 0) { 592 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000); 593 return; 594 } 595 596 if (rc != 0) { 597 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 598 } else { 599 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 600 } 601 602 bdev_rbd_io_complete(disk->reset_bdev_io, bio_status); 603 disk->reset_bdev_io = NULL; 604 } 605 606 static int 607 bdev_rbd_reset_timer(void *arg) 608 { 609 struct bdev_rbd *disk = arg; 610 611 spdk_poller_unregister(&disk->reset_timer); 612 613 spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk); 614 615 return SPDK_POLLER_BUSY; 616 } 617 618 static void 619 bdev_rbd_reset(void *ctx) 620 { 621 struct spdk_bdev_io *bdev_io = ctx; 622 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 623 624 /* 625 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 626 * poller to wait for in-flight I/O to complete. 627 */ 628 assert(disk->reset_bdev_io == NULL); 629 disk->reset_bdev_io = bdev_io; 630 631 bdev_rbd_reset_timer(disk); 632 } 633 634 static void 635 _bdev_rbd_destruct_done(void *io_device) 636 { 637 struct bdev_rbd *rbd = io_device; 638 639 assert(rbd != NULL); 640 641 spdk_bdev_destruct_done(&rbd->disk, 0); 642 bdev_rbd_free(rbd); 643 } 644 645 static void 646 bdev_rbd_free_cb(void *io_device) 647 { 648 struct bdev_rbd *rbd = io_device; 649 650 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 651 652 /* free the ctx */ 653 if (rbd->ctx) { 654 bdev_rbd_put_pool_ctx(rbd->ctx); 655 rbd->ctx = NULL; 656 } 657 658 /* The io device has been unregistered. Send a message back to the 659 * original thread that started the destruct operation, so that the 660 * bdev unregister callback is invoked on the same thread that started 661 * this whole process. 662 */ 663 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 664 } 665 666 static void 667 _bdev_rbd_destruct(void *ctx) 668 { 669 struct bdev_rbd *rbd = ctx; 670 671 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 672 } 673 674 static int 675 bdev_rbd_destruct(void *ctx) 676 { 677 struct bdev_rbd *rbd = ctx; 678 679 /* Start the destruct operation on the rbd bdev's 680 * main thread. This guarantees it will only start 681 * executing after any messages related to channel 682 * deletions have finished completing. *Always* 683 * send a message, even if this function gets called 684 * from the main thread, in case there are pending 685 * channel delete messages in flight to this thread. 686 */ 687 assert(rbd->destruct_td == NULL); 688 rbd->destruct_td = spdk_get_thread(); 689 spdk_thread_send_msg(spdk_thread_get_app_thread(), _bdev_rbd_destruct, rbd); 690 691 /* Return 1 to indicate the destruct path is asynchronous. */ 692 return 1; 693 } 694 695 static void 696 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 697 bool success) 698 { 699 if (!success) { 700 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 701 return; 702 } 703 704 bdev_rbd_start_aio(bdev_io); 705 } 706 707 static void 708 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 709 { 710 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 711 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 712 713 rbd_io->submit_td = submit_td; 714 switch (bdev_io->type) { 715 case SPDK_BDEV_IO_TYPE_READ: 716 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 717 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 718 break; 719 720 case SPDK_BDEV_IO_TYPE_WRITE: 721 case SPDK_BDEV_IO_TYPE_UNMAP: 722 case SPDK_BDEV_IO_TYPE_FLUSH: 723 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 724 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 725 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 726 #endif 727 bdev_rbd_start_aio(bdev_io); 728 break; 729 730 case SPDK_BDEV_IO_TYPE_RESET: 731 spdk_thread_exec_msg(spdk_thread_get_app_thread(), bdev_rbd_reset, bdev_io); 732 break; 733 734 default: 735 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 736 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 737 break; 738 } 739 } 740 741 static bool 742 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 743 { 744 switch (io_type) { 745 case SPDK_BDEV_IO_TYPE_READ: 746 case SPDK_BDEV_IO_TYPE_WRITE: 747 case SPDK_BDEV_IO_TYPE_UNMAP: 748 case SPDK_BDEV_IO_TYPE_FLUSH: 749 case SPDK_BDEV_IO_TYPE_RESET: 750 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 751 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 752 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 753 #endif 754 return true; 755 756 default: 757 return false; 758 } 759 } 760 761 static int 762 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 763 { 764 struct bdev_rbd_io_channel *ch = ctx_buf; 765 struct bdev_rbd *disk = io_device; 766 767 ch->disk = disk; 768 ch->group_ch = spdk_get_io_channel(&rbd_if); 769 assert(ch->group_ch != NULL); 770 771 return 0; 772 } 773 774 static void 775 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 776 { 777 struct bdev_rbd_io_channel *ch = ctx_buf; 778 779 spdk_put_io_channel(ch->group_ch); 780 } 781 782 static struct spdk_io_channel * 783 bdev_rbd_get_io_channel(void *ctx) 784 { 785 struct bdev_rbd *rbd_bdev = ctx; 786 787 return spdk_get_io_channel(rbd_bdev); 788 } 789 790 static void 791 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 792 { 793 struct bdev_rbd_cluster *entry; 794 795 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 796 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 797 if (strcmp(cluster_name, entry->name)) { 798 continue; 799 } 800 if (entry->user_id) { 801 spdk_json_write_named_string(w, "user_id", entry->user_id); 802 } 803 804 if (entry->config_param) { 805 char **config_entry = entry->config_param; 806 807 spdk_json_write_named_object_begin(w, "config_param"); 808 while (*config_entry) { 809 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 810 config_entry += 2; 811 } 812 spdk_json_write_object_end(w); 813 } 814 if (entry->config_file) { 815 spdk_json_write_named_string(w, "config_file", entry->config_file); 816 } 817 if (entry->key_file) { 818 spdk_json_write_named_string(w, "key_file", entry->key_file); 819 } 820 821 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 822 return; 823 } 824 825 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 826 } 827 828 static int 829 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 830 { 831 struct bdev_rbd *rbd_bdev = ctx; 832 833 spdk_json_write_named_object_begin(w, "rbd"); 834 835 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 836 837 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 838 839 if (rbd_bdev->cluster_name) { 840 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 841 goto end; 842 } 843 844 if (rbd_bdev->user_id) { 845 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 846 } 847 848 if (rbd_bdev->config) { 849 char **entry = rbd_bdev->config; 850 851 spdk_json_write_named_object_begin(w, "config"); 852 while (*entry) { 853 spdk_json_write_named_string(w, entry[0], entry[1]); 854 entry += 2; 855 } 856 spdk_json_write_object_end(w); 857 } 858 859 end: 860 spdk_json_write_object_end(w); 861 862 return 0; 863 } 864 865 static void 866 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 867 { 868 struct bdev_rbd *rbd = bdev->ctxt; 869 char uuid_str[SPDK_UUID_STRING_LEN]; 870 871 spdk_json_write_object_begin(w); 872 873 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 874 875 spdk_json_write_named_object_begin(w, "params"); 876 spdk_json_write_named_string(w, "name", bdev->name); 877 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 878 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 879 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 880 if (rbd->user_id) { 881 spdk_json_write_named_string(w, "user_id", rbd->user_id); 882 } 883 884 if (rbd->config) { 885 char **entry = rbd->config; 886 887 spdk_json_write_named_object_begin(w, "config"); 888 while (*entry) { 889 spdk_json_write_named_string(w, entry[0], entry[1]); 890 entry += 2; 891 } 892 spdk_json_write_object_end(w); 893 } 894 895 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 896 spdk_json_write_named_string(w, "uuid", uuid_str); 897 898 spdk_json_write_object_end(w); 899 900 spdk_json_write_object_end(w); 901 } 902 903 static void 904 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 905 { 906 assert(entry != NULL); 907 908 spdk_json_write_object_begin(w); 909 spdk_json_write_named_string(w, "cluster_name", entry->name); 910 911 if (entry->user_id) { 912 spdk_json_write_named_string(w, "user_id", entry->user_id); 913 } 914 915 if (entry->config_param) { 916 char **config_entry = entry->config_param; 917 918 spdk_json_write_named_object_begin(w, "config_param"); 919 while (*config_entry) { 920 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 921 config_entry += 2; 922 } 923 spdk_json_write_object_end(w); 924 } 925 if (entry->config_file) { 926 spdk_json_write_named_string(w, "config_file", entry->config_file); 927 } 928 if (entry->key_file) { 929 spdk_json_write_named_string(w, "key_file", entry->key_file); 930 } 931 932 if (entry->core_mask) { 933 spdk_json_write_named_string(w, "core_mask", entry->core_mask); 934 } 935 936 spdk_json_write_object_end(w); 937 } 938 939 int 940 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 941 { 942 struct bdev_rbd_cluster *entry; 943 struct spdk_json_write_ctx *w; 944 945 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 946 947 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 948 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 949 return -ENOENT; 950 } 951 952 /* If cluster name is provided */ 953 if (name) { 954 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 955 if (strcmp(name, entry->name) == 0) { 956 w = spdk_jsonrpc_begin_result(request); 957 dump_single_cluster_entry(entry, w); 958 spdk_jsonrpc_end_result(request, w); 959 960 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 961 return 0; 962 } 963 } 964 965 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 966 return -ENOENT; 967 } 968 969 w = spdk_jsonrpc_begin_result(request); 970 spdk_json_write_array_begin(w); 971 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 972 dump_single_cluster_entry(entry, w); 973 } 974 spdk_json_write_array_end(w); 975 spdk_jsonrpc_end_result(request, w); 976 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 977 978 return 0; 979 } 980 981 static const struct spdk_bdev_fn_table rbd_fn_table = { 982 .destruct = bdev_rbd_destruct, 983 .submit_request = bdev_rbd_submit_request, 984 .io_type_supported = bdev_rbd_io_type_supported, 985 .get_io_channel = bdev_rbd_get_io_channel, 986 .dump_info_json = bdev_rbd_dump_info_json, 987 .write_config_json = bdev_rbd_write_config_json, 988 }; 989 990 static int 991 rbd_thread_set_cpumask(struct spdk_cpuset *set) 992 { 993 #ifdef __linux__ 994 uint32_t lcore; 995 cpu_set_t mask; 996 997 assert(set != NULL); 998 CPU_ZERO(&mask); 999 1000 /* get the core id on current spdk_cpuset and set to cpu_set_t */ 1001 for (lcore = 0; lcore < SPDK_CPUSET_SIZE; lcore++) { 1002 if (spdk_cpuset_get_cpu(set, lcore)) { 1003 CPU_SET(lcore, &mask); 1004 } 1005 } 1006 1007 /* change current thread core mask */ 1008 if (sched_setaffinity(0, sizeof(mask), &mask) < 0) { 1009 SPDK_ERRLOG("Set non SPDK thread cpu mask error (errno=%d)\n", errno); 1010 return -1; 1011 } 1012 1013 return 0; 1014 #else 1015 SPDK_ERRLOG("SPDK non spdk thread cpumask setup supports only Linux platform now.\n"); 1016 return -ENOTSUP; 1017 #endif 1018 } 1019 1020 1021 static int 1022 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 1023 const char *config_file, const char *key_file, const char *core_mask) 1024 { 1025 struct bdev_rbd_cluster *entry; 1026 struct spdk_cpuset rbd_core_mask = {}; 1027 int rc; 1028 1029 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1030 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1031 if (strcmp(name, entry->name) == 0) { 1032 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 1033 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1034 return -1; 1035 } 1036 } 1037 1038 entry = calloc(1, sizeof(*entry)); 1039 if (!entry) { 1040 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 1041 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1042 return -1; 1043 } 1044 1045 entry->name = strdup(name); 1046 if (entry->name == NULL) { 1047 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1048 goto err_handle; 1049 } 1050 1051 if (user_id) { 1052 entry->user_id = strdup(user_id); 1053 if (entry->user_id == NULL) { 1054 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1055 goto err_handle; 1056 } 1057 } 1058 1059 /* Support specify config_param or config_file separately, or both of them. */ 1060 if (config_param) { 1061 entry->config_param = bdev_rbd_dup_config(config_param); 1062 if (entry->config_param == NULL) { 1063 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1064 goto err_handle; 1065 } 1066 } 1067 1068 if (config_file) { 1069 entry->config_file = strdup(config_file); 1070 if (entry->config_file == NULL) { 1071 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1072 goto err_handle; 1073 } 1074 } 1075 1076 if (key_file) { 1077 entry->key_file = strdup(key_file); 1078 if (entry->key_file == NULL) { 1079 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 1080 goto err_handle; 1081 } 1082 } 1083 1084 if (core_mask) { 1085 entry->core_mask = strdup(core_mask); 1086 if (entry->core_mask == NULL) { 1087 SPDK_ERRLOG("Core_mask=%s allocation failed on entry = %p\n", core_mask, entry); 1088 goto err_handle; 1089 } 1090 1091 if (spdk_cpuset_parse(&rbd_core_mask, entry->core_mask) < 0) { 1092 SPDK_ERRLOG("Invalid cpumask=%s on entry = %p\n", entry->core_mask, entry); 1093 goto err_handle; 1094 } 1095 1096 if (rbd_thread_set_cpumask(&rbd_core_mask) < 0) { 1097 SPDK_ERRLOG("Failed to change rbd threads to core_mask %s on entry = %p\n", core_mask, entry); 1098 goto err_handle; 1099 } 1100 } 1101 1102 1103 /* If rbd thread core mask is given, rados_create() must execute with 1104 * the affinity set by rbd_thread_set_cpumask(). The affinity set 1105 * by rbd_thread_set_cpumask() will be reverted once rbd_register_cluster() returns 1106 * and when we leave the spdk_call_unaffinitized context. */ 1107 rc = rados_create(&entry->cluster, user_id); 1108 if (rc < 0) { 1109 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1110 goto err_handle; 1111 } 1112 1113 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 1114 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1115 if (entry->config_file && rc < 0) { 1116 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 1117 rados_shutdown(entry->cluster); 1118 goto err_handle; 1119 } 1120 1121 if (config_param) { 1122 const char *const *config_entry = config_param; 1123 while (*config_entry) { 1124 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1125 if (rc < 0) { 1126 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1127 rados_shutdown(entry->cluster); 1128 goto err_handle; 1129 } 1130 config_entry += 2; 1131 } 1132 } 1133 1134 if (key_file) { 1135 rc = rados_conf_set(entry->cluster, "keyring", key_file); 1136 if (rc < 0) { 1137 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 1138 rados_shutdown(entry->cluster); 1139 goto err_handle; 1140 } 1141 } 1142 1143 rc = rados_connect(entry->cluster); 1144 if (rc < 0) { 1145 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1146 rados_shutdown(entry->cluster); 1147 goto err_handle; 1148 } 1149 1150 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1151 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1152 1153 return 0; 1154 1155 err_handle: 1156 bdev_rbd_cluster_free(entry); 1157 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1158 return -1; 1159 } 1160 1161 int 1162 bdev_rbd_unregister_cluster(const char *name) 1163 { 1164 struct bdev_rbd_cluster *entry; 1165 int rc = 0; 1166 1167 if (name == NULL) { 1168 return -1; 1169 } 1170 1171 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1172 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1173 if (strcmp(name, entry->name) == 0) { 1174 if (entry->ref == 0) { 1175 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1176 rados_shutdown(entry->cluster); 1177 bdev_rbd_cluster_free(entry); 1178 } else { 1179 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1180 entry->name); 1181 rc = -1; 1182 } 1183 1184 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1185 return rc; 1186 } 1187 } 1188 1189 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1190 1191 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1192 1193 return -1; 1194 } 1195 1196 static void * 1197 _bdev_rbd_register_cluster(void *arg) 1198 { 1199 struct cluster_register_info *info = arg; 1200 void *ret = arg; 1201 int rc; 1202 1203 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1204 (const char *const *)info->config_param, (const char *)info->config_file, 1205 (const char *)info->key_file, info->core_mask); 1206 if (rc) { 1207 ret = NULL; 1208 } 1209 1210 return ret; 1211 } 1212 1213 int 1214 bdev_rbd_register_cluster(struct cluster_register_info *info) 1215 { 1216 assert(info != NULL); 1217 1218 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1219 * resource contention */ 1220 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1221 return -1; 1222 } 1223 1224 return 0; 1225 } 1226 1227 int 1228 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1229 const char *pool_name, 1230 const char *const *config, 1231 const char *rbd_name, 1232 uint32_t block_size, 1233 const char *cluster_name, 1234 const struct spdk_uuid *uuid) 1235 { 1236 struct bdev_rbd *rbd; 1237 int ret; 1238 1239 if ((pool_name == NULL) || (rbd_name == NULL)) { 1240 return -EINVAL; 1241 } 1242 1243 rbd = calloc(1, sizeof(struct bdev_rbd)); 1244 if (rbd == NULL) { 1245 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1246 return -ENOMEM; 1247 } 1248 1249 rbd->rbd_name = strdup(rbd_name); 1250 if (!rbd->rbd_name) { 1251 bdev_rbd_free(rbd); 1252 return -ENOMEM; 1253 } 1254 1255 if (user_id) { 1256 rbd->user_id = strdup(user_id); 1257 if (!rbd->user_id) { 1258 bdev_rbd_free(rbd); 1259 return -ENOMEM; 1260 } 1261 } 1262 1263 if (cluster_name) { 1264 rbd->cluster_name = strdup(cluster_name); 1265 if (!rbd->cluster_name) { 1266 bdev_rbd_free(rbd); 1267 return -ENOMEM; 1268 } 1269 } 1270 rbd->pool_name = strdup(pool_name); 1271 if (!rbd->pool_name) { 1272 bdev_rbd_free(rbd); 1273 return -ENOMEM; 1274 } 1275 1276 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1277 bdev_rbd_free(rbd); 1278 return -ENOMEM; 1279 } 1280 1281 ret = bdev_rbd_init(rbd); 1282 if (ret < 0) { 1283 bdev_rbd_free(rbd); 1284 SPDK_ERRLOG("Failed to init rbd device\n"); 1285 return ret; 1286 } 1287 1288 if (uuid) { 1289 rbd->disk.uuid = *uuid; 1290 } 1291 1292 if (name) { 1293 rbd->disk.name = strdup(name); 1294 } else { 1295 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1296 } 1297 if (!rbd->disk.name) { 1298 bdev_rbd_free(rbd); 1299 return -ENOMEM; 1300 } 1301 rbd->disk.product_name = "Ceph Rbd Disk"; 1302 bdev_rbd_count++; 1303 1304 rbd->disk.write_cache = 0; 1305 rbd->disk.blocklen = block_size; 1306 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1307 rbd->disk.ctxt = rbd; 1308 rbd->disk.fn_table = &rbd_fn_table; 1309 rbd->disk.module = &rbd_if; 1310 1311 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1312 1313 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1314 bdev_rbd_destroy_cb, 1315 sizeof(struct bdev_rbd_io_channel), 1316 rbd_name); 1317 ret = spdk_bdev_register(&rbd->disk); 1318 if (ret) { 1319 spdk_io_device_unregister(rbd, NULL); 1320 bdev_rbd_free(rbd); 1321 return ret; 1322 } 1323 1324 *bdev = &(rbd->disk); 1325 1326 return ret; 1327 } 1328 1329 void 1330 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1331 { 1332 int rc; 1333 1334 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1335 if (rc != 0) { 1336 cb_fn(cb_arg, rc); 1337 } 1338 } 1339 1340 static void 1341 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1342 { 1343 } 1344 1345 int 1346 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1347 { 1348 struct spdk_bdev_desc *desc; 1349 struct spdk_bdev *bdev; 1350 struct spdk_io_channel *ch; 1351 struct bdev_rbd_io_channel *rbd_io_ch; 1352 int rc = 0; 1353 uint64_t new_size_in_byte; 1354 uint64_t current_size_in_mb; 1355 1356 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1357 if (rc != 0) { 1358 return rc; 1359 } 1360 1361 bdev = spdk_bdev_desc_get_bdev(desc); 1362 1363 if (bdev->module != &rbd_if) { 1364 rc = -EINVAL; 1365 goto exit; 1366 } 1367 1368 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1369 if (current_size_in_mb > new_size_in_mb) { 1370 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1371 rc = -EINVAL; 1372 goto exit; 1373 } 1374 1375 ch = bdev_rbd_get_io_channel(bdev); 1376 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1377 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1378 1379 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1380 spdk_put_io_channel(ch); 1381 if (rc != 0) { 1382 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1383 goto exit; 1384 } 1385 1386 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1387 if (rc != 0) { 1388 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1389 } 1390 1391 exit: 1392 spdk_bdev_close(desc); 1393 return rc; 1394 } 1395 1396 static int 1397 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1398 { 1399 return 0; 1400 } 1401 1402 static void 1403 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1404 { 1405 } 1406 1407 static int 1408 bdev_rbd_library_init(void) 1409 { 1410 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1411 0, "bdev_rbd_poll_groups"); 1412 return 0; 1413 } 1414 1415 static void 1416 bdev_rbd_library_fini(void) 1417 { 1418 spdk_io_device_unregister(&rbd_if, NULL); 1419 } 1420 1421 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1422