1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 #include "spdk/likely.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk/log.h" 53 54 #define SPDK_RBD_QUEUE_DEPTH 128 55 #define MAX_EVENTS_PER_POLL 128 56 57 static int bdev_rbd_count = 0; 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 66 rados_t cluster; 67 rados_t *cluster_p; 68 char *cluster_name; 69 70 rados_ioctx_t io_ctx; 71 rbd_image_t image; 72 int pfd; 73 74 rbd_image_info_t info; 75 pthread_mutex_t mutex; 76 struct spdk_thread *main_td; 77 uint32_t ch_count; 78 struct bdev_rbd_group_channel *group_ch; 79 80 bool deferred_free; 81 TAILQ_ENTRY(bdev_rbd) tailq; 82 struct spdk_poller *reset_timer; 83 struct spdk_bdev_io *reset_bdev_io; 84 }; 85 86 struct bdev_rbd_group_channel { 87 struct spdk_poller *poller; 88 int epoll_fd; 89 }; 90 91 struct bdev_rbd_io_channel { 92 struct bdev_rbd *disk; 93 }; 94 95 struct bdev_rbd_io { 96 struct spdk_thread *submit_td; 97 enum spdk_bdev_io_status status; 98 size_t total_len; 99 }; 100 101 struct bdev_rbd_cluster { 102 char *name; 103 char *user_id; 104 char **config_param; 105 char *config_file; 106 rados_t cluster; 107 uint32_t ref; 108 STAILQ_ENTRY(bdev_rbd_cluster) link; 109 }; 110 111 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 112 g_map_bdev_rbd_cluster); 113 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 114 115 static void 116 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 117 { 118 assert(entry != NULL); 119 120 bdev_rbd_free_config(entry->config_param); 121 free(entry->config_file); 122 free(entry->user_id); 123 free(entry->name); 124 free(entry); 125 } 126 127 static void 128 bdev_rbd_put_cluster(rados_t **cluster) 129 { 130 struct bdev_rbd_cluster *entry; 131 132 assert(cluster != NULL); 133 134 /* No need go through the map if *cluster equals to NULL */ 135 if (*cluster == NULL) { 136 return; 137 } 138 139 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 140 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 141 if (*cluster != &entry->cluster) { 142 continue; 143 } 144 145 assert(entry->ref > 0); 146 entry->ref--; 147 *cluster = NULL; 148 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 149 return; 150 } 151 152 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 153 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 154 } 155 156 static void 157 bdev_rbd_free(struct bdev_rbd *rbd) 158 { 159 if (!rbd) { 160 return; 161 } 162 163 free(rbd->disk.name); 164 free(rbd->rbd_name); 165 free(rbd->user_id); 166 free(rbd->pool_name); 167 bdev_rbd_free_config(rbd->config); 168 169 if (rbd->io_ctx) { 170 rados_ioctx_destroy(rbd->io_ctx); 171 } 172 173 if (rbd->cluster_name) { 174 bdev_rbd_put_cluster(&rbd->cluster_p); 175 free(rbd->cluster_name); 176 } else if (rbd->cluster) { 177 rados_shutdown(rbd->cluster); 178 } 179 180 pthread_mutex_destroy(&rbd->mutex); 181 free(rbd); 182 } 183 184 void 185 bdev_rbd_free_config(char **config) 186 { 187 char **entry; 188 189 if (config) { 190 for (entry = config; *entry; entry++) { 191 free(*entry); 192 } 193 free(config); 194 } 195 } 196 197 char ** 198 bdev_rbd_dup_config(const char *const *config) 199 { 200 size_t count; 201 char **copy; 202 203 if (!config) { 204 return NULL; 205 } 206 for (count = 0; config[count]; count++) {} 207 copy = calloc(count + 1, sizeof(*copy)); 208 if (!copy) { 209 return NULL; 210 } 211 for (count = 0; config[count]; count++) { 212 if (!(copy[count] = strdup(config[count]))) { 213 bdev_rbd_free_config(copy); 214 return NULL; 215 } 216 } 217 return copy; 218 } 219 220 static int 221 bdev_rados_cluster_init(const char *user_id, const char *const *config, 222 rados_t *cluster) 223 { 224 int ret; 225 226 ret = rados_create(cluster, user_id); 227 if (ret < 0) { 228 SPDK_ERRLOG("Failed to create rados_t struct\n"); 229 return -1; 230 } 231 232 if (config) { 233 const char *const *entry = config; 234 while (*entry) { 235 ret = rados_conf_set(*cluster, entry[0], entry[1]); 236 if (ret < 0) { 237 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 238 rados_shutdown(*cluster); 239 return -1; 240 } 241 entry += 2; 242 } 243 } else { 244 ret = rados_conf_read_file(*cluster, NULL); 245 if (ret < 0) { 246 SPDK_ERRLOG("Failed to read conf file\n"); 247 rados_shutdown(*cluster); 248 return -1; 249 } 250 } 251 252 ret = rados_connect(*cluster); 253 if (ret < 0) { 254 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 255 rados_shutdown(*cluster); 256 return -1; 257 } 258 259 return 0; 260 } 261 262 static int 263 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 264 { 265 struct bdev_rbd_cluster *entry; 266 267 if (cluster == NULL) { 268 SPDK_ERRLOG("cluster should not be NULL\n"); 269 return -1; 270 } 271 272 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 273 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 274 if (strcmp(cluster_name, entry->name) == 0) { 275 entry->ref++; 276 *cluster = &entry->cluster; 277 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 278 return 0; 279 } 280 } 281 282 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 283 return -1; 284 } 285 286 static int 287 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 288 { 289 int ret; 290 291 ret = bdev_rbd_get_cluster(cluster_name, cluster); 292 if (ret < 0) { 293 SPDK_ERRLOG("Failed to create rados_t struct\n"); 294 return -1; 295 } 296 297 return ret; 298 } 299 300 static void * 301 bdev_rbd_cluster_handle(void *arg) 302 { 303 void *ret = arg; 304 struct bdev_rbd *rbd = arg; 305 int rc; 306 307 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 308 &rbd->cluster); 309 if (rc < 0) { 310 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 311 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 312 ret = NULL; 313 } 314 315 return ret; 316 } 317 318 static void * 319 bdev_rbd_init_context(void *arg) 320 { 321 struct bdev_rbd *rbd = arg; 322 int rc; 323 324 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) { 325 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd); 326 return NULL; 327 } 328 329 rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL); 330 if (rc < 0) { 331 SPDK_ERRLOG("Failed to open specified rbd device\n"); 332 return NULL; 333 } 334 335 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info)); 336 rbd_close(rbd->image); 337 if (rc < 0) { 338 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 339 return NULL; 340 } 341 342 return arg; 343 } 344 345 static int 346 bdev_rbd_init(struct bdev_rbd *rbd) 347 { 348 int ret = 0; 349 350 if (!rbd->cluster_name) { 351 rbd->cluster_p = &rbd->cluster; 352 /* Cluster should be created in non-SPDK thread to avoid conflict between 353 * Rados and SPDK thread */ 354 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 355 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 356 return -1; 357 } 358 } else { 359 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 360 if (ret < 0) { 361 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 362 rbd, rbd->cluster_name); 363 return -1; 364 } 365 } 366 367 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) { 368 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd); 369 } 370 371 return ret; 372 } 373 374 static void 375 bdev_rbd_exit(rbd_image_t image) 376 { 377 rbd_flush(image); 378 rbd_close(image); 379 } 380 381 static void 382 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 383 { 384 /* Doing nothing here */ 385 } 386 387 static void 388 _bdev_rbd_io_complete(void *_rbd_io) 389 { 390 struct bdev_rbd_io *rbd_io = _rbd_io; 391 392 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status); 393 } 394 395 static void 396 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 397 { 398 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 399 400 rbd_io->status = status; 401 if (rbd_io->submit_td != NULL) { 402 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io); 403 } else { 404 _bdev_rbd_io_complete(rbd_io); 405 } 406 } 407 408 static void 409 bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io, 410 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 411 { 412 int ret; 413 rbd_completion_t comp; 414 struct bdev_rbd_io *rbd_io; 415 rbd_image_t image = disk->image; 416 417 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 418 &comp); 419 if (ret < 0) { 420 goto err; 421 } 422 423 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 424 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 425 rbd_io->total_len = len; 426 if (spdk_likely(iovcnt == 1)) { 427 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 428 } else { 429 ret = rbd_aio_readv(image, iov, iovcnt, offset, comp); 430 } 431 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 432 if (spdk_likely(iovcnt == 1)) { 433 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 434 } else { 435 ret = rbd_aio_writev(image, iov, iovcnt, offset, comp); 436 } 437 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 438 ret = rbd_aio_flush(image, comp); 439 } 440 441 if (ret < 0) { 442 rbd_aio_release(comp); 443 goto err; 444 } 445 446 return; 447 448 err: 449 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 450 } 451 452 static int bdev_rbd_library_init(void); 453 454 static void bdev_rbd_library_fini(void); 455 456 static int 457 bdev_rbd_get_ctx_size(void) 458 { 459 return sizeof(struct bdev_rbd_io); 460 } 461 462 static struct spdk_bdev_module rbd_if = { 463 .name = "rbd", 464 .module_init = bdev_rbd_library_init, 465 .module_fini = bdev_rbd_library_fini, 466 .get_ctx_size = bdev_rbd_get_ctx_size, 467 468 }; 469 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 470 471 static int 472 bdev_rbd_reset_timer(void *arg) 473 { 474 struct bdev_rbd *disk = arg; 475 476 /* 477 * TODO: This should check if any I/O is still in flight before completing the reset. 478 * For now, just complete after the timer expires. 479 */ 480 bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 481 spdk_poller_unregister(&disk->reset_timer); 482 disk->reset_bdev_io = NULL; 483 484 return SPDK_POLLER_BUSY; 485 } 486 487 static void 488 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 489 { 490 /* 491 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 492 * timer to wait for in-flight I/O to complete. 493 */ 494 assert(disk->reset_bdev_io == NULL); 495 disk->reset_bdev_io = bdev_io; 496 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 497 } 498 499 static void 500 bdev_rbd_free_cb(void *io_device) 501 { 502 struct bdev_rbd *rbd = io_device; 503 504 assert(rbd != NULL); 505 506 pthread_mutex_lock(&rbd->mutex); 507 508 if (rbd->ch_count != 0) { 509 rbd->deferred_free = true; 510 pthread_mutex_unlock(&rbd->mutex); 511 } else { 512 pthread_mutex_unlock(&rbd->mutex); 513 bdev_rbd_free((struct bdev_rbd *)rbd); 514 } 515 } 516 517 static int 518 bdev_rbd_destruct(void *ctx) 519 { 520 struct bdev_rbd *rbd = ctx; 521 522 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 523 524 return 0; 525 } 526 527 static void 528 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 529 bool success) 530 { 531 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 532 533 if (!success) { 534 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 535 return; 536 } 537 538 bdev_rbd_start_aio(disk, 539 bdev_io, 540 bdev_io->u.bdev.iovs, 541 bdev_io->u.bdev.iovcnt, 542 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 543 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 544 } 545 546 static void 547 _bdev_rbd_submit_request(void *ctx) 548 { 549 struct spdk_bdev_io *bdev_io = ctx; 550 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 551 552 switch (bdev_io->type) { 553 case SPDK_BDEV_IO_TYPE_READ: 554 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 555 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 556 break; 557 558 case SPDK_BDEV_IO_TYPE_WRITE: 559 case SPDK_BDEV_IO_TYPE_FLUSH: 560 bdev_rbd_start_aio(disk, 561 bdev_io, 562 bdev_io->u.bdev.iovs, 563 bdev_io->u.bdev.iovcnt, 564 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 565 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 566 break; 567 568 case SPDK_BDEV_IO_TYPE_RESET: 569 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 570 bdev_io); 571 break; 572 573 default: 574 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 575 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 576 break; 577 } 578 } 579 580 static void 581 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 582 { 583 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch); 584 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 585 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt; 586 587 if (disk->main_td != submit_td) { 588 rbd_io->submit_td = submit_td; 589 spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io); 590 } else { 591 rbd_io->submit_td = NULL; 592 _bdev_rbd_submit_request(bdev_io); 593 } 594 } 595 596 static bool 597 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 598 { 599 switch (io_type) { 600 case SPDK_BDEV_IO_TYPE_READ: 601 case SPDK_BDEV_IO_TYPE_WRITE: 602 case SPDK_BDEV_IO_TYPE_FLUSH: 603 case SPDK_BDEV_IO_TYPE_RESET: 604 return true; 605 606 default: 607 return false; 608 } 609 } 610 611 static void 612 bdev_rbd_io_poll(struct bdev_rbd *disk) 613 { 614 int i, io_status, rc; 615 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 616 struct spdk_bdev_io *bdev_io; 617 struct bdev_rbd_io *rbd_io; 618 enum spdk_bdev_io_status bio_status; 619 620 rc = rbd_poll_io_events(disk->image, comps, SPDK_RBD_QUEUE_DEPTH); 621 for (i = 0; i < rc; i++) { 622 bdev_io = rbd_aio_get_arg(comps[i]); 623 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 624 io_status = rbd_aio_get_return_value(comps[i]); 625 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 626 627 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 628 if ((int)rbd_io->total_len != io_status) { 629 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 630 } 631 } else { 632 /* For others, 0 means success */ 633 if (io_status != 0) { 634 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 635 } 636 } 637 638 rbd_aio_release(comps[i]); 639 640 bdev_rbd_io_complete(bdev_io, bio_status); 641 } 642 } 643 644 static void 645 bdev_rbd_free_channel_resources(struct bdev_rbd *disk) 646 { 647 int rc; 648 649 assert(disk != NULL); 650 assert(disk->main_td == spdk_get_thread()); 651 assert(disk->ch_count == 0); 652 assert(disk->group_ch != NULL); 653 rc = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_DEL, 654 disk->pfd, NULL); 655 if (rc < 0) { 656 SPDK_ERRLOG("Failed to remove fd on disk=%p from the polling group=%p\n", 657 disk, disk->group_ch); 658 } 659 spdk_put_io_channel(spdk_io_channel_from_ctx(disk->group_ch)); 660 661 if (disk->image) { 662 bdev_rbd_exit(disk->image); 663 } 664 665 if (disk->pfd >= 0) { 666 close(disk->pfd); 667 } 668 669 disk->main_td = NULL; 670 disk->group_ch = NULL; 671 } 672 673 static void * 674 bdev_rbd_handle(void *arg) 675 { 676 struct bdev_rbd *disk = arg; 677 void *ret = arg; 678 679 if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) { 680 SPDK_ERRLOG("Failed to open specified rbd device\n"); 681 ret = NULL; 682 } 683 684 return ret; 685 } 686 687 static int 688 _bdev_rbd_create_cb(struct bdev_rbd *disk) 689 { 690 int ret; 691 struct epoll_event event = {}; 692 693 disk->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 694 assert(disk->group_ch != NULL); 695 event.events = EPOLLIN; 696 event.data.ptr = disk; 697 698 if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) { 699 goto err; 700 } 701 702 disk->pfd = eventfd(0, EFD_NONBLOCK); 703 if (disk->pfd < 0) { 704 SPDK_ERRLOG("Failed to get eventfd\n"); 705 goto err; 706 } 707 708 ret = rbd_set_image_notification(disk->image, disk->pfd, EVENT_TYPE_EVENTFD); 709 if (ret < 0) { 710 SPDK_ERRLOG("Failed to set rbd image notification\n"); 711 goto err; 712 } 713 714 ret = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_ADD, disk->pfd, &event); 715 if (ret < 0) { 716 SPDK_ERRLOG("Failed to add the fd of disk=%p to the epoll group from group_ch=%p\n", disk, 717 disk->group_ch); 718 goto err; 719 } 720 721 return 0; 722 723 err: 724 bdev_rbd_free_channel_resources(disk); 725 return -1; 726 } 727 728 static int 729 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 730 { 731 struct bdev_rbd_io_channel *ch = ctx_buf; 732 struct bdev_rbd *disk = io_device; 733 int rc; 734 735 ch->disk = disk; 736 pthread_mutex_lock(&disk->mutex); 737 if (disk->ch_count == 0) { 738 assert(disk->main_td == NULL); 739 rc = _bdev_rbd_create_cb(disk); 740 if (rc) { 741 SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk); 742 pthread_mutex_unlock(&disk->mutex); 743 return rc; 744 } 745 746 disk->main_td = spdk_get_thread(); 747 } 748 749 disk->ch_count++; 750 pthread_mutex_unlock(&disk->mutex); 751 752 return 0; 753 } 754 755 static void 756 _bdev_rbd_destroy_cb(void *ctx) 757 { 758 struct bdev_rbd *disk = ctx; 759 bool deferred_free; 760 761 pthread_mutex_lock(&disk->mutex); 762 assert(disk->ch_count > 0); 763 disk->ch_count--; 764 765 if (disk->ch_count > 0) { 766 /* A new channel was created between when message was sent and this function executed */ 767 pthread_mutex_unlock(&disk->mutex); 768 return; 769 } 770 771 bdev_rbd_free_channel_resources(disk); 772 773 deferred_free = disk->deferred_free; 774 pthread_mutex_unlock(&disk->mutex); 775 776 /* Need to free rbd structure if there is deferred_free case 777 * by the bdev_rbd_destruct function */ 778 if (deferred_free) { 779 bdev_rbd_free(disk); 780 } 781 } 782 783 static void 784 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 785 { 786 struct bdev_rbd *disk = io_device; 787 struct spdk_thread *thread; 788 789 pthread_mutex_lock(&disk->mutex); 790 assert(disk->ch_count > 0); 791 disk->ch_count--; 792 if (disk->ch_count == 0) { 793 assert(disk->main_td != NULL); 794 if (disk->main_td != spdk_get_thread()) { 795 /* The final channel was destroyed on a different thread 796 * than where the first channel was created. Pass a message 797 * to the main thread to unregister the poller. */ 798 disk->ch_count++; 799 thread = disk->main_td; 800 pthread_mutex_unlock(&disk->mutex); 801 spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk); 802 return; 803 } 804 805 bdev_rbd_free_channel_resources(disk); 806 } 807 pthread_mutex_unlock(&disk->mutex); 808 } 809 810 static struct spdk_io_channel * 811 bdev_rbd_get_io_channel(void *ctx) 812 { 813 struct bdev_rbd *rbd_bdev = ctx; 814 815 return spdk_get_io_channel(rbd_bdev); 816 } 817 818 static void 819 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 820 { 821 struct bdev_rbd_cluster *entry; 822 823 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 824 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 825 if (strcmp(cluster_name, entry->name)) { 826 continue; 827 } 828 if (entry->user_id) { 829 spdk_json_write_named_string(w, "user_id", entry->user_id); 830 } 831 832 if (entry->config_param) { 833 char **config_entry = entry->config_param; 834 835 spdk_json_write_named_object_begin(w, "config_param"); 836 while (*config_entry) { 837 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 838 config_entry += 2; 839 } 840 spdk_json_write_object_end(w); 841 } else if (entry->config_file) { 842 spdk_json_write_named_string(w, "config_file", entry->config_file); 843 } 844 845 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 846 return; 847 } 848 849 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 850 } 851 852 static int 853 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 854 { 855 struct bdev_rbd *rbd_bdev = ctx; 856 857 spdk_json_write_named_object_begin(w, "rbd"); 858 859 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 860 861 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 862 863 if (rbd_bdev->cluster_name) { 864 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 865 goto end; 866 } 867 868 if (rbd_bdev->user_id) { 869 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 870 } 871 872 if (rbd_bdev->config) { 873 char **entry = rbd_bdev->config; 874 875 spdk_json_write_named_object_begin(w, "config"); 876 while (*entry) { 877 spdk_json_write_named_string(w, entry[0], entry[1]); 878 entry += 2; 879 } 880 spdk_json_write_object_end(w); 881 } 882 883 end: 884 spdk_json_write_object_end(w); 885 886 return 0; 887 } 888 889 static void 890 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 891 { 892 struct bdev_rbd *rbd = bdev->ctxt; 893 894 spdk_json_write_object_begin(w); 895 896 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 897 898 spdk_json_write_named_object_begin(w, "params"); 899 spdk_json_write_named_string(w, "name", bdev->name); 900 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 901 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 902 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 903 if (rbd->user_id) { 904 spdk_json_write_named_string(w, "user_id", rbd->user_id); 905 } 906 907 if (rbd->config) { 908 char **entry = rbd->config; 909 910 spdk_json_write_named_object_begin(w, "config"); 911 while (*entry) { 912 spdk_json_write_named_string(w, entry[0], entry[1]); 913 entry += 2; 914 } 915 spdk_json_write_object_end(w); 916 } 917 918 spdk_json_write_object_end(w); 919 920 spdk_json_write_object_end(w); 921 } 922 923 static void 924 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 925 { 926 assert(entry != NULL); 927 928 spdk_json_write_object_begin(w); 929 spdk_json_write_named_string(w, "cluster_name", entry->name); 930 931 if (entry->user_id) { 932 spdk_json_write_named_string(w, "user_id", entry->user_id); 933 } 934 935 if (entry->config_param) { 936 char **config_entry = entry->config_param; 937 938 spdk_json_write_named_object_begin(w, "config_param"); 939 while (*config_entry) { 940 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 941 config_entry += 2; 942 } 943 spdk_json_write_object_end(w); 944 } else if (entry->config_file) { 945 spdk_json_write_named_string(w, "config_file", entry->config_file); 946 } 947 948 spdk_json_write_object_end(w); 949 } 950 951 int 952 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 953 { 954 struct bdev_rbd_cluster *entry; 955 struct spdk_json_write_ctx *w; 956 957 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 958 959 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 960 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 961 return -ENOENT; 962 } 963 964 /* If cluster name is provided */ 965 if (name) { 966 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 967 if (strcmp(name, entry->name) == 0) { 968 w = spdk_jsonrpc_begin_result(request); 969 dump_single_cluster_entry(entry, w); 970 spdk_jsonrpc_end_result(request, w); 971 972 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 973 return 0; 974 } 975 } 976 977 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 978 return -ENOENT; 979 } 980 981 w = spdk_jsonrpc_begin_result(request); 982 spdk_json_write_array_begin(w); 983 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 984 dump_single_cluster_entry(entry, w); 985 } 986 spdk_json_write_array_end(w); 987 spdk_jsonrpc_end_result(request, w); 988 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 989 990 return 0; 991 } 992 993 static const struct spdk_bdev_fn_table rbd_fn_table = { 994 .destruct = bdev_rbd_destruct, 995 .submit_request = bdev_rbd_submit_request, 996 .io_type_supported = bdev_rbd_io_type_supported, 997 .get_io_channel = bdev_rbd_get_io_channel, 998 .dump_info_json = bdev_rbd_dump_info_json, 999 .write_config_json = bdev_rbd_write_config_json, 1000 }; 1001 1002 static int 1003 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 1004 const char *config_file) 1005 { 1006 struct bdev_rbd_cluster *entry; 1007 int rc; 1008 1009 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1010 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1011 if (strcmp(name, entry->name) == 0) { 1012 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 1013 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1014 return -1; 1015 } 1016 } 1017 1018 entry = calloc(1, sizeof(*entry)); 1019 if (!entry) { 1020 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 1021 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1022 return -1; 1023 } 1024 1025 entry->name = strdup(name); 1026 if (entry->name == NULL) { 1027 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 1028 goto err_handle; 1029 } 1030 1031 if (user_id) { 1032 entry->user_id = strdup(user_id); 1033 if (entry->user_id == NULL) { 1034 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 1035 goto err_handle; 1036 } 1037 } 1038 1039 /* The first priority is the config_param, then we use the config_file */ 1040 if (config_param) { 1041 entry->config_param = bdev_rbd_dup_config(config_param); 1042 if (entry->config_param == NULL) { 1043 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 1044 goto err_handle; 1045 } 1046 } else if (config_file) { 1047 entry->config_file = strdup(config_file); 1048 if (entry->config_file == NULL) { 1049 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 1050 goto err_handle; 1051 } 1052 } 1053 1054 rc = rados_create(&entry->cluster, user_id); 1055 if (rc < 0) { 1056 SPDK_ERRLOG("Failed to create rados_t struct\n"); 1057 goto err_handle; 1058 } 1059 1060 if (config_param) { 1061 const char *const *config_entry = config_param; 1062 while (*config_entry) { 1063 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 1064 if (rc < 0) { 1065 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 1066 rados_shutdown(entry->cluster); 1067 goto err_handle; 1068 } 1069 config_entry += 2; 1070 } 1071 } else { 1072 rc = rados_conf_read_file(entry->cluster, entry->config_file); 1073 if (rc < 0) { 1074 SPDK_ERRLOG("Failed to read conf file\n"); 1075 rados_shutdown(entry->cluster); 1076 goto err_handle; 1077 } 1078 } 1079 1080 rc = rados_connect(entry->cluster); 1081 if (rc < 0) { 1082 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 1083 rados_shutdown(entry->cluster); 1084 goto err_handle; 1085 } 1086 1087 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 1088 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1089 1090 return 0; 1091 1092 err_handle: 1093 bdev_rbd_cluster_free(entry); 1094 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1095 return -1; 1096 } 1097 1098 int 1099 bdev_rbd_unregister_cluster(const char *name) 1100 { 1101 struct bdev_rbd_cluster *entry; 1102 int rc = 0; 1103 1104 if (name == NULL) { 1105 return -1; 1106 } 1107 1108 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 1109 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 1110 if (strcmp(name, entry->name) == 0) { 1111 if (entry->ref == 0) { 1112 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 1113 rados_shutdown(entry->cluster); 1114 bdev_rbd_cluster_free(entry); 1115 } else { 1116 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 1117 entry->name); 1118 rc = -1; 1119 } 1120 1121 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1122 return rc; 1123 } 1124 } 1125 1126 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 1127 1128 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 1129 1130 return -1; 1131 } 1132 1133 static void * 1134 _bdev_rbd_register_cluster(void *arg) 1135 { 1136 struct cluster_register_info *info = arg; 1137 void *ret = arg; 1138 int rc; 1139 1140 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1141 (const char *const *)info->config_param, (const char *)info->config_file); 1142 if (rc) { 1143 ret = NULL; 1144 } 1145 1146 return ret; 1147 } 1148 1149 int 1150 bdev_rbd_register_cluster(struct cluster_register_info *info) 1151 { 1152 assert(info != NULL); 1153 1154 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1155 * resource contention */ 1156 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1157 return -1; 1158 } 1159 1160 return 0; 1161 } 1162 1163 int 1164 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1165 const char *pool_name, 1166 const char *const *config, 1167 const char *rbd_name, 1168 uint32_t block_size, 1169 const char *cluster_name) 1170 { 1171 struct bdev_rbd *rbd; 1172 int ret; 1173 1174 if ((pool_name == NULL) || (rbd_name == NULL)) { 1175 return -EINVAL; 1176 } 1177 1178 rbd = calloc(1, sizeof(struct bdev_rbd)); 1179 if (rbd == NULL) { 1180 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1181 return -ENOMEM; 1182 } 1183 1184 ret = pthread_mutex_init(&rbd->mutex, NULL); 1185 if (ret) { 1186 SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name); 1187 free(rbd); 1188 return ret; 1189 } 1190 1191 rbd->pfd = -1; 1192 rbd->rbd_name = strdup(rbd_name); 1193 if (!rbd->rbd_name) { 1194 bdev_rbd_free(rbd); 1195 return -ENOMEM; 1196 } 1197 1198 if (user_id) { 1199 rbd->user_id = strdup(user_id); 1200 if (!rbd->user_id) { 1201 bdev_rbd_free(rbd); 1202 return -ENOMEM; 1203 } 1204 } 1205 1206 if (cluster_name) { 1207 rbd->cluster_name = strdup(cluster_name); 1208 if (!rbd->cluster_name) { 1209 bdev_rbd_free(rbd); 1210 return -ENOMEM; 1211 } 1212 } 1213 rbd->pool_name = strdup(pool_name); 1214 if (!rbd->pool_name) { 1215 bdev_rbd_free(rbd); 1216 return -ENOMEM; 1217 } 1218 1219 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1220 bdev_rbd_free(rbd); 1221 return -ENOMEM; 1222 } 1223 1224 ret = bdev_rbd_init(rbd); 1225 if (ret < 0) { 1226 bdev_rbd_free(rbd); 1227 SPDK_ERRLOG("Failed to init rbd device\n"); 1228 return ret; 1229 } 1230 1231 if (name) { 1232 rbd->disk.name = strdup(name); 1233 } else { 1234 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1235 } 1236 if (!rbd->disk.name) { 1237 bdev_rbd_free(rbd); 1238 return -ENOMEM; 1239 } 1240 rbd->disk.product_name = "Ceph Rbd Disk"; 1241 bdev_rbd_count++; 1242 1243 rbd->disk.write_cache = 0; 1244 rbd->disk.blocklen = block_size; 1245 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1246 rbd->disk.ctxt = rbd; 1247 rbd->disk.fn_table = &rbd_fn_table; 1248 rbd->disk.module = &rbd_if; 1249 1250 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1251 1252 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1253 bdev_rbd_destroy_cb, 1254 sizeof(struct bdev_rbd_io_channel), 1255 rbd_name); 1256 ret = spdk_bdev_register(&rbd->disk); 1257 if (ret) { 1258 spdk_io_device_unregister(rbd, NULL); 1259 bdev_rbd_free(rbd); 1260 return ret; 1261 } 1262 1263 *bdev = &(rbd->disk); 1264 1265 return ret; 1266 } 1267 1268 void 1269 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1270 { 1271 if (!bdev || bdev->module != &rbd_if) { 1272 cb_fn(cb_arg, -ENODEV); 1273 return; 1274 } 1275 1276 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 1277 } 1278 1279 int 1280 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 1281 { 1282 struct spdk_io_channel *ch; 1283 struct bdev_rbd_io_channel *rbd_io_ch; 1284 int rc; 1285 uint64_t new_size_in_byte; 1286 uint64_t current_size_in_mb; 1287 1288 if (bdev->module != &rbd_if) { 1289 return -EINVAL; 1290 } 1291 1292 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1293 if (current_size_in_mb > new_size_in_mb) { 1294 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 1295 return -EINVAL; 1296 } 1297 1298 ch = bdev_rbd_get_io_channel(bdev); 1299 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1300 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1301 1302 rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte); 1303 spdk_put_io_channel(ch); 1304 if (rc != 0) { 1305 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1306 return rc; 1307 } 1308 1309 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1310 if (rc != 0) { 1311 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1312 return rc; 1313 } 1314 1315 return rc; 1316 } 1317 1318 static int 1319 bdev_rbd_group_poll(void *arg) 1320 { 1321 struct bdev_rbd_group_channel *group_ch = arg; 1322 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1323 int num_events, i; 1324 1325 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 1326 1327 if (num_events <= 0) { 1328 return SPDK_POLLER_IDLE; 1329 } 1330 1331 for (i = 0; i < num_events; i++) { 1332 bdev_rbd_io_poll((struct bdev_rbd *)events[i].data.ptr); 1333 } 1334 1335 return SPDK_POLLER_BUSY; 1336 } 1337 1338 static int 1339 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1340 { 1341 struct bdev_rbd_group_channel *ch = ctx_buf; 1342 1343 ch->epoll_fd = epoll_create1(0); 1344 if (ch->epoll_fd < 0) { 1345 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 1346 return -1; 1347 } 1348 1349 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0); 1350 1351 return 0; 1352 } 1353 1354 static void 1355 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1356 { 1357 struct bdev_rbd_group_channel *ch = ctx_buf; 1358 1359 if (ch->epoll_fd >= 0) { 1360 close(ch->epoll_fd); 1361 } 1362 1363 spdk_poller_unregister(&ch->poller); 1364 } 1365 1366 static int 1367 bdev_rbd_library_init(void) 1368 { 1369 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1370 sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups"); 1371 1372 return 0; 1373 } 1374 1375 static void 1376 bdev_rbd_library_fini(void) 1377 { 1378 spdk_io_device_unregister(&rbd_if, NULL); 1379 } 1380 1381 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1382