1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 41 #include "spdk/env.h" 42 #include "spdk/bdev.h" 43 #include "spdk/thread.h" 44 #include "spdk/json.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 #include "spdk/likely.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk/log.h" 51 52 static int bdev_rbd_count = 0; 53 54 struct bdev_rbd { 55 struct spdk_bdev disk; 56 char *rbd_name; 57 char *user_id; 58 char *pool_name; 59 char **config; 60 61 rados_t cluster; 62 rados_t *cluster_p; 63 char *cluster_name; 64 65 rados_ioctx_t io_ctx; 66 rbd_image_t image; 67 68 rbd_image_info_t info; 69 pthread_mutex_t mutex; 70 struct spdk_thread *main_td; 71 struct spdk_thread *destruct_td; 72 uint32_t ch_count; 73 struct spdk_io_channel *group_ch; 74 75 TAILQ_ENTRY(bdev_rbd) tailq; 76 struct spdk_poller *reset_timer; 77 struct spdk_bdev_io *reset_bdev_io; 78 }; 79 80 struct bdev_rbd_io_channel { 81 struct bdev_rbd *disk; 82 }; 83 84 struct bdev_rbd_io { 85 struct spdk_thread *submit_td; 86 enum spdk_bdev_io_status status; 87 rbd_completion_t comp; 88 size_t total_len; 89 }; 90 91 struct bdev_rbd_cluster { 92 char *name; 93 char *user_id; 94 char **config_param; 95 char *config_file; 96 rados_t cluster; 97 uint32_t ref; 98 STAILQ_ENTRY(bdev_rbd_cluster) link; 99 }; 100 101 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 102 g_map_bdev_rbd_cluster); 103 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 104 105 static void 106 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 107 { 108 assert(entry != NULL); 109 110 bdev_rbd_free_config(entry->config_param); 111 free(entry->config_file); 112 free(entry->user_id); 113 free(entry->name); 114 free(entry); 115 } 116 117 static void 118 bdev_rbd_put_cluster(rados_t **cluster) 119 { 120 struct bdev_rbd_cluster *entry; 121 122 assert(cluster != NULL); 123 124 /* No need go through the map if *cluster equals to NULL */ 125 if (*cluster == NULL) { 126 return; 127 } 128 129 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 130 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 131 if (*cluster != &entry->cluster) { 132 continue; 133 } 134 135 assert(entry->ref > 0); 136 entry->ref--; 137 *cluster = NULL; 138 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 139 return; 140 } 141 142 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 143 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 144 } 145 146 static void 147 bdev_rbd_free(struct bdev_rbd *rbd) 148 { 149 if (!rbd) { 150 return; 151 } 152 153 free(rbd->disk.name); 154 free(rbd->rbd_name); 155 free(rbd->user_id); 156 free(rbd->pool_name); 157 bdev_rbd_free_config(rbd->config); 158 159 if (rbd->io_ctx) { 160 rados_ioctx_destroy(rbd->io_ctx); 161 } 162 163 if (rbd->cluster_name) { 164 bdev_rbd_put_cluster(&rbd->cluster_p); 165 free(rbd->cluster_name); 166 } else if (rbd->cluster) { 167 rados_shutdown(rbd->cluster); 168 } 169 170 pthread_mutex_destroy(&rbd->mutex); 171 free(rbd); 172 } 173 174 void 175 bdev_rbd_free_config(char **config) 176 { 177 char **entry; 178 179 if (config) { 180 for (entry = config; *entry; entry++) { 181 free(*entry); 182 } 183 free(config); 184 } 185 } 186 187 char ** 188 bdev_rbd_dup_config(const char *const *config) 189 { 190 size_t count; 191 char **copy; 192 193 if (!config) { 194 return NULL; 195 } 196 for (count = 0; config[count]; count++) {} 197 copy = calloc(count + 1, sizeof(*copy)); 198 if (!copy) { 199 return NULL; 200 } 201 for (count = 0; config[count]; count++) { 202 if (!(copy[count] = strdup(config[count]))) { 203 bdev_rbd_free_config(copy); 204 return NULL; 205 } 206 } 207 return copy; 208 } 209 210 static int 211 bdev_rados_cluster_init(const char *user_id, const char *const *config, 212 rados_t *cluster) 213 { 214 int ret; 215 216 ret = rados_create(cluster, user_id); 217 if (ret < 0) { 218 SPDK_ERRLOG("Failed to create rados_t struct\n"); 219 return -1; 220 } 221 222 if (config) { 223 const char *const *entry = config; 224 while (*entry) { 225 ret = rados_conf_set(*cluster, entry[0], entry[1]); 226 if (ret < 0) { 227 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 228 rados_shutdown(*cluster); 229 return -1; 230 } 231 entry += 2; 232 } 233 } else { 234 ret = rados_conf_read_file(*cluster, NULL); 235 if (ret < 0) { 236 SPDK_ERRLOG("Failed to read conf file\n"); 237 rados_shutdown(*cluster); 238 return -1; 239 } 240 } 241 242 ret = rados_connect(*cluster); 243 if (ret < 0) { 244 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 245 rados_shutdown(*cluster); 246 return -1; 247 } 248 249 return 0; 250 } 251 252 static int 253 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 254 { 255 struct bdev_rbd_cluster *entry; 256 257 if (cluster == NULL) { 258 SPDK_ERRLOG("cluster should not be NULL\n"); 259 return -1; 260 } 261 262 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 263 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 264 if (strcmp(cluster_name, entry->name) == 0) { 265 entry->ref++; 266 *cluster = &entry->cluster; 267 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 268 return 0; 269 } 270 } 271 272 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 273 return -1; 274 } 275 276 static int 277 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 278 { 279 int ret; 280 281 ret = bdev_rbd_get_cluster(cluster_name, cluster); 282 if (ret < 0) { 283 SPDK_ERRLOG("Failed to create rados_t struct\n"); 284 return -1; 285 } 286 287 return ret; 288 } 289 290 static void * 291 bdev_rbd_cluster_handle(void *arg) 292 { 293 void *ret = arg; 294 struct bdev_rbd *rbd = arg; 295 int rc; 296 297 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 298 &rbd->cluster); 299 if (rc < 0) { 300 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 301 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 302 ret = NULL; 303 } 304 305 return ret; 306 } 307 308 static void * 309 bdev_rbd_init_context(void *arg) 310 { 311 struct bdev_rbd *rbd = arg; 312 int rc; 313 314 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 315 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 316 return NULL; 317 } 318 319 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 320 if (rc < 0) { 321 SPDK_ERRLOG("Failed to open specified rbd device\n"); 322 return NULL; 323 } 324 325 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 326 rbd_close(rbd->image); 327 if (rc < 0) { 328 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 329 return NULL; 330 } 331 332 return arg; 333 } 334 335 static int 336 bdev_rbd_init(struct bdev_rbd *rbd) 337 { 338 int ret = 0; 339 340 if (!rbd->cluster_name) { 341 rbd->cluster_p = &rbd->cluster; 342 /* Cluster should be created in non-SPDK thread to avoid conflict between 343 * Rados and SPDK thread */ 344 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 345 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 346 return -1; 347 } 348 } else { 349 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 350 if (ret < 0) { 351 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 352 rbd, rbd->cluster_name); 353 return -1; 354 } 355 } 356 357 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 358 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 359 } 360 361 return ret; 362 } 363 364 static void 365 bdev_rbd_exit(rbd_image_t image) 366 { 367 rbd_flush(image); 368 rbd_close(image); 369 } 370 371 static void 372 _bdev_rbd_io_complete(void *_rbd_io) 373 { 374 struct bdev_rbd_io *rbd_io = _rbd_io; 375 376 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 377 } 378 379 static void 380 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 381 { 382 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 383 struct spdk_thread *current_thread = spdk_get_thread(); 384 385 rbd_io->status = status; 386 assert(rbd_io->submit_td != NULL); 387 if (rbd_io->submit_td != current_thread) { 388 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 389 } else { 390 _bdev_rbd_io_complete(rbd_io); 391 } 392 } 393 394 static void 395 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 396 { 397 int io_status; 398 struct spdk_bdev_io *bdev_io; 399 struct bdev_rbd_io *rbd_io; 400 enum spdk_bdev_io_status bio_status; 401 402 bdev_io = rbd_aio_get_arg(cb); 403 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 404 io_status = rbd_aio_get_return_value(cb); 405 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 406 407 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 408 if ((int)rbd_io->total_len != io_status) { 409 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 410 } 411 } else { 412 /* For others, 0 means success */ 413 if (io_status != 0) { 414 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 415 } 416 } 417 418 rbd_aio_release(cb); 419 420 bdev_rbd_io_complete(bdev_io, bio_status); 421 } 422 423 static void 424 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 425 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 426 { 427 int ret; 428 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 429 rbd_image_t image = disk->image; 430 431 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 432 &rbd_io->comp); 433 if (ret < 0) { 434 goto err; 435 } 436 437 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 438 rbd_io->total_len = len; 439 if (spdk_likely(iovcnt == 1)) { 440 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 441 } else { 442 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 443 } 444 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 445 if (spdk_likely(iovcnt == 1)) { 446 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 447 } else { 448 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 449 } 450 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 451 ret = rbd_aio_flush(image, rbd_io->comp); 452 } 453 454 if (ret < 0) { 455 rbd_aio_release(rbd_io->comp); 456 goto err; 457 } 458 459 return; 460 461 err: 462 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 463 } 464 465 static int bdev_rbd_library_init(void); 466 static void bdev_rbd_library_fini(void); 467 468 static int 469 bdev_rbd_get_ctx_size(void) 470 { 471 return sizeof(struct bdev_rbd_io); 472 } 473 474 static struct spdk_bdev_module rbd_if = { 475 .name = "rbd", 476 .module_init = bdev_rbd_library_init, 477 .module_fini = bdev_rbd_library_fini, 478 .get_ctx_size = bdev_rbd_get_ctx_size, 479 480 }; 481 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 482 483 static int 484 bdev_rbd_reset_timer(void *arg) 485 { 486 struct bdev_rbd *disk = arg; 487 488 /* 489 * TODO: This should check if any I/O is still in flight before completing the reset. 490 * For now, just complete after the timer expires. 491 */ 492 bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 493 spdk_poller_unregister(&disk->reset_timer); 494 disk->reset_bdev_io = NULL; 495 496 return SPDK_POLLER_BUSY; 497 } 498 499 static void 500 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 501 { 502 /* 503 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 504 * timer to wait for in-flight I/O to complete. 505 */ 506 assert(disk->reset_bdev_io == NULL); 507 disk->reset_bdev_io = bdev_io; 508 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 509 } 510 511 static void 512 _bdev_rbd_destruct_done(void *io_device) 513 { 514 struct bdev_rbd *rbd = io_device; 515 516 assert(rbd != NULL); 517 assert(rbd->ch_count == 0); 518 519 spdk_bdev_destruct_done(&rbd->disk, 0); 520 bdev_rbd_free(rbd); 521 } 522 523 static void 524 bdev_rbd_free_cb(void *io_device) 525 { 526 struct bdev_rbd *rbd = io_device; 527 528 /* The io device has been unregistered. Send a message back to the 529 * original thread that started the destruct operation, so that the 530 * bdev unregister callback is invoked on the same thread that started 531 * this whole process. 532 */ 533 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 534 } 535 536 static void 537 _bdev_rbd_destruct(void *ctx) 538 { 539 struct bdev_rbd *rbd = ctx; 540 541 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 542 } 543 544 static int 545 bdev_rbd_destruct(void *ctx) 546 { 547 struct bdev_rbd *rbd = ctx; 548 struct spdk_thread *td; 549 550 if (rbd->main_td == NULL) { 551 td = spdk_get_thread(); 552 } else { 553 td = rbd->main_td; 554 } 555 556 /* Start the destruct operation on the rbd bdev's 557 * main thread. This guarantees it will only start 558 * executing after any messages related to channel 559 * deletions have finished completing. *Always* 560 * send a message, even if this function gets called 561 * from the main thread, in case there are pending 562 * channel delete messages in flight to this thread. 563 */ 564 assert(rbd->destruct_td == NULL); 565 rbd->destruct_td = td; 566 spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd); 567 568 /* Return 1 to indicate the destruct path is asynchronous. */ 569 return 1; 570 } 571 572 static void 573 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 574 bool success) 575 { 576 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 577 578 if (!success) { 579 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 580 return; 581 } 582 583 bdev_rbd_start_aio(disk, 584 bdev_io, 585 bdev_io->u.bdev.iovs, 586 bdev_io->u.bdev.iovcnt, 587 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 588 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 589 } 590 591 static void 592 _bdev_rbd_submit_request(void *ctx) 593 { 594 struct spdk_bdev_io *bdev_io = ctx; 595 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 596 597 switch (bdev_io->type) { 598 case SPDK_BDEV_IO_TYPE_READ: 599 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 600 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 601 break; 602 603 case SPDK_BDEV_IO_TYPE_WRITE: 604 case SPDK_BDEV_IO_TYPE_FLUSH: 605 bdev_rbd_start_aio(disk, 606 bdev_io, 607 bdev_io->u.bdev.iovs, 608 bdev_io->u.bdev.iovcnt, 609 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 610 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 611 break; 612 613 case SPDK_BDEV_IO_TYPE_RESET: 614 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 615 bdev_io); 616 break; 617 618 default: 619 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 620 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 621 break; 622 } 623 } 624 625 static void 626 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 627 { 628 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 629 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 630 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 631 632 rbd_io->submit_td = submit_td; 633 if (disk->main_td != submit_td) { 634 spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io); 635 } else { 636 _bdev_rbd_submit_request(bdev_io); 637 } 638 } 639 640 static bool 641 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 642 { 643 switch (io_type) { 644 case SPDK_BDEV_IO_TYPE_READ: 645 case SPDK_BDEV_IO_TYPE_WRITE: 646 case SPDK_BDEV_IO_TYPE_FLUSH: 647 case SPDK_BDEV_IO_TYPE_RESET: 648 return true; 649 650 default: 651 return false; 652 } 653 } 654 655 static void 656 bdev_rbd_free_channel_resources(struct bdev_rbd *disk) 657 { 658 assert(disk != NULL); 659 assert(disk->main_td == spdk_get_thread()); 660 assert(disk->ch_count == 0); 661 662 spdk_put_io_channel(disk->group_ch); 663 if (disk->image) { 664 bdev_rbd_exit(disk->image); 665 } 666 667 disk->main_td = NULL; 668 disk->group_ch = NULL; 669 } 670 671 static void * 672 bdev_rbd_handle(void *arg) 673 { 674 struct bdev_rbd *disk = arg; 675 void *ret = arg; 676 677 if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) { 678 SPDK_ERRLOG("Failed to open specified rbd device\n"); 679 ret = NULL; 680 } 681 682 return ret; 683 } 684 685 static int 686 _bdev_rbd_create_cb(struct bdev_rbd *disk) 687 { 688 disk->group_ch = spdk_get_io_channel(&rbd_if); 689 assert(disk->group_ch != NULL); 690 691 if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) { 692 bdev_rbd_free_channel_resources(disk); 693 return -1; 694 } 695 696 return 0; 697 } 698 699 static int 700 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 701 { 702 struct bdev_rbd_io_channel *ch = ctx_buf; 703 struct bdev_rbd *disk = io_device; 704 int rc; 705 706 ch->disk = disk; 707 pthread_mutex_lock(&disk->mutex); 708 if (disk->ch_count == 0) { 709 assert(disk->main_td == NULL); 710 rc = _bdev_rbd_create_cb(disk); 711 if (rc) { 712 SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk); 713 pthread_mutex_unlock(&disk->mutex); 714 return rc; 715 } 716 717 disk->main_td = spdk_get_thread(); 718 } 719 720 disk->ch_count++; 721 pthread_mutex_unlock(&disk->mutex); 722 723 return 0; 724 } 725 726 static void 727 _bdev_rbd_destroy_cb(void *ctx) 728 { 729 struct bdev_rbd *disk = ctx; 730 731 pthread_mutex_lock(&disk->mutex); 732 assert(disk->ch_count > 0); 733 disk->ch_count--; 734 735 if (disk->ch_count > 0) { 736 /* A new channel was created between when message was sent and this function executed */ 737 pthread_mutex_unlock(&disk->mutex); 738 return; 739 } 740 741 bdev_rbd_free_channel_resources(disk); 742 pthread_mutex_unlock(&disk->mutex); 743 } 744 745 static void 746 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 747 { 748 struct bdev_rbd *disk = io_device; 749 struct spdk_thread *thread; 750 751 pthread_mutex_lock(&disk->mutex); 752 assert(disk->ch_count > 0); 753 disk->ch_count--; 754 if (disk->ch_count == 0) { 755 assert(disk->main_td != NULL); 756 if (disk->main_td != spdk_get_thread()) { 757 /* The final channel was destroyed on a different thread 758 * than where the first channel was created. Pass a message 759 * to the main thread to unregister the poller. */ 760 disk->ch_count++; 761 thread = disk->main_td; 762 pthread_mutex_unlock(&disk->mutex); 763 spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk); 764 return; 765 } 766 767 bdev_rbd_free_channel_resources(disk); 768 } 769 pthread_mutex_unlock(&disk->mutex); 770 } 771 772 static struct spdk_io_channel * 773 bdev_rbd_get_io_channel(void *ctx) 774 { 775 struct bdev_rbd *rbd_bdev = ctx; 776 777 return spdk_get_io_channel(rbd_bdev); 778 } 779 780 static void 781 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 782 { 783 struct bdev_rbd_cluster *entry; 784 785 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 786 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 787 if (strcmp(cluster_name, entry->name)) { 788 continue; 789 } 790 if (entry->user_id) { 791 spdk_json_write_named_string(w, "user_id", entry->user_id); 792 } 793 794 if (entry->config_param) { 795 char **config_entry = entry->config_param; 796 797 spdk_json_write_named_object_begin(w, "config_param"); 798 while (*config_entry) { 799 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 800 config_entry += 2; 801 } 802 spdk_json_write_object_end(w); 803 } else if (entry->config_file) { 804 spdk_json_write_named_string(w, "config_file", entry->config_file); 805 } 806 807 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 808 return; 809 } 810 811 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 812 } 813 814 static int 815 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 816 { 817 struct bdev_rbd *rbd_bdev = ctx; 818 819 spdk_json_write_named_object_begin(w, "rbd"); 820 821 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 822 823 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 824 825 if (rbd_bdev->cluster_name) { 826 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 827 goto end; 828 } 829 830 if (rbd_bdev->user_id) { 831 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 832 } 833 834 if (rbd_bdev->config) { 835 char **entry = rbd_bdev->config; 836 837 spdk_json_write_named_object_begin(w, "config"); 838 while (*entry) { 839 spdk_json_write_named_string(w, entry[0], entry[1]); 840 entry += 2; 841 } 842 spdk_json_write_object_end(w); 843 } 844 845 end: 846 spdk_json_write_object_end(w); 847 848 return 0; 849 } 850 851 static void 852 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 853 { 854 struct bdev_rbd *rbd = bdev->ctxt; 855 char uuid_str[SPDK_UUID_STRING_LEN]; 856 857 spdk_json_write_object_begin(w); 858 859 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 860 861 spdk_json_write_named_object_begin(w, "params"); 862 spdk_json_write_named_string(w, "name", bdev->name); 863 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 864 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 865 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 866 if (rbd->user_id) { 867 spdk_json_write_named_string(w, "user_id", rbd->user_id); 868 } 869 870 if (rbd->config) { 871 char **entry = rbd->config; 872 873 spdk_json_write_named_object_begin(w, "config"); 874 while (*entry) { 875 spdk_json_write_named_string(w, entry[0], entry[1]); 876 entry += 2; 877 } 878 spdk_json_write_object_end(w); 879 } 880 881 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 882 spdk_json_write_named_string(w, "uuid", uuid_str); 883 884 spdk_json_write_object_end(w); 885 886 spdk_json_write_object_end(w); 887 } 888 889 static void 890 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 891 { 892 assert(entry != NULL); 893 894 spdk_json_write_object_begin(w); 895 spdk_json_write_named_string(w, "cluster_name", entry->name); 896 897 if (entry->user_id) { 898 spdk_json_write_named_string(w, "user_id", entry->user_id); 899 } 900 901 if (entry->config_param) { 902 char **config_entry = entry->config_param; 903 904 spdk_json_write_named_object_begin(w, "config_param"); 905 while (*config_entry) { 906 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 907 config_entry += 2; 908 } 909 spdk_json_write_object_end(w); 910 } else if (entry->config_file) { 911 spdk_json_write_named_string(w, "config_file", entry->config_file); 912 } 913 914 spdk_json_write_object_end(w); 915 } 916 917 int 918 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 919 { 920 struct bdev_rbd_cluster *entry; 921 struct spdk_json_write_ctx *w; 922 923 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 924 925 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 926 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 927 return -ENOENT; 928 } 929 930 /* If cluster name is provided */ 931 if (name) { 932 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 933 if (strcmp(name, entry->name) == 0) { 934 w = spdk_jsonrpc_begin_result(request); 935 dump_single_cluster_entry(entry, w); 936 spdk_jsonrpc_end_result(request, w); 937 938 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 939 return 0; 940 } 941 } 942 943 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 944 return -ENOENT; 945 } 946 947 w = spdk_jsonrpc_begin_result(request); 948 spdk_json_write_array_begin(w); 949 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 950 dump_single_cluster_entry(entry, w); 951 } 952 spdk_json_write_array_end(w); 953 spdk_jsonrpc_end_result(request, w); 954 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 955 956 return 0; 957 } 958 959 static const struct spdk_bdev_fn_table rbd_fn_table = { 960 .destruct = bdev_rbd_destruct, 961 .submit_request = bdev_rbd_submit_request, 962 .io_type_supported = bdev_rbd_io_type_supported, 963 .get_io_channel = bdev_rbd_get_io_channel, 964 .dump_info_json = bdev_rbd_dump_info_json, 965 .write_config_json = bdev_rbd_write_config_json, 966 }; 967 968 static int 969 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 970 const char *config_file) 971 { 972 struct bdev_rbd_cluster *entry; 973 int rc; 974 975 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 976 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 977 if (strcmp(name, entry->name) == 0) { 978 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 979 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 980 return -1; 981 } 982 } 983 984 entry = calloc(1, sizeof(*entry)); 985 if (!entry) { 986 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 987 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 988 return -1; 989 } 990 991 entry->name = strdup(name); 992 if (entry->name == NULL) { 993 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 994 goto err_handle; 995 } 996 997 if (user_id) { 998 entry->user_id = strdup(user_id); 999 if (entry->user_id == NULL) { 1000 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1001 goto err_handle; 1002 } 1003 } 1004 1005 /* The first priority is the config_param, then we use the config_file */ 1006 if (config_param) { 1007 entry->config_param = bdev_rbd_dup_config(config_param); 1008 if (entry->config_param == NULL) { 1009 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1010 goto err_handle; 1011 } 1012 } else if (config_file) { 1013 entry->config_file = strdup(config_file); 1014 if (entry->config_file == NULL) { 1015 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1016 goto err_handle; 1017 } 1018 } 1019 1020 rc = rados_create(&entry->cluster, user_id); 1021 if (rc < 0) { 1022 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1023 goto err_handle; 1024 } 1025 1026 if (config_param) { 1027 const char *const *config_entry = config_param; 1028 while (*config_entry) { 1029 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1030 if (rc < 0) { 1031 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1032 rados_shutdown(entry->cluster); 1033 goto err_handle; 1034 } 1035 config_entry += 2; 1036 } 1037 } else { 1038 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1039 if (rc < 0) { 1040 SPDK_ERRLOG("Failed to read conf file\n"); 1041 rados_shutdown(entry->cluster); 1042 goto err_handle; 1043 } 1044 } 1045 1046 rc = rados_connect(entry->cluster); 1047 if (rc < 0) { 1048 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1049 rados_shutdown(entry->cluster); 1050 goto err_handle; 1051 } 1052 1053 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1054 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1055 1056 return 0; 1057 1058 err_handle: 1059 bdev_rbd_cluster_free(entry); 1060 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1061 return -1; 1062 } 1063 1064 int 1065 bdev_rbd_unregister_cluster(const char *name) 1066 { 1067 struct bdev_rbd_cluster *entry; 1068 int rc = 0; 1069 1070 if (name == NULL) { 1071 return -1; 1072 } 1073 1074 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1075 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1076 if (strcmp(name, entry->name) == 0) { 1077 if (entry->ref == 0) { 1078 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1079 rados_shutdown(entry->cluster); 1080 bdev_rbd_cluster_free(entry); 1081 } else { 1082 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1083 entry->name); 1084 rc = -1; 1085 } 1086 1087 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1088 return rc; 1089 } 1090 } 1091 1092 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1093 1094 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1095 1096 return -1; 1097 } 1098 1099 static void * 1100 _bdev_rbd_register_cluster(void *arg) 1101 { 1102 struct cluster_register_info *info = arg; 1103 void *ret = arg; 1104 int rc; 1105 1106 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1107 (const char *const *)info->config_param, (const char *)info->config_file); 1108 if (rc) { 1109 ret = NULL; 1110 } 1111 1112 return ret; 1113 } 1114 1115 int 1116 bdev_rbd_register_cluster(struct cluster_register_info *info) 1117 { 1118 assert(info != NULL); 1119 1120 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1121 * resource contention */ 1122 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1123 return -1; 1124 } 1125 1126 return 0; 1127 } 1128 1129 int 1130 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1131 const char *pool_name, 1132 const char *const *config, 1133 const char *rbd_name, 1134 uint32_t block_size, 1135 const char *cluster_name, 1136 const struct spdk_uuid *uuid) 1137 { 1138 struct bdev_rbd *rbd; 1139 int ret; 1140 1141 if ((pool_name == NULL) || (rbd_name == NULL)) { 1142 return -EINVAL; 1143 } 1144 1145 rbd = calloc(1, sizeof(struct bdev_rbd)); 1146 if (rbd == NULL) { 1147 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1148 return -ENOMEM; 1149 } 1150 1151 ret = pthread_mutex_init(&rbd->mutex, NULL); 1152 if (ret) { 1153 SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name); 1154 free(rbd); 1155 return ret; 1156 } 1157 1158 rbd->rbd_name = strdup(rbd_name); 1159 if (!rbd->rbd_name) { 1160 bdev_rbd_free(rbd); 1161 return -ENOMEM; 1162 } 1163 1164 if (user_id) { 1165 rbd->user_id = strdup(user_id); 1166 if (!rbd->user_id) { 1167 bdev_rbd_free(rbd); 1168 return -ENOMEM; 1169 } 1170 } 1171 1172 if (cluster_name) { 1173 rbd->cluster_name = strdup(cluster_name); 1174 if (!rbd->cluster_name) { 1175 bdev_rbd_free(rbd); 1176 return -ENOMEM; 1177 } 1178 } 1179 rbd->pool_name = strdup(pool_name); 1180 if (!rbd->pool_name) { 1181 bdev_rbd_free(rbd); 1182 return -ENOMEM; 1183 } 1184 1185 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1186 bdev_rbd_free(rbd); 1187 return -ENOMEM; 1188 } 1189 1190 ret = bdev_rbd_init(rbd); 1191 if (ret < 0) { 1192 bdev_rbd_free(rbd); 1193 SPDK_ERRLOG("Failed to init rbd device\n"); 1194 return ret; 1195 } 1196 1197 if (uuid) { 1198 rbd->disk.uuid = *uuid; 1199 } else { 1200 spdk_uuid_generate(&rbd->disk.uuid); 1201 } 1202 1203 if (name) { 1204 rbd->disk.name = strdup(name); 1205 } else { 1206 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1207 } 1208 if (!rbd->disk.name) { 1209 bdev_rbd_free(rbd); 1210 return -ENOMEM; 1211 } 1212 rbd->disk.product_name = "Ceph Rbd Disk"; 1213 bdev_rbd_count++; 1214 1215 rbd->disk.write_cache = 0; 1216 rbd->disk.blocklen = block_size; 1217 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1218 rbd->disk.ctxt = rbd; 1219 rbd->disk.fn_table = &rbd_fn_table; 1220 rbd->disk.module = &rbd_if; 1221 1222 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1223 1224 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1225 bdev_rbd_destroy_cb, 1226 sizeof(struct bdev_rbd_io_channel), 1227 rbd_name); 1228 ret = spdk_bdev_register(&rbd->disk); 1229 if (ret) { 1230 spdk_io_device_unregister(rbd, NULL); 1231 bdev_rbd_free(rbd); 1232 return ret; 1233 } 1234 1235 *bdev = &(rbd->disk); 1236 1237 return ret; 1238 } 1239 1240 void 1241 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1242 { 1243 if (!bdev || bdev->module != &rbd_if) { 1244 cb_fn(cb_arg, -ENODEV); 1245 return; 1246 } 1247 1248 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 1249 } 1250 1251 int 1252 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 1253 { 1254 struct spdk_io_channel *ch; 1255 struct bdev_rbd_io_channel *rbd_io_ch; 1256 int rc; 1257 uint64_t new_size_in_byte; 1258 uint64_t current_size_in_mb; 1259 1260 if (bdev->module != &rbd_if) { 1261 return -EINVAL; 1262 } 1263 1264 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1265 if (current_size_in_mb > new_size_in_mb) { 1266 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 1267 return -EINVAL; 1268 } 1269 1270 ch = bdev_rbd_get_io_channel(bdev); 1271 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1272 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1273 1274 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1275 spdk_put_io_channel(ch); 1276 if (rc != 0) { 1277 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1278 return rc; 1279 } 1280 1281 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1282 if (rc != 0) { 1283 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1284 return rc; 1285 } 1286 1287 return rc; 1288 } 1289 1290 static int 1291 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1292 { 1293 return 0; 1294 } 1295 1296 static void 1297 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1298 { 1299 } 1300 1301 static int 1302 bdev_rbd_library_init(void) 1303 { 1304 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1305 0, "bdev_rbd_poll_groups"); 1306 return 0; 1307 } 1308 1309 static void 1310 bdev_rbd_library_fini(void) 1311 { 1312 spdk_io_device_unregister(&rbd_if, NULL); 1313 } 1314 1315 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1316