1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "bdev_rbd.h" 9 10 #include <rbd/librbd.h> 11 #include <rados/librados.h> 12 13 #include "spdk/env.h" 14 #include "spdk/bdev.h" 15 #include "spdk/thread.h" 16 #include "spdk/json.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/likely.h" 20 21 #include "spdk/bdev_module.h" 22 #include "spdk/log.h" 23 24 static int bdev_rbd_count = 0; 25 26 struct bdev_rbd_pool_ctx { 27 rados_t *cluster_p; 28 char *name; 29 rados_ioctx_t io_ctx; 30 uint32_t ref; 31 STAILQ_ENTRY(bdev_rbd_pool_ctx) link; 32 }; 33 34 static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER( 35 g_map_bdev_rbd_pool_ctx); 36 37 struct bdev_rbd { 38 struct spdk_bdev disk; 39 char *rbd_name; 40 char *user_id; 41 char *pool_name; 42 char **config; 43 44 rados_t cluster; 45 rados_t *cluster_p; 46 char *cluster_name; 47 48 union rbd_ctx { 49 rados_ioctx_t io_ctx; 50 struct bdev_rbd_pool_ctx *ctx; 51 } rados_ctx; 52 53 rbd_image_t image; 54 55 rbd_image_info_t info; 56 struct spdk_thread *destruct_td; 57 58 TAILQ_ENTRY(bdev_rbd) tailq; 59 struct spdk_poller *reset_timer; 60 struct spdk_bdev_io *reset_bdev_io; 61 62 uint64_t rbd_watch_handle; 63 }; 64 65 struct bdev_rbd_io_channel { 66 struct bdev_rbd *disk; 67 struct spdk_io_channel *group_ch; 68 }; 69 70 struct bdev_rbd_io { 71 struct spdk_thread *submit_td; 72 enum spdk_bdev_io_status status; 73 rbd_completion_t comp; 74 size_t total_len; 75 }; 76 77 struct bdev_rbd_cluster { 78 char *name; 79 char *user_id; 80 char **config_param; 81 char *config_file; 82 char *key_file; 83 char *core_mask; 84 rados_t cluster; 85 uint32_t ref; 86 STAILQ_ENTRY(bdev_rbd_cluster) link; 87 }; 88 89 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 90 g_map_bdev_rbd_cluster); 91 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 92 93 static struct spdk_io_channel *bdev_rbd_get_io_channel(void *ctx); 94 95 static void 96 _rbd_update_callback(void *arg) 97 { 98 struct bdev_rbd *rbd = arg; 99 uint64_t current_size_in_bytes = 0; 100 int rc; 101 102 rc = rbd_get_size(rbd->image, ¤t_size_in_bytes); 103 if (rc < 0) { 104 SPDK_ERRLOG("Failed getting size %d\n", rc); 105 return; 106 } 107 108 rc = spdk_bdev_notify_blockcnt_change(&rbd->disk, current_size_in_bytes / rbd->disk.blocklen); 109 if (rc != 0) { 110 SPDK_ERRLOG("failed to notify block cnt change.\n"); 111 } 112 } 113 114 static void 115 rbd_update_callback(void *arg) 116 { 117 spdk_thread_send_msg(spdk_thread_get_app_thread(), _rbd_update_callback, arg); 118 } 119 120 static void 121 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 122 { 123 assert(entry != NULL); 124 125 bdev_rbd_free_config(entry->config_param); 126 free(entry->config_file); 127 free(entry->key_file); 128 free(entry->user_id); 129 free(entry->name); 130 free(entry->core_mask); 131 free(entry); 132 } 133 134 static void 135 bdev_rbd_put_cluster(rados_t **cluster) 136 { 137 struct bdev_rbd_cluster *entry; 138 139 assert(cluster != NULL); 140 141 /* No need go through the map if *cluster equals to NULL */ 142 if (*cluster == NULL) { 143 return; 144 } 145 146 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 147 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 148 if (*cluster != &entry->cluster) { 149 continue; 150 } 151 152 assert(entry->ref > 0); 153 entry->ref--; 154 *cluster = NULL; 155 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 156 return; 157 } 158 159 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 160 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 161 } 162 163 static void 164 bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry) 165 { 166 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 167 168 assert(entry != NULL); 169 assert(entry->ref > 0); 170 entry->ref--; 171 if (entry->ref == 0) { 172 STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link); 173 rados_ioctx_destroy(entry->io_ctx); 174 free(entry->name); 175 free(entry); 176 } 177 } 178 179 static void 180 bdev_rbd_free(struct bdev_rbd *rbd) 181 { 182 if (!rbd) { 183 return; 184 } 185 186 if (rbd->image) { 187 rbd_update_unwatch(rbd->image, rbd->rbd_watch_handle); 188 rbd_flush(rbd->image); 189 rbd_close(rbd->image); 190 } 191 192 free(rbd->disk.name); 193 free(rbd->rbd_name); 194 free(rbd->user_id); 195 free(rbd->pool_name); 196 bdev_rbd_free_config(rbd->config); 197 198 if (rbd->cluster_name) { 199 /* When rbd is destructed by bdev_rbd_destruct, it will not enter here 200 * because the ctx will already freed by bdev_rbd_free_cb in async manner. 201 * This path only happens during the rbd initialization procedure of rbd */ 202 if (rbd->rados_ctx.ctx) { 203 bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx); 204 rbd->rados_ctx.ctx = NULL; 205 } 206 207 bdev_rbd_put_cluster(&rbd->cluster_p); 208 free(rbd->cluster_name); 209 } else if (rbd->cluster) { 210 if (rbd->rados_ctx.io_ctx) { 211 rados_ioctx_destroy(rbd->rados_ctx.io_ctx); 212 } 213 rados_shutdown(rbd->cluster); 214 } 215 216 free(rbd); 217 } 218 219 void 220 bdev_rbd_free_config(char **config) 221 { 222 char **entry; 223 224 if (config) { 225 for (entry = config; *entry; entry++) { 226 free(*entry); 227 } 228 free(config); 229 } 230 } 231 232 char ** 233 bdev_rbd_dup_config(const char *const *config) 234 { 235 size_t count; 236 char **copy; 237 238 if (!config) { 239 return NULL; 240 } 241 for (count = 0; config[count]; count++) {} 242 copy = calloc(count + 1, sizeof(*copy)); 243 if (!copy) { 244 return NULL; 245 } 246 for (count = 0; config[count]; count++) { 247 if (!(copy[count] = strdup(config[count]))) { 248 bdev_rbd_free_config(copy); 249 return NULL; 250 } 251 } 252 return copy; 253 } 254 255 static int 256 bdev_rados_cluster_init(const char *user_id, const char *const *config, 257 rados_t *cluster) 258 { 259 int ret; 260 261 ret = rados_create(cluster, user_id); 262 if (ret < 0) { 263 SPDK_ERRLOG("Failed to create rados_t struct\n"); 264 return -1; 265 } 266 267 if (config) { 268 const char *const *entry = config; 269 while (*entry) { 270 ret = rados_conf_set(*cluster, entry[0], entry[1]); 271 if (ret < 0) { 272 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 273 rados_shutdown(*cluster); 274 *cluster = NULL; 275 return -1; 276 } 277 entry += 2; 278 } 279 } else { 280 ret = rados_conf_read_file(*cluster, NULL); 281 if (ret < 0) { 282 SPDK_ERRLOG("Failed to read conf file\n"); 283 rados_shutdown(*cluster); 284 *cluster = NULL; 285 return -1; 286 } 287 } 288 289 ret = rados_connect(*cluster); 290 if (ret < 0) { 291 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 292 rados_shutdown(*cluster); 293 *cluster = NULL; 294 return -1; 295 } 296 297 return 0; 298 } 299 300 static int 301 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 302 { 303 struct bdev_rbd_cluster *entry; 304 305 if (cluster == NULL) { 306 SPDK_ERRLOG("cluster should not be NULL\n"); 307 return -1; 308 } 309 310 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 311 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 312 if (strcmp(cluster_name, entry->name) == 0) { 313 entry->ref++; 314 *cluster = &entry->cluster; 315 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 316 return 0; 317 } 318 } 319 320 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 321 return -1; 322 } 323 324 static int 325 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 326 { 327 int ret; 328 329 ret = bdev_rbd_get_cluster(cluster_name, cluster); 330 if (ret < 0) { 331 SPDK_ERRLOG("Failed to create rados_t struct\n"); 332 return -1; 333 } 334 335 return ret; 336 } 337 338 static void * 339 bdev_rbd_cluster_handle(void *arg) 340 { 341 void *ret = arg; 342 struct bdev_rbd *rbd = arg; 343 int rc; 344 345 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 346 &rbd->cluster); 347 if (rc < 0) { 348 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 349 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 350 ret = NULL; 351 } 352 353 return ret; 354 } 355 356 static int 357 bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name, struct bdev_rbd_pool_ctx **ctx) 358 { 359 struct bdev_rbd_pool_ctx *entry; 360 361 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 362 363 if (name == NULL || ctx == NULL) { 364 return -1; 365 } 366 367 STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) { 368 if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) { 369 entry->ref++; 370 *ctx = entry; 371 return 0; 372 } 373 } 374 375 entry = calloc(1, sizeof(*entry)); 376 if (!entry) { 377 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 378 return -1; 379 } 380 381 entry->name = strdup(name); 382 if (entry->name == NULL) { 383 SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry); 384 goto err_handle; 385 } 386 387 if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) { 388 goto err_handle1; 389 } 390 391 entry->cluster_p = cluster_p; 392 entry->ref = 1; 393 *ctx = entry; 394 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link); 395 396 return 0; 397 398 err_handle1: 399 free(entry->name); 400 err_handle: 401 free(entry); 402 403 return -1; 404 } 405 406 static void * 407 bdev_rbd_init_context(void *arg) 408 { 409 struct bdev_rbd *rbd = arg; 410 int rc; 411 rados_ioctx_t *io_ctx = NULL; 412 413 if (rbd->cluster_name) { 414 if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->rados_ctx.ctx) < 0) { 415 SPDK_ERRLOG("Failed to create ioctx on rbd=%p with cluster_name=%s\n", 416 rbd, rbd->cluster_name); 417 return NULL; 418 } 419 io_ctx = &rbd->rados_ctx.ctx->io_ctx; 420 } else { 421 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->rados_ctx.io_ctx) < 0) { 422 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 423 return NULL; 424 } 425 io_ctx = &rbd->rados_ctx.io_ctx; 426 } 427 428 assert(io_ctx != NULL); 429 rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL); 430 if (rc < 0) { 431 SPDK_ERRLOG("Failed to open specified rbd device\n"); 432 return NULL; 433 } 434 435 rc = rbd_update_watch(rbd->image, &rbd->rbd_watch_handle, rbd_update_callback, (void *)rbd); 436 if (rc < 0) { 437 SPDK_ERRLOG("Failed to set up watch %d\n", rc); 438 } 439 440 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 441 if (rc < 0) { 442 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 443 return NULL; 444 } 445 446 return arg; 447 } 448 449 static int 450 bdev_rbd_init(struct bdev_rbd *rbd) 451 { 452 int ret = 0; 453 454 if (!rbd->cluster_name) { 455 rbd->cluster_p = &rbd->cluster; 456 /* Cluster should be created in non-SPDK thread to avoid conflict between 457 * Rados and SPDK thread */ 458 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 459 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 460 return -1; 461 } 462 } else { 463 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 464 if (ret < 0) { 465 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 466 rbd, rbd->cluster_name); 467 return -1; 468 } 469 } 470 471 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 472 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 473 return -1; 474 } 475 476 return ret; 477 } 478 479 static void 480 _bdev_rbd_io_complete(void *_rbd_io) 481 { 482 struct bdev_rbd_io *rbd_io = _rbd_io; 483 484 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 485 } 486 487 static void 488 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 489 { 490 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 491 struct spdk_thread *current_thread = spdk_get_thread(); 492 493 rbd_io->status = status; 494 assert(rbd_io->submit_td != NULL); 495 if (rbd_io->submit_td != current_thread) { 496 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 497 } else { 498 _bdev_rbd_io_complete(rbd_io); 499 } 500 } 501 502 static void 503 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 504 { 505 int io_status; 506 struct spdk_bdev_io *bdev_io; 507 struct bdev_rbd_io *rbd_io; 508 enum spdk_bdev_io_status bio_status; 509 510 bdev_io = rbd_aio_get_arg(cb); 511 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 512 io_status = rbd_aio_get_return_value(cb); 513 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 514 515 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 516 if ((int)rbd_io->total_len != io_status) { 517 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 518 } 519 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 520 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE && io_status == -EILSEQ) { 521 bio_status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 522 #endif 523 } else if (io_status != 0) { /* For others, 0 means success */ 524 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 525 } 526 527 rbd_aio_release(cb); 528 529 bdev_rbd_io_complete(bdev_io, bio_status); 530 } 531 532 static void 533 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 534 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 535 { 536 int ret; 537 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 538 rbd_image_t image = disk->image; 539 540 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 541 &rbd_io->comp); 542 if (ret < 0) { 543 goto err; 544 } 545 546 switch (bdev_io->type) { 547 case SPDK_BDEV_IO_TYPE_READ: 548 rbd_io->total_len = len; 549 if (spdk_likely(iovcnt == 1)) { 550 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, 551 rbd_io->comp); 552 } else { 553 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 554 } 555 break; 556 case SPDK_BDEV_IO_TYPE_WRITE: 557 if (spdk_likely(iovcnt == 1)) { 558 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, 559 rbd_io->comp); 560 } else { 561 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 562 } 563 break; 564 case SPDK_BDEV_IO_TYPE_UNMAP: 565 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 566 break; 567 case SPDK_BDEV_IO_TYPE_FLUSH: 568 ret = rbd_aio_flush(image, rbd_io->comp); 569 break; 570 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 571 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, 572 /* op_flags */ 0); 573 break; 574 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 575 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 576 ret = rbd_aio_compare_and_writev(image, offset, iov /* cmp */, iovcnt, 577 bdev_io->u.bdev.fused_iovs /* write */, 578 bdev_io->u.bdev.fused_iovcnt, 579 rbd_io->comp, NULL, 580 /* op_flags */ 0); 581 break; 582 #endif 583 default: 584 /* This should not happen. 585 * Function should only be called with supported io types in bdev_rbd_submit_request 586 */ 587 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 588 ret = -ENOTSUP; 589 break; 590 } 591 592 if (ret < 0) { 593 rbd_aio_release(rbd_io->comp); 594 goto err; 595 } 596 597 return; 598 599 err: 600 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 601 } 602 603 static void 604 bdev_rbd_start_aio(void *ctx) 605 { 606 struct spdk_bdev_io *bdev_io = ctx; 607 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 608 609 _bdev_rbd_start_aio(disk, 610 bdev_io, 611 bdev_io->u.bdev.iovs, 612 bdev_io->u.bdev.iovcnt, 613 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 614 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 615 } 616 617 static int bdev_rbd_library_init(void); 618 static void bdev_rbd_library_fini(void); 619 620 static int 621 bdev_rbd_get_ctx_size(void) 622 { 623 return sizeof(struct bdev_rbd_io); 624 } 625 626 static struct spdk_bdev_module rbd_if = { 627 .name = "rbd", 628 .module_init = bdev_rbd_library_init, 629 .module_fini = bdev_rbd_library_fini, 630 .get_ctx_size = bdev_rbd_get_ctx_size, 631 632 }; 633 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 634 635 static int bdev_rbd_reset_timer(void *arg); 636 637 static void 638 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd, 639 void *cb_arg, int rc) 640 { 641 struct bdev_rbd *disk = cb_arg; 642 enum spdk_bdev_io_status bio_status; 643 644 if (rc == 0 && current_qd > 0) { 645 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000); 646 return; 647 } 648 649 if (rc != 0) { 650 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 651 } else { 652 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 653 } 654 655 bdev_rbd_io_complete(disk->reset_bdev_io, bio_status); 656 disk->reset_bdev_io = NULL; 657 } 658 659 static int 660 bdev_rbd_reset_timer(void *arg) 661 { 662 struct bdev_rbd *disk = arg; 663 664 spdk_poller_unregister(&disk->reset_timer); 665 666 spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk); 667 668 return SPDK_POLLER_BUSY; 669 } 670 671 static void 672 bdev_rbd_reset(void *ctx) 673 { 674 struct spdk_bdev_io *bdev_io = ctx; 675 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 676 677 /* 678 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 679 * poller to wait for in-flight I/O to complete. 680 */ 681 assert(disk->reset_bdev_io == NULL); 682 disk->reset_bdev_io = bdev_io; 683 684 bdev_rbd_reset_timer(disk); 685 } 686 687 static void 688 _bdev_rbd_destruct_done(void *io_device) 689 { 690 struct bdev_rbd *rbd = io_device; 691 692 assert(rbd != NULL); 693 694 spdk_bdev_destruct_done(&rbd->disk, 0); 695 bdev_rbd_free(rbd); 696 } 697 698 static void 699 bdev_rbd_free_cb(void *io_device) 700 { 701 struct bdev_rbd *rbd = io_device; 702 703 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 704 705 /* free the ctx */ 706 if (rbd->cluster_name && rbd->rados_ctx.ctx) { 707 bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx); 708 rbd->rados_ctx.ctx = NULL; 709 } 710 711 /* The io device has been unregistered. Send a message back to the 712 * original thread that started the destruct operation, so that the 713 * bdev unregister callback is invoked on the same thread that started 714 * this whole process. 715 */ 716 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 717 } 718 719 static void 720 _bdev_rbd_destruct(void *ctx) 721 { 722 struct bdev_rbd *rbd = ctx; 723 724 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 725 } 726 727 static int 728 bdev_rbd_destruct(void *ctx) 729 { 730 struct bdev_rbd *rbd = ctx; 731 732 /* Start the destruct operation on the rbd bdev's 733 * main thread. This guarantees it will only start 734 * executing after any messages related to channel 735 * deletions have finished completing. *Always* 736 * send a message, even if this function gets called 737 * from the main thread, in case there are pending 738 * channel delete messages in flight to this thread. 739 */ 740 assert(rbd->destruct_td == NULL); 741 rbd->destruct_td = spdk_get_thread(); 742 spdk_thread_send_msg(spdk_thread_get_app_thread(), _bdev_rbd_destruct, rbd); 743 744 /* Return 1 to indicate the destruct path is asynchronous. */ 745 return 1; 746 } 747 748 static void 749 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 750 bool success) 751 { 752 if (!success) { 753 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 754 return; 755 } 756 757 bdev_rbd_start_aio(bdev_io); 758 } 759 760 static void 761 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 762 { 763 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 764 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 765 766 rbd_io->submit_td = submit_td; 767 switch (bdev_io->type) { 768 case SPDK_BDEV_IO_TYPE_READ: 769 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 770 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 771 break; 772 773 case SPDK_BDEV_IO_TYPE_WRITE: 774 case SPDK_BDEV_IO_TYPE_UNMAP: 775 case SPDK_BDEV_IO_TYPE_FLUSH: 776 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 777 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 778 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 779 #endif 780 bdev_rbd_start_aio(bdev_io); 781 break; 782 783 case SPDK_BDEV_IO_TYPE_RESET: 784 spdk_thread_exec_msg(spdk_thread_get_app_thread(), bdev_rbd_reset, bdev_io); 785 break; 786 787 default: 788 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 789 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 790 break; 791 } 792 } 793 794 static bool 795 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 796 { 797 switch (io_type) { 798 case SPDK_BDEV_IO_TYPE_READ: 799 case SPDK_BDEV_IO_TYPE_WRITE: 800 case SPDK_BDEV_IO_TYPE_UNMAP: 801 case SPDK_BDEV_IO_TYPE_FLUSH: 802 case SPDK_BDEV_IO_TYPE_RESET: 803 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 804 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 805 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 806 #endif 807 return true; 808 809 default: 810 return false; 811 } 812 } 813 814 static int 815 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 816 { 817 struct bdev_rbd_io_channel *ch = ctx_buf; 818 struct bdev_rbd *disk = io_device; 819 820 ch->disk = disk; 821 ch->group_ch = spdk_get_io_channel(&rbd_if); 822 assert(ch->group_ch != NULL); 823 824 return 0; 825 } 826 827 static void 828 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 829 { 830 struct bdev_rbd_io_channel *ch = ctx_buf; 831 832 spdk_put_io_channel(ch->group_ch); 833 } 834 835 static struct spdk_io_channel * 836 bdev_rbd_get_io_channel(void *ctx) 837 { 838 struct bdev_rbd *rbd_bdev = ctx; 839 840 return spdk_get_io_channel(rbd_bdev); 841 } 842 843 static void 844 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 845 { 846 struct bdev_rbd_cluster *entry; 847 848 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 849 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 850 if (strcmp(cluster_name, entry->name)) { 851 continue; 852 } 853 if (entry->user_id) { 854 spdk_json_write_named_string(w, "user_id", entry->user_id); 855 } 856 857 if (entry->config_param) { 858 char **config_entry = entry->config_param; 859 860 spdk_json_write_named_object_begin(w, "config_param"); 861 while (*config_entry) { 862 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 863 config_entry += 2; 864 } 865 spdk_json_write_object_end(w); 866 } 867 if (entry->config_file) { 868 spdk_json_write_named_string(w, "config_file", entry->config_file); 869 } 870 if (entry->key_file) { 871 spdk_json_write_named_string(w, "key_file", entry->key_file); 872 } 873 874 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 875 return; 876 } 877 878 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 879 } 880 881 static int 882 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 883 { 884 struct bdev_rbd *rbd_bdev = ctx; 885 886 spdk_json_write_named_object_begin(w, "rbd"); 887 888 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 889 890 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 891 892 if (rbd_bdev->cluster_name) { 893 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 894 goto end; 895 } 896 897 if (rbd_bdev->user_id) { 898 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 899 } 900 901 if (rbd_bdev->config) { 902 char **entry = rbd_bdev->config; 903 904 spdk_json_write_named_object_begin(w, "config"); 905 while (*entry) { 906 spdk_json_write_named_string(w, entry[0], entry[1]); 907 entry += 2; 908 } 909 spdk_json_write_object_end(w); 910 } 911 912 end: 913 spdk_json_write_object_end(w); 914 915 return 0; 916 } 917 918 static void 919 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 920 { 921 struct bdev_rbd *rbd = bdev->ctxt; 922 char uuid_str[SPDK_UUID_STRING_LEN]; 923 924 spdk_json_write_object_begin(w); 925 926 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 927 928 spdk_json_write_named_object_begin(w, "params"); 929 spdk_json_write_named_string(w, "name", bdev->name); 930 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 931 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 932 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 933 if (rbd->user_id) { 934 spdk_json_write_named_string(w, "user_id", rbd->user_id); 935 } 936 937 if (rbd->config) { 938 char **entry = rbd->config; 939 940 spdk_json_write_named_object_begin(w, "config"); 941 while (*entry) { 942 spdk_json_write_named_string(w, entry[0], entry[1]); 943 entry += 2; 944 } 945 spdk_json_write_object_end(w); 946 } 947 948 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 949 spdk_json_write_named_string(w, "uuid", uuid_str); 950 951 spdk_json_write_object_end(w); 952 953 spdk_json_write_object_end(w); 954 } 955 956 static void 957 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 958 { 959 assert(entry != NULL); 960 961 spdk_json_write_object_begin(w); 962 spdk_json_write_named_string(w, "cluster_name", entry->name); 963 964 if (entry->user_id) { 965 spdk_json_write_named_string(w, "user_id", entry->user_id); 966 } 967 968 if (entry->config_param) { 969 char **config_entry = entry->config_param; 970 971 spdk_json_write_named_object_begin(w, "config_param"); 972 while (*config_entry) { 973 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 974 config_entry += 2; 975 } 976 spdk_json_write_object_end(w); 977 } 978 if (entry->config_file) { 979 spdk_json_write_named_string(w, "config_file", entry->config_file); 980 } 981 if (entry->key_file) { 982 spdk_json_write_named_string(w, "key_file", entry->key_file); 983 } 984 985 if (entry->core_mask) { 986 spdk_json_write_named_string(w, "core_mask", entry->core_mask); 987 } 988 989 spdk_json_write_object_end(w); 990 } 991 992 int 993 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 994 { 995 struct bdev_rbd_cluster *entry; 996 struct spdk_json_write_ctx *w; 997 998 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 999 1000 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 1001 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1002 return -ENOENT; 1003 } 1004 1005 /* If cluster name is provided */ 1006 if (name) { 1007 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1008 if (strcmp(name, entry->name) == 0) { 1009 w = spdk_jsonrpc_begin_result(request); 1010 dump_single_cluster_entry(entry, w); 1011 spdk_jsonrpc_end_result(request, w); 1012 1013 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1014 return 0; 1015 } 1016 } 1017 1018 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1019 return -ENOENT; 1020 } 1021 1022 w = spdk_jsonrpc_begin_result(request); 1023 spdk_json_write_array_begin(w); 1024 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1025 dump_single_cluster_entry(entry, w); 1026 } 1027 spdk_json_write_array_end(w); 1028 spdk_jsonrpc_end_result(request, w); 1029 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1030 1031 return 0; 1032 } 1033 1034 static const struct spdk_bdev_fn_table rbd_fn_table = { 1035 .destruct = bdev_rbd_destruct, 1036 .submit_request = bdev_rbd_submit_request, 1037 .io_type_supported = bdev_rbd_io_type_supported, 1038 .get_io_channel = bdev_rbd_get_io_channel, 1039 .dump_info_json = bdev_rbd_dump_info_json, 1040 .write_config_json = bdev_rbd_write_config_json, 1041 }; 1042 1043 static int 1044 rbd_thread_set_cpumask(struct spdk_cpuset *set) 1045 { 1046 #ifdef __linux__ 1047 uint32_t lcore; 1048 cpu_set_t mask; 1049 1050 assert(set != NULL); 1051 CPU_ZERO(&mask); 1052 1053 /* get the core id on current spdk_cpuset and set to cpu_set_t */ 1054 for (lcore = 0; lcore < SPDK_CPUSET_SIZE; lcore++) { 1055 if (spdk_cpuset_get_cpu(set, lcore)) { 1056 CPU_SET(lcore, &mask); 1057 } 1058 } 1059 1060 /* change current thread core mask */ 1061 if (sched_setaffinity(0, sizeof(mask), &mask) < 0) { 1062 SPDK_ERRLOG("Set non SPDK thread cpu mask error (errno=%d)\n", errno); 1063 return -1; 1064 } 1065 1066 return 0; 1067 #else 1068 SPDK_ERRLOG("SPDK non spdk thread cpumask setup supports only Linux platform now.\n"); 1069 return -ENOTSUP; 1070 #endif 1071 } 1072 1073 1074 static int 1075 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 1076 const char *config_file, const char *key_file, const char *core_mask) 1077 { 1078 struct bdev_rbd_cluster *entry; 1079 struct spdk_cpuset rbd_core_mask = {}; 1080 int rc; 1081 1082 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1083 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1084 if (strcmp(name, entry->name) == 0) { 1085 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 1086 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1087 return -1; 1088 } 1089 } 1090 1091 entry = calloc(1, sizeof(*entry)); 1092 if (!entry) { 1093 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 1094 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1095 return -1; 1096 } 1097 1098 entry->name = strdup(name); 1099 if (entry->name == NULL) { 1100 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1101 goto err_handle; 1102 } 1103 1104 if (user_id) { 1105 entry->user_id = strdup(user_id); 1106 if (entry->user_id == NULL) { 1107 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1108 goto err_handle; 1109 } 1110 } 1111 1112 /* Support specify config_param or config_file separately, or both of them. */ 1113 if (config_param) { 1114 entry->config_param = bdev_rbd_dup_config(config_param); 1115 if (entry->config_param == NULL) { 1116 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1117 goto err_handle; 1118 } 1119 } 1120 1121 if (config_file) { 1122 entry->config_file = strdup(config_file); 1123 if (entry->config_file == NULL) { 1124 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1125 goto err_handle; 1126 } 1127 } 1128 1129 if (key_file) { 1130 entry->key_file = strdup(key_file); 1131 if (entry->key_file == NULL) { 1132 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 1133 goto err_handle; 1134 } 1135 } 1136 1137 if (core_mask) { 1138 entry->core_mask = strdup(core_mask); 1139 if (entry->core_mask == NULL) { 1140 SPDK_ERRLOG("Core_mask=%s allocation failed on entry = %p\n", core_mask, entry); 1141 goto err_handle; 1142 } 1143 1144 if (spdk_cpuset_parse(&rbd_core_mask, entry->core_mask) < 0) { 1145 SPDK_ERRLOG("Invalid cpumask=%s on entry = %p\n", entry->core_mask, entry); 1146 goto err_handle; 1147 } 1148 1149 if (rbd_thread_set_cpumask(&rbd_core_mask) < 0) { 1150 SPDK_ERRLOG("Failed to change rbd threads to core_mask %s on entry = %p\n", core_mask, entry); 1151 goto err_handle; 1152 } 1153 } 1154 1155 1156 /* If rbd thread core mask is given, rados_create() must execute with 1157 * the affinity set by rbd_thread_set_cpumask(). The affinity set 1158 * by rbd_thread_set_cpumask() will be reverted once rbd_register_cluster() returns 1159 * and when we leave the spdk_call_unaffinitized context. */ 1160 rc = rados_create(&entry->cluster, user_id); 1161 if (rc < 0) { 1162 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1163 goto err_handle; 1164 } 1165 1166 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 1167 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1168 if (entry->config_file && rc < 0) { 1169 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 1170 rados_shutdown(entry->cluster); 1171 goto err_handle; 1172 } 1173 1174 if (config_param) { 1175 const char *const *config_entry = config_param; 1176 while (*config_entry) { 1177 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1178 if (rc < 0) { 1179 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1180 rados_shutdown(entry->cluster); 1181 goto err_handle; 1182 } 1183 config_entry += 2; 1184 } 1185 } 1186 1187 if (key_file) { 1188 rc = rados_conf_set(entry->cluster, "keyring", key_file); 1189 if (rc < 0) { 1190 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 1191 rados_shutdown(entry->cluster); 1192 goto err_handle; 1193 } 1194 } 1195 1196 rc = rados_connect(entry->cluster); 1197 if (rc < 0) { 1198 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1199 rados_shutdown(entry->cluster); 1200 goto err_handle; 1201 } 1202 1203 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1204 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1205 1206 return 0; 1207 1208 err_handle: 1209 bdev_rbd_cluster_free(entry); 1210 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1211 return -1; 1212 } 1213 1214 int 1215 bdev_rbd_unregister_cluster(const char *name) 1216 { 1217 struct bdev_rbd_cluster *entry; 1218 int rc = 0; 1219 1220 if (name == NULL) { 1221 return -1; 1222 } 1223 1224 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1225 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1226 if (strcmp(name, entry->name) == 0) { 1227 if (entry->ref == 0) { 1228 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1229 rados_shutdown(entry->cluster); 1230 bdev_rbd_cluster_free(entry); 1231 } else { 1232 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1233 entry->name); 1234 rc = -1; 1235 } 1236 1237 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1238 return rc; 1239 } 1240 } 1241 1242 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1243 1244 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1245 1246 return -1; 1247 } 1248 1249 static void * 1250 _bdev_rbd_register_cluster(void *arg) 1251 { 1252 struct cluster_register_info *info = arg; 1253 void *ret = arg; 1254 int rc; 1255 1256 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1257 (const char *const *)info->config_param, (const char *)info->config_file, 1258 (const char *)info->key_file, info->core_mask); 1259 if (rc) { 1260 ret = NULL; 1261 } 1262 1263 return ret; 1264 } 1265 1266 int 1267 bdev_rbd_register_cluster(struct cluster_register_info *info) 1268 { 1269 assert(info != NULL); 1270 1271 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1272 * resource contention */ 1273 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1274 return -1; 1275 } 1276 1277 return 0; 1278 } 1279 1280 int 1281 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1282 const char *pool_name, 1283 const char *const *config, 1284 const char *rbd_name, 1285 uint32_t block_size, 1286 const char *cluster_name, 1287 const struct spdk_uuid *uuid) 1288 { 1289 struct bdev_rbd *rbd; 1290 int ret; 1291 1292 if ((pool_name == NULL) || (rbd_name == NULL)) { 1293 return -EINVAL; 1294 } 1295 1296 rbd = calloc(1, sizeof(struct bdev_rbd)); 1297 if (rbd == NULL) { 1298 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1299 return -ENOMEM; 1300 } 1301 1302 rbd->rbd_name = strdup(rbd_name); 1303 if (!rbd->rbd_name) { 1304 bdev_rbd_free(rbd); 1305 return -ENOMEM; 1306 } 1307 1308 if (user_id) { 1309 rbd->user_id = strdup(user_id); 1310 if (!rbd->user_id) { 1311 bdev_rbd_free(rbd); 1312 return -ENOMEM; 1313 } 1314 } 1315 1316 if (cluster_name) { 1317 rbd->cluster_name = strdup(cluster_name); 1318 if (!rbd->cluster_name) { 1319 bdev_rbd_free(rbd); 1320 return -ENOMEM; 1321 } 1322 } 1323 rbd->pool_name = strdup(pool_name); 1324 if (!rbd->pool_name) { 1325 bdev_rbd_free(rbd); 1326 return -ENOMEM; 1327 } 1328 1329 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1330 bdev_rbd_free(rbd); 1331 return -ENOMEM; 1332 } 1333 1334 ret = bdev_rbd_init(rbd); 1335 if (ret < 0) { 1336 bdev_rbd_free(rbd); 1337 SPDK_ERRLOG("Failed to init rbd device\n"); 1338 return ret; 1339 } 1340 1341 rbd->disk.uuid = *uuid; 1342 if (name) { 1343 rbd->disk.name = strdup(name); 1344 } else { 1345 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1346 } 1347 if (!rbd->disk.name) { 1348 bdev_rbd_free(rbd); 1349 return -ENOMEM; 1350 } 1351 rbd->disk.product_name = "Ceph Rbd Disk"; 1352 bdev_rbd_count++; 1353 1354 rbd->disk.write_cache = 0; 1355 rbd->disk.blocklen = block_size; 1356 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1357 rbd->disk.ctxt = rbd; 1358 rbd->disk.fn_table = &rbd_fn_table; 1359 rbd->disk.module = &rbd_if; 1360 1361 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1362 1363 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1364 bdev_rbd_destroy_cb, 1365 sizeof(struct bdev_rbd_io_channel), 1366 rbd_name); 1367 ret = spdk_bdev_register(&rbd->disk); 1368 if (ret) { 1369 spdk_io_device_unregister(rbd, NULL); 1370 bdev_rbd_free(rbd); 1371 return ret; 1372 } 1373 1374 *bdev = &(rbd->disk); 1375 1376 return ret; 1377 } 1378 1379 void 1380 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1381 { 1382 int rc; 1383 1384 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1385 if (rc != 0) { 1386 cb_fn(cb_arg, rc); 1387 } 1388 } 1389 1390 static void 1391 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1392 { 1393 } 1394 1395 int 1396 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1397 { 1398 struct spdk_bdev_desc *desc; 1399 struct spdk_bdev *bdev; 1400 struct bdev_rbd *rbd; 1401 int rc = 0; 1402 uint64_t new_size_in_byte; 1403 uint64_t current_size_in_mb; 1404 1405 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1406 if (rc != 0) { 1407 return rc; 1408 } 1409 1410 bdev = spdk_bdev_desc_get_bdev(desc); 1411 1412 if (bdev->module != &rbd_if) { 1413 rc = -EINVAL; 1414 goto exit; 1415 } 1416 1417 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1418 if (current_size_in_mb > new_size_in_mb) { 1419 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1420 rc = -EINVAL; 1421 goto exit; 1422 } 1423 1424 rbd = SPDK_CONTAINEROF(bdev, struct bdev_rbd, disk); 1425 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1426 rc = rbd_resize(rbd->image, new_size_in_byte); 1427 if (rc != 0) { 1428 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1429 goto exit; 1430 } 1431 1432 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1433 if (rc != 0) { 1434 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1435 } 1436 1437 exit: 1438 spdk_bdev_close(desc); 1439 return rc; 1440 } 1441 1442 static int 1443 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1444 { 1445 return 0; 1446 } 1447 1448 static void 1449 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1450 { 1451 } 1452 1453 static int 1454 bdev_rbd_library_init(void) 1455 { 1456 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1457 0, "bdev_rbd_poll_groups"); 1458 return 0; 1459 } 1460 1461 static void 1462 bdev_rbd_library_fini(void) 1463 { 1464 spdk_io_device_unregister(&rbd_if, NULL); 1465 } 1466 1467 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1468