1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "bdev_rbd.h" 9 10 #include <rbd/librbd.h> 11 #include <rados/librados.h> 12 13 #include "spdk/env.h" 14 #include "spdk/bdev.h" 15 #include "spdk/thread.h" 16 #include "spdk/json.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/likely.h" 20 21 #include "spdk/bdev_module.h" 22 #include "spdk/log.h" 23 24 static int bdev_rbd_count = 0; 25 26 struct bdev_rbd { 27 struct spdk_bdev disk; 28 char *rbd_name; 29 char *user_id; 30 char *pool_name; 31 char **config; 32 33 rados_t cluster; 34 rados_t *cluster_p; 35 char *cluster_name; 36 37 rados_ioctx_t io_ctx; 38 rbd_image_t image; 39 40 rbd_image_info_t info; 41 pthread_mutex_t mutex; 42 struct spdk_thread *main_td; 43 struct spdk_thread *destruct_td; 44 uint32_t ch_count; 45 struct spdk_io_channel *group_ch; 46 47 TAILQ_ENTRY(bdev_rbd) tailq; 48 struct spdk_poller *reset_timer; 49 struct spdk_bdev_io *reset_bdev_io; 50 }; 51 52 struct bdev_rbd_io_channel { 53 struct bdev_rbd *disk; 54 }; 55 56 struct bdev_rbd_io { 57 struct spdk_thread *submit_td; 58 enum spdk_bdev_io_status status; 59 rbd_completion_t comp; 60 size_t total_len; 61 }; 62 63 struct bdev_rbd_cluster { 64 char *name; 65 char *user_id; 66 char **config_param; 67 char *config_file; 68 char *key_file; 69 rados_t cluster; 70 uint32_t ref; 71 STAILQ_ENTRY(bdev_rbd_cluster) link; 72 }; 73 74 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 75 g_map_bdev_rbd_cluster); 76 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 77 78 static void 79 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 80 { 81 assert(entry != NULL); 82 83 bdev_rbd_free_config(entry->config_param); 84 free(entry->config_file); 85 free(entry->key_file); 86 free(entry->user_id); 87 free(entry->name); 88 free(entry); 89 } 90 91 static void 92 bdev_rbd_put_cluster(rados_t **cluster) 93 { 94 struct bdev_rbd_cluster *entry; 95 96 assert(cluster != NULL); 97 98 /* No need go through the map if *cluster equals to NULL */ 99 if (*cluster == NULL) { 100 return; 101 } 102 103 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 104 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 105 if (*cluster != &entry->cluster) { 106 continue; 107 } 108 109 assert(entry->ref > 0); 110 entry->ref--; 111 *cluster = NULL; 112 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 113 return; 114 } 115 116 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 117 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 118 } 119 120 static void 121 bdev_rbd_free(struct bdev_rbd *rbd) 122 { 123 if (!rbd) { 124 return; 125 } 126 127 free(rbd->disk.name); 128 free(rbd->rbd_name); 129 free(rbd->user_id); 130 free(rbd->pool_name); 131 bdev_rbd_free_config(rbd->config); 132 133 if (rbd->io_ctx) { 134 rados_ioctx_destroy(rbd->io_ctx); 135 } 136 137 if (rbd->cluster_name) { 138 bdev_rbd_put_cluster(&rbd->cluster_p); 139 free(rbd->cluster_name); 140 } else if (rbd->cluster) { 141 rados_shutdown(rbd->cluster); 142 } 143 144 pthread_mutex_destroy(&rbd->mutex); 145 free(rbd); 146 } 147 148 void 149 bdev_rbd_free_config(char **config) 150 { 151 char **entry; 152 153 if (config) { 154 for (entry = config; *entry; entry++) { 155 free(*entry); 156 } 157 free(config); 158 } 159 } 160 161 char ** 162 bdev_rbd_dup_config(const char *const *config) 163 { 164 size_t count; 165 char **copy; 166 167 if (!config) { 168 return NULL; 169 } 170 for (count = 0; config[count]; count++) {} 171 copy = calloc(count + 1, sizeof(*copy)); 172 if (!copy) { 173 return NULL; 174 } 175 for (count = 0; config[count]; count++) { 176 if (!(copy[count] = strdup(config[count]))) { 177 bdev_rbd_free_config(copy); 178 return NULL; 179 } 180 } 181 return copy; 182 } 183 184 static int 185 bdev_rados_cluster_init(const char *user_id, const char *const *config, 186 rados_t *cluster) 187 { 188 int ret; 189 190 ret = rados_create(cluster, user_id); 191 if (ret < 0) { 192 SPDK_ERRLOG("Failed to create rados_t struct\n"); 193 return -1; 194 } 195 196 if (config) { 197 const char *const *entry = config; 198 while (*entry) { 199 ret = rados_conf_set(*cluster, entry[0], entry[1]); 200 if (ret < 0) { 201 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 202 rados_shutdown(*cluster); 203 return -1; 204 } 205 entry += 2; 206 } 207 } else { 208 ret = rados_conf_read_file(*cluster, NULL); 209 if (ret < 0) { 210 SPDK_ERRLOG("Failed to read conf file\n"); 211 rados_shutdown(*cluster); 212 return -1; 213 } 214 } 215 216 ret = rados_connect(*cluster); 217 if (ret < 0) { 218 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 219 rados_shutdown(*cluster); 220 return -1; 221 } 222 223 return 0; 224 } 225 226 static int 227 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 228 { 229 struct bdev_rbd_cluster *entry; 230 231 if (cluster == NULL) { 232 SPDK_ERRLOG("cluster should not be NULL\n"); 233 return -1; 234 } 235 236 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 237 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 238 if (strcmp(cluster_name, entry->name) == 0) { 239 entry->ref++; 240 *cluster = &entry->cluster; 241 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 242 return 0; 243 } 244 } 245 246 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 247 return -1; 248 } 249 250 static int 251 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 252 { 253 int ret; 254 255 ret = bdev_rbd_get_cluster(cluster_name, cluster); 256 if (ret < 0) { 257 SPDK_ERRLOG("Failed to create rados_t struct\n"); 258 return -1; 259 } 260 261 return ret; 262 } 263 264 static void * 265 bdev_rbd_cluster_handle(void *arg) 266 { 267 void *ret = arg; 268 struct bdev_rbd *rbd = arg; 269 int rc; 270 271 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 272 &rbd->cluster); 273 if (rc < 0) { 274 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 275 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 276 ret = NULL; 277 } 278 279 return ret; 280 } 281 282 static void * 283 bdev_rbd_init_context(void *arg) 284 { 285 struct bdev_rbd *rbd = arg; 286 int rc; 287 288 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 289 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 290 return NULL; 291 } 292 293 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 294 if (rc < 0) { 295 SPDK_ERRLOG("Failed to open specified rbd device\n"); 296 return NULL; 297 } 298 299 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 300 rbd_close(rbd->image); 301 if (rc < 0) { 302 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 303 return NULL; 304 } 305 306 return arg; 307 } 308 309 static int 310 bdev_rbd_init(struct bdev_rbd *rbd) 311 { 312 int ret = 0; 313 314 if (!rbd->cluster_name) { 315 rbd->cluster_p = &rbd->cluster; 316 /* Cluster should be created in non-SPDK thread to avoid conflict between 317 * Rados and SPDK thread */ 318 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 319 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 320 return -1; 321 } 322 } else { 323 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 324 if (ret < 0) { 325 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 326 rbd, rbd->cluster_name); 327 return -1; 328 } 329 } 330 331 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 332 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 333 } 334 335 return ret; 336 } 337 338 static void 339 bdev_rbd_exit(rbd_image_t image) 340 { 341 rbd_flush(image); 342 rbd_close(image); 343 } 344 345 static void 346 _bdev_rbd_io_complete(void *_rbd_io) 347 { 348 struct bdev_rbd_io *rbd_io = _rbd_io; 349 350 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 351 } 352 353 static void 354 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 355 { 356 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 357 struct spdk_thread *current_thread = spdk_get_thread(); 358 359 rbd_io->status = status; 360 assert(rbd_io->submit_td != NULL); 361 if (rbd_io->submit_td != current_thread) { 362 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 363 } else { 364 _bdev_rbd_io_complete(rbd_io); 365 } 366 } 367 368 static void 369 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 370 { 371 int io_status; 372 struct spdk_bdev_io *bdev_io; 373 struct bdev_rbd_io *rbd_io; 374 enum spdk_bdev_io_status bio_status; 375 376 bdev_io = rbd_aio_get_arg(cb); 377 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 378 io_status = rbd_aio_get_return_value(cb); 379 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 380 381 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 382 if ((int)rbd_io->total_len != io_status) { 383 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 384 } 385 } else { 386 /* For others, 0 means success */ 387 if (io_status != 0) { 388 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 389 } 390 } 391 392 rbd_aio_release(cb); 393 394 bdev_rbd_io_complete(bdev_io, bio_status); 395 } 396 397 static void 398 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 399 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 400 { 401 int ret; 402 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 403 rbd_image_t image = disk->image; 404 405 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 406 &rbd_io->comp); 407 if (ret < 0) { 408 goto err; 409 } 410 411 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 412 rbd_io->total_len = len; 413 if (spdk_likely(iovcnt == 1)) { 414 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 415 } else { 416 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 417 } 418 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 419 if (spdk_likely(iovcnt == 1)) { 420 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 421 } else { 422 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 423 } 424 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) { 425 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 426 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 427 ret = rbd_aio_flush(image, rbd_io->comp); 428 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) { 429 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0); 430 } 431 432 if (ret < 0) { 433 rbd_aio_release(rbd_io->comp); 434 goto err; 435 } 436 437 return; 438 439 err: 440 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 441 } 442 443 static int bdev_rbd_library_init(void); 444 static void bdev_rbd_library_fini(void); 445 446 static int 447 bdev_rbd_get_ctx_size(void) 448 { 449 return sizeof(struct bdev_rbd_io); 450 } 451 452 static struct spdk_bdev_module rbd_if = { 453 .name = "rbd", 454 .module_init = bdev_rbd_library_init, 455 .module_fini = bdev_rbd_library_fini, 456 .get_ctx_size = bdev_rbd_get_ctx_size, 457 458 }; 459 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 460 461 static int 462 bdev_rbd_reset_timer(void *arg) 463 { 464 struct bdev_rbd *disk = arg; 465 466 /* 467 * TODO: This should check if any I/O is still in flight before completing the reset. 468 * For now, just complete after the timer expires. 469 */ 470 bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 471 spdk_poller_unregister(&disk->reset_timer); 472 disk->reset_bdev_io = NULL; 473 474 return SPDK_POLLER_BUSY; 475 } 476 477 static void 478 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 479 { 480 /* 481 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 482 * timer to wait for in-flight I/O to complete. 483 */ 484 assert(disk->reset_bdev_io == NULL); 485 disk->reset_bdev_io = bdev_io; 486 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 487 } 488 489 static void 490 _bdev_rbd_destruct_done(void *io_device) 491 { 492 struct bdev_rbd *rbd = io_device; 493 494 assert(rbd != NULL); 495 assert(rbd->ch_count == 0); 496 497 spdk_bdev_destruct_done(&rbd->disk, 0); 498 bdev_rbd_free(rbd); 499 } 500 501 static void 502 bdev_rbd_free_cb(void *io_device) 503 { 504 struct bdev_rbd *rbd = io_device; 505 506 /* The io device has been unregistered. Send a message back to the 507 * original thread that started the destruct operation, so that the 508 * bdev unregister callback is invoked on the same thread that started 509 * this whole process. 510 */ 511 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 512 } 513 514 static void 515 _bdev_rbd_destruct(void *ctx) 516 { 517 struct bdev_rbd *rbd = ctx; 518 519 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 520 } 521 522 static int 523 bdev_rbd_destruct(void *ctx) 524 { 525 struct bdev_rbd *rbd = ctx; 526 struct spdk_thread *td; 527 528 if (rbd->main_td == NULL) { 529 td = spdk_get_thread(); 530 } else { 531 td = rbd->main_td; 532 } 533 534 /* Start the destruct operation on the rbd bdev's 535 * main thread. This guarantees it will only start 536 * executing after any messages related to channel 537 * deletions have finished completing. *Always* 538 * send a message, even if this function gets called 539 * from the main thread, in case there are pending 540 * channel delete messages in flight to this thread. 541 */ 542 assert(rbd->destruct_td == NULL); 543 rbd->destruct_td = td; 544 spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd); 545 546 /* Return 1 to indicate the destruct path is asynchronous. */ 547 return 1; 548 } 549 550 static void 551 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 552 bool success) 553 { 554 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 555 556 if (!success) { 557 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 558 return; 559 } 560 561 bdev_rbd_start_aio(disk, 562 bdev_io, 563 bdev_io->u.bdev.iovs, 564 bdev_io->u.bdev.iovcnt, 565 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 566 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 567 } 568 569 static void 570 _bdev_rbd_submit_request(void *ctx) 571 { 572 struct spdk_bdev_io *bdev_io = ctx; 573 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 574 575 switch (bdev_io->type) { 576 case SPDK_BDEV_IO_TYPE_READ: 577 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 578 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 579 break; 580 581 case SPDK_BDEV_IO_TYPE_WRITE: 582 case SPDK_BDEV_IO_TYPE_UNMAP: 583 case SPDK_BDEV_IO_TYPE_FLUSH: 584 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 585 bdev_rbd_start_aio(disk, 586 bdev_io, 587 bdev_io->u.bdev.iovs, 588 bdev_io->u.bdev.iovcnt, 589 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 590 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 591 break; 592 593 case SPDK_BDEV_IO_TYPE_RESET: 594 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 595 bdev_io); 596 break; 597 598 default: 599 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 600 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 601 break; 602 } 603 } 604 605 static void 606 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 607 { 608 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 609 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 610 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 611 612 rbd_io->submit_td = submit_td; 613 if (disk->main_td != submit_td) { 614 spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io); 615 } else { 616 _bdev_rbd_submit_request(bdev_io); 617 } 618 } 619 620 static bool 621 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 622 { 623 switch (io_type) { 624 case SPDK_BDEV_IO_TYPE_READ: 625 case SPDK_BDEV_IO_TYPE_WRITE: 626 case SPDK_BDEV_IO_TYPE_UNMAP: 627 case SPDK_BDEV_IO_TYPE_FLUSH: 628 case SPDK_BDEV_IO_TYPE_RESET: 629 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 630 return true; 631 632 default: 633 return false; 634 } 635 } 636 637 static void 638 bdev_rbd_free_channel_resources(struct bdev_rbd *disk) 639 { 640 assert(disk != NULL); 641 assert(disk->main_td == spdk_get_thread()); 642 assert(disk->ch_count == 0); 643 644 spdk_put_io_channel(disk->group_ch); 645 if (disk->image) { 646 bdev_rbd_exit(disk->image); 647 } 648 649 disk->main_td = NULL; 650 disk->group_ch = NULL; 651 } 652 653 static void * 654 bdev_rbd_handle(void *arg) 655 { 656 struct bdev_rbd *disk = arg; 657 void *ret = arg; 658 659 if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) { 660 SPDK_ERRLOG("Failed to open specified rbd device\n"); 661 ret = NULL; 662 } 663 664 return ret; 665 } 666 667 static int 668 _bdev_rbd_create_cb(struct bdev_rbd *disk) 669 { 670 disk->group_ch = spdk_get_io_channel(&rbd_if); 671 assert(disk->group_ch != NULL); 672 673 if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) { 674 bdev_rbd_free_channel_resources(disk); 675 return -1; 676 } 677 678 return 0; 679 } 680 681 static int 682 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 683 { 684 struct bdev_rbd_io_channel *ch = ctx_buf; 685 struct bdev_rbd *disk = io_device; 686 int rc; 687 688 ch->disk = disk; 689 pthread_mutex_lock(&disk->mutex); 690 if (disk->ch_count == 0) { 691 assert(disk->main_td == NULL); 692 rc = _bdev_rbd_create_cb(disk); 693 if (rc) { 694 SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk); 695 pthread_mutex_unlock(&disk->mutex); 696 return rc; 697 } 698 699 disk->main_td = spdk_get_thread(); 700 } 701 702 disk->ch_count++; 703 pthread_mutex_unlock(&disk->mutex); 704 705 return 0; 706 } 707 708 static void 709 _bdev_rbd_destroy_cb(void *ctx) 710 { 711 struct bdev_rbd *disk = ctx; 712 713 pthread_mutex_lock(&disk->mutex); 714 assert(disk->ch_count > 0); 715 disk->ch_count--; 716 717 if (disk->ch_count > 0) { 718 /* A new channel was created between when message was sent and this function executed */ 719 pthread_mutex_unlock(&disk->mutex); 720 return; 721 } 722 723 bdev_rbd_free_channel_resources(disk); 724 pthread_mutex_unlock(&disk->mutex); 725 } 726 727 static void 728 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 729 { 730 struct bdev_rbd *disk = io_device; 731 struct spdk_thread *thread; 732 733 pthread_mutex_lock(&disk->mutex); 734 assert(disk->ch_count > 0); 735 disk->ch_count--; 736 if (disk->ch_count == 0) { 737 assert(disk->main_td != NULL); 738 if (disk->main_td != spdk_get_thread()) { 739 /* The final channel was destroyed on a different thread 740 * than where the first channel was created. Pass a message 741 * to the main thread to unregister the poller. */ 742 disk->ch_count++; 743 thread = disk->main_td; 744 pthread_mutex_unlock(&disk->mutex); 745 spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk); 746 return; 747 } 748 749 bdev_rbd_free_channel_resources(disk); 750 } 751 pthread_mutex_unlock(&disk->mutex); 752 } 753 754 static struct spdk_io_channel * 755 bdev_rbd_get_io_channel(void *ctx) 756 { 757 struct bdev_rbd *rbd_bdev = ctx; 758 759 return spdk_get_io_channel(rbd_bdev); 760 } 761 762 static void 763 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 764 { 765 struct bdev_rbd_cluster *entry; 766 767 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 768 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 769 if (strcmp(cluster_name, entry->name)) { 770 continue; 771 } 772 if (entry->user_id) { 773 spdk_json_write_named_string(w, "user_id", entry->user_id); 774 } 775 776 if (entry->config_param) { 777 char **config_entry = entry->config_param; 778 779 spdk_json_write_named_object_begin(w, "config_param"); 780 while (*config_entry) { 781 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 782 config_entry += 2; 783 } 784 spdk_json_write_object_end(w); 785 } 786 if (entry->config_file) { 787 spdk_json_write_named_string(w, "config_file", entry->config_file); 788 } 789 if (entry->key_file) { 790 spdk_json_write_named_string(w, "key_file", entry->key_file); 791 } 792 793 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 794 return; 795 } 796 797 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 798 } 799 800 static int 801 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 802 { 803 struct bdev_rbd *rbd_bdev = ctx; 804 805 spdk_json_write_named_object_begin(w, "rbd"); 806 807 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 808 809 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 810 811 if (rbd_bdev->cluster_name) { 812 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 813 goto end; 814 } 815 816 if (rbd_bdev->user_id) { 817 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 818 } 819 820 if (rbd_bdev->config) { 821 char **entry = rbd_bdev->config; 822 823 spdk_json_write_named_object_begin(w, "config"); 824 while (*entry) { 825 spdk_json_write_named_string(w, entry[0], entry[1]); 826 entry += 2; 827 } 828 spdk_json_write_object_end(w); 829 } 830 831 end: 832 spdk_json_write_object_end(w); 833 834 return 0; 835 } 836 837 static void 838 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 839 { 840 struct bdev_rbd *rbd = bdev->ctxt; 841 char uuid_str[SPDK_UUID_STRING_LEN]; 842 843 spdk_json_write_object_begin(w); 844 845 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 846 847 spdk_json_write_named_object_begin(w, "params"); 848 spdk_json_write_named_string(w, "name", bdev->name); 849 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 850 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 851 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 852 if (rbd->user_id) { 853 spdk_json_write_named_string(w, "user_id", rbd->user_id); 854 } 855 856 if (rbd->config) { 857 char **entry = rbd->config; 858 859 spdk_json_write_named_object_begin(w, "config"); 860 while (*entry) { 861 spdk_json_write_named_string(w, entry[0], entry[1]); 862 entry += 2; 863 } 864 spdk_json_write_object_end(w); 865 } 866 867 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 868 spdk_json_write_named_string(w, "uuid", uuid_str); 869 870 spdk_json_write_object_end(w); 871 872 spdk_json_write_object_end(w); 873 } 874 875 static void 876 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 877 { 878 assert(entry != NULL); 879 880 spdk_json_write_object_begin(w); 881 spdk_json_write_named_string(w, "cluster_name", entry->name); 882 883 if (entry->user_id) { 884 spdk_json_write_named_string(w, "user_id", entry->user_id); 885 } 886 887 if (entry->config_param) { 888 char **config_entry = entry->config_param; 889 890 spdk_json_write_named_object_begin(w, "config_param"); 891 while (*config_entry) { 892 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 893 config_entry += 2; 894 } 895 spdk_json_write_object_end(w); 896 } 897 if (entry->config_file) { 898 spdk_json_write_named_string(w, "config_file", entry->config_file); 899 } 900 if (entry->key_file) { 901 spdk_json_write_named_string(w, "key_file", entry->key_file); 902 } 903 904 spdk_json_write_object_end(w); 905 } 906 907 int 908 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 909 { 910 struct bdev_rbd_cluster *entry; 911 struct spdk_json_write_ctx *w; 912 913 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 914 915 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 916 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 917 return -ENOENT; 918 } 919 920 /* If cluster name is provided */ 921 if (name) { 922 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 923 if (strcmp(name, entry->name) == 0) { 924 w = spdk_jsonrpc_begin_result(request); 925 dump_single_cluster_entry(entry, w); 926 spdk_jsonrpc_end_result(request, w); 927 928 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 929 return 0; 930 } 931 } 932 933 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 934 return -ENOENT; 935 } 936 937 w = spdk_jsonrpc_begin_result(request); 938 spdk_json_write_array_begin(w); 939 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 940 dump_single_cluster_entry(entry, w); 941 } 942 spdk_json_write_array_end(w); 943 spdk_jsonrpc_end_result(request, w); 944 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 945 946 return 0; 947 } 948 949 static const struct spdk_bdev_fn_table rbd_fn_table = { 950 .destruct = bdev_rbd_destruct, 951 .submit_request = bdev_rbd_submit_request, 952 .io_type_supported = bdev_rbd_io_type_supported, 953 .get_io_channel = bdev_rbd_get_io_channel, 954 .dump_info_json = bdev_rbd_dump_info_json, 955 .write_config_json = bdev_rbd_write_config_json, 956 }; 957 958 static int 959 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 960 const char *config_file, const char *key_file) 961 { 962 struct bdev_rbd_cluster *entry; 963 int rc; 964 965 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 966 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 967 if (strcmp(name, entry->name) == 0) { 968 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 969 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 970 return -1; 971 } 972 } 973 974 entry = calloc(1, sizeof(*entry)); 975 if (!entry) { 976 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 977 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 978 return -1; 979 } 980 981 entry->name = strdup(name); 982 if (entry->name == NULL) { 983 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 984 goto err_handle; 985 } 986 987 if (user_id) { 988 entry->user_id = strdup(user_id); 989 if (entry->user_id == NULL) { 990 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 991 goto err_handle; 992 } 993 } 994 995 /* Support specify config_param or config_file separately, or both of them. */ 996 if (config_param) { 997 entry->config_param = bdev_rbd_dup_config(config_param); 998 if (entry->config_param == NULL) { 999 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1000 goto err_handle; 1001 } 1002 } 1003 1004 if (config_file) { 1005 entry->config_file = strdup(config_file); 1006 if (entry->config_file == NULL) { 1007 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1008 goto err_handle; 1009 } 1010 } 1011 1012 if (key_file) { 1013 entry->key_file = strdup(key_file); 1014 if (entry->key_file == NULL) { 1015 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 1016 goto err_handle; 1017 } 1018 } 1019 1020 rc = rados_create(&entry->cluster, user_id); 1021 if (rc < 0) { 1022 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1023 goto err_handle; 1024 } 1025 1026 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 1027 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1028 if (entry->config_file && rc < 0) { 1029 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 1030 rados_shutdown(entry->cluster); 1031 goto err_handle; 1032 } 1033 1034 if (config_param) { 1035 const char *const *config_entry = config_param; 1036 while (*config_entry) { 1037 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1038 if (rc < 0) { 1039 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1040 rados_shutdown(entry->cluster); 1041 goto err_handle; 1042 } 1043 config_entry += 2; 1044 } 1045 } 1046 1047 if (key_file) { 1048 rc = rados_conf_set(entry->cluster, "keyring", key_file); 1049 if (rc < 0) { 1050 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 1051 rados_shutdown(entry->cluster); 1052 goto err_handle; 1053 } 1054 } 1055 1056 rc = rados_connect(entry->cluster); 1057 if (rc < 0) { 1058 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1059 rados_shutdown(entry->cluster); 1060 goto err_handle; 1061 } 1062 1063 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1064 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1065 1066 return 0; 1067 1068 err_handle: 1069 bdev_rbd_cluster_free(entry); 1070 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1071 return -1; 1072 } 1073 1074 int 1075 bdev_rbd_unregister_cluster(const char *name) 1076 { 1077 struct bdev_rbd_cluster *entry; 1078 int rc = 0; 1079 1080 if (name == NULL) { 1081 return -1; 1082 } 1083 1084 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1085 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1086 if (strcmp(name, entry->name) == 0) { 1087 if (entry->ref == 0) { 1088 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1089 rados_shutdown(entry->cluster); 1090 bdev_rbd_cluster_free(entry); 1091 } else { 1092 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1093 entry->name); 1094 rc = -1; 1095 } 1096 1097 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1098 return rc; 1099 } 1100 } 1101 1102 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1103 1104 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1105 1106 return -1; 1107 } 1108 1109 static void * 1110 _bdev_rbd_register_cluster(void *arg) 1111 { 1112 struct cluster_register_info *info = arg; 1113 void *ret = arg; 1114 int rc; 1115 1116 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1117 (const char *const *)info->config_param, (const char *)info->config_file, 1118 (const char *)info->key_file); 1119 if (rc) { 1120 ret = NULL; 1121 } 1122 1123 return ret; 1124 } 1125 1126 int 1127 bdev_rbd_register_cluster(struct cluster_register_info *info) 1128 { 1129 assert(info != NULL); 1130 1131 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1132 * resource contention */ 1133 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1134 return -1; 1135 } 1136 1137 return 0; 1138 } 1139 1140 int 1141 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1142 const char *pool_name, 1143 const char *const *config, 1144 const char *rbd_name, 1145 uint32_t block_size, 1146 const char *cluster_name, 1147 const struct spdk_uuid *uuid) 1148 { 1149 struct bdev_rbd *rbd; 1150 int ret; 1151 1152 if ((pool_name == NULL) || (rbd_name == NULL)) { 1153 return -EINVAL; 1154 } 1155 1156 rbd = calloc(1, sizeof(struct bdev_rbd)); 1157 if (rbd == NULL) { 1158 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1159 return -ENOMEM; 1160 } 1161 1162 ret = pthread_mutex_init(&rbd->mutex, NULL); 1163 if (ret) { 1164 SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name); 1165 free(rbd); 1166 return ret; 1167 } 1168 1169 rbd->rbd_name = strdup(rbd_name); 1170 if (!rbd->rbd_name) { 1171 bdev_rbd_free(rbd); 1172 return -ENOMEM; 1173 } 1174 1175 if (user_id) { 1176 rbd->user_id = strdup(user_id); 1177 if (!rbd->user_id) { 1178 bdev_rbd_free(rbd); 1179 return -ENOMEM; 1180 } 1181 } 1182 1183 if (cluster_name) { 1184 rbd->cluster_name = strdup(cluster_name); 1185 if (!rbd->cluster_name) { 1186 bdev_rbd_free(rbd); 1187 return -ENOMEM; 1188 } 1189 } 1190 rbd->pool_name = strdup(pool_name); 1191 if (!rbd->pool_name) { 1192 bdev_rbd_free(rbd); 1193 return -ENOMEM; 1194 } 1195 1196 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1197 bdev_rbd_free(rbd); 1198 return -ENOMEM; 1199 } 1200 1201 ret = bdev_rbd_init(rbd); 1202 if (ret < 0) { 1203 bdev_rbd_free(rbd); 1204 SPDK_ERRLOG("Failed to init rbd device\n"); 1205 return ret; 1206 } 1207 1208 if (uuid) { 1209 rbd->disk.uuid = *uuid; 1210 } else { 1211 spdk_uuid_generate(&rbd->disk.uuid); 1212 } 1213 1214 if (name) { 1215 rbd->disk.name = strdup(name); 1216 } else { 1217 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1218 } 1219 if (!rbd->disk.name) { 1220 bdev_rbd_free(rbd); 1221 return -ENOMEM; 1222 } 1223 rbd->disk.product_name = "Ceph Rbd Disk"; 1224 bdev_rbd_count++; 1225 1226 rbd->disk.write_cache = 0; 1227 rbd->disk.blocklen = block_size; 1228 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1229 rbd->disk.ctxt = rbd; 1230 rbd->disk.fn_table = &rbd_fn_table; 1231 rbd->disk.module = &rbd_if; 1232 1233 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1234 1235 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1236 bdev_rbd_destroy_cb, 1237 sizeof(struct bdev_rbd_io_channel), 1238 rbd_name); 1239 ret = spdk_bdev_register(&rbd->disk); 1240 if (ret) { 1241 spdk_io_device_unregister(rbd, NULL); 1242 bdev_rbd_free(rbd); 1243 return ret; 1244 } 1245 1246 *bdev = &(rbd->disk); 1247 1248 return ret; 1249 } 1250 1251 void 1252 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1253 { 1254 int rc; 1255 1256 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1257 if (rc != 0) { 1258 cb_fn(cb_arg, rc); 1259 } 1260 } 1261 1262 static void 1263 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1264 { 1265 } 1266 1267 int 1268 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1269 { 1270 struct spdk_bdev_desc *desc; 1271 struct spdk_bdev *bdev; 1272 struct spdk_io_channel *ch; 1273 struct bdev_rbd_io_channel *rbd_io_ch; 1274 int rc = 0; 1275 uint64_t new_size_in_byte; 1276 uint64_t current_size_in_mb; 1277 1278 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1279 if (rc != 0) { 1280 return rc; 1281 } 1282 1283 bdev = spdk_bdev_desc_get_bdev(desc); 1284 1285 if (bdev->module != &rbd_if) { 1286 rc = -EINVAL; 1287 goto exit; 1288 } 1289 1290 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1291 if (current_size_in_mb > new_size_in_mb) { 1292 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1293 rc = -EINVAL; 1294 goto exit; 1295 } 1296 1297 ch = bdev_rbd_get_io_channel(bdev); 1298 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1299 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1300 1301 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1302 spdk_put_io_channel(ch); 1303 if (rc != 0) { 1304 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1305 goto exit; 1306 } 1307 1308 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1309 if (rc != 0) { 1310 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1311 } 1312 1313 exit: 1314 spdk_bdev_close(desc); 1315 return rc; 1316 } 1317 1318 static int 1319 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1320 { 1321 return 0; 1322 } 1323 1324 static void 1325 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1326 { 1327 } 1328 1329 static int 1330 bdev_rbd_library_init(void) 1331 { 1332 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1333 0, "bdev_rbd_poll_groups"); 1334 return 0; 1335 } 1336 1337 static void 1338 bdev_rbd_library_fini(void) 1339 { 1340 spdk_io_device_unregister(&rbd_if, NULL); 1341 } 1342 1343 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1344