1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "bdev_rbd.h" 9 10 #include <rbd/librbd.h> 11 #include <rados/librados.h> 12 13 #include "spdk/env.h" 14 #include "spdk/bdev.h" 15 #include "spdk/thread.h" 16 #include "spdk/json.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/likely.h" 20 21 #include "spdk/bdev_module.h" 22 #include "spdk/log.h" 23 24 static int bdev_rbd_count = 0; 25 26 struct bdev_rbd { 27 struct spdk_bdev disk; 28 char *rbd_name; 29 char *user_id; 30 char *pool_name; 31 char **config; 32 33 rados_t cluster; 34 rados_t *cluster_p; 35 char *cluster_name; 36 37 rados_ioctx_t io_ctx; 38 rbd_image_t image; 39 40 rbd_image_info_t info; 41 pthread_mutex_t mutex; 42 struct spdk_thread *main_td; 43 struct spdk_thread *destruct_td; 44 uint32_t ch_count; 45 struct spdk_io_channel *group_ch; 46 47 TAILQ_ENTRY(bdev_rbd) tailq; 48 struct spdk_poller *reset_timer; 49 struct spdk_bdev_io *reset_bdev_io; 50 }; 51 52 struct bdev_rbd_io_channel { 53 struct bdev_rbd *disk; 54 }; 55 56 struct bdev_rbd_io { 57 struct spdk_thread *submit_td; 58 enum spdk_bdev_io_status status; 59 rbd_completion_t comp; 60 size_t total_len; 61 }; 62 63 struct bdev_rbd_cluster { 64 char *name; 65 char *user_id; 66 char **config_param; 67 char *config_file; 68 char *key_file; 69 rados_t cluster; 70 uint32_t ref; 71 STAILQ_ENTRY(bdev_rbd_cluster) link; 72 }; 73 74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 75 g_map_bdev_rbd_cluster); 76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 77 78 static void 79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 80 { 81 assert(entry != NULL); 82 83 bdev_rbd_free_config(entry->config_param); 84 free(entry->config_file); 85 free(entry->key_file); 86 free(entry->user_id); 87 free(entry->name); 88 free(entry); 89 } 90 91 static void 92 bdev_rbd_put_cluster(rados_t **cluster) 93 { 94 struct bdev_rbd_cluster *entry; 95 96 assert(cluster != NULL); 97 98 /* No need go through the map if *cluster equals to NULL */ 99 if (*cluster == NULL) { 100 return; 101 } 102 103 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 104 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 105 if (*cluster != &entry->cluster) { 106 continue; 107 } 108 109 assert(entry->ref > 0); 110 entry->ref--; 111 *cluster = NULL; 112 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 113 return; 114 } 115 116 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 117 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 118 } 119 120 static void 121 bdev_rbd_free(struct bdev_rbd *rbd) 122 { 123 if (!rbd) { 124 return; 125 } 126 127 free(rbd->disk.name); 128 free(rbd->rbd_name); 129 free(rbd->user_id); 130 free(rbd->pool_name); 131 bdev_rbd_free_config(rbd->config); 132 133 if (rbd->io_ctx) { 134 rados_ioctx_destroy(rbd->io_ctx); 135 } 136 137 if (rbd->cluster_name) { 138 bdev_rbd_put_cluster(&rbd->cluster_p); 139 free(rbd->cluster_name); 140 } else if (rbd->cluster) { 141 rados_shutdown(rbd->cluster); 142 } 143 144 pthread_mutex_destroy(&rbd->mutex); 145 free(rbd); 146 } 147 148 void 149 bdev_rbd_free_config(char **config) 150 { 151 char **entry; 152 153 if (config) { 154 for (entry = config; *entry; entry++) { 155 free(*entry); 156 } 157 free(config); 158 } 159 } 160 161 char ** 162 bdev_rbd_dup_config(const char *const *config) 163 { 164 size_t count; 165 char **copy; 166 167 if (!config) { 168 return NULL; 169 } 170 for (count = 0; config[count]; count++) {} 171 copy = calloc(count + 1, sizeof(*copy)); 172 if (!copy) { 173 return NULL; 174 } 175 for (count = 0; config[count]; count++) { 176 if (!(copy[count] = strdup(config[count]))) { 177 bdev_rbd_free_config(copy); 178 return NULL; 179 } 180 } 181 return copy; 182 } 183 184 static int 185 bdev_rados_cluster_init(const char *user_id, const char *const *config, 186 rados_t *cluster) 187 { 188 int ret; 189 190 ret = rados_create(cluster, user_id); 191 if (ret < 0) { 192 SPDK_ERRLOG("Failed to create rados_t struct\n"); 193 return -1; 194 } 195 196 if (config) { 197 const char *const *entry = config; 198 while (*entry) { 199 ret = rados_conf_set(*cluster, entry[0], entry[1]); 200 if (ret < 0) { 201 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 202 rados_shutdown(*cluster); 203 *cluster = NULL; 204 return -1; 205 } 206 entry += 2; 207 } 208 } else { 209 ret = rados_conf_read_file(*cluster, NULL); 210 if (ret < 0) { 211 SPDK_ERRLOG("Failed to read conf file\n"); 212 rados_shutdown(*cluster); 213 *cluster = NULL; 214 return -1; 215 } 216 } 217 218 ret = rados_connect(*cluster); 219 if (ret < 0) { 220 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 221 rados_shutdown(*cluster); 222 *cluster = NULL; 223 return -1; 224 } 225 226 return 0; 227 } 228 229 static int 230 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 231 { 232 struct bdev_rbd_cluster *entry; 233 234 if (cluster == NULL) { 235 SPDK_ERRLOG("cluster should not be NULL\n"); 236 return -1; 237 } 238 239 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 240 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 241 if (strcmp(cluster_name, entry->name) == 0) { 242 entry->ref++; 243 *cluster = &entry->cluster; 244 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 245 return 0; 246 } 247 } 248 249 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 250 return -1; 251 } 252 253 static int 254 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 255 { 256 int ret; 257 258 ret = bdev_rbd_get_cluster(cluster_name, cluster); 259 if (ret < 0) { 260 SPDK_ERRLOG("Failed to create rados_t struct\n"); 261 return -1; 262 } 263 264 return ret; 265 } 266 267 static void * 268 bdev_rbd_cluster_handle(void *arg) 269 { 270 void *ret = arg; 271 struct bdev_rbd *rbd = arg; 272 int rc; 273 274 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 275 &rbd->cluster); 276 if (rc < 0) { 277 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 278 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 279 ret = NULL; 280 } 281 282 return ret; 283 } 284 285 static void * 286 bdev_rbd_init_context(void *arg) 287 { 288 struct bdev_rbd *rbd = arg; 289 int rc; 290 291 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 292 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 293 return NULL; 294 } 295 296 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 297 if (rc < 0) { 298 SPDK_ERRLOG("Failed to open specified rbd device\n"); 299 return NULL; 300 } 301 302 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 303 rbd_close(rbd->image); 304 if (rc < 0) { 305 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 306 return NULL; 307 } 308 309 return arg; 310 } 311 312 static int 313 bdev_rbd_init(struct bdev_rbd *rbd) 314 { 315 int ret = 0; 316 317 if (!rbd->cluster_name) { 318 rbd->cluster_p = &rbd->cluster; 319 /* Cluster should be created in non-SPDK thread to avoid conflict between 320 * Rados and SPDK thread */ 321 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 322 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 323 return -1; 324 } 325 } else { 326 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 327 if (ret < 0) { 328 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 329 rbd, rbd->cluster_name); 330 return -1; 331 } 332 } 333 334 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 335 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 336 return -1; 337 } 338 339 return ret; 340 } 341 342 static void 343 bdev_rbd_exit(rbd_image_t image) 344 { 345 rbd_flush(image); 346 rbd_close(image); 347 } 348 349 static void 350 _bdev_rbd_io_complete(void *_rbd_io) 351 { 352 struct bdev_rbd_io *rbd_io = _rbd_io; 353 354 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 355 } 356 357 static void 358 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 359 { 360 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 361 struct spdk_thread *current_thread = spdk_get_thread(); 362 363 rbd_io->status = status; 364 assert(rbd_io->submit_td != NULL); 365 if (rbd_io->submit_td != current_thread) { 366 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 367 } else { 368 _bdev_rbd_io_complete(rbd_io); 369 } 370 } 371 372 static void 373 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 374 { 375 int io_status; 376 struct spdk_bdev_io *bdev_io; 377 struct bdev_rbd_io *rbd_io; 378 enum spdk_bdev_io_status bio_status; 379 380 bdev_io = rbd_aio_get_arg(cb); 381 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 382 io_status = rbd_aio_get_return_value(cb); 383 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 384 385 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 386 if ((int)rbd_io->total_len != io_status) { 387 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 388 } 389 } else { 390 /* For others, 0 means success */ 391 if (io_status != 0) { 392 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 393 } 394 } 395 396 rbd_aio_release(cb); 397 398 bdev_rbd_io_complete(bdev_io, bio_status); 399 } 400 401 static void 402 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 403 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 404 { 405 int ret; 406 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 407 rbd_image_t image = disk->image; 408 409 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 410 &rbd_io->comp); 411 if (ret < 0) { 412 goto err; 413 } 414 415 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 416 rbd_io->total_len = len; 417 if (spdk_likely(iovcnt == 1)) { 418 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 419 } else { 420 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 421 } 422 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 423 if (spdk_likely(iovcnt == 1)) { 424 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 425 } else { 426 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 427 } 428 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) { 429 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 430 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 431 ret = rbd_aio_flush(image, rbd_io->comp); 432 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) { 433 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0); 434 } 435 436 if (ret < 0) { 437 rbd_aio_release(rbd_io->comp); 438 goto err; 439 } 440 441 return; 442 443 err: 444 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 445 } 446 447 static void 448 bdev_rbd_start_aio(void *ctx) 449 { 450 struct spdk_bdev_io *bdev_io = ctx; 451 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 452 453 _bdev_rbd_start_aio(disk, 454 bdev_io, 455 bdev_io->u.bdev.iovs, 456 bdev_io->u.bdev.iovcnt, 457 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 458 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 459 } 460 461 static int bdev_rbd_library_init(void); 462 static void bdev_rbd_library_fini(void); 463 464 static int 465 bdev_rbd_get_ctx_size(void) 466 { 467 return sizeof(struct bdev_rbd_io); 468 } 469 470 static struct spdk_bdev_module rbd_if = { 471 .name = "rbd", 472 .module_init = bdev_rbd_library_init, 473 .module_fini = bdev_rbd_library_fini, 474 .get_ctx_size = bdev_rbd_get_ctx_size, 475 476 }; 477 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 478 479 static int bdev_rbd_reset_timer(void *arg); 480 481 static void 482 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd, 483 void *cb_arg, int rc) 484 { 485 struct bdev_rbd *disk = cb_arg; 486 enum spdk_bdev_io_status bio_status; 487 488 if (rc == 0 && current_qd > 0) { 489 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000); 490 return; 491 } 492 493 if (rc != 0) { 494 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 495 } else { 496 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 497 } 498 499 bdev_rbd_io_complete(disk->reset_bdev_io, bio_status); 500 disk->reset_bdev_io = NULL; 501 } 502 503 static int 504 bdev_rbd_reset_timer(void *arg) 505 { 506 struct bdev_rbd *disk = arg; 507 508 spdk_poller_unregister(&disk->reset_timer); 509 510 spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk); 511 512 return SPDK_POLLER_BUSY; 513 } 514 515 static void 516 bdev_rbd_reset(void *ctx) 517 { 518 struct spdk_bdev_io *bdev_io = ctx; 519 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 520 521 /* 522 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 523 * poller to wait for in-flight I/O to complete. 524 */ 525 assert(disk->reset_bdev_io == NULL); 526 disk->reset_bdev_io = bdev_io; 527 528 bdev_rbd_reset_timer(disk); 529 } 530 531 static void 532 _bdev_rbd_destruct_done(void *io_device) 533 { 534 struct bdev_rbd *rbd = io_device; 535 536 assert(rbd != NULL); 537 assert(rbd->ch_count == 0); 538 539 spdk_bdev_destruct_done(&rbd->disk, 0); 540 bdev_rbd_free(rbd); 541 } 542 543 static void 544 bdev_rbd_free_cb(void *io_device) 545 { 546 struct bdev_rbd *rbd = io_device; 547 548 /* The io device has been unregistered. Send a message back to the 549 * original thread that started the destruct operation, so that the 550 * bdev unregister callback is invoked on the same thread that started 551 * this whole process. 552 */ 553 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 554 } 555 556 static void 557 _bdev_rbd_destruct(void *ctx) 558 { 559 struct bdev_rbd *rbd = ctx; 560 561 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 562 } 563 564 static int 565 bdev_rbd_destruct(void *ctx) 566 { 567 struct bdev_rbd *rbd = ctx; 568 struct spdk_thread *td; 569 570 if (rbd->main_td == NULL) { 571 td = spdk_get_thread(); 572 } else { 573 td = rbd->main_td; 574 } 575 576 /* Start the destruct operation on the rbd bdev's 577 * main thread. This guarantees it will only start 578 * executing after any messages related to channel 579 * deletions have finished completing. *Always* 580 * send a message, even if this function gets called 581 * from the main thread, in case there are pending 582 * channel delete messages in flight to this thread. 583 */ 584 assert(rbd->destruct_td == NULL); 585 rbd->destruct_td = td; 586 spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd); 587 588 /* Return 1 to indicate the destruct path is asynchronous. */ 589 return 1; 590 } 591 592 static void 593 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 594 bool success) 595 { 596 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 597 598 if (!success) { 599 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 600 return; 601 } 602 603 spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io); 604 } 605 606 static void 607 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 608 { 609 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 610 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 611 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 612 613 rbd_io->submit_td = submit_td; 614 switch (bdev_io->type) { 615 case SPDK_BDEV_IO_TYPE_READ: 616 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 617 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 618 break; 619 620 case SPDK_BDEV_IO_TYPE_WRITE: 621 case SPDK_BDEV_IO_TYPE_UNMAP: 622 case SPDK_BDEV_IO_TYPE_FLUSH: 623 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 624 spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io); 625 break; 626 627 case SPDK_BDEV_IO_TYPE_RESET: 628 spdk_thread_exec_msg(disk->main_td, bdev_rbd_reset, bdev_io); 629 break; 630 631 default: 632 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 633 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 634 break; 635 } 636 } 637 638 static bool 639 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 640 { 641 switch (io_type) { 642 case SPDK_BDEV_IO_TYPE_READ: 643 case SPDK_BDEV_IO_TYPE_WRITE: 644 case SPDK_BDEV_IO_TYPE_UNMAP: 645 case SPDK_BDEV_IO_TYPE_FLUSH: 646 case SPDK_BDEV_IO_TYPE_RESET: 647 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 648 return true; 649 650 default: 651 return false; 652 } 653 } 654 655 static void 656 bdev_rbd_free_channel_resources(struct bdev_rbd *disk) 657 { 658 assert(disk != NULL); 659 assert(disk->main_td == spdk_get_thread()); 660 assert(disk->ch_count == 0); 661 662 spdk_put_io_channel(disk->group_ch); 663 if (disk->image) { 664 bdev_rbd_exit(disk->image); 665 } 666 667 disk->main_td = NULL; 668 disk->group_ch = NULL; 669 } 670 671 static void * 672 bdev_rbd_handle(void *arg) 673 { 674 struct bdev_rbd *disk = arg; 675 void *ret = arg; 676 677 if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) { 678 SPDK_ERRLOG("Failed to open specified rbd device\n"); 679 ret = NULL; 680 } 681 682 return ret; 683 } 684 685 static int 686 _bdev_rbd_create_cb(struct bdev_rbd *disk) 687 { 688 disk->group_ch = spdk_get_io_channel(&rbd_if); 689 assert(disk->group_ch != NULL); 690 691 if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) { 692 bdev_rbd_free_channel_resources(disk); 693 return -1; 694 } 695 696 return 0; 697 } 698 699 static int 700 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 701 { 702 struct bdev_rbd_io_channel *ch = ctx_buf; 703 struct bdev_rbd *disk = io_device; 704 int rc; 705 706 ch->disk = disk; 707 pthread_mutex_lock(&disk->mutex); 708 if (disk->ch_count == 0) { 709 assert(disk->main_td == NULL); 710 rc = _bdev_rbd_create_cb(disk); 711 if (rc) { 712 SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk); 713 pthread_mutex_unlock(&disk->mutex); 714 return rc; 715 } 716 717 disk->main_td = spdk_get_thread(); 718 } 719 720 disk->ch_count++; 721 pthread_mutex_unlock(&disk->mutex); 722 723 return 0; 724 } 725 726 static void 727 _bdev_rbd_destroy_cb(void *ctx) 728 { 729 struct bdev_rbd *disk = ctx; 730 731 pthread_mutex_lock(&disk->mutex); 732 assert(disk->ch_count > 0); 733 disk->ch_count--; 734 735 if (disk->ch_count > 0) { 736 /* A new channel was created between when message was sent and this function executed */ 737 pthread_mutex_unlock(&disk->mutex); 738 return; 739 } 740 741 bdev_rbd_free_channel_resources(disk); 742 pthread_mutex_unlock(&disk->mutex); 743 } 744 745 static void 746 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 747 { 748 struct bdev_rbd *disk = io_device; 749 struct spdk_thread *thread; 750 751 pthread_mutex_lock(&disk->mutex); 752 assert(disk->ch_count > 0); 753 disk->ch_count--; 754 if (disk->ch_count == 0) { 755 assert(disk->main_td != NULL); 756 if (disk->main_td != spdk_get_thread()) { 757 /* The final channel was destroyed on a different thread 758 * than where the first channel was created. Pass a message 759 * to the main thread to unregister the poller. */ 760 disk->ch_count++; 761 thread = disk->main_td; 762 pthread_mutex_unlock(&disk->mutex); 763 spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk); 764 return; 765 } 766 767 bdev_rbd_free_channel_resources(disk); 768 } 769 pthread_mutex_unlock(&disk->mutex); 770 } 771 772 static struct spdk_io_channel * 773 bdev_rbd_get_io_channel(void *ctx) 774 { 775 struct bdev_rbd *rbd_bdev = ctx; 776 777 return spdk_get_io_channel(rbd_bdev); 778 } 779 780 static void 781 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 782 { 783 struct bdev_rbd_cluster *entry; 784 785 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 786 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 787 if (strcmp(cluster_name, entry->name)) { 788 continue; 789 } 790 if (entry->user_id) { 791 spdk_json_write_named_string(w, "user_id", entry->user_id); 792 } 793 794 if (entry->config_param) { 795 char **config_entry = entry->config_param; 796 797 spdk_json_write_named_object_begin(w, "config_param"); 798 while (*config_entry) { 799 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 800 config_entry += 2; 801 } 802 spdk_json_write_object_end(w); 803 } 804 if (entry->config_file) { 805 spdk_json_write_named_string(w, "config_file", entry->config_file); 806 } 807 if (entry->key_file) { 808 spdk_json_write_named_string(w, "key_file", entry->key_file); 809 } 810 811 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 812 return; 813 } 814 815 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 816 } 817 818 static int 819 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 820 { 821 struct bdev_rbd *rbd_bdev = ctx; 822 823 spdk_json_write_named_object_begin(w, "rbd"); 824 825 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 826 827 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 828 829 if (rbd_bdev->cluster_name) { 830 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 831 goto end; 832 } 833 834 if (rbd_bdev->user_id) { 835 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 836 } 837 838 if (rbd_bdev->config) { 839 char **entry = rbd_bdev->config; 840 841 spdk_json_write_named_object_begin(w, "config"); 842 while (*entry) { 843 spdk_json_write_named_string(w, entry[0], entry[1]); 844 entry += 2; 845 } 846 spdk_json_write_object_end(w); 847 } 848 849 end: 850 spdk_json_write_object_end(w); 851 852 return 0; 853 } 854 855 static void 856 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 857 { 858 struct bdev_rbd *rbd = bdev->ctxt; 859 char uuid_str[SPDK_UUID_STRING_LEN]; 860 861 spdk_json_write_object_begin(w); 862 863 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 864 865 spdk_json_write_named_object_begin(w, "params"); 866 spdk_json_write_named_string(w, "name", bdev->name); 867 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 868 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 869 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 870 if (rbd->user_id) { 871 spdk_json_write_named_string(w, "user_id", rbd->user_id); 872 } 873 874 if (rbd->config) { 875 char **entry = rbd->config; 876 877 spdk_json_write_named_object_begin(w, "config"); 878 while (*entry) { 879 spdk_json_write_named_string(w, entry[0], entry[1]); 880 entry += 2; 881 } 882 spdk_json_write_object_end(w); 883 } 884 885 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 886 spdk_json_write_named_string(w, "uuid", uuid_str); 887 888 spdk_json_write_object_end(w); 889 890 spdk_json_write_object_end(w); 891 } 892 893 static void 894 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 895 { 896 assert(entry != NULL); 897 898 spdk_json_write_object_begin(w); 899 spdk_json_write_named_string(w, "cluster_name", entry->name); 900 901 if (entry->user_id) { 902 spdk_json_write_named_string(w, "user_id", entry->user_id); 903 } 904 905 if (entry->config_param) { 906 char **config_entry = entry->config_param; 907 908 spdk_json_write_named_object_begin(w, "config_param"); 909 while (*config_entry) { 910 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 911 config_entry += 2; 912 } 913 spdk_json_write_object_end(w); 914 } 915 if (entry->config_file) { 916 spdk_json_write_named_string(w, "config_file", entry->config_file); 917 } 918 if (entry->key_file) { 919 spdk_json_write_named_string(w, "key_file", entry->key_file); 920 } 921 922 spdk_json_write_object_end(w); 923 } 924 925 int 926 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 927 { 928 struct bdev_rbd_cluster *entry; 929 struct spdk_json_write_ctx *w; 930 931 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 932 933 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 934 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 935 return -ENOENT; 936 } 937 938 /* If cluster name is provided */ 939 if (name) { 940 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 941 if (strcmp(name, entry->name) == 0) { 942 w = spdk_jsonrpc_begin_result(request); 943 dump_single_cluster_entry(entry, w); 944 spdk_jsonrpc_end_result(request, w); 945 946 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 947 return 0; 948 } 949 } 950 951 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 952 return -ENOENT; 953 } 954 955 w = spdk_jsonrpc_begin_result(request); 956 spdk_json_write_array_begin(w); 957 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 958 dump_single_cluster_entry(entry, w); 959 } 960 spdk_json_write_array_end(w); 961 spdk_jsonrpc_end_result(request, w); 962 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 963 964 return 0; 965 } 966 967 static const struct spdk_bdev_fn_table rbd_fn_table = { 968 .destruct = bdev_rbd_destruct, 969 .submit_request = bdev_rbd_submit_request, 970 .io_type_supported = bdev_rbd_io_type_supported, 971 .get_io_channel = bdev_rbd_get_io_channel, 972 .dump_info_json = bdev_rbd_dump_info_json, 973 .write_config_json = bdev_rbd_write_config_json, 974 }; 975 976 static int 977 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 978 const char *config_file, const char *key_file) 979 { 980 struct bdev_rbd_cluster *entry; 981 int rc; 982 983 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 984 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 985 if (strcmp(name, entry->name) == 0) { 986 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 987 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 988 return -1; 989 } 990 } 991 992 entry = calloc(1, sizeof(*entry)); 993 if (!entry) { 994 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 995 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 996 return -1; 997 } 998 999 entry->name = strdup(name); 1000 if (entry->name == NULL) { 1001 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1002 goto err_handle; 1003 } 1004 1005 if (user_id) { 1006 entry->user_id = strdup(user_id); 1007 if (entry->user_id == NULL) { 1008 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1009 goto err_handle; 1010 } 1011 } 1012 1013 /* Support specify config_param or config_file separately, or both of them. */ 1014 if (config_param) { 1015 entry->config_param = bdev_rbd_dup_config(config_param); 1016 if (entry->config_param == NULL) { 1017 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1018 goto err_handle; 1019 } 1020 } 1021 1022 if (config_file) { 1023 entry->config_file = strdup(config_file); 1024 if (entry->config_file == NULL) { 1025 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1026 goto err_handle; 1027 } 1028 } 1029 1030 if (key_file) { 1031 entry->key_file = strdup(key_file); 1032 if (entry->key_file == NULL) { 1033 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 1034 goto err_handle; 1035 } 1036 } 1037 1038 rc = rados_create(&entry->cluster, user_id); 1039 if (rc < 0) { 1040 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1041 goto err_handle; 1042 } 1043 1044 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 1045 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1046 if (entry->config_file && rc < 0) { 1047 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 1048 rados_shutdown(entry->cluster); 1049 goto err_handle; 1050 } 1051 1052 if (config_param) { 1053 const char *const *config_entry = config_param; 1054 while (*config_entry) { 1055 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1056 if (rc < 0) { 1057 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1058 rados_shutdown(entry->cluster); 1059 goto err_handle; 1060 } 1061 config_entry += 2; 1062 } 1063 } 1064 1065 if (key_file) { 1066 rc = rados_conf_set(entry->cluster, "keyring", key_file); 1067 if (rc < 0) { 1068 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 1069 rados_shutdown(entry->cluster); 1070 goto err_handle; 1071 } 1072 } 1073 1074 rc = rados_connect(entry->cluster); 1075 if (rc < 0) { 1076 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1077 rados_shutdown(entry->cluster); 1078 goto err_handle; 1079 } 1080 1081 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1082 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1083 1084 return 0; 1085 1086 err_handle: 1087 bdev_rbd_cluster_free(entry); 1088 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1089 return -1; 1090 } 1091 1092 int 1093 bdev_rbd_unregister_cluster(const char *name) 1094 { 1095 struct bdev_rbd_cluster *entry; 1096 int rc = 0; 1097 1098 if (name == NULL) { 1099 return -1; 1100 } 1101 1102 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1103 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1104 if (strcmp(name, entry->name) == 0) { 1105 if (entry->ref == 0) { 1106 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1107 rados_shutdown(entry->cluster); 1108 bdev_rbd_cluster_free(entry); 1109 } else { 1110 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1111 entry->name); 1112 rc = -1; 1113 } 1114 1115 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1116 return rc; 1117 } 1118 } 1119 1120 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1121 1122 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1123 1124 return -1; 1125 } 1126 1127 static void * 1128 _bdev_rbd_register_cluster(void *arg) 1129 { 1130 struct cluster_register_info *info = arg; 1131 void *ret = arg; 1132 int rc; 1133 1134 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1135 (const char *const *)info->config_param, (const char *)info->config_file, 1136 (const char *)info->key_file); 1137 if (rc) { 1138 ret = NULL; 1139 } 1140 1141 return ret; 1142 } 1143 1144 int 1145 bdev_rbd_register_cluster(struct cluster_register_info *info) 1146 { 1147 assert(info != NULL); 1148 1149 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1150 * resource contention */ 1151 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1152 return -1; 1153 } 1154 1155 return 0; 1156 } 1157 1158 int 1159 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1160 const char *pool_name, 1161 const char *const *config, 1162 const char *rbd_name, 1163 uint32_t block_size, 1164 const char *cluster_name, 1165 const struct spdk_uuid *uuid) 1166 { 1167 struct bdev_rbd *rbd; 1168 int ret; 1169 1170 if ((pool_name == NULL) || (rbd_name == NULL)) { 1171 return -EINVAL; 1172 } 1173 1174 rbd = calloc(1, sizeof(struct bdev_rbd)); 1175 if (rbd == NULL) { 1176 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1177 return -ENOMEM; 1178 } 1179 1180 ret = pthread_mutex_init(&rbd->mutex, NULL); 1181 if (ret) { 1182 SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name); 1183 free(rbd); 1184 return ret; 1185 } 1186 1187 rbd->rbd_name = strdup(rbd_name); 1188 if (!rbd->rbd_name) { 1189 bdev_rbd_free(rbd); 1190 return -ENOMEM; 1191 } 1192 1193 if (user_id) { 1194 rbd->user_id = strdup(user_id); 1195 if (!rbd->user_id) { 1196 bdev_rbd_free(rbd); 1197 return -ENOMEM; 1198 } 1199 } 1200 1201 if (cluster_name) { 1202 rbd->cluster_name = strdup(cluster_name); 1203 if (!rbd->cluster_name) { 1204 bdev_rbd_free(rbd); 1205 return -ENOMEM; 1206 } 1207 } 1208 rbd->pool_name = strdup(pool_name); 1209 if (!rbd->pool_name) { 1210 bdev_rbd_free(rbd); 1211 return -ENOMEM; 1212 } 1213 1214 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1215 bdev_rbd_free(rbd); 1216 return -ENOMEM; 1217 } 1218 1219 ret = bdev_rbd_init(rbd); 1220 if (ret < 0) { 1221 bdev_rbd_free(rbd); 1222 SPDK_ERRLOG("Failed to init rbd device\n"); 1223 return ret; 1224 } 1225 1226 if (uuid) { 1227 rbd->disk.uuid = *uuid; 1228 } else { 1229 spdk_uuid_generate(&rbd->disk.uuid); 1230 } 1231 1232 if (name) { 1233 rbd->disk.name = strdup(name); 1234 } else { 1235 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1236 } 1237 if (!rbd->disk.name) { 1238 bdev_rbd_free(rbd); 1239 return -ENOMEM; 1240 } 1241 rbd->disk.product_name = "Ceph Rbd Disk"; 1242 bdev_rbd_count++; 1243 1244 rbd->disk.write_cache = 0; 1245 rbd->disk.blocklen = block_size; 1246 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1247 rbd->disk.ctxt = rbd; 1248 rbd->disk.fn_table = &rbd_fn_table; 1249 rbd->disk.module = &rbd_if; 1250 1251 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1252 1253 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1254 bdev_rbd_destroy_cb, 1255 sizeof(struct bdev_rbd_io_channel), 1256 rbd_name); 1257 ret = spdk_bdev_register(&rbd->disk); 1258 if (ret) { 1259 spdk_io_device_unregister(rbd, NULL); 1260 bdev_rbd_free(rbd); 1261 return ret; 1262 } 1263 1264 *bdev = &(rbd->disk); 1265 1266 return ret; 1267 } 1268 1269 void 1270 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1271 { 1272 int rc; 1273 1274 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1275 if (rc != 0) { 1276 cb_fn(cb_arg, rc); 1277 } 1278 } 1279 1280 static void 1281 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1282 { 1283 } 1284 1285 int 1286 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1287 { 1288 struct spdk_bdev_desc *desc; 1289 struct spdk_bdev *bdev; 1290 struct spdk_io_channel *ch; 1291 struct bdev_rbd_io_channel *rbd_io_ch; 1292 int rc = 0; 1293 uint64_t new_size_in_byte; 1294 uint64_t current_size_in_mb; 1295 1296 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1297 if (rc != 0) { 1298 return rc; 1299 } 1300 1301 bdev = spdk_bdev_desc_get_bdev(desc); 1302 1303 if (bdev->module != &rbd_if) { 1304 rc = -EINVAL; 1305 goto exit; 1306 } 1307 1308 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1309 if (current_size_in_mb > new_size_in_mb) { 1310 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1311 rc = -EINVAL; 1312 goto exit; 1313 } 1314 1315 ch = bdev_rbd_get_io_channel(bdev); 1316 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1317 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1318 1319 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1320 spdk_put_io_channel(ch); 1321 if (rc != 0) { 1322 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1323 goto exit; 1324 } 1325 1326 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1327 if (rc != 0) { 1328 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1329 } 1330 1331 exit: 1332 spdk_bdev_close(desc); 1333 return rc; 1334 } 1335 1336 static int 1337 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1338 { 1339 return 0; 1340 } 1341 1342 static void 1343 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1344 { 1345 } 1346 1347 static int 1348 bdev_rbd_library_init(void) 1349 { 1350 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1351 0, "bdev_rbd_poll_groups"); 1352 return 0; 1353 } 1354 1355 static void 1356 bdev_rbd_library_fini(void) 1357 { 1358 spdk_io_device_unregister(&rbd_if, NULL); 1359 } 1360 1361 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1362