1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "bdev_rbd.h" 9 10 #include <rbd/librbd.h> 11 #include <rados/librados.h> 12 13 #include "spdk/env.h" 14 #include "spdk/bdev.h" 15 #include "spdk/thread.h" 16 #include "spdk/json.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/likely.h" 20 21 #include "spdk/bdev_module.h" 22 #include "spdk/log.h" 23 24 static int bdev_rbd_count = 0; 25 26 struct bdev_rbd_pool_ctx { 27 rados_t *cluster_p; 28 char *name; 29 rados_ioctx_t io_ctx; 30 uint32_t ref; 31 STAILQ_ENTRY(bdev_rbd_pool_ctx) link; 32 }; 33 34 static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER( 35 g_map_bdev_rbd_pool_ctx); 36 37 struct bdev_rbd { 38 struct spdk_bdev disk; 39 char *rbd_name; 40 char *user_id; 41 char *pool_name; 42 char **config; 43 44 rados_t cluster; 45 rados_t *cluster_p; 46 char *cluster_name; 47 48 union rbd_ctx { 49 rados_ioctx_t io_ctx; 50 struct bdev_rbd_pool_ctx *ctx; 51 } rados_ctx; 52 53 rbd_image_t image; 54 55 rbd_image_info_t info; 56 struct spdk_thread *destruct_td; 57 58 TAILQ_ENTRY(bdev_rbd) tailq; 59 struct spdk_poller *reset_timer; 60 struct spdk_bdev_io *reset_bdev_io; 61 }; 62 63 struct bdev_rbd_io_channel { 64 struct bdev_rbd *disk; 65 struct spdk_io_channel *group_ch; 66 }; 67 68 struct bdev_rbd_io { 69 struct spdk_thread *submit_td; 70 enum spdk_bdev_io_status status; 71 rbd_completion_t comp; 72 size_t total_len; 73 }; 74 75 struct bdev_rbd_cluster { 76 char *name; 77 char *user_id; 78 char **config_param; 79 char *config_file; 80 char *key_file; 81 char *core_mask; 82 rados_t cluster; 83 uint32_t ref; 84 STAILQ_ENTRY(bdev_rbd_cluster) link; 85 }; 86 87 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 88 g_map_bdev_rbd_cluster); 89 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 90 91 static void 92 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 93 { 94 assert(entry != NULL); 95 96 bdev_rbd_free_config(entry->config_param); 97 free(entry->config_file); 98 free(entry->key_file); 99 free(entry->user_id); 100 free(entry->name); 101 free(entry->core_mask); 102 free(entry); 103 } 104 105 static void 106 bdev_rbd_put_cluster(rados_t **cluster) 107 { 108 struct bdev_rbd_cluster *entry; 109 110 assert(cluster != NULL); 111 112 /* No need go through the map if *cluster equals to NULL */ 113 if (*cluster == NULL) { 114 return; 115 } 116 117 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 118 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 119 if (*cluster != &entry->cluster) { 120 continue; 121 } 122 123 assert(entry->ref > 0); 124 entry->ref--; 125 *cluster = NULL; 126 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 127 return; 128 } 129 130 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 131 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 132 } 133 134 static void 135 bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry) 136 { 137 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 138 139 assert(entry != NULL); 140 assert(entry->ref > 0); 141 entry->ref--; 142 if (entry->ref == 0) { 143 STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link); 144 rados_ioctx_destroy(entry->io_ctx); 145 free(entry->name); 146 free(entry); 147 } 148 } 149 150 static void 151 bdev_rbd_free(struct bdev_rbd *rbd) 152 { 153 if (!rbd) { 154 return; 155 } 156 157 if (rbd->image) { 158 rbd_flush(rbd->image); 159 rbd_close(rbd->image); 160 } 161 162 free(rbd->disk.name); 163 free(rbd->rbd_name); 164 free(rbd->user_id); 165 free(rbd->pool_name); 166 bdev_rbd_free_config(rbd->config); 167 168 if (rbd->cluster_name) { 169 /* When rbd is destructed by bdev_rbd_destruct, it will not enter here 170 * because the ctx will already freed by bdev_rbd_free_cb in async manner. 171 * This path only happens during the rbd initialization procedure of rbd */ 172 if (rbd->rados_ctx.ctx) { 173 bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx); 174 rbd->rados_ctx.ctx = NULL; 175 } 176 177 bdev_rbd_put_cluster(&rbd->cluster_p); 178 free(rbd->cluster_name); 179 } else if (rbd->cluster) { 180 if (rbd->rados_ctx.io_ctx) { 181 rados_ioctx_destroy(rbd->rados_ctx.io_ctx); 182 } 183 rados_shutdown(rbd->cluster); 184 } 185 186 free(rbd); 187 } 188 189 void 190 bdev_rbd_free_config(char **config) 191 { 192 char **entry; 193 194 if (config) { 195 for (entry = config; *entry; entry++) { 196 free(*entry); 197 } 198 free(config); 199 } 200 } 201 202 char ** 203 bdev_rbd_dup_config(const char *const *config) 204 { 205 size_t count; 206 char **copy; 207 208 if (!config) { 209 return NULL; 210 } 211 for (count = 0; config[count]; count++) {} 212 copy = calloc(count + 1, sizeof(*copy)); 213 if (!copy) { 214 return NULL; 215 } 216 for (count = 0; config[count]; count++) { 217 if (!(copy[count] = strdup(config[count]))) { 218 bdev_rbd_free_config(copy); 219 return NULL; 220 } 221 } 222 return copy; 223 } 224 225 static int 226 bdev_rados_cluster_init(const char *user_id, const char *const *config, 227 rados_t *cluster) 228 { 229 int ret; 230 231 ret = rados_create(cluster, user_id); 232 if (ret < 0) { 233 SPDK_ERRLOG("Failed to create rados_t struct\n"); 234 return -1; 235 } 236 237 if (config) { 238 const char *const *entry = config; 239 while (*entry) { 240 ret = rados_conf_set(*cluster, entry[0], entry[1]); 241 if (ret < 0) { 242 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 243 rados_shutdown(*cluster); 244 *cluster = NULL; 245 return -1; 246 } 247 entry += 2; 248 } 249 } else { 250 ret = rados_conf_read_file(*cluster, NULL); 251 if (ret < 0) { 252 SPDK_ERRLOG("Failed to read conf file\n"); 253 rados_shutdown(*cluster); 254 *cluster = NULL; 255 return -1; 256 } 257 } 258 259 ret = rados_connect(*cluster); 260 if (ret < 0) { 261 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 262 rados_shutdown(*cluster); 263 *cluster = NULL; 264 return -1; 265 } 266 267 return 0; 268 } 269 270 static int 271 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 272 { 273 struct bdev_rbd_cluster *entry; 274 275 if (cluster == NULL) { 276 SPDK_ERRLOG("cluster should not be NULL\n"); 277 return -1; 278 } 279 280 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 281 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 282 if (strcmp(cluster_name, entry->name) == 0) { 283 entry->ref++; 284 *cluster = &entry->cluster; 285 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 286 return 0; 287 } 288 } 289 290 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 291 return -1; 292 } 293 294 static int 295 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 296 { 297 int ret; 298 299 ret = bdev_rbd_get_cluster(cluster_name, cluster); 300 if (ret < 0) { 301 SPDK_ERRLOG("Failed to create rados_t struct\n"); 302 return -1; 303 } 304 305 return ret; 306 } 307 308 static void * 309 bdev_rbd_cluster_handle(void *arg) 310 { 311 void *ret = arg; 312 struct bdev_rbd *rbd = arg; 313 int rc; 314 315 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 316 &rbd->cluster); 317 if (rc < 0) { 318 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 319 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 320 ret = NULL; 321 } 322 323 return ret; 324 } 325 326 static int 327 bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name, struct bdev_rbd_pool_ctx **ctx) 328 { 329 struct bdev_rbd_pool_ctx *entry; 330 331 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 332 333 if (name == NULL || ctx == NULL) { 334 return -1; 335 } 336 337 STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) { 338 if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) { 339 entry->ref++; 340 *ctx = entry; 341 return 0; 342 } 343 } 344 345 entry = calloc(1, sizeof(*entry)); 346 if (!entry) { 347 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 348 return -1; 349 } 350 351 entry->name = strdup(name); 352 if (entry->name == NULL) { 353 SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry); 354 goto err_handle; 355 } 356 357 if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) { 358 goto err_handle1; 359 } 360 361 entry->cluster_p = cluster_p; 362 entry->ref = 1; 363 *ctx = entry; 364 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link); 365 366 return 0; 367 368 err_handle1: 369 free(entry->name); 370 err_handle: 371 free(entry); 372 373 return -1; 374 } 375 376 static void * 377 bdev_rbd_init_context(void *arg) 378 { 379 struct bdev_rbd *rbd = arg; 380 int rc; 381 rados_ioctx_t *io_ctx = NULL; 382 383 if (rbd->cluster_name) { 384 if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->rados_ctx.ctx) < 0) { 385 SPDK_ERRLOG("Failed to create ioctx on rbd=%p with cluster_name=%s\n", 386 rbd, rbd->cluster_name); 387 return NULL; 388 } 389 io_ctx = &rbd->rados_ctx.ctx->io_ctx; 390 } else { 391 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->rados_ctx.io_ctx) < 0) { 392 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 393 return NULL; 394 } 395 io_ctx = &rbd->rados_ctx.io_ctx; 396 } 397 398 assert(io_ctx != NULL); 399 rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL); 400 if (rc < 0) { 401 SPDK_ERRLOG("Failed to open specified rbd device\n"); 402 return NULL; 403 } 404 405 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 406 if (rc < 0) { 407 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 408 return NULL; 409 } 410 411 return arg; 412 } 413 414 static int 415 bdev_rbd_init(struct bdev_rbd *rbd) 416 { 417 int ret = 0; 418 419 if (!rbd->cluster_name) { 420 rbd->cluster_p = &rbd->cluster; 421 /* Cluster should be created in non-SPDK thread to avoid conflict between 422 * Rados and SPDK thread */ 423 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 424 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 425 return -1; 426 } 427 } else { 428 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 429 if (ret < 0) { 430 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 431 rbd, rbd->cluster_name); 432 return -1; 433 } 434 } 435 436 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 437 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 438 return -1; 439 } 440 441 return ret; 442 } 443 444 static void 445 _bdev_rbd_io_complete(void *_rbd_io) 446 { 447 struct bdev_rbd_io *rbd_io = _rbd_io; 448 449 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 450 } 451 452 static void 453 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 454 { 455 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 456 struct spdk_thread *current_thread = spdk_get_thread(); 457 458 rbd_io->status = status; 459 assert(rbd_io->submit_td != NULL); 460 if (rbd_io->submit_td != current_thread) { 461 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 462 } else { 463 _bdev_rbd_io_complete(rbd_io); 464 } 465 } 466 467 static void 468 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 469 { 470 int io_status; 471 struct spdk_bdev_io *bdev_io; 472 struct bdev_rbd_io *rbd_io; 473 enum spdk_bdev_io_status bio_status; 474 475 bdev_io = rbd_aio_get_arg(cb); 476 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 477 io_status = rbd_aio_get_return_value(cb); 478 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 479 480 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 481 if ((int)rbd_io->total_len != io_status) { 482 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 483 } 484 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 485 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE && io_status == -EILSEQ) { 486 bio_status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 487 #endif 488 } else if (io_status != 0) { /* For others, 0 means success */ 489 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 490 } 491 492 rbd_aio_release(cb); 493 494 bdev_rbd_io_complete(bdev_io, bio_status); 495 } 496 497 static void 498 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 499 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 500 { 501 int ret; 502 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 503 rbd_image_t image = disk->image; 504 505 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 506 &rbd_io->comp); 507 if (ret < 0) { 508 goto err; 509 } 510 511 switch (bdev_io->type) { 512 case SPDK_BDEV_IO_TYPE_READ: 513 rbd_io->total_len = len; 514 if (spdk_likely(iovcnt == 1)) { 515 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, 516 rbd_io->comp); 517 } else { 518 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 519 } 520 break; 521 case SPDK_BDEV_IO_TYPE_WRITE: 522 if (spdk_likely(iovcnt == 1)) { 523 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, 524 rbd_io->comp); 525 } else { 526 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 527 } 528 break; 529 case SPDK_BDEV_IO_TYPE_UNMAP: 530 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 531 break; 532 case SPDK_BDEV_IO_TYPE_FLUSH: 533 ret = rbd_aio_flush(image, rbd_io->comp); 534 break; 535 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 536 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, 537 /* op_flags */ 0); 538 break; 539 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 540 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 541 ret = rbd_aio_compare_and_writev(image, offset, iov /* cmp */, iovcnt, 542 bdev_io->u.bdev.fused_iovs /* write */, 543 bdev_io->u.bdev.fused_iovcnt, 544 rbd_io->comp, NULL, 545 /* op_flags */ 0); 546 break; 547 #endif 548 default: 549 /* This should not happen. 550 * Function should only be called with supported io types in bdev_rbd_submit_request 551 */ 552 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 553 ret = -ENOTSUP; 554 break; 555 } 556 557 if (ret < 0) { 558 rbd_aio_release(rbd_io->comp); 559 goto err; 560 } 561 562 return; 563 564 err: 565 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 566 } 567 568 static void 569 bdev_rbd_start_aio(void *ctx) 570 { 571 struct spdk_bdev_io *bdev_io = ctx; 572 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 573 574 _bdev_rbd_start_aio(disk, 575 bdev_io, 576 bdev_io->u.bdev.iovs, 577 bdev_io->u.bdev.iovcnt, 578 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 579 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 580 } 581 582 static int bdev_rbd_library_init(void); 583 static void bdev_rbd_library_fini(void); 584 585 static int 586 bdev_rbd_get_ctx_size(void) 587 { 588 return sizeof(struct bdev_rbd_io); 589 } 590 591 static struct spdk_bdev_module rbd_if = { 592 .name = "rbd", 593 .module_init = bdev_rbd_library_init, 594 .module_fini = bdev_rbd_library_fini, 595 .get_ctx_size = bdev_rbd_get_ctx_size, 596 597 }; 598 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 599 600 static int bdev_rbd_reset_timer(void *arg); 601 602 static void 603 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd, 604 void *cb_arg, int rc) 605 { 606 struct bdev_rbd *disk = cb_arg; 607 enum spdk_bdev_io_status bio_status; 608 609 if (rc == 0 && current_qd > 0) { 610 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000); 611 return; 612 } 613 614 if (rc != 0) { 615 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 616 } else { 617 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 618 } 619 620 bdev_rbd_io_complete(disk->reset_bdev_io, bio_status); 621 disk->reset_bdev_io = NULL; 622 } 623 624 static int 625 bdev_rbd_reset_timer(void *arg) 626 { 627 struct bdev_rbd *disk = arg; 628 629 spdk_poller_unregister(&disk->reset_timer); 630 631 spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk); 632 633 return SPDK_POLLER_BUSY; 634 } 635 636 static void 637 bdev_rbd_reset(void *ctx) 638 { 639 struct spdk_bdev_io *bdev_io = ctx; 640 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 641 642 /* 643 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 644 * poller to wait for in-flight I/O to complete. 645 */ 646 assert(disk->reset_bdev_io == NULL); 647 disk->reset_bdev_io = bdev_io; 648 649 bdev_rbd_reset_timer(disk); 650 } 651 652 static void 653 _bdev_rbd_destruct_done(void *io_device) 654 { 655 struct bdev_rbd *rbd = io_device; 656 657 assert(rbd != NULL); 658 659 spdk_bdev_destruct_done(&rbd->disk, 0); 660 bdev_rbd_free(rbd); 661 } 662 663 static void 664 bdev_rbd_free_cb(void *io_device) 665 { 666 struct bdev_rbd *rbd = io_device; 667 668 assert(spdk_get_thread() == spdk_thread_get_app_thread()); 669 670 /* free the ctx */ 671 if (rbd->cluster_name && rbd->rados_ctx.ctx) { 672 bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx); 673 rbd->rados_ctx.ctx = NULL; 674 } 675 676 /* The io device has been unregistered. Send a message back to the 677 * original thread that started the destruct operation, so that the 678 * bdev unregister callback is invoked on the same thread that started 679 * this whole process. 680 */ 681 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 682 } 683 684 static void 685 _bdev_rbd_destruct(void *ctx) 686 { 687 struct bdev_rbd *rbd = ctx; 688 689 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 690 } 691 692 static int 693 bdev_rbd_destruct(void *ctx) 694 { 695 struct bdev_rbd *rbd = ctx; 696 697 /* Start the destruct operation on the rbd bdev's 698 * main thread. This guarantees it will only start 699 * executing after any messages related to channel 700 * deletions have finished completing. *Always* 701 * send a message, even if this function gets called 702 * from the main thread, in case there are pending 703 * channel delete messages in flight to this thread. 704 */ 705 assert(rbd->destruct_td == NULL); 706 rbd->destruct_td = spdk_get_thread(); 707 spdk_thread_send_msg(spdk_thread_get_app_thread(), _bdev_rbd_destruct, rbd); 708 709 /* Return 1 to indicate the destruct path is asynchronous. */ 710 return 1; 711 } 712 713 static void 714 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 715 bool success) 716 { 717 if (!success) { 718 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 719 return; 720 } 721 722 bdev_rbd_start_aio(bdev_io); 723 } 724 725 static void 726 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 727 { 728 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 729 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 730 731 rbd_io->submit_td = submit_td; 732 switch (bdev_io->type) { 733 case SPDK_BDEV_IO_TYPE_READ: 734 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 735 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 736 break; 737 738 case SPDK_BDEV_IO_TYPE_WRITE: 739 case SPDK_BDEV_IO_TYPE_UNMAP: 740 case SPDK_BDEV_IO_TYPE_FLUSH: 741 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 742 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 743 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 744 #endif 745 bdev_rbd_start_aio(bdev_io); 746 break; 747 748 case SPDK_BDEV_IO_TYPE_RESET: 749 spdk_thread_exec_msg(spdk_thread_get_app_thread(), bdev_rbd_reset, bdev_io); 750 break; 751 752 default: 753 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 754 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 755 break; 756 } 757 } 758 759 static bool 760 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 761 { 762 switch (io_type) { 763 case SPDK_BDEV_IO_TYPE_READ: 764 case SPDK_BDEV_IO_TYPE_WRITE: 765 case SPDK_BDEV_IO_TYPE_UNMAP: 766 case SPDK_BDEV_IO_TYPE_FLUSH: 767 case SPDK_BDEV_IO_TYPE_RESET: 768 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 769 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC 770 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE: 771 #endif 772 return true; 773 774 default: 775 return false; 776 } 777 } 778 779 static int 780 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 781 { 782 struct bdev_rbd_io_channel *ch = ctx_buf; 783 struct bdev_rbd *disk = io_device; 784 785 ch->disk = disk; 786 ch->group_ch = spdk_get_io_channel(&rbd_if); 787 assert(ch->group_ch != NULL); 788 789 return 0; 790 } 791 792 static void 793 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 794 { 795 struct bdev_rbd_io_channel *ch = ctx_buf; 796 797 spdk_put_io_channel(ch->group_ch); 798 } 799 800 static struct spdk_io_channel * 801 bdev_rbd_get_io_channel(void *ctx) 802 { 803 struct bdev_rbd *rbd_bdev = ctx; 804 805 return spdk_get_io_channel(rbd_bdev); 806 } 807 808 static void 809 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 810 { 811 struct bdev_rbd_cluster *entry; 812 813 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 814 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 815 if (strcmp(cluster_name, entry->name)) { 816 continue; 817 } 818 if (entry->user_id) { 819 spdk_json_write_named_string(w, "user_id", entry->user_id); 820 } 821 822 if (entry->config_param) { 823 char **config_entry = entry->config_param; 824 825 spdk_json_write_named_object_begin(w, "config_param"); 826 while (*config_entry) { 827 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 828 config_entry += 2; 829 } 830 spdk_json_write_object_end(w); 831 } 832 if (entry->config_file) { 833 spdk_json_write_named_string(w, "config_file", entry->config_file); 834 } 835 if (entry->key_file) { 836 spdk_json_write_named_string(w, "key_file", entry->key_file); 837 } 838 839 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 840 return; 841 } 842 843 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 844 } 845 846 static int 847 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 848 { 849 struct bdev_rbd *rbd_bdev = ctx; 850 851 spdk_json_write_named_object_begin(w, "rbd"); 852 853 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 854 855 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 856 857 if (rbd_bdev->cluster_name) { 858 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 859 goto end; 860 } 861 862 if (rbd_bdev->user_id) { 863 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 864 } 865 866 if (rbd_bdev->config) { 867 char **entry = rbd_bdev->config; 868 869 spdk_json_write_named_object_begin(w, "config"); 870 while (*entry) { 871 spdk_json_write_named_string(w, entry[0], entry[1]); 872 entry += 2; 873 } 874 spdk_json_write_object_end(w); 875 } 876 877 end: 878 spdk_json_write_object_end(w); 879 880 return 0; 881 } 882 883 static void 884 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 885 { 886 struct bdev_rbd *rbd = bdev->ctxt; 887 char uuid_str[SPDK_UUID_STRING_LEN]; 888 889 spdk_json_write_object_begin(w); 890 891 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 892 893 spdk_json_write_named_object_begin(w, "params"); 894 spdk_json_write_named_string(w, "name", bdev->name); 895 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 896 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 897 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 898 if (rbd->user_id) { 899 spdk_json_write_named_string(w, "user_id", rbd->user_id); 900 } 901 902 if (rbd->config) { 903 char **entry = rbd->config; 904 905 spdk_json_write_named_object_begin(w, "config"); 906 while (*entry) { 907 spdk_json_write_named_string(w, entry[0], entry[1]); 908 entry += 2; 909 } 910 spdk_json_write_object_end(w); 911 } 912 913 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 914 spdk_json_write_named_string(w, "uuid", uuid_str); 915 916 spdk_json_write_object_end(w); 917 918 spdk_json_write_object_end(w); 919 } 920 921 static void 922 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 923 { 924 assert(entry != NULL); 925 926 spdk_json_write_object_begin(w); 927 spdk_json_write_named_string(w, "cluster_name", entry->name); 928 929 if (entry->user_id) { 930 spdk_json_write_named_string(w, "user_id", entry->user_id); 931 } 932 933 if (entry->config_param) { 934 char **config_entry = entry->config_param; 935 936 spdk_json_write_named_object_begin(w, "config_param"); 937 while (*config_entry) { 938 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 939 config_entry += 2; 940 } 941 spdk_json_write_object_end(w); 942 } 943 if (entry->config_file) { 944 spdk_json_write_named_string(w, "config_file", entry->config_file); 945 } 946 if (entry->key_file) { 947 spdk_json_write_named_string(w, "key_file", entry->key_file); 948 } 949 950 if (entry->core_mask) { 951 spdk_json_write_named_string(w, "core_mask", entry->core_mask); 952 } 953 954 spdk_json_write_object_end(w); 955 } 956 957 int 958 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 959 { 960 struct bdev_rbd_cluster *entry; 961 struct spdk_json_write_ctx *w; 962 963 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 964 965 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 966 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 967 return -ENOENT; 968 } 969 970 /* If cluster name is provided */ 971 if (name) { 972 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 973 if (strcmp(name, entry->name) == 0) { 974 w = spdk_jsonrpc_begin_result(request); 975 dump_single_cluster_entry(entry, w); 976 spdk_jsonrpc_end_result(request, w); 977 978 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 979 return 0; 980 } 981 } 982 983 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 984 return -ENOENT; 985 } 986 987 w = spdk_jsonrpc_begin_result(request); 988 spdk_json_write_array_begin(w); 989 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 990 dump_single_cluster_entry(entry, w); 991 } 992 spdk_json_write_array_end(w); 993 spdk_jsonrpc_end_result(request, w); 994 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 995 996 return 0; 997 } 998 999 static const struct spdk_bdev_fn_table rbd_fn_table = { 1000 .destruct = bdev_rbd_destruct, 1001 .submit_request = bdev_rbd_submit_request, 1002 .io_type_supported = bdev_rbd_io_type_supported, 1003 .get_io_channel = bdev_rbd_get_io_channel, 1004 .dump_info_json = bdev_rbd_dump_info_json, 1005 .write_config_json = bdev_rbd_write_config_json, 1006 }; 1007 1008 static int 1009 rbd_thread_set_cpumask(struct spdk_cpuset *set) 1010 { 1011 #ifdef __linux__ 1012 uint32_t lcore; 1013 cpu_set_t mask; 1014 1015 assert(set != NULL); 1016 CPU_ZERO(&mask); 1017 1018 /* get the core id on current spdk_cpuset and set to cpu_set_t */ 1019 for (lcore = 0; lcore < SPDK_CPUSET_SIZE; lcore++) { 1020 if (spdk_cpuset_get_cpu(set, lcore)) { 1021 CPU_SET(lcore, &mask); 1022 } 1023 } 1024 1025 /* change current thread core mask */ 1026 if (sched_setaffinity(0, sizeof(mask), &mask) < 0) { 1027 SPDK_ERRLOG("Set non SPDK thread cpu mask error (errno=%d)\n", errno); 1028 return -1; 1029 } 1030 1031 return 0; 1032 #else 1033 SPDK_ERRLOG("SPDK non spdk thread cpumask setup supports only Linux platform now.\n"); 1034 return -ENOTSUP; 1035 #endif 1036 } 1037 1038 1039 static int 1040 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 1041 const char *config_file, const char *key_file, const char *core_mask) 1042 { 1043 struct bdev_rbd_cluster *entry; 1044 struct spdk_cpuset rbd_core_mask = {}; 1045 int rc; 1046 1047 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1048 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1049 if (strcmp(name, entry->name) == 0) { 1050 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 1051 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1052 return -1; 1053 } 1054 } 1055 1056 entry = calloc(1, sizeof(*entry)); 1057 if (!entry) { 1058 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 1059 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1060 return -1; 1061 } 1062 1063 entry->name = strdup(name); 1064 if (entry->name == NULL) { 1065 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1066 goto err_handle; 1067 } 1068 1069 if (user_id) { 1070 entry->user_id = strdup(user_id); 1071 if (entry->user_id == NULL) { 1072 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1073 goto err_handle; 1074 } 1075 } 1076 1077 /* Support specify config_param or config_file separately, or both of them. */ 1078 if (config_param) { 1079 entry->config_param = bdev_rbd_dup_config(config_param); 1080 if (entry->config_param == NULL) { 1081 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1082 goto err_handle; 1083 } 1084 } 1085 1086 if (config_file) { 1087 entry->config_file = strdup(config_file); 1088 if (entry->config_file == NULL) { 1089 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1090 goto err_handle; 1091 } 1092 } 1093 1094 if (key_file) { 1095 entry->key_file = strdup(key_file); 1096 if (entry->key_file == NULL) { 1097 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 1098 goto err_handle; 1099 } 1100 } 1101 1102 if (core_mask) { 1103 entry->core_mask = strdup(core_mask); 1104 if (entry->core_mask == NULL) { 1105 SPDK_ERRLOG("Core_mask=%s allocation failed on entry = %p\n", core_mask, entry); 1106 goto err_handle; 1107 } 1108 1109 if (spdk_cpuset_parse(&rbd_core_mask, entry->core_mask) < 0) { 1110 SPDK_ERRLOG("Invalid cpumask=%s on entry = %p\n", entry->core_mask, entry); 1111 goto err_handle; 1112 } 1113 1114 if (rbd_thread_set_cpumask(&rbd_core_mask) < 0) { 1115 SPDK_ERRLOG("Failed to change rbd threads to core_mask %s on entry = %p\n", core_mask, entry); 1116 goto err_handle; 1117 } 1118 } 1119 1120 1121 /* If rbd thread core mask is given, rados_create() must execute with 1122 * the affinity set by rbd_thread_set_cpumask(). The affinity set 1123 * by rbd_thread_set_cpumask() will be reverted once rbd_register_cluster() returns 1124 * and when we leave the spdk_call_unaffinitized context. */ 1125 rc = rados_create(&entry->cluster, user_id); 1126 if (rc < 0) { 1127 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1128 goto err_handle; 1129 } 1130 1131 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 1132 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1133 if (entry->config_file && rc < 0) { 1134 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 1135 rados_shutdown(entry->cluster); 1136 goto err_handle; 1137 } 1138 1139 if (config_param) { 1140 const char *const *config_entry = config_param; 1141 while (*config_entry) { 1142 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1143 if (rc < 0) { 1144 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1145 rados_shutdown(entry->cluster); 1146 goto err_handle; 1147 } 1148 config_entry += 2; 1149 } 1150 } 1151 1152 if (key_file) { 1153 rc = rados_conf_set(entry->cluster, "keyring", key_file); 1154 if (rc < 0) { 1155 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 1156 rados_shutdown(entry->cluster); 1157 goto err_handle; 1158 } 1159 } 1160 1161 rc = rados_connect(entry->cluster); 1162 if (rc < 0) { 1163 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1164 rados_shutdown(entry->cluster); 1165 goto err_handle; 1166 } 1167 1168 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1169 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1170 1171 return 0; 1172 1173 err_handle: 1174 bdev_rbd_cluster_free(entry); 1175 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1176 return -1; 1177 } 1178 1179 int 1180 bdev_rbd_unregister_cluster(const char *name) 1181 { 1182 struct bdev_rbd_cluster *entry; 1183 int rc = 0; 1184 1185 if (name == NULL) { 1186 return -1; 1187 } 1188 1189 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1190 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1191 if (strcmp(name, entry->name) == 0) { 1192 if (entry->ref == 0) { 1193 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1194 rados_shutdown(entry->cluster); 1195 bdev_rbd_cluster_free(entry); 1196 } else { 1197 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1198 entry->name); 1199 rc = -1; 1200 } 1201 1202 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1203 return rc; 1204 } 1205 } 1206 1207 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1208 1209 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1210 1211 return -1; 1212 } 1213 1214 static void * 1215 _bdev_rbd_register_cluster(void *arg) 1216 { 1217 struct cluster_register_info *info = arg; 1218 void *ret = arg; 1219 int rc; 1220 1221 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1222 (const char *const *)info->config_param, (const char *)info->config_file, 1223 (const char *)info->key_file, info->core_mask); 1224 if (rc) { 1225 ret = NULL; 1226 } 1227 1228 return ret; 1229 } 1230 1231 int 1232 bdev_rbd_register_cluster(struct cluster_register_info *info) 1233 { 1234 assert(info != NULL); 1235 1236 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1237 * resource contention */ 1238 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1239 return -1; 1240 } 1241 1242 return 0; 1243 } 1244 1245 int 1246 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1247 const char *pool_name, 1248 const char *const *config, 1249 const char *rbd_name, 1250 uint32_t block_size, 1251 const char *cluster_name, 1252 const struct spdk_uuid *uuid) 1253 { 1254 struct bdev_rbd *rbd; 1255 int ret; 1256 1257 if ((pool_name == NULL) || (rbd_name == NULL)) { 1258 return -EINVAL; 1259 } 1260 1261 rbd = calloc(1, sizeof(struct bdev_rbd)); 1262 if (rbd == NULL) { 1263 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1264 return -ENOMEM; 1265 } 1266 1267 rbd->rbd_name = strdup(rbd_name); 1268 if (!rbd->rbd_name) { 1269 bdev_rbd_free(rbd); 1270 return -ENOMEM; 1271 } 1272 1273 if (user_id) { 1274 rbd->user_id = strdup(user_id); 1275 if (!rbd->user_id) { 1276 bdev_rbd_free(rbd); 1277 return -ENOMEM; 1278 } 1279 } 1280 1281 if (cluster_name) { 1282 rbd->cluster_name = strdup(cluster_name); 1283 if (!rbd->cluster_name) { 1284 bdev_rbd_free(rbd); 1285 return -ENOMEM; 1286 } 1287 } 1288 rbd->pool_name = strdup(pool_name); 1289 if (!rbd->pool_name) { 1290 bdev_rbd_free(rbd); 1291 return -ENOMEM; 1292 } 1293 1294 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1295 bdev_rbd_free(rbd); 1296 return -ENOMEM; 1297 } 1298 1299 ret = bdev_rbd_init(rbd); 1300 if (ret < 0) { 1301 bdev_rbd_free(rbd); 1302 SPDK_ERRLOG("Failed to init rbd device\n"); 1303 return ret; 1304 } 1305 1306 if (uuid) { 1307 rbd->disk.uuid = *uuid; 1308 } 1309 1310 if (name) { 1311 rbd->disk.name = strdup(name); 1312 } else { 1313 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1314 } 1315 if (!rbd->disk.name) { 1316 bdev_rbd_free(rbd); 1317 return -ENOMEM; 1318 } 1319 rbd->disk.product_name = "Ceph Rbd Disk"; 1320 bdev_rbd_count++; 1321 1322 rbd->disk.write_cache = 0; 1323 rbd->disk.blocklen = block_size; 1324 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1325 rbd->disk.ctxt = rbd; 1326 rbd->disk.fn_table = &rbd_fn_table; 1327 rbd->disk.module = &rbd_if; 1328 1329 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1330 1331 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1332 bdev_rbd_destroy_cb, 1333 sizeof(struct bdev_rbd_io_channel), 1334 rbd_name); 1335 ret = spdk_bdev_register(&rbd->disk); 1336 if (ret) { 1337 spdk_io_device_unregister(rbd, NULL); 1338 bdev_rbd_free(rbd); 1339 return ret; 1340 } 1341 1342 *bdev = &(rbd->disk); 1343 1344 return ret; 1345 } 1346 1347 void 1348 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1349 { 1350 int rc; 1351 1352 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1353 if (rc != 0) { 1354 cb_fn(cb_arg, rc); 1355 } 1356 } 1357 1358 static void 1359 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1360 { 1361 } 1362 1363 int 1364 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1365 { 1366 struct spdk_bdev_desc *desc; 1367 struct spdk_bdev *bdev; 1368 struct spdk_io_channel *ch; 1369 struct bdev_rbd_io_channel *rbd_io_ch; 1370 int rc = 0; 1371 uint64_t new_size_in_byte; 1372 uint64_t current_size_in_mb; 1373 1374 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1375 if (rc != 0) { 1376 return rc; 1377 } 1378 1379 bdev = spdk_bdev_desc_get_bdev(desc); 1380 1381 if (bdev->module != &rbd_if) { 1382 rc = -EINVAL; 1383 goto exit; 1384 } 1385 1386 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1387 if (current_size_in_mb > new_size_in_mb) { 1388 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1389 rc = -EINVAL; 1390 goto exit; 1391 } 1392 1393 ch = bdev_rbd_get_io_channel(bdev); 1394 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1395 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1396 1397 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1398 spdk_put_io_channel(ch); 1399 if (rc != 0) { 1400 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1401 goto exit; 1402 } 1403 1404 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1405 if (rc != 0) { 1406 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1407 } 1408 1409 exit: 1410 spdk_bdev_close(desc); 1411 return rc; 1412 } 1413 1414 static int 1415 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1416 { 1417 return 0; 1418 } 1419 1420 static void 1421 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1422 { 1423 } 1424 1425 static int 1426 bdev_rbd_library_init(void) 1427 { 1428 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1429 0, "bdev_rbd_poll_groups"); 1430 return 0; 1431 } 1432 1433 static void 1434 bdev_rbd_library_fini(void) 1435 { 1436 spdk_io_device_unregister(&rbd_if, NULL); 1437 } 1438 1439 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1440