1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 41 #include "spdk/env.h" 42 #include "spdk/bdev.h" 43 #include "spdk/thread.h" 44 #include "spdk/json.h" 45 #include "spdk/string.h" 46 #include "spdk/util.h" 47 #include "spdk/likely.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk/log.h" 51 52 static int bdev_rbd_count = 0; 53 54 struct bdev_rbd { 55 struct spdk_bdev disk; 56 char *rbd_name; 57 char *user_id; 58 char *pool_name; 59 char **config; 60 61 rados_t cluster; 62 rados_t *cluster_p; 63 char *cluster_name; 64 65 rados_ioctx_t io_ctx; 66 rbd_image_t image; 67 68 rbd_image_info_t info; 69 pthread_mutex_t mutex; 70 struct spdk_thread *main_td; 71 struct spdk_thread *destruct_td; 72 uint32_t ch_count; 73 struct spdk_io_channel *group_ch; 74 75 TAILQ_ENTRY(bdev_rbd) tailq; 76 struct spdk_poller *reset_timer; 77 struct spdk_bdev_io *reset_bdev_io; 78 }; 79 80 struct bdev_rbd_io_channel { 81 struct bdev_rbd *disk; 82 }; 83 84 struct bdev_rbd_io { 85 struct spdk_thread *submit_td; 86 enum spdk_bdev_io_status status; 87 rbd_completion_t comp; 88 size_t total_len; 89 }; 90 91 struct bdev_rbd_cluster { 92 char *name; 93 char *user_id; 94 char **config_param; 95 char *config_file; 96 char *key_file; 97 rados_t cluster; 98 uint32_t ref; 99 STAILQ_ENTRY(bdev_rbd_cluster) link; 100 }; 101 102 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 103 g_map_bdev_rbd_cluster); 104 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 105 106 static void 107 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 108 { 109 assert(entry != NULL); 110 111 bdev_rbd_free_config(entry->config_param); 112 free(entry->config_file); 113 free(entry->key_file); 114 free(entry->user_id); 115 free(entry->name); 116 free(entry); 117 } 118 119 static void 120 bdev_rbd_put_cluster(rados_t **cluster) 121 { 122 struct bdev_rbd_cluster *entry; 123 124 assert(cluster != NULL); 125 126 /* No need go through the map if *cluster equals to NULL */ 127 if (*cluster == NULL) { 128 return; 129 } 130 131 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 132 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 133 if (*cluster != &entry->cluster) { 134 continue; 135 } 136 137 assert(entry->ref > 0); 138 entry->ref--; 139 *cluster = NULL; 140 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 141 return; 142 } 143 144 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 145 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 146 } 147 148 static void 149 bdev_rbd_free(struct bdev_rbd *rbd) 150 { 151 if (!rbd) { 152 return; 153 } 154 155 free(rbd->disk.name); 156 free(rbd->rbd_name); 157 free(rbd->user_id); 158 free(rbd->pool_name); 159 bdev_rbd_free_config(rbd->config); 160 161 if (rbd->io_ctx) { 162 rados_ioctx_destroy(rbd->io_ctx); 163 } 164 165 if (rbd->cluster_name) { 166 bdev_rbd_put_cluster(&rbd->cluster_p); 167 free(rbd->cluster_name); 168 } else if (rbd->cluster) { 169 rados_shutdown(rbd->cluster); 170 } 171 172 pthread_mutex_destroy(&rbd->mutex); 173 free(rbd); 174 } 175 176 void 177 bdev_rbd_free_config(char **config) 178 { 179 char **entry; 180 181 if (config) { 182 for (entry = config; *entry; entry++) { 183 free(*entry); 184 } 185 free(config); 186 } 187 } 188 189 char ** 190 bdev_rbd_dup_config(const char *const *config) 191 { 192 size_t count; 193 char **copy; 194 195 if (!config) { 196 return NULL; 197 } 198 for (count = 0; config[count]; count++) {} 199 copy = calloc(count + 1, sizeof(*copy)); 200 if (!copy) { 201 return NULL; 202 } 203 for (count = 0; config[count]; count++) { 204 if (!(copy[count] = strdup(config[count]))) { 205 bdev_rbd_free_config(copy); 206 return NULL; 207 } 208 } 209 return copy; 210 } 211 212 static int 213 bdev_rados_cluster_init(const char *user_id, const char *const *config, 214 rados_t *cluster) 215 { 216 int ret; 217 218 ret = rados_create(cluster, user_id); 219 if (ret < 0) { 220 SPDK_ERRLOG("Failed to create rados_t struct\n"); 221 return -1; 222 } 223 224 if (config) { 225 const char *const *entry = config; 226 while (*entry) { 227 ret = rados_conf_set(*cluster, entry[0], entry[1]); 228 if (ret < 0) { 229 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 230 rados_shutdown(*cluster); 231 return -1; 232 } 233 entry += 2; 234 } 235 } else { 236 ret = rados_conf_read_file(*cluster, NULL); 237 if (ret < 0) { 238 SPDK_ERRLOG("Failed to read conf file\n"); 239 rados_shutdown(*cluster); 240 return -1; 241 } 242 } 243 244 ret = rados_connect(*cluster); 245 if (ret < 0) { 246 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 247 rados_shutdown(*cluster); 248 return -1; 249 } 250 251 return 0; 252 } 253 254 static int 255 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 256 { 257 struct bdev_rbd_cluster *entry; 258 259 if (cluster == NULL) { 260 SPDK_ERRLOG("cluster should not be NULL\n"); 261 return -1; 262 } 263 264 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 265 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 266 if (strcmp(cluster_name, entry->name) == 0) { 267 entry->ref++; 268 *cluster = &entry->cluster; 269 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 270 return 0; 271 } 272 } 273 274 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 275 return -1; 276 } 277 278 static int 279 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 280 { 281 int ret; 282 283 ret = bdev_rbd_get_cluster(cluster_name, cluster); 284 if (ret < 0) { 285 SPDK_ERRLOG("Failed to create rados_t struct\n"); 286 return -1; 287 } 288 289 return ret; 290 } 291 292 static void * 293 bdev_rbd_cluster_handle(void *arg) 294 { 295 void *ret = arg; 296 struct bdev_rbd *rbd = arg; 297 int rc; 298 299 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 300 &rbd->cluster); 301 if (rc < 0) { 302 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 303 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 304 ret = NULL; 305 } 306 307 return ret; 308 } 309 310 static void * 311 bdev_rbd_init_context(void *arg) 312 { 313 struct bdev_rbd *rbd = arg; 314 int rc; 315 316 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 317 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 318 return NULL; 319 } 320 321 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 322 if (rc < 0) { 323 SPDK_ERRLOG("Failed to open specified rbd device\n"); 324 return NULL; 325 } 326 327 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 328 rbd_close(rbd->image); 329 if (rc < 0) { 330 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 331 return NULL; 332 } 333 334 return arg; 335 } 336 337 static int 338 bdev_rbd_init(struct bdev_rbd *rbd) 339 { 340 int ret = 0; 341 342 if (!rbd->cluster_name) { 343 rbd->cluster_p = &rbd->cluster; 344 /* Cluster should be created in non-SPDK thread to avoid conflict between 345 * Rados and SPDK thread */ 346 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 347 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 348 return -1; 349 } 350 } else { 351 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 352 if (ret < 0) { 353 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 354 rbd, rbd->cluster_name); 355 return -1; 356 } 357 } 358 359 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 360 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 361 } 362 363 return ret; 364 } 365 366 static void 367 bdev_rbd_exit(rbd_image_t image) 368 { 369 rbd_flush(image); 370 rbd_close(image); 371 } 372 373 static void 374 _bdev_rbd_io_complete(void *_rbd_io) 375 { 376 struct bdev_rbd_io *rbd_io = _rbd_io; 377 378 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 379 } 380 381 static void 382 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 383 { 384 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 385 struct spdk_thread *current_thread = spdk_get_thread(); 386 387 rbd_io->status = status; 388 assert(rbd_io->submit_td != NULL); 389 if (rbd_io->submit_td != current_thread) { 390 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 391 } else { 392 _bdev_rbd_io_complete(rbd_io); 393 } 394 } 395 396 static void 397 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 398 { 399 int io_status; 400 struct spdk_bdev_io *bdev_io; 401 struct bdev_rbd_io *rbd_io; 402 enum spdk_bdev_io_status bio_status; 403 404 bdev_io = rbd_aio_get_arg(cb); 405 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 406 io_status = rbd_aio_get_return_value(cb); 407 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 408 409 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 410 if ((int)rbd_io->total_len != io_status) { 411 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 412 } 413 } else { 414 /* For others, 0 means success */ 415 if (io_status != 0) { 416 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 417 } 418 } 419 420 rbd_aio_release(cb); 421 422 bdev_rbd_io_complete(bdev_io, bio_status); 423 } 424 425 static void 426 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 427 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 428 { 429 int ret; 430 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 431 rbd_image_t image = disk->image; 432 433 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 434 &rbd_io->comp); 435 if (ret < 0) { 436 goto err; 437 } 438 439 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 440 rbd_io->total_len = len; 441 if (spdk_likely(iovcnt == 1)) { 442 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 443 } else { 444 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp); 445 } 446 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 447 if (spdk_likely(iovcnt == 1)) { 448 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, rbd_io->comp); 449 } else { 450 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp); 451 } 452 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) { 453 ret = rbd_aio_discard(image, offset, len, rbd_io->comp); 454 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 455 ret = rbd_aio_flush(image, rbd_io->comp); 456 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) { 457 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0, /* op_flags */ 0); 458 } 459 460 if (ret < 0) { 461 rbd_aio_release(rbd_io->comp); 462 goto err; 463 } 464 465 return; 466 467 err: 468 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 469 } 470 471 static int bdev_rbd_library_init(void); 472 static void bdev_rbd_library_fini(void); 473 474 static int 475 bdev_rbd_get_ctx_size(void) 476 { 477 return sizeof(struct bdev_rbd_io); 478 } 479 480 static struct spdk_bdev_module rbd_if = { 481 .name = "rbd", 482 .module_init = bdev_rbd_library_init, 483 .module_fini = bdev_rbd_library_fini, 484 .get_ctx_size = bdev_rbd_get_ctx_size, 485 486 }; 487 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 488 489 static int 490 bdev_rbd_reset_timer(void *arg) 491 { 492 struct bdev_rbd *disk = arg; 493 494 /* 495 * TODO: This should check if any I/O is still in flight before completing the reset. 496 * For now, just complete after the timer expires. 497 */ 498 bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 499 spdk_poller_unregister(&disk->reset_timer); 500 disk->reset_bdev_io = NULL; 501 502 return SPDK_POLLER_BUSY; 503 } 504 505 static void 506 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 507 { 508 /* 509 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 510 * timer to wait for in-flight I/O to complete. 511 */ 512 assert(disk->reset_bdev_io == NULL); 513 disk->reset_bdev_io = bdev_io; 514 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 515 } 516 517 static void 518 _bdev_rbd_destruct_done(void *io_device) 519 { 520 struct bdev_rbd *rbd = io_device; 521 522 assert(rbd != NULL); 523 assert(rbd->ch_count == 0); 524 525 spdk_bdev_destruct_done(&rbd->disk, 0); 526 bdev_rbd_free(rbd); 527 } 528 529 static void 530 bdev_rbd_free_cb(void *io_device) 531 { 532 struct bdev_rbd *rbd = io_device; 533 534 /* The io device has been unregistered. Send a message back to the 535 * original thread that started the destruct operation, so that the 536 * bdev unregister callback is invoked on the same thread that started 537 * this whole process. 538 */ 539 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd); 540 } 541 542 static void 543 _bdev_rbd_destruct(void *ctx) 544 { 545 struct bdev_rbd *rbd = ctx; 546 547 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 548 } 549 550 static int 551 bdev_rbd_destruct(void *ctx) 552 { 553 struct bdev_rbd *rbd = ctx; 554 struct spdk_thread *td; 555 556 if (rbd->main_td == NULL) { 557 td = spdk_get_thread(); 558 } else { 559 td = rbd->main_td; 560 } 561 562 /* Start the destruct operation on the rbd bdev's 563 * main thread. This guarantees it will only start 564 * executing after any messages related to channel 565 * deletions have finished completing. *Always* 566 * send a message, even if this function gets called 567 * from the main thread, in case there are pending 568 * channel delete messages in flight to this thread. 569 */ 570 assert(rbd->destruct_td == NULL); 571 rbd->destruct_td = td; 572 spdk_thread_send_msg(td, _bdev_rbd_destruct, rbd); 573 574 /* Return 1 to indicate the destruct path is asynchronous. */ 575 return 1; 576 } 577 578 static void 579 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 580 bool success) 581 { 582 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 583 584 if (!success) { 585 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 586 return; 587 } 588 589 bdev_rbd_start_aio(disk, 590 bdev_io, 591 bdev_io->u.bdev.iovs, 592 bdev_io->u.bdev.iovcnt, 593 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 594 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 595 } 596 597 static void 598 _bdev_rbd_submit_request(void *ctx) 599 { 600 struct spdk_bdev_io *bdev_io = ctx; 601 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 602 603 switch (bdev_io->type) { 604 case SPDK_BDEV_IO_TYPE_READ: 605 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 606 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 607 break; 608 609 case SPDK_BDEV_IO_TYPE_WRITE: 610 case SPDK_BDEV_IO_TYPE_UNMAP: 611 case SPDK_BDEV_IO_TYPE_FLUSH: 612 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 613 bdev_rbd_start_aio(disk, 614 bdev_io, 615 bdev_io->u.bdev.iovs, 616 bdev_io->u.bdev.iovcnt, 617 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 618 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 619 break; 620 621 case SPDK_BDEV_IO_TYPE_RESET: 622 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 623 bdev_io); 624 break; 625 626 default: 627 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 628 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 629 break; 630 } 631 } 632 633 static void 634 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 635 { 636 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 637 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 638 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 639 640 rbd_io->submit_td = submit_td; 641 if (disk->main_td != submit_td) { 642 spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io); 643 } else { 644 _bdev_rbd_submit_request(bdev_io); 645 } 646 } 647 648 static bool 649 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 650 { 651 switch (io_type) { 652 case SPDK_BDEV_IO_TYPE_READ: 653 case SPDK_BDEV_IO_TYPE_WRITE: 654 case SPDK_BDEV_IO_TYPE_UNMAP: 655 case SPDK_BDEV_IO_TYPE_FLUSH: 656 case SPDK_BDEV_IO_TYPE_RESET: 657 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 658 return true; 659 660 default: 661 return false; 662 } 663 } 664 665 static void 666 bdev_rbd_free_channel_resources(struct bdev_rbd *disk) 667 { 668 assert(disk != NULL); 669 assert(disk->main_td == spdk_get_thread()); 670 assert(disk->ch_count == 0); 671 672 spdk_put_io_channel(disk->group_ch); 673 if (disk->image) { 674 bdev_rbd_exit(disk->image); 675 } 676 677 disk->main_td = NULL; 678 disk->group_ch = NULL; 679 } 680 681 static void * 682 bdev_rbd_handle(void *arg) 683 { 684 struct bdev_rbd *disk = arg; 685 void *ret = arg; 686 687 if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) { 688 SPDK_ERRLOG("Failed to open specified rbd device\n"); 689 ret = NULL; 690 } 691 692 return ret; 693 } 694 695 static int 696 _bdev_rbd_create_cb(struct bdev_rbd *disk) 697 { 698 disk->group_ch = spdk_get_io_channel(&rbd_if); 699 assert(disk->group_ch != NULL); 700 701 if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) { 702 bdev_rbd_free_channel_resources(disk); 703 return -1; 704 } 705 706 return 0; 707 } 708 709 static int 710 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 711 { 712 struct bdev_rbd_io_channel *ch = ctx_buf; 713 struct bdev_rbd *disk = io_device; 714 int rc; 715 716 ch->disk = disk; 717 pthread_mutex_lock(&disk->mutex); 718 if (disk->ch_count == 0) { 719 assert(disk->main_td == NULL); 720 rc = _bdev_rbd_create_cb(disk); 721 if (rc) { 722 SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk); 723 pthread_mutex_unlock(&disk->mutex); 724 return rc; 725 } 726 727 disk->main_td = spdk_get_thread(); 728 } 729 730 disk->ch_count++; 731 pthread_mutex_unlock(&disk->mutex); 732 733 return 0; 734 } 735 736 static void 737 _bdev_rbd_destroy_cb(void *ctx) 738 { 739 struct bdev_rbd *disk = ctx; 740 741 pthread_mutex_lock(&disk->mutex); 742 assert(disk->ch_count > 0); 743 disk->ch_count--; 744 745 if (disk->ch_count > 0) { 746 /* A new channel was created between when message was sent and this function executed */ 747 pthread_mutex_unlock(&disk->mutex); 748 return; 749 } 750 751 bdev_rbd_free_channel_resources(disk); 752 pthread_mutex_unlock(&disk->mutex); 753 } 754 755 static void 756 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 757 { 758 struct bdev_rbd *disk = io_device; 759 struct spdk_thread *thread; 760 761 pthread_mutex_lock(&disk->mutex); 762 assert(disk->ch_count > 0); 763 disk->ch_count--; 764 if (disk->ch_count == 0) { 765 assert(disk->main_td != NULL); 766 if (disk->main_td != spdk_get_thread()) { 767 /* The final channel was destroyed on a different thread 768 * than where the first channel was created. Pass a message 769 * to the main thread to unregister the poller. */ 770 disk->ch_count++; 771 thread = disk->main_td; 772 pthread_mutex_unlock(&disk->mutex); 773 spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk); 774 return; 775 } 776 777 bdev_rbd_free_channel_resources(disk); 778 } 779 pthread_mutex_unlock(&disk->mutex); 780 } 781 782 static struct spdk_io_channel * 783 bdev_rbd_get_io_channel(void *ctx) 784 { 785 struct bdev_rbd *rbd_bdev = ctx; 786 787 return spdk_get_io_channel(rbd_bdev); 788 } 789 790 static void 791 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 792 { 793 struct bdev_rbd_cluster *entry; 794 795 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 796 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 797 if (strcmp(cluster_name, entry->name)) { 798 continue; 799 } 800 if (entry->user_id) { 801 spdk_json_write_named_string(w, "user_id", entry->user_id); 802 } 803 804 if (entry->config_param) { 805 char **config_entry = entry->config_param; 806 807 spdk_json_write_named_object_begin(w, "config_param"); 808 while (*config_entry) { 809 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 810 config_entry += 2; 811 } 812 spdk_json_write_object_end(w); 813 } 814 if (entry->config_file) { 815 spdk_json_write_named_string(w, "config_file", entry->config_file); 816 } 817 if (entry->key_file) { 818 spdk_json_write_named_string(w, "key_file", entry->key_file); 819 } 820 821 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 822 return; 823 } 824 825 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 826 } 827 828 static int 829 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 830 { 831 struct bdev_rbd *rbd_bdev = ctx; 832 833 spdk_json_write_named_object_begin(w, "rbd"); 834 835 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 836 837 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 838 839 if (rbd_bdev->cluster_name) { 840 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 841 goto end; 842 } 843 844 if (rbd_bdev->user_id) { 845 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 846 } 847 848 if (rbd_bdev->config) { 849 char **entry = rbd_bdev->config; 850 851 spdk_json_write_named_object_begin(w, "config"); 852 while (*entry) { 853 spdk_json_write_named_string(w, entry[0], entry[1]); 854 entry += 2; 855 } 856 spdk_json_write_object_end(w); 857 } 858 859 end: 860 spdk_json_write_object_end(w); 861 862 return 0; 863 } 864 865 static void 866 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 867 { 868 struct bdev_rbd *rbd = bdev->ctxt; 869 char uuid_str[SPDK_UUID_STRING_LEN]; 870 871 spdk_json_write_object_begin(w); 872 873 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 874 875 spdk_json_write_named_object_begin(w, "params"); 876 spdk_json_write_named_string(w, "name", bdev->name); 877 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 878 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 879 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 880 if (rbd->user_id) { 881 spdk_json_write_named_string(w, "user_id", rbd->user_id); 882 } 883 884 if (rbd->config) { 885 char **entry = rbd->config; 886 887 spdk_json_write_named_object_begin(w, "config"); 888 while (*entry) { 889 spdk_json_write_named_string(w, entry[0], entry[1]); 890 entry += 2; 891 } 892 spdk_json_write_object_end(w); 893 } 894 895 spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid); 896 spdk_json_write_named_string(w, "uuid", uuid_str); 897 898 spdk_json_write_object_end(w); 899 900 spdk_json_write_object_end(w); 901 } 902 903 static void 904 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 905 { 906 assert(entry != NULL); 907 908 spdk_json_write_object_begin(w); 909 spdk_json_write_named_string(w, "cluster_name", entry->name); 910 911 if (entry->user_id) { 912 spdk_json_write_named_string(w, "user_id", entry->user_id); 913 } 914 915 if (entry->config_param) { 916 char **config_entry = entry->config_param; 917 918 spdk_json_write_named_object_begin(w, "config_param"); 919 while (*config_entry) { 920 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 921 config_entry += 2; 922 } 923 spdk_json_write_object_end(w); 924 } 925 if (entry->config_file) { 926 spdk_json_write_named_string(w, "config_file", entry->config_file); 927 } 928 if (entry->key_file) { 929 spdk_json_write_named_string(w, "key_file", entry->key_file); 930 } 931 932 spdk_json_write_object_end(w); 933 } 934 935 int 936 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 937 { 938 struct bdev_rbd_cluster *entry; 939 struct spdk_json_write_ctx *w; 940 941 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 942 943 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 944 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 945 return -ENOENT; 946 } 947 948 /* If cluster name is provided */ 949 if (name) { 950 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 951 if (strcmp(name, entry->name) == 0) { 952 w = spdk_jsonrpc_begin_result(request); 953 dump_single_cluster_entry(entry, w); 954 spdk_jsonrpc_end_result(request, w); 955 956 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 957 return 0; 958 } 959 } 960 961 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 962 return -ENOENT; 963 } 964 965 w = spdk_jsonrpc_begin_result(request); 966 spdk_json_write_array_begin(w); 967 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 968 dump_single_cluster_entry(entry, w); 969 } 970 spdk_json_write_array_end(w); 971 spdk_jsonrpc_end_result(request, w); 972 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 973 974 return 0; 975 } 976 977 static const struct spdk_bdev_fn_table rbd_fn_table = { 978 .destruct = bdev_rbd_destruct, 979 .submit_request = bdev_rbd_submit_request, 980 .io_type_supported = bdev_rbd_io_type_supported, 981 .get_io_channel = bdev_rbd_get_io_channel, 982 .dump_info_json = bdev_rbd_dump_info_json, 983 .write_config_json = bdev_rbd_write_config_json, 984 }; 985 986 static int 987 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 988 const char *config_file, const char *key_file) 989 { 990 struct bdev_rbd_cluster *entry; 991 int rc; 992 993 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 994 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 995 if (strcmp(name, entry->name) == 0) { 996 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 997 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 998 return -1; 999 } 1000 } 1001 1002 entry = calloc(1, sizeof(*entry)); 1003 if (!entry) { 1004 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 1005 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1006 return -1; 1007 } 1008 1009 entry->name = strdup(name); 1010 if (entry->name == NULL) { 1011 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1012 goto err_handle; 1013 } 1014 1015 if (user_id) { 1016 entry->user_id = strdup(user_id); 1017 if (entry->user_id == NULL) { 1018 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1019 goto err_handle; 1020 } 1021 } 1022 1023 /* Support specify config_param or config_file separately, or both of them. */ 1024 if (config_param) { 1025 entry->config_param = bdev_rbd_dup_config(config_param); 1026 if (entry->config_param == NULL) { 1027 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1028 goto err_handle; 1029 } 1030 } 1031 1032 if (config_file) { 1033 entry->config_file = strdup(config_file); 1034 if (entry->config_file == NULL) { 1035 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1036 goto err_handle; 1037 } 1038 } 1039 1040 if (key_file) { 1041 entry->key_file = strdup(key_file); 1042 if (entry->key_file == NULL) { 1043 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry); 1044 goto err_handle; 1045 } 1046 } 1047 1048 rc = rados_create(&entry->cluster, user_id); 1049 if (rc < 0) { 1050 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1051 goto err_handle; 1052 } 1053 1054 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */ 1055 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1056 if (entry->config_file && rc < 0) { 1057 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file); 1058 rados_shutdown(entry->cluster); 1059 goto err_handle; 1060 } 1061 1062 if (config_param) { 1063 const char *const *config_entry = config_param; 1064 while (*config_entry) { 1065 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1066 if (rc < 0) { 1067 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1068 rados_shutdown(entry->cluster); 1069 goto err_handle; 1070 } 1071 config_entry += 2; 1072 } 1073 } 1074 1075 if (key_file) { 1076 rc = rados_conf_set(entry->cluster, "keyring", key_file); 1077 if (rc < 0) { 1078 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file); 1079 rados_shutdown(entry->cluster); 1080 goto err_handle; 1081 } 1082 } 1083 1084 rc = rados_connect(entry->cluster); 1085 if (rc < 0) { 1086 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1087 rados_shutdown(entry->cluster); 1088 goto err_handle; 1089 } 1090 1091 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1092 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1093 1094 return 0; 1095 1096 err_handle: 1097 bdev_rbd_cluster_free(entry); 1098 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1099 return -1; 1100 } 1101 1102 int 1103 bdev_rbd_unregister_cluster(const char *name) 1104 { 1105 struct bdev_rbd_cluster *entry; 1106 int rc = 0; 1107 1108 if (name == NULL) { 1109 return -1; 1110 } 1111 1112 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1113 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1114 if (strcmp(name, entry->name) == 0) { 1115 if (entry->ref == 0) { 1116 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1117 rados_shutdown(entry->cluster); 1118 bdev_rbd_cluster_free(entry); 1119 } else { 1120 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1121 entry->name); 1122 rc = -1; 1123 } 1124 1125 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1126 return rc; 1127 } 1128 } 1129 1130 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1131 1132 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1133 1134 return -1; 1135 } 1136 1137 static void * 1138 _bdev_rbd_register_cluster(void *arg) 1139 { 1140 struct cluster_register_info *info = arg; 1141 void *ret = arg; 1142 int rc; 1143 1144 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1145 (const char *const *)info->config_param, (const char *)info->config_file, 1146 (const char *)info->key_file); 1147 if (rc) { 1148 ret = NULL; 1149 } 1150 1151 return ret; 1152 } 1153 1154 int 1155 bdev_rbd_register_cluster(struct cluster_register_info *info) 1156 { 1157 assert(info != NULL); 1158 1159 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1160 * resource contention */ 1161 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1162 return -1; 1163 } 1164 1165 return 0; 1166 } 1167 1168 int 1169 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1170 const char *pool_name, 1171 const char *const *config, 1172 const char *rbd_name, 1173 uint32_t block_size, 1174 const char *cluster_name, 1175 const struct spdk_uuid *uuid) 1176 { 1177 struct bdev_rbd *rbd; 1178 int ret; 1179 1180 if ((pool_name == NULL) || (rbd_name == NULL)) { 1181 return -EINVAL; 1182 } 1183 1184 rbd = calloc(1, sizeof(struct bdev_rbd)); 1185 if (rbd == NULL) { 1186 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1187 return -ENOMEM; 1188 } 1189 1190 ret = pthread_mutex_init(&rbd->mutex, NULL); 1191 if (ret) { 1192 SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name); 1193 free(rbd); 1194 return ret; 1195 } 1196 1197 rbd->rbd_name = strdup(rbd_name); 1198 if (!rbd->rbd_name) { 1199 bdev_rbd_free(rbd); 1200 return -ENOMEM; 1201 } 1202 1203 if (user_id) { 1204 rbd->user_id = strdup(user_id); 1205 if (!rbd->user_id) { 1206 bdev_rbd_free(rbd); 1207 return -ENOMEM; 1208 } 1209 } 1210 1211 if (cluster_name) { 1212 rbd->cluster_name = strdup(cluster_name); 1213 if (!rbd->cluster_name) { 1214 bdev_rbd_free(rbd); 1215 return -ENOMEM; 1216 } 1217 } 1218 rbd->pool_name = strdup(pool_name); 1219 if (!rbd->pool_name) { 1220 bdev_rbd_free(rbd); 1221 return -ENOMEM; 1222 } 1223 1224 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1225 bdev_rbd_free(rbd); 1226 return -ENOMEM; 1227 } 1228 1229 ret = bdev_rbd_init(rbd); 1230 if (ret < 0) { 1231 bdev_rbd_free(rbd); 1232 SPDK_ERRLOG("Failed to init rbd device\n"); 1233 return ret; 1234 } 1235 1236 if (uuid) { 1237 rbd->disk.uuid = *uuid; 1238 } else { 1239 spdk_uuid_generate(&rbd->disk.uuid); 1240 } 1241 1242 if (name) { 1243 rbd->disk.name = strdup(name); 1244 } else { 1245 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1246 } 1247 if (!rbd->disk.name) { 1248 bdev_rbd_free(rbd); 1249 return -ENOMEM; 1250 } 1251 rbd->disk.product_name = "Ceph Rbd Disk"; 1252 bdev_rbd_count++; 1253 1254 rbd->disk.write_cache = 0; 1255 rbd->disk.blocklen = block_size; 1256 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1257 rbd->disk.ctxt = rbd; 1258 rbd->disk.fn_table = &rbd_fn_table; 1259 rbd->disk.module = &rbd_if; 1260 1261 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1262 1263 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1264 bdev_rbd_destroy_cb, 1265 sizeof(struct bdev_rbd_io_channel), 1266 rbd_name); 1267 ret = spdk_bdev_register(&rbd->disk); 1268 if (ret) { 1269 spdk_io_device_unregister(rbd, NULL); 1270 bdev_rbd_free(rbd); 1271 return ret; 1272 } 1273 1274 *bdev = &(rbd->disk); 1275 1276 return ret; 1277 } 1278 1279 void 1280 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1281 { 1282 int rc; 1283 1284 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg); 1285 if (rc != 0) { 1286 cb_fn(cb_arg, rc); 1287 } 1288 } 1289 1290 static void 1291 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx) 1292 { 1293 } 1294 1295 int 1296 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb) 1297 { 1298 struct spdk_bdev_desc *desc; 1299 struct spdk_bdev *bdev; 1300 struct spdk_io_channel *ch; 1301 struct bdev_rbd_io_channel *rbd_io_ch; 1302 int rc = 0; 1303 uint64_t new_size_in_byte; 1304 uint64_t current_size_in_mb; 1305 1306 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc); 1307 if (rc != 0) { 1308 return rc; 1309 } 1310 1311 bdev = spdk_bdev_desc_get_bdev(desc); 1312 1313 if (bdev->module != &rbd_if) { 1314 rc = -EINVAL; 1315 goto exit; 1316 } 1317 1318 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1319 if (current_size_in_mb > new_size_in_mb) { 1320 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n"); 1321 rc = -EINVAL; 1322 goto exit; 1323 } 1324 1325 ch = bdev_rbd_get_io_channel(bdev); 1326 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1327 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1328 1329 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1330 spdk_put_io_channel(ch); 1331 if (rc != 0) { 1332 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1333 goto exit; 1334 } 1335 1336 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1337 if (rc != 0) { 1338 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1339 } 1340 1341 exit: 1342 spdk_bdev_close(desc); 1343 return rc; 1344 } 1345 1346 static int 1347 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1348 { 1349 return 0; 1350 } 1351 1352 static void 1353 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1354 { 1355 } 1356 1357 static int 1358 bdev_rbd_library_init(void) 1359 { 1360 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1361 0, "bdev_rbd_poll_groups"); 1362 return 0; 1363 } 1364 1365 static void 1366 bdev_rbd_library_fini(void) 1367 { 1368 spdk_io_device_unregister(&rbd_if, NULL); 1369 } 1370 1371 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1372