1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "bdev_rbd.h" 9 10 #include <rbd/librbd.h> 11 #include <rados/librados.h> 12 13 #include "spdk/env.h" 14 #include "spdk/bdev.h" 15 #include "spdk/thread.h" 16 #include "spdk/json.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/likely.h" 20 21 #include "spdk/bdev_module.h" 22 #include "spdk/log.h" 23 24 static int bdev_rbd_count = 0; 25 26 struct bdev_rbd { 27 struct spdk_bdev disk; 28 char *rbd_name; 29 char *user_id; 30 char *pool_name; 31 char **config; 32 33 rados_t cluster; 34 rados_t *cluster_p; 35 char *cluster_name; 36 37 rados_ioctx_t io_ctx; 38 rbd_image_t image; 39 40 rbd_image_info_t info; 41 pthread_mutex_t mutex; 42 struct spdk_thread *main_td; 43 struct spdk_thread *destruct_td; 44 uint32_t ch_count; 45 struct spdk_io_channel *group_ch; 46 47 TAILQ_ENTRY(bdev_rbd) tailq; 48 struct spdk_poller *reset_timer; 49 struct spdk_bdev_io *reset_bdev_io; 50 }; 51 52 struct bdev_rbd_io_channel { 53 struct bdev_rbd *disk; 54 }; 55 56 struct bdev_rbd_io { 57 struct spdk_thread *submit_td; 58 enum spdk_bdev_io_status status; 59 rbd_completion_t comp; 60 size_t total_len; 61 }; 62 63 struct bdev_rbd_cluster { 64 char *name; 65 char *user_id; 66 char **config_param; 67 char *config_file; 68 char *key_file; 69 rados_t cluster; 70 uint32_t ref; 71 STAILQ_ENTRY(bdev_rbd_cluster) link; 72 }; 73 74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 75 g_map_bdev_rbd_cluster); 76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 77 78 static void 79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 80 { 81 assert(entry != NULL); 82 83 bdev_rbd_free_config(entry->config_param); 84 free(entry->config_file); 85 free(entry->key_file); 86 free(entry->user_id); 87 free(entry->name); 88 free(entry); 89 } 90 91 static void 92 bdev_rbd_put_cluster(rados_t **cluster) 93 { 94 struct bdev_rbd_cluster *entry; 95 96 assert(cluster != NULL); 97 98 /* No need go through the map if *cluster equals to NULL */ 99 if (*cluster == NULL) { 100 return; 101 } 102 103 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 104 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 105 if (*cluster != &entry->cluster) { 106 continue; 107 } 108 109 assert(entry->ref > 0); 110 entry->ref--; 111 *cluster = NULL; 112 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 113 return; 114 } 115 116 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 117 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 118 } 119 120 static void 121 bdev_rbd_free(struct bdev_rbd *rbd) 122 { 123 if (!rbd) { 124 return; 125 } 126 127 free(rbd->disk.name); 128 free(rbd->rbd_name); 129 free(rbd->user_id); 130 free(rbd->pool_name); 131 bdev_rbd_free_config(rbd->config); 132 133 if (rbd->io_ctx) { 134 rados_ioctx_destroy(rbd->io_ctx); 135 } 136 137 if (rbd->cluster_name) { 138 bdev_rbd_put_cluster(&rbd->cluster_p); 139 free(rbd->cluster_name); 140 } else if (rbd->cluster) { 141 rados_shutdown(rbd->cluster); 142 } 143 144 pthread_mutex_destroy(&rbd->mutex); 145 free(rbd); 146 } 147 148 void 149 bdev_rbd_free_config(char **config) 150 { 151 char **entry; 152 153 if (config) { 154 for (entry = config; *entry; entry++) { 155 free(*entry); 156 } 157 free(config); 158 } 159 } 160 161 char ** 162 bdev_rbd_dup_config(const char *const *config) 163 { 164 size_t count; 165 char **copy; 166 167 if (!config) { 168 return NULL; 169 } 170 for (count = 0; config[count]; count++) {} 171 copy = calloc(count + 1, sizeof(*copy)); 172 if (!copy) { 173 return NULL; 174 } 175 for (count = 0; config[count]; count++) { 176 if (!(copy[count] = strdup(config[count]))) { 177 bdev_rbd_free_config(copy); 178 return NULL; 179 } 180 } 181 return copy; 182 } 183 184 static int 185 bdev_rados_cluster_init(const char *user_id, const char *const *config, 186 rados_t *cluster) 187 { 188 int ret; 189 190 ret = rados_create(cluster, user_id); 191 if (ret < 0) { 192 SPDK_ERRLOG("Failed to create rados_t struct\n"); 193 return -1; 194 } 195 196 if (config) { 197 const char *const *entry = config; 198 while (*entry) { 199 ret = rados_conf_set(*cluster, entry[0], entry[1]); 200 if (ret < 0) { 201 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 202 rados_shutdown(*cluster); 203 return -1; 204 } 205 entry += 2; 206 } 207 } else { 208 ret = rados_conf_read_file(*cluster, NULL); 209 if (ret < 0) { 210 SPDK_ERRLOG("Failed to read conf file\n"); 211 rados_shutdown(*cluster); 212 return -1; 213 } 214 } 215 216 ret = rados_connect(*cluster); 217 if (ret < 0) { 218 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 219 rados_shutdown(*cluster); 220 return -1; 221 } 222 223 return 0; 224 } 225 226 static int 227 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 228 { 229 struct bdev_rbd_cluster *entry; 230 231 if (cluster == NULL) { 232 SPDK_ERRLOG("cluster should not be NULL\n"); 233 return -1; 234 } 235 236 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 237 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 238 if (strcmp(cluster_name, entry->name) == 0) { 239 entry->ref++; 240 *cluster = &entry->cluster; 241 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 242 return 0; 243 } 244 } 245 246 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 247 return -1; 248 } 249 250 static int 251 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 252 { 253 int ret; 254 255 ret = bdev_rbd_get_cluster(cluster_name, cluster); 256 if (ret < 0) { 257 SPDK_ERRLOG("Failed to create rados_t struct\n"); 258 return -1; 259 } 260 261 return ret; 262 } 263 264 static void * 265 bdev_rbd_cluster_handle(void *arg) 266 { 267 void *ret = arg; 268 struct bdev_rbd *rbd = arg; 269 int rc; 270 271 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 272 &rbd->cluster); 273 if (rc < 0) { 274 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 275 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 276 ret = NULL; 277 } 278 279 return ret; 280 } 281 282 static void * 283 bdev_rbd_init_context(void *arg) 284 { 285 struct bdev_rbd *rbd = arg; 286 int rc; 287 288 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 289 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 290 return NULL; 291 } 292 293 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 294 if (rc < 0) { 295 SPDK_ERRLOG("Failed to open specified rbd device\n"); 296 return NULL; 297 } 298 299 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 300 rbd_close(rbd->image); 301 if (rc < 0) { 302 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 303 return NULL; 304 } 305 306 return arg; 307 } 308 309 static int 310 bdev_rbd_init(struct bdev_rbd *rbd) 311 { 312 int ret = 0; 313 314 if (!rbd->cluster_name) { 315 rbd->cluster_p = &rbd->cluster; 316 /* Cluster should be created in non-SPDK thread to avoid conflict between 317 * Rados and SPDK thread */ 318 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 319 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 320 return -1; 321 } 322 } else { 323 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 324 if (ret < 0) { 325 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 326 rbd, rbd->cluster_name); 327 return -1; 328 } 329 } 330 331 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 332 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 333 return -1; 334 } 335 336 return ret; 337 } 338 339 static void 340 bdev_rbd_exit(rbd_image_t image) 341 { 342 rbd_flush(image); 343 rbd_close(image); 344 } 345 346 static void 347 _bdev_rbd_io_complete(void *_rbd_io) 348 { 349 struct bdev_rbd_io *rbd_io = _rbd_io; 350 351 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 352 } 353 354 static void 355 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 356 { 357 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 358 struct spdk_thread *current_thread = spdk_get_thread(); 359 360 rbd_io->status = status; 361 assert(rbd_io->submit_td != NULL); 362 if (rbd_io->submit_td != current_thread) { 363 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 364 } else { 365 _bdev_rbd_io_complete(rbd_io); 366 } 367 } 368 369 static void 370 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 371 { 372 int io_status; 373 struct spdk_bdev_io *bdev_io; 374 struct bdev_rbd_io *rbd_io; 375 enum spdk_bdev_io_status bio_status; 376 377 bdev_io = rbd_aio_get_arg(cb); 378 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 379 io_status = rbd_aio_get_return_value(cb); 380 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 381 382 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 383 if ((int)rbd_io->total_len != io_status) { 384 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 385 } 386 } else { 387 /* For others, 0 means success */ 388 if (io_status != 0) { 389 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 390 } 391 } 392 393 rbd_aio_release(cb); 394 395 bdev_rbd_io_complete(bdev_io, bio_status); 396 } 397 398 static void 399 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 400 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 401 { 402 int ret; 403 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 404 rbd_image_t image = disk->image; 405 406 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 407 &rbd_io->comp); 408 if (ret < 0) { 409 goto err; 410 } 411 412 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 413 rbd_io->total_len = len; 414 if (spdk_likely(iovcnt == 1)) { 415 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 416 } else { 417 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 418 } 419 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 420 if (spdk_likely(iovcnt == 1)) { 421 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 422 } else { 423 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 424 } 425 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) { 426 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 427 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 428 ret = rbd_aio_flush(image, rbd_io->comp); 429 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) { 430 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0); 431 } 432 433 if (ret < 0) { 434 rbd_aio_release(rbd_io->comp); 435 goto err; 436 } 437 438 return; 439 440 err: 441 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 442 } 443 444 static int bdev_rbd_library_init(void); 445 static void bdev_rbd_library_fini(void); 446 447 static int 448 bdev_rbd_get_ctx_size(void) 449 { 450 return sizeof(struct bdev_rbd_io); 451 } 452 453 static struct spdk_bdev_module rbd_if = { 454 .name = "rbd", 455 .module_init = bdev_rbd_library_init, 456 .module_fini = bdev_rbd_library_fini, 457 .get_ctx_size = bdev_rbd_get_ctx_size, 458 459 }; 460 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 461 462 static int bdev_rbd_reset_timer(void *arg); 463 464 static void 465 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd, 466 void *cb_arg, int rc) 467 { 468 struct bdev_rbd *disk = cb_arg; 469 enum spdk_bdev_io_status bio_status; 470 471 if (rc == 0 && current_qd > 0) { 472 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000); 473 return; 474 } 475 476 if (rc != 0) { 477 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 478 } else { 479 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 480 } 481 482 bdev_rbd_io_complete(disk->reset_bdev_io, bio_status); 483 disk->reset_bdev_io = NULL; 484 } 485 486 static int 487 bdev_rbd_reset_timer(void *arg) 488 { 489 struct bdev_rbd *disk = arg; 490 491 spdk_poller_unregister(&disk->reset_timer); 492 493 spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk); 494 495 return SPDK_POLLER_BUSY; 496 } 497 498 static void 499 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 500 { 501 /* 502 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 503 * poller to wait for in-flight I/O to complete. 504 */ 505 assert(disk->reset_bdev_io == NULL); 506 disk->reset_bdev_io = bdev_io; 507 508 bdev_rbd_reset_timer(disk); 509 } 510 511 static void 512 _bdev_rbd_destruct_done(void *io_device) 513 { 514 struct bdev_rbd *rbd = io_device; 515 516 assert(rbd != NULL); 517 assert(rbd->ch_count == 0); 518 519 spdk_bdev_destruct_done(&rbd->disk, 0); 520 bdev_rbd_free(rbd); 521 } 522 523 static void 524 bdev_rbd_free_cb(void *io_device) 525 { 526 struct bdev_rbd *rbd = io_device; 527 528 /* The io device has been unregistered. Send a message back to the 529 * original thread that started the destruct operation, so that the 530 * bdev unregister callback is invoked on the same thread that started 531 * this whole process. 532 */ 533 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 534 } 535 536 static void 537 _bdev_rbd_destruct(void *ctx) 538 { 539 struct bdev_rbd *rbd = ctx; 540 541 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 542 } 543 544 static int 545 bdev_rbd_destruct(void *ctx) 546 { 547 struct bdev_rbd *rbd = ctx; 548 struct spdk_thread *td; 549 550 if (rbd->main_td == NULL) { 551 td = spdk_get_thread(); 552 } else { 553 td = rbd->main_td; 554 } 555 556 /* Start the destruct operation on the rbd bdev's 557 * main thread. This guarantees it will only start 558 * executing after any messages related to channel 559 * deletions have finished completing. *Always* 560 * send a message, even if this function gets called 561 * from the main thread, in case there are pending 562 * channel delete messages in flight to this thread. 563 */ 564 assert(rbd->destruct_td == NULL); 565 rbd->destruct_td = td; 566 spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd); 567 568 /* Return 1 to indicate the destruct path is asynchronous. */ 569 return 1; 570 } 571 572 static void 573 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 574 bool success) 575 { 576 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 577 578 if (!success) { 579 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 580 return; 581 } 582 583 bdev_rbd_start_aio(disk, 584 bdev_io, 585 bdev_io->u.bdev.iovs, 586 bdev_io->u.bdev.iovcnt, 587 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 588 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 589 } 590 591 static void 592 _bdev_rbd_submit_request(void *ctx) 593 { 594 struct spdk_bdev_io *bdev_io = ctx; 595 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 596 597 switch (bdev_io->type) { 598 case SPDK_BDEV_IO_TYPE_READ: 599 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 600 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 601 break; 602 603 case SPDK_BDEV_IO_TYPE_WRITE: 604 case SPDK_BDEV_IO_TYPE_UNMAP: 605 case SPDK_BDEV_IO_TYPE_FLUSH: 606 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 607 bdev_rbd_start_aio(disk, 608 bdev_io, 609 bdev_io->u.bdev.iovs, 610 bdev_io->u.bdev.iovcnt, 611 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 612 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 613 break; 614 615 case SPDK_BDEV_IO_TYPE_RESET: 616 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 617 bdev_io); 618 break; 619 620 default: 621 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 622 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 623 break; 624 } 625 } 626 627 static void 628 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 629 { 630 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 631 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 632 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 633 634 rbd_io->submit_td = submit_td; 635 if (disk->main_td != submit_td) { 636 spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io); 637 } else { 638 _bdev_rbd_submit_request(bdev_io); 639 } 640 } 641 642 static bool 643 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 644 { 645 switch (io_type) { 646 case SPDK_BDEV_IO_TYPE_READ: 647 case SPDK_BDEV_IO_TYPE_WRITE: 648 case SPDK_BDEV_IO_TYPE_UNMAP: 649 case SPDK_BDEV_IO_TYPE_FLUSH: 650 case SPDK_BDEV_IO_TYPE_RESET: 651 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 652 return true; 653 654 default: 655 return false; 656 } 657 } 658 659 static void 660 bdev_rbd_free_channel_resources(struct bdev_rbd *disk) 661 { 662 assert(disk != NULL); 663 assert(disk->main_td == spdk_get_thread()); 664 assert(disk->ch_count == 0); 665 666 spdk_put_io_channel(disk->group_ch); 667 if (disk->image) { 668 bdev_rbd_exit(disk->image); 669 } 670 671 disk->main_td = NULL; 672 disk->group_ch = NULL; 673 } 674 675 static void * 676 bdev_rbd_handle(void *arg) 677 { 678 struct bdev_rbd *disk = arg; 679 void *ret = arg; 680 681 if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) { 682 SPDK_ERRLOG("Failed to open specified rbd device\n"); 683 ret = NULL; 684 } 685 686 return ret; 687 } 688 689 static int 690 _bdev_rbd_create_cb(struct bdev_rbd *disk) 691 { 692 disk->group_ch = spdk_get_io_channel(&rbd_if); 693 assert(disk->group_ch != NULL); 694 695 if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) { 696 bdev_rbd_free_channel_resources(disk); 697 return -1; 698 } 699 700 return 0; 701 } 702 703 static int 704 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 705 { 706 struct bdev_rbd_io_channel *ch = ctx_buf; 707 struct bdev_rbd *disk = io_device; 708 int rc; 709 710 ch->disk = disk; 711 pthread_mutex_lock(&disk->mutex); 712 if (disk->ch_count == 0) { 713 assert(disk->main_td == NULL); 714 rc = _bdev_rbd_create_cb(disk); 715 if (rc) { 716 SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk); 717 pthread_mutex_unlock(&disk->mutex); 718 return rc; 719 } 720 721 disk->main_td = spdk_get_thread(); 722 } 723 724 disk->ch_count++; 725 pthread_mutex_unlock(&disk->mutex); 726 727 return 0; 728 } 729 730 static void 731 _bdev_rbd_destroy_cb(void *ctx) 732 { 733 struct bdev_rbd *disk = ctx; 734 735 pthread_mutex_lock(&disk->mutex); 736 assert(disk->ch_count > 0); 737 disk->ch_count--; 738 739 if (disk->ch_count > 0) { 740 /* A new channel was created between when message was sent and this function executed */ 741 pthread_mutex_unlock(&disk->mutex); 742 return; 743 } 744 745 bdev_rbd_free_channel_resources(disk); 746 pthread_mutex_unlock(&disk->mutex); 747 } 748 749 static void 750 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 751 { 752 struct bdev_rbd *disk = io_device; 753 struct spdk_thread *thread; 754 755 pthread_mutex_lock(&disk->mutex); 756 assert(disk->ch_count > 0); 757 disk->ch_count--; 758 if (disk->ch_count == 0) { 759 assert(disk->main_td != NULL); 760 if (disk->main_td != spdk_get_thread()) { 761 /* The final channel was destroyed on a different thread 762 * than where the first channel was created. Pass a message 763 * to the main thread to unregister the poller. */ 764 disk->ch_count++; 765 thread = disk->main_td; 766 pthread_mutex_unlock(&disk->mutex); 767 spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk); 768 return; 769 } 770 771 bdev_rbd_free_channel_resources(disk); 772 } 773 pthread_mutex_unlock(&disk->mutex); 774 } 775 776 static struct spdk_io_channel * 777 bdev_rbd_get_io_channel(void *ctx) 778 { 779 struct bdev_rbd *rbd_bdev = ctx; 780 781 return spdk_get_io_channel(rbd_bdev); 782 } 783 784 static void 785 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 786 { 787 struct bdev_rbd_cluster *entry; 788 789 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 790 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 791 if (strcmp(cluster_name, entry->name)) { 792 continue; 793 } 794 if (entry->user_id) { 795 spdk_json_write_named_string(w, "user_id", entry->user_id); 796 } 797 798 if (entry->config_param) { 799 char **config_entry = entry->config_param; 800 801 spdk_json_write_named_object_begin(w, "config_param"); 802 while (*config_entry) { 803 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 804 config_entry += 2; 805 } 806 spdk_json_write_object_end(w); 807 } 808 if (entry->config_file) { 809 spdk_json_write_named_string(w, "config_file", entry->config_file); 810 } 811 if (entry->key_file) { 812 spdk_json_write_named_string(w, "key_file", entry->key_file); 813 } 814 815 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 816 return; 817 } 818 819 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 820 } 821 822 static int 823 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 824 { 825 struct bdev_rbd *rbd_bdev = ctx; 826 827 spdk_json_write_named_object_begin(w, "rbd"); 828 829 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 830 831 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 832 833 if (rbd_bdev->cluster_name) { 834 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 835 goto end; 836 } 837 838 if (rbd_bdev->user_id) { 839 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 840 } 841 842 if (rbd_bdev->config) { 843 char **entry = rbd_bdev->config; 844 845 spdk_json_write_named_object_begin(w, "config"); 846 while (*entry) { 847 spdk_json_write_named_string(w, entry[0], entry[1]); 848 entry += 2; 849 } 850 spdk_json_write_object_end(w); 851 } 852 853 end: 854 spdk_json_write_object_end(w); 855 856 return 0; 857 } 858 859 static void 860 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 861 { 862 struct bdev_rbd *rbd = bdev->ctxt; 863 char uuid_str[SPDK_UUID_STRING_LEN]; 864 865 spdk_json_write_object_begin(w); 866 867 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 868 869 spdk_json_write_named_object_begin(w, "params"); 870 spdk_json_write_named_string(w, "name", bdev->name); 871 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 872 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 873 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 874 if (rbd->user_id) { 875 spdk_json_write_named_string(w, "user_id", rbd->user_id); 876 } 877 878 if (rbd->config) { 879 char **entry = rbd->config; 880 881 spdk_json_write_named_object_begin(w, "config"); 882 while (*entry) { 883 spdk_json_write_named_string(w, entry[0], entry[1]); 884 entry += 2; 885 } 886 spdk_json_write_object_end(w); 887 } 888 889 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 890 spdk_json_write_named_string(w, "uuid", uuid_str); 891 892 spdk_json_write_object_end(w); 893 894 spdk_json_write_object_end(w); 895 } 896 897 static void 898 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 899 { 900 assert(entry != NULL); 901 902 spdk_json_write_object_begin(w); 903 spdk_json_write_named_string(w, "cluster_name", entry->name); 904 905 if (entry->user_id) { 906 spdk_json_write_named_string(w, "user_id", entry->user_id); 907 } 908 909 if (entry->config_param) { 910 char **config_entry = entry->config_param; 911 912 spdk_json_write_named_object_begin(w, "config_param"); 913 while (*config_entry) { 914 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 915 config_entry += 2; 916 } 917 spdk_json_write_object_end(w); 918 } 919 if (entry->config_file) { 920 spdk_json_write_named_string(w, "config_file", entry->config_file); 921 } 922 if (entry->key_file) { 923 spdk_json_write_named_string(w, "key_file", entry->key_file); 924 } 925 926 spdk_json_write_object_end(w); 927 } 928 929 int 930 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 931 { 932 struct bdev_rbd_cluster *entry; 933 struct spdk_json_write_ctx *w; 934 935 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 936 937 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 938 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 939 return -ENOENT; 940 } 941 942 /* If cluster name is provided */ 943 if (name) { 944 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 945 if (strcmp(name, entry->name) == 0) { 946 w = spdk_jsonrpc_begin_result(request); 947 dump_single_cluster_entry(entry, w); 948 spdk_jsonrpc_end_result(request, w); 949 950 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 951 return 0; 952 } 953 } 954 955 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 956 return -ENOENT; 957 } 958 959 w = spdk_jsonrpc_begin_result(request); 960 spdk_json_write_array_begin(w); 961 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 962 dump_single_cluster_entry(entry, w); 963 } 964 spdk_json_write_array_end(w); 965 spdk_jsonrpc_end_result(request, w); 966 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 967 968 return 0; 969 } 970 971 static const struct spdk_bdev_fn_table rbd_fn_table = { 972 .destruct = bdev_rbd_destruct, 973 .submit_request = bdev_rbd_submit_request, 974 .io_type_supported = bdev_rbd_io_type_supported, 975 .get_io_channel = bdev_rbd_get_io_channel, 976 .dump_info_json = bdev_rbd_dump_info_json, 977 .write_config_json = bdev_rbd_write_config_json, 978 }; 979 980 static int 981 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 982 const char *config_file, const char *key_file) 983 { 984 struct bdev_rbd_cluster *entry; 985 int rc; 986 987 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 988 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 989 if (strcmp(name, entry->name) == 0) { 990 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 991 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 992 return -1; 993 } 994 } 995 996 entry = calloc(1, sizeof(*entry)); 997 if (!entry) { 998 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 999 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1000 return -1; 1001 } 1002 1003 entry->name = strdup(name); 1004 if (entry->name == NULL) { 1005 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1006 goto err_handle; 1007 } 1008 1009 if (user_id) { 1010 entry->user_id = strdup(user_id); 1011 if (entry->user_id == NULL) { 1012 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1013 goto err_handle; 1014 } 1015 } 1016 1017 /* Support specify config_param or config_file separately, or both of them. */ 1018 if (config_param) { 1019 entry->config_param = bdev_rbd_dup_config(config_param); 1020 if (entry->config_param == NULL) { 1021 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1022 goto err_handle; 1023 } 1024 } 1025 1026 if (config_file) { 1027 entry->config_file = strdup(config_file); 1028 if (entry->config_file == NULL) { 1029 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1030 goto err_handle; 1031 } 1032 } 1033 1034 if (key_file) { 1035 entry->key_file = strdup(key_file); 1036 if (entry->key_file == NULL) { 1037 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 1038 goto err_handle; 1039 } 1040 } 1041 1042 rc = rados_create(&entry->cluster, user_id); 1043 if (rc < 0) { 1044 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1045 goto err_handle; 1046 } 1047 1048 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 1049 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1050 if (entry->config_file && rc < 0) { 1051 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 1052 rados_shutdown(entry->cluster); 1053 goto err_handle; 1054 } 1055 1056 if (config_param) { 1057 const char *const *config_entry = config_param; 1058 while (*config_entry) { 1059 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1060 if (rc < 0) { 1061 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1062 rados_shutdown(entry->cluster); 1063 goto err_handle; 1064 } 1065 config_entry += 2; 1066 } 1067 } 1068 1069 if (key_file) { 1070 rc = rados_conf_set(entry->cluster, "keyring", key_file); 1071 if (rc < 0) { 1072 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 1073 rados_shutdown(entry->cluster); 1074 goto err_handle; 1075 } 1076 } 1077 1078 rc = rados_connect(entry->cluster); 1079 if (rc < 0) { 1080 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1081 rados_shutdown(entry->cluster); 1082 goto err_handle; 1083 } 1084 1085 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1086 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1087 1088 return 0; 1089 1090 err_handle: 1091 bdev_rbd_cluster_free(entry); 1092 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1093 return -1; 1094 } 1095 1096 int 1097 bdev_rbd_unregister_cluster(const char *name) 1098 { 1099 struct bdev_rbd_cluster *entry; 1100 int rc = 0; 1101 1102 if (name == NULL) { 1103 return -1; 1104 } 1105 1106 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1107 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1108 if (strcmp(name, entry->name) == 0) { 1109 if (entry->ref == 0) { 1110 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1111 rados_shutdown(entry->cluster); 1112 bdev_rbd_cluster_free(entry); 1113 } else { 1114 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1115 entry->name); 1116 rc = -1; 1117 } 1118 1119 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1120 return rc; 1121 } 1122 } 1123 1124 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1125 1126 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1127 1128 return -1; 1129 } 1130 1131 static void * 1132 _bdev_rbd_register_cluster(void *arg) 1133 { 1134 struct cluster_register_info *info = arg; 1135 void *ret = arg; 1136 int rc; 1137 1138 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1139 (const char *const *)info->config_param, (const char *)info->config_file, 1140 (const char *)info->key_file); 1141 if (rc) { 1142 ret = NULL; 1143 } 1144 1145 return ret; 1146 } 1147 1148 int 1149 bdev_rbd_register_cluster(struct cluster_register_info *info) 1150 { 1151 assert(info != NULL); 1152 1153 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1154 * resource contention */ 1155 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1156 return -1; 1157 } 1158 1159 return 0; 1160 } 1161 1162 int 1163 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1164 const char *pool_name, 1165 const char *const *config, 1166 const char *rbd_name, 1167 uint32_t block_size, 1168 const char *cluster_name, 1169 const struct spdk_uuid *uuid) 1170 { 1171 struct bdev_rbd *rbd; 1172 int ret; 1173 1174 if ((pool_name == NULL) || (rbd_name == NULL)) { 1175 return -EINVAL; 1176 } 1177 1178 rbd = calloc(1, sizeof(struct bdev_rbd)); 1179 if (rbd == NULL) { 1180 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1181 return -ENOMEM; 1182 } 1183 1184 ret = pthread_mutex_init(&rbd->mutex, NULL); 1185 if (ret) { 1186 SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name); 1187 free(rbd); 1188 return ret; 1189 } 1190 1191 rbd->rbd_name = strdup(rbd_name); 1192 if (!rbd->rbd_name) { 1193 bdev_rbd_free(rbd); 1194 return -ENOMEM; 1195 } 1196 1197 if (user_id) { 1198 rbd->user_id = strdup(user_id); 1199 if (!rbd->user_id) { 1200 bdev_rbd_free(rbd); 1201 return -ENOMEM; 1202 } 1203 } 1204 1205 if (cluster_name) { 1206 rbd->cluster_name = strdup(cluster_name); 1207 if (!rbd->cluster_name) { 1208 bdev_rbd_free(rbd); 1209 return -ENOMEM; 1210 } 1211 } 1212 rbd->pool_name = strdup(pool_name); 1213 if (!rbd->pool_name) { 1214 bdev_rbd_free(rbd); 1215 return -ENOMEM; 1216 } 1217 1218 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1219 bdev_rbd_free(rbd); 1220 return -ENOMEM; 1221 } 1222 1223 ret = bdev_rbd_init(rbd); 1224 if (ret < 0) { 1225 bdev_rbd_free(rbd); 1226 SPDK_ERRLOG("Failed to init rbd device\n"); 1227 return ret; 1228 } 1229 1230 if (uuid) { 1231 rbd->disk.uuid = *uuid; 1232 } else { 1233 spdk_uuid_generate(&rbd->disk.uuid); 1234 } 1235 1236 if (name) { 1237 rbd->disk.name = strdup(name); 1238 } else { 1239 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1240 } 1241 if (!rbd->disk.name) { 1242 bdev_rbd_free(rbd); 1243 return -ENOMEM; 1244 } 1245 rbd->disk.product_name = "Ceph Rbd Disk"; 1246 bdev_rbd_count++; 1247 1248 rbd->disk.write_cache = 0; 1249 rbd->disk.blocklen = block_size; 1250 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1251 rbd->disk.ctxt = rbd; 1252 rbd->disk.fn_table = &rbd_fn_table; 1253 rbd->disk.module = &rbd_if; 1254 1255 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1256 1257 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1258 bdev_rbd_destroy_cb, 1259 sizeof(struct bdev_rbd_io_channel), 1260 rbd_name); 1261 ret = spdk_bdev_register(&rbd->disk); 1262 if (ret) { 1263 spdk_io_device_unregister(rbd, NULL); 1264 bdev_rbd_free(rbd); 1265 return ret; 1266 } 1267 1268 *bdev = &(rbd->disk); 1269 1270 return ret; 1271 } 1272 1273 void 1274 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1275 { 1276 int rc; 1277 1278 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1279 if (rc != 0) { 1280 cb_fn(cb_arg, rc); 1281 } 1282 } 1283 1284 static void 1285 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1286 { 1287 } 1288 1289 int 1290 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1291 { 1292 struct spdk_bdev_desc *desc; 1293 struct spdk_bdev *bdev; 1294 struct spdk_io_channel *ch; 1295 struct bdev_rbd_io_channel *rbd_io_ch; 1296 int rc = 0; 1297 uint64_t new_size_in_byte; 1298 uint64_t current_size_in_mb; 1299 1300 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1301 if (rc != 0) { 1302 return rc; 1303 } 1304 1305 bdev = spdk_bdev_desc_get_bdev(desc); 1306 1307 if (bdev->module != &rbd_if) { 1308 rc = -EINVAL; 1309 goto exit; 1310 } 1311 1312 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1313 if (current_size_in_mb > new_size_in_mb) { 1314 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1315 rc = -EINVAL; 1316 goto exit; 1317 } 1318 1319 ch = bdev_rbd_get_io_channel(bdev); 1320 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1321 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1322 1323 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1324 spdk_put_io_channel(ch); 1325 if (rc != 0) { 1326 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1327 goto exit; 1328 } 1329 1330 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1331 if (rc != 0) { 1332 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1333 } 1334 1335 exit: 1336 spdk_bdev_close(desc); 1337 return rc; 1338 } 1339 1340 static int 1341 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1342 { 1343 return 0; 1344 } 1345 1346 static void 1347 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1348 { 1349 } 1350 1351 static int 1352 bdev_rbd_library_init(void) 1353 { 1354 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1355 0, "bdev_rbd_poll_groups"); 1356 return 0; 1357 } 1358 1359 static void 1360 bdev_rbd_library_fini(void) 1361 { 1362 spdk_io_device_unregister(&rbd_if, NULL); 1363 } 1364 1365 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1366