1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "bdev_rbd.h" 9 10 #include <rbd/librbd.h> 11 #include <rados/librados.h> 12 13 #include "spdk/env.h" 14 #include "spdk/bdev.h" 15 #include "spdk/thread.h" 16 #include "spdk/json.h" 17 #include "spdk/string.h" 18 #include "spdk/util.h" 19 #include "spdk/likely.h" 20 21 #include "spdk/bdev_module.h" 22 #include "spdk/log.h" 23 24 static int bdev_rbd_count = 0; 25 26 struct bdev_rbd { 27 struct spdk_bdev disk; 28 char *rbd_name; 29 char *user_id; 30 char *pool_name; 31 char **config; 32 33 rados_t cluster; 34 rados_t *cluster_p; 35 char *cluster_name; 36 37 rados_ioctx_t io_ctx; 38 rbd_image_t image; 39 40 rbd_image_info_t info; 41 struct spdk_thread *main_td; 42 struct spdk_thread *destruct_td; 43 44 TAILQ_ENTRY(bdev_rbd) tailq; 45 struct spdk_poller *reset_timer; 46 struct spdk_bdev_io *reset_bdev_io; 47 }; 48 49 struct bdev_rbd_io_channel { 50 struct bdev_rbd *disk; 51 struct spdk_io_channel *group_ch; 52 }; 53 54 struct bdev_rbd_io { 55 struct spdk_thread *submit_td; 56 enum spdk_bdev_io_status status; 57 rbd_completion_t comp; 58 size_t total_len; 59 }; 60 61 struct bdev_rbd_cluster { 62 char *name; 63 char *user_id; 64 char **config_param; 65 char *config_file; 66 char *key_file; 67 rados_t cluster; 68 uint32_t ref; 69 STAILQ_ENTRY(bdev_rbd_cluster) link; 70 }; 71 72 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 73 g_map_bdev_rbd_cluster); 74 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 75 76 static void 77 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 78 { 79 assert(entry != NULL); 80 81 bdev_rbd_free_config(entry->config_param); 82 free(entry->config_file); 83 free(entry->key_file); 84 free(entry->user_id); 85 free(entry->name); 86 free(entry); 87 } 88 89 static void 90 bdev_rbd_put_cluster(rados_t **cluster) 91 { 92 struct bdev_rbd_cluster *entry; 93 94 assert(cluster != NULL); 95 96 /* No need go through the map if *cluster equals to NULL */ 97 if (*cluster == NULL) { 98 return; 99 } 100 101 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 102 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 103 if (*cluster != &entry->cluster) { 104 continue; 105 } 106 107 assert(entry->ref > 0); 108 entry->ref--; 109 *cluster = NULL; 110 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 111 return; 112 } 113 114 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 115 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 116 } 117 118 static void 119 bdev_rbd_free(struct bdev_rbd *rbd) 120 { 121 if (!rbd) { 122 return; 123 } 124 125 if (rbd->image) { 126 rbd_flush(rbd->image); 127 rbd_close(rbd->image); 128 } 129 130 free(rbd->disk.name); 131 free(rbd->rbd_name); 132 free(rbd->user_id); 133 free(rbd->pool_name); 134 bdev_rbd_free_config(rbd->config); 135 136 if (rbd->io_ctx) { 137 rados_ioctx_destroy(rbd->io_ctx); 138 } 139 140 if (rbd->cluster_name) { 141 bdev_rbd_put_cluster(&rbd->cluster_p); 142 free(rbd->cluster_name); 143 } else if (rbd->cluster) { 144 rados_shutdown(rbd->cluster); 145 } 146 147 free(rbd); 148 } 149 150 void 151 bdev_rbd_free_config(char **config) 152 { 153 char **entry; 154 155 if (config) { 156 for (entry = config; *entry; entry++) { 157 free(*entry); 158 } 159 free(config); 160 } 161 } 162 163 char ** 164 bdev_rbd_dup_config(const char *const *config) 165 { 166 size_t count; 167 char **copy; 168 169 if (!config) { 170 return NULL; 171 } 172 for (count = 0; config[count]; count++) {} 173 copy = calloc(count + 1, sizeof(*copy)); 174 if (!copy) { 175 return NULL; 176 } 177 for (count = 0; config[count]; count++) { 178 if (!(copy[count] = strdup(config[count]))) { 179 bdev_rbd_free_config(copy); 180 return NULL; 181 } 182 } 183 return copy; 184 } 185 186 static int 187 bdev_rados_cluster_init(const char *user_id, const char *const *config, 188 rados_t *cluster) 189 { 190 int ret; 191 192 ret = rados_create(cluster, user_id); 193 if (ret < 0) { 194 SPDK_ERRLOG("Failed to create rados_t struct\n"); 195 return -1; 196 } 197 198 if (config) { 199 const char *const *entry = config; 200 while (*entry) { 201 ret = rados_conf_set(*cluster, entry[0], entry[1]); 202 if (ret < 0) { 203 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 204 rados_shutdown(*cluster); 205 *cluster = NULL; 206 return -1; 207 } 208 entry += 2; 209 } 210 } else { 211 ret = rados_conf_read_file(*cluster, NULL); 212 if (ret < 0) { 213 SPDK_ERRLOG("Failed to read conf file\n"); 214 rados_shutdown(*cluster); 215 *cluster = NULL; 216 return -1; 217 } 218 } 219 220 ret = rados_connect(*cluster); 221 if (ret < 0) { 222 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 223 rados_shutdown(*cluster); 224 *cluster = NULL; 225 return -1; 226 } 227 228 return 0; 229 } 230 231 static int 232 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 233 { 234 struct bdev_rbd_cluster *entry; 235 236 if (cluster == NULL) { 237 SPDK_ERRLOG("cluster should not be NULL\n"); 238 return -1; 239 } 240 241 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 242 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 243 if (strcmp(cluster_name, entry->name) == 0) { 244 entry->ref++; 245 *cluster = &entry->cluster; 246 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 247 return 0; 248 } 249 } 250 251 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 252 return -1; 253 } 254 255 static int 256 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 257 { 258 int ret; 259 260 ret = bdev_rbd_get_cluster(cluster_name, cluster); 261 if (ret < 0) { 262 SPDK_ERRLOG("Failed to create rados_t struct\n"); 263 return -1; 264 } 265 266 return ret; 267 } 268 269 static void * 270 bdev_rbd_cluster_handle(void *arg) 271 { 272 void *ret = arg; 273 struct bdev_rbd *rbd = arg; 274 int rc; 275 276 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 277 &rbd->cluster); 278 if (rc < 0) { 279 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 280 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 281 ret = NULL; 282 } 283 284 return ret; 285 } 286 287 static void * 288 bdev_rbd_init_context(void *arg) 289 { 290 struct bdev_rbd *rbd = arg; 291 int rc; 292 293 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 294 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 295 return NULL; 296 } 297 298 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 299 if (rc < 0) { 300 SPDK_ERRLOG("Failed to open specified rbd device\n"); 301 return NULL; 302 } 303 304 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 305 if (rc < 0) { 306 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 307 return NULL; 308 } 309 310 return arg; 311 } 312 313 static int 314 bdev_rbd_init(struct bdev_rbd *rbd) 315 { 316 int ret = 0; 317 318 if (!rbd->cluster_name) { 319 rbd->cluster_p = &rbd->cluster; 320 /* Cluster should be created in non-SPDK thread to avoid conflict between 321 * Rados and SPDK thread */ 322 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 323 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 324 return -1; 325 } 326 } else { 327 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 328 if (ret < 0) { 329 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 330 rbd, rbd->cluster_name); 331 return -1; 332 } 333 } 334 335 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 336 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 337 return -1; 338 } 339 340 rbd->main_td = spdk_get_thread(); 341 342 return ret; 343 } 344 345 static void 346 _bdev_rbd_io_complete(void *_rbd_io) 347 { 348 struct bdev_rbd_io *rbd_io = _rbd_io; 349 350 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 351 } 352 353 static void 354 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 355 { 356 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 357 struct spdk_thread *current_thread = spdk_get_thread(); 358 359 rbd_io->status = status; 360 assert(rbd_io->submit_td != NULL); 361 if (rbd_io->submit_td != current_thread) { 362 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 363 } else { 364 _bdev_rbd_io_complete(rbd_io); 365 } 366 } 367 368 static void 369 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 370 { 371 int io_status; 372 struct spdk_bdev_io *bdev_io; 373 struct bdev_rbd_io *rbd_io; 374 enum spdk_bdev_io_status bio_status; 375 376 bdev_io = rbd_aio_get_arg(cb); 377 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 378 io_status = rbd_aio_get_return_value(cb); 379 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 380 381 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 382 if ((int)rbd_io->total_len != io_status) { 383 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 384 } 385 } else { 386 /* For others, 0 means success */ 387 if (io_status != 0) { 388 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 389 } 390 } 391 392 rbd_aio_release(cb); 393 394 bdev_rbd_io_complete(bdev_io, bio_status); 395 } 396 397 static void 398 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 399 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 400 { 401 int ret; 402 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 403 rbd_image_t image = disk->image; 404 405 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 406 &rbd_io->comp); 407 if (ret < 0) { 408 goto err; 409 } 410 411 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 412 rbd_io->total_len = len; 413 if (spdk_likely(iovcnt == 1)) { 414 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 415 } else { 416 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 417 } 418 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 419 if (spdk_likely(iovcnt == 1)) { 420 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 421 } else { 422 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 423 } 424 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) { 425 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 426 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 427 ret = rbd_aio_flush(image, rbd_io->comp); 428 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) { 429 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0); 430 } 431 432 if (ret < 0) { 433 rbd_aio_release(rbd_io->comp); 434 goto err; 435 } 436 437 return; 438 439 err: 440 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 441 } 442 443 static void 444 bdev_rbd_start_aio(void *ctx) 445 { 446 struct spdk_bdev_io *bdev_io = ctx; 447 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 448 449 _bdev_rbd_start_aio(disk, 450 bdev_io, 451 bdev_io->u.bdev.iovs, 452 bdev_io->u.bdev.iovcnt, 453 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 454 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 455 } 456 457 static int bdev_rbd_library_init(void); 458 static void bdev_rbd_library_fini(void); 459 460 static int 461 bdev_rbd_get_ctx_size(void) 462 { 463 return sizeof(struct bdev_rbd_io); 464 } 465 466 static struct spdk_bdev_module rbd_if = { 467 .name = "rbd", 468 .module_init = bdev_rbd_library_init, 469 .module_fini = bdev_rbd_library_fini, 470 .get_ctx_size = bdev_rbd_get_ctx_size, 471 472 }; 473 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 474 475 static int bdev_rbd_reset_timer(void *arg); 476 477 static void 478 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd, 479 void *cb_arg, int rc) 480 { 481 struct bdev_rbd *disk = cb_arg; 482 enum spdk_bdev_io_status bio_status; 483 484 if (rc == 0 && current_qd > 0) { 485 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000); 486 return; 487 } 488 489 if (rc != 0) { 490 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 491 } else { 492 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 493 } 494 495 bdev_rbd_io_complete(disk->reset_bdev_io, bio_status); 496 disk->reset_bdev_io = NULL; 497 } 498 499 static int 500 bdev_rbd_reset_timer(void *arg) 501 { 502 struct bdev_rbd *disk = arg; 503 504 spdk_poller_unregister(&disk->reset_timer); 505 506 spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk); 507 508 return SPDK_POLLER_BUSY; 509 } 510 511 static void 512 bdev_rbd_reset(void *ctx) 513 { 514 struct spdk_bdev_io *bdev_io = ctx; 515 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 516 517 /* 518 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 519 * poller to wait for in-flight I/O to complete. 520 */ 521 assert(disk->reset_bdev_io == NULL); 522 disk->reset_bdev_io = bdev_io; 523 524 bdev_rbd_reset_timer(disk); 525 } 526 527 static void 528 _bdev_rbd_destruct_done(void *io_device) 529 { 530 struct bdev_rbd *rbd = io_device; 531 532 assert(rbd != NULL); 533 534 spdk_bdev_destruct_done(&rbd->disk, 0); 535 bdev_rbd_free(rbd); 536 } 537 538 static void 539 bdev_rbd_free_cb(void *io_device) 540 { 541 struct bdev_rbd *rbd = io_device; 542 543 /* The io device has been unregistered. Send a message back to the 544 * original thread that started the destruct operation, so that the 545 * bdev unregister callback is invoked on the same thread that started 546 * this whole process. 547 */ 548 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 549 } 550 551 static void 552 _bdev_rbd_destruct(void *ctx) 553 { 554 struct bdev_rbd *rbd = ctx; 555 556 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 557 } 558 559 static int 560 bdev_rbd_destruct(void *ctx) 561 { 562 struct bdev_rbd *rbd = ctx; 563 struct spdk_thread *td; 564 565 if (rbd->main_td == NULL) { 566 td = spdk_get_thread(); 567 } else { 568 td = rbd->main_td; 569 } 570 571 /* Start the destruct operation on the rbd bdev's 572 * main thread. This guarantees it will only start 573 * executing after any messages related to channel 574 * deletions have finished completing. *Always* 575 * send a message, even if this function gets called 576 * from the main thread, in case there are pending 577 * channel delete messages in flight to this thread. 578 */ 579 assert(rbd->destruct_td == NULL); 580 rbd->destruct_td = td; 581 spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd); 582 583 /* Return 1 to indicate the destruct path is asynchronous. */ 584 return 1; 585 } 586 587 static void 588 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 589 bool success) 590 { 591 if (!success) { 592 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 593 return; 594 } 595 596 bdev_rbd_start_aio(bdev_io); 597 } 598 599 static void 600 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 601 { 602 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 603 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 604 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 605 606 rbd_io->submit_td = submit_td; 607 switch (bdev_io->type) { 608 case SPDK_BDEV_IO_TYPE_READ: 609 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 610 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 611 break; 612 613 case SPDK_BDEV_IO_TYPE_WRITE: 614 case SPDK_BDEV_IO_TYPE_UNMAP: 615 case SPDK_BDEV_IO_TYPE_FLUSH: 616 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 617 bdev_rbd_start_aio(bdev_io); 618 break; 619 620 case SPDK_BDEV_IO_TYPE_RESET: 621 spdk_thread_exec_msg(disk->main_td, bdev_rbd_reset, bdev_io); 622 break; 623 624 default: 625 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 626 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 627 break; 628 } 629 } 630 631 static bool 632 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 633 { 634 switch (io_type) { 635 case SPDK_BDEV_IO_TYPE_READ: 636 case SPDK_BDEV_IO_TYPE_WRITE: 637 case SPDK_BDEV_IO_TYPE_UNMAP: 638 case SPDK_BDEV_IO_TYPE_FLUSH: 639 case SPDK_BDEV_IO_TYPE_RESET: 640 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 641 return true; 642 643 default: 644 return false; 645 } 646 } 647 648 static int 649 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 650 { 651 struct bdev_rbd_io_channel *ch = ctx_buf; 652 struct bdev_rbd *disk = io_device; 653 654 ch->disk = disk; 655 ch->group_ch = spdk_get_io_channel(&rbd_if); 656 assert(ch->group_ch != NULL); 657 658 return 0; 659 } 660 661 static void 662 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 663 { 664 struct bdev_rbd_io_channel *ch = ctx_buf; 665 666 spdk_put_io_channel(ch->group_ch); 667 } 668 669 static struct spdk_io_channel * 670 bdev_rbd_get_io_channel(void *ctx) 671 { 672 struct bdev_rbd *rbd_bdev = ctx; 673 674 return spdk_get_io_channel(rbd_bdev); 675 } 676 677 static void 678 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 679 { 680 struct bdev_rbd_cluster *entry; 681 682 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 683 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 684 if (strcmp(cluster_name, entry->name)) { 685 continue; 686 } 687 if (entry->user_id) { 688 spdk_json_write_named_string(w, "user_id", entry->user_id); 689 } 690 691 if (entry->config_param) { 692 char **config_entry = entry->config_param; 693 694 spdk_json_write_named_object_begin(w, "config_param"); 695 while (*config_entry) { 696 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 697 config_entry += 2; 698 } 699 spdk_json_write_object_end(w); 700 } 701 if (entry->config_file) { 702 spdk_json_write_named_string(w, "config_file", entry->config_file); 703 } 704 if (entry->key_file) { 705 spdk_json_write_named_string(w, "key_file", entry->key_file); 706 } 707 708 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 709 return; 710 } 711 712 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 713 } 714 715 static int 716 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 717 { 718 struct bdev_rbd *rbd_bdev = ctx; 719 720 spdk_json_write_named_object_begin(w, "rbd"); 721 722 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 723 724 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 725 726 if (rbd_bdev->cluster_name) { 727 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 728 goto end; 729 } 730 731 if (rbd_bdev->user_id) { 732 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 733 } 734 735 if (rbd_bdev->config) { 736 char **entry = rbd_bdev->config; 737 738 spdk_json_write_named_object_begin(w, "config"); 739 while (*entry) { 740 spdk_json_write_named_string(w, entry[0], entry[1]); 741 entry += 2; 742 } 743 spdk_json_write_object_end(w); 744 } 745 746 end: 747 spdk_json_write_object_end(w); 748 749 return 0; 750 } 751 752 static void 753 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 754 { 755 struct bdev_rbd *rbd = bdev->ctxt; 756 char uuid_str[SPDK_UUID_STRING_LEN]; 757 758 spdk_json_write_object_begin(w); 759 760 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 761 762 spdk_json_write_named_object_begin(w, "params"); 763 spdk_json_write_named_string(w, "name", bdev->name); 764 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 765 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 766 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 767 if (rbd->user_id) { 768 spdk_json_write_named_string(w, "user_id", rbd->user_id); 769 } 770 771 if (rbd->config) { 772 char **entry = rbd->config; 773 774 spdk_json_write_named_object_begin(w, "config"); 775 while (*entry) { 776 spdk_json_write_named_string(w, entry[0], entry[1]); 777 entry += 2; 778 } 779 spdk_json_write_object_end(w); 780 } 781 782 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 783 spdk_json_write_named_string(w, "uuid", uuid_str); 784 785 spdk_json_write_object_end(w); 786 787 spdk_json_write_object_end(w); 788 } 789 790 static void 791 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 792 { 793 assert(entry != NULL); 794 795 spdk_json_write_object_begin(w); 796 spdk_json_write_named_string(w, "cluster_name", entry->name); 797 798 if (entry->user_id) { 799 spdk_json_write_named_string(w, "user_id", entry->user_id); 800 } 801 802 if (entry->config_param) { 803 char **config_entry = entry->config_param; 804 805 spdk_json_write_named_object_begin(w, "config_param"); 806 while (*config_entry) { 807 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 808 config_entry += 2; 809 } 810 spdk_json_write_object_end(w); 811 } 812 if (entry->config_file) { 813 spdk_json_write_named_string(w, "config_file", entry->config_file); 814 } 815 if (entry->key_file) { 816 spdk_json_write_named_string(w, "key_file", entry->key_file); 817 } 818 819 spdk_json_write_object_end(w); 820 } 821 822 int 823 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 824 { 825 struct bdev_rbd_cluster *entry; 826 struct spdk_json_write_ctx *w; 827 828 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 829 830 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 831 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 832 return -ENOENT; 833 } 834 835 /* If cluster name is provided */ 836 if (name) { 837 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 838 if (strcmp(name, entry->name) == 0) { 839 w = spdk_jsonrpc_begin_result(request); 840 dump_single_cluster_entry(entry, w); 841 spdk_jsonrpc_end_result(request, w); 842 843 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 844 return 0; 845 } 846 } 847 848 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 849 return -ENOENT; 850 } 851 852 w = spdk_jsonrpc_begin_result(request); 853 spdk_json_write_array_begin(w); 854 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 855 dump_single_cluster_entry(entry, w); 856 } 857 spdk_json_write_array_end(w); 858 spdk_jsonrpc_end_result(request, w); 859 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 860 861 return 0; 862 } 863 864 static const struct spdk_bdev_fn_table rbd_fn_table = { 865 .destruct = bdev_rbd_destruct, 866 .submit_request = bdev_rbd_submit_request, 867 .io_type_supported = bdev_rbd_io_type_supported, 868 .get_io_channel = bdev_rbd_get_io_channel, 869 .dump_info_json = bdev_rbd_dump_info_json, 870 .write_config_json = bdev_rbd_write_config_json, 871 }; 872 873 static int 874 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 875 const char *config_file, const char *key_file) 876 { 877 struct bdev_rbd_cluster *entry; 878 int rc; 879 880 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 881 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 882 if (strcmp(name, entry->name) == 0) { 883 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 884 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 885 return -1; 886 } 887 } 888 889 entry = calloc(1, sizeof(*entry)); 890 if (!entry) { 891 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 892 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 893 return -1; 894 } 895 896 entry->name = strdup(name); 897 if (entry->name == NULL) { 898 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 899 goto err_handle; 900 } 901 902 if (user_id) { 903 entry->user_id = strdup(user_id); 904 if (entry->user_id == NULL) { 905 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 906 goto err_handle; 907 } 908 } 909 910 /* Support specify config_param or config_file separately, or both of them. */ 911 if (config_param) { 912 entry->config_param = bdev_rbd_dup_config(config_param); 913 if (entry->config_param == NULL) { 914 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 915 goto err_handle; 916 } 917 } 918 919 if (config_file) { 920 entry->config_file = strdup(config_file); 921 if (entry->config_file == NULL) { 922 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 923 goto err_handle; 924 } 925 } 926 927 if (key_file) { 928 entry->key_file = strdup(key_file); 929 if (entry->key_file == NULL) { 930 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 931 goto err_handle; 932 } 933 } 934 935 rc = rados_create(&entry->cluster, user_id); 936 if (rc < 0) { 937 SPDK_ERRLOG("Failed to create rados_t struct\n"); 938 goto err_handle; 939 } 940 941 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 942 rc = rados_conf_read_file(entry->cluster, entry->config_file); 943 if (entry->config_file && rc < 0) { 944 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 945 rados_shutdown(entry->cluster); 946 goto err_handle; 947 } 948 949 if (config_param) { 950 const char *const *config_entry = config_param; 951 while (*config_entry) { 952 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 953 if (rc < 0) { 954 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 955 rados_shutdown(entry->cluster); 956 goto err_handle; 957 } 958 config_entry += 2; 959 } 960 } 961 962 if (key_file) { 963 rc = rados_conf_set(entry->cluster, "keyring", key_file); 964 if (rc < 0) { 965 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 966 rados_shutdown(entry->cluster); 967 goto err_handle; 968 } 969 } 970 971 rc = rados_connect(entry->cluster); 972 if (rc < 0) { 973 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 974 rados_shutdown(entry->cluster); 975 goto err_handle; 976 } 977 978 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 979 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 980 981 return 0; 982 983 err_handle: 984 bdev_rbd_cluster_free(entry); 985 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 986 return -1; 987 } 988 989 int 990 bdev_rbd_unregister_cluster(const char *name) 991 { 992 struct bdev_rbd_cluster *entry; 993 int rc = 0; 994 995 if (name == NULL) { 996 return -1; 997 } 998 999 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1000 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1001 if (strcmp(name, entry->name) == 0) { 1002 if (entry->ref == 0) { 1003 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1004 rados_shutdown(entry->cluster); 1005 bdev_rbd_cluster_free(entry); 1006 } else { 1007 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1008 entry->name); 1009 rc = -1; 1010 } 1011 1012 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1013 return rc; 1014 } 1015 } 1016 1017 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1018 1019 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1020 1021 return -1; 1022 } 1023 1024 static void * 1025 _bdev_rbd_register_cluster(void *arg) 1026 { 1027 struct cluster_register_info *info = arg; 1028 void *ret = arg; 1029 int rc; 1030 1031 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1032 (const char *const *)info->config_param, (const char *)info->config_file, 1033 (const char *)info->key_file); 1034 if (rc) { 1035 ret = NULL; 1036 } 1037 1038 return ret; 1039 } 1040 1041 int 1042 bdev_rbd_register_cluster(struct cluster_register_info *info) 1043 { 1044 assert(info != NULL); 1045 1046 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1047 * resource contention */ 1048 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1049 return -1; 1050 } 1051 1052 return 0; 1053 } 1054 1055 int 1056 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1057 const char *pool_name, 1058 const char *const *config, 1059 const char *rbd_name, 1060 uint32_t block_size, 1061 const char *cluster_name, 1062 const struct spdk_uuid *uuid) 1063 { 1064 struct bdev_rbd *rbd; 1065 int ret; 1066 1067 if ((pool_name == NULL) || (rbd_name == NULL)) { 1068 return -EINVAL; 1069 } 1070 1071 rbd = calloc(1, sizeof(struct bdev_rbd)); 1072 if (rbd == NULL) { 1073 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1074 return -ENOMEM; 1075 } 1076 1077 rbd->rbd_name = strdup(rbd_name); 1078 if (!rbd->rbd_name) { 1079 bdev_rbd_free(rbd); 1080 return -ENOMEM; 1081 } 1082 1083 if (user_id) { 1084 rbd->user_id = strdup(user_id); 1085 if (!rbd->user_id) { 1086 bdev_rbd_free(rbd); 1087 return -ENOMEM; 1088 } 1089 } 1090 1091 if (cluster_name) { 1092 rbd->cluster_name = strdup(cluster_name); 1093 if (!rbd->cluster_name) { 1094 bdev_rbd_free(rbd); 1095 return -ENOMEM; 1096 } 1097 } 1098 rbd->pool_name = strdup(pool_name); 1099 if (!rbd->pool_name) { 1100 bdev_rbd_free(rbd); 1101 return -ENOMEM; 1102 } 1103 1104 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1105 bdev_rbd_free(rbd); 1106 return -ENOMEM; 1107 } 1108 1109 ret = bdev_rbd_init(rbd); 1110 if (ret < 0) { 1111 bdev_rbd_free(rbd); 1112 SPDK_ERRLOG("Failed to init rbd device\n"); 1113 return ret; 1114 } 1115 1116 if (uuid) { 1117 rbd->disk.uuid = *uuid; 1118 } 1119 1120 if (name) { 1121 rbd->disk.name = strdup(name); 1122 } else { 1123 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1124 } 1125 if (!rbd->disk.name) { 1126 bdev_rbd_free(rbd); 1127 return -ENOMEM; 1128 } 1129 rbd->disk.product_name = "Ceph Rbd Disk"; 1130 bdev_rbd_count++; 1131 1132 rbd->disk.write_cache = 0; 1133 rbd->disk.blocklen = block_size; 1134 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1135 rbd->disk.ctxt = rbd; 1136 rbd->disk.fn_table = &rbd_fn_table; 1137 rbd->disk.module = &rbd_if; 1138 1139 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1140 1141 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1142 bdev_rbd_destroy_cb, 1143 sizeof(struct bdev_rbd_io_channel), 1144 rbd_name); 1145 ret = spdk_bdev_register(&rbd->disk); 1146 if (ret) { 1147 spdk_io_device_unregister(rbd, NULL); 1148 bdev_rbd_free(rbd); 1149 return ret; 1150 } 1151 1152 *bdev = &(rbd->disk); 1153 1154 return ret; 1155 } 1156 1157 void 1158 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1159 { 1160 int rc; 1161 1162 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1163 if (rc != 0) { 1164 cb_fn(cb_arg, rc); 1165 } 1166 } 1167 1168 static void 1169 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1170 { 1171 } 1172 1173 int 1174 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1175 { 1176 struct spdk_bdev_desc *desc; 1177 struct spdk_bdev *bdev; 1178 struct spdk_io_channel *ch; 1179 struct bdev_rbd_io_channel *rbd_io_ch; 1180 int rc = 0; 1181 uint64_t new_size_in_byte; 1182 uint64_t current_size_in_mb; 1183 1184 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1185 if (rc != 0) { 1186 return rc; 1187 } 1188 1189 bdev = spdk_bdev_desc_get_bdev(desc); 1190 1191 if (bdev->module != &rbd_if) { 1192 rc = -EINVAL; 1193 goto exit; 1194 } 1195 1196 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1197 if (current_size_in_mb > new_size_in_mb) { 1198 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1199 rc = -EINVAL; 1200 goto exit; 1201 } 1202 1203 ch = bdev_rbd_get_io_channel(bdev); 1204 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1205 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1206 1207 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1208 spdk_put_io_channel(ch); 1209 if (rc != 0) { 1210 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1211 goto exit; 1212 } 1213 1214 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1215 if (rc != 0) { 1216 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1217 } 1218 1219 exit: 1220 spdk_bdev_close(desc); 1221 return rc; 1222 } 1223 1224 static int 1225 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1226 { 1227 return 0; 1228 } 1229 1230 static void 1231 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1232 { 1233 } 1234 1235 static int 1236 bdev_rbd_library_init(void) 1237 { 1238 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1239 0, "bdev_rbd_poll_groups"); 1240 return 0; 1241 } 1242 1243 static void 1244 bdev_rbd_library_fini(void) 1245 { 1246 spdk_io_device_unregister(&rbd_if, NULL); 1247 } 1248 1249 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1250