1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "bdev_rbd.h" 37 38 #include <rbd/librbd.h> 39 #include <rados/librados.h> 40 #include <sys/eventfd.h> 41 #include <sys/epoll.h> 42 43 #include "spdk/env.h" 44 #include "spdk/bdev.h" 45 #include "spdk/thread.h" 46 #include "spdk/json.h" 47 #include "spdk/string.h" 48 #include "spdk/util.h" 49 #include "spdk/likely.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk/log.h" 53 54 #define SPDK_RBD_QUEUE_DEPTH 128 55 #define MAX_EVENTS_PER_POLL 128 56 57 static int bdev_rbd_count = 0; 58 59 struct bdev_rbd { 60 struct spdk_bdev disk; 61 char *rbd_name; 62 char *user_id; 63 char *pool_name; 64 char **config; 65 rados_t cluster; 66 rados_t *cluster_p; 67 char *cluster_name; 68 rbd_image_info_t info; 69 TAILQ_ENTRY(bdev_rbd) tailq; 70 struct spdk_poller *reset_timer; 71 struct spdk_bdev_io *reset_bdev_io; 72 }; 73 74 struct bdev_rbd_group_channel { 75 struct spdk_poller *poller; 76 int epoll_fd; 77 }; 78 79 struct bdev_rbd_io_channel { 80 rados_ioctx_t io_ctx; 81 int pfd; 82 rbd_image_t image; 83 struct bdev_rbd *disk; 84 struct bdev_rbd_group_channel *group_ch; 85 }; 86 87 struct bdev_rbd_io { 88 size_t total_len; 89 }; 90 91 struct bdev_rbd_cluster { 92 char *name; 93 char *user_id; 94 char **config_param; 95 char *config_file; 96 rados_t cluster; 97 uint32_t ref; 98 STAILQ_ENTRY(bdev_rbd_cluster) link; 99 }; 100 101 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER( 102 g_map_bdev_rbd_cluster); 103 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER; 104 105 static void 106 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry) 107 { 108 assert(entry != NULL); 109 110 bdev_rbd_free_config(entry->config_param); 111 free(entry->config_file); 112 free(entry->user_id); 113 free(entry->name); 114 free(entry); 115 } 116 117 static void 118 bdev_rbd_put_cluster(rados_t **cluster) 119 { 120 struct bdev_rbd_cluster *entry; 121 122 assert(cluster != NULL); 123 124 /* No need go through the map if *cluster equals to NULL */ 125 if (*cluster == NULL) { 126 return; 127 } 128 129 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 130 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 131 if (*cluster != &entry->cluster) { 132 continue; 133 } 134 135 assert(entry->ref > 0); 136 entry->ref--; 137 *cluster = NULL; 138 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 139 return; 140 } 141 142 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 143 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster); 144 } 145 146 static void 147 bdev_rbd_free(struct bdev_rbd *rbd) 148 { 149 if (!rbd) { 150 return; 151 } 152 153 free(rbd->disk.name); 154 free(rbd->rbd_name); 155 free(rbd->user_id); 156 free(rbd->pool_name); 157 bdev_rbd_free_config(rbd->config); 158 159 if (rbd->cluster_name) { 160 bdev_rbd_put_cluster(&rbd->cluster_p); 161 free(rbd->cluster_name); 162 } else if (rbd->cluster) { 163 rados_shutdown(rbd->cluster); 164 } 165 166 free(rbd); 167 } 168 169 void 170 bdev_rbd_free_config(char **config) 171 { 172 char **entry; 173 174 if (config) { 175 for (entry = config; *entry; entry++) { 176 free(*entry); 177 } 178 free(config); 179 } 180 } 181 182 char ** 183 bdev_rbd_dup_config(const char *const *config) 184 { 185 size_t count; 186 char **copy; 187 188 if (!config) { 189 return NULL; 190 } 191 for (count = 0; config[count]; count++) {} 192 copy = calloc(count + 1, sizeof(*copy)); 193 if (!copy) { 194 return NULL; 195 } 196 for (count = 0; config[count]; count++) { 197 if (!(copy[count] = strdup(config[count]))) { 198 bdev_rbd_free_config(copy); 199 return NULL; 200 } 201 } 202 return copy; 203 } 204 205 static int 206 bdev_rados_cluster_init(const char *user_id, const char *const *config, 207 rados_t *cluster) 208 { 209 int ret; 210 211 ret = rados_create(cluster, user_id); 212 if (ret < 0) { 213 SPDK_ERRLOG("Failed to create rados_t struct\n"); 214 return -1; 215 } 216 217 if (config) { 218 const char *const *entry = config; 219 while (*entry) { 220 ret = rados_conf_set(*cluster, entry[0], entry[1]); 221 if (ret < 0) { 222 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]); 223 rados_shutdown(*cluster); 224 return -1; 225 } 226 entry += 2; 227 } 228 } else { 229 ret = rados_conf_read_file(*cluster, NULL); 230 if (ret < 0) { 231 SPDK_ERRLOG("Failed to read conf file\n"); 232 rados_shutdown(*cluster); 233 return -1; 234 } 235 } 236 237 ret = rados_connect(*cluster); 238 if (ret < 0) { 239 SPDK_ERRLOG("Failed to connect to rbd_pool\n"); 240 rados_shutdown(*cluster); 241 return -1; 242 } 243 244 return 0; 245 } 246 247 static int 248 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster) 249 { 250 struct bdev_rbd_cluster *entry; 251 252 if (cluster == NULL) { 253 SPDK_ERRLOG("cluster should not be NULL\n"); 254 return -1; 255 } 256 257 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 258 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 259 if (strcmp(cluster_name, entry->name) == 0) { 260 entry->ref++; 261 *cluster = &entry->cluster; 262 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 263 return 0; 264 } 265 } 266 267 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 268 return -1; 269 } 270 271 static int 272 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster) 273 { 274 int ret; 275 276 ret = bdev_rbd_get_cluster(cluster_name, cluster); 277 if (ret < 0) { 278 SPDK_ERRLOG("Failed to create rados_t struct\n"); 279 return -1; 280 } 281 282 return ret; 283 } 284 285 static void * 286 bdev_rbd_cluster_handle(void *arg) 287 { 288 void *ret = arg; 289 struct bdev_rbd *rbd = arg; 290 int rc; 291 292 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config, 293 &rbd->cluster); 294 if (rc < 0) { 295 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n", 296 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name); 297 ret = NULL; 298 } 299 300 return ret; 301 } 302 303 static int 304 bdev_rbd_init(struct bdev_rbd *rbd) 305 { 306 int ret = 0; 307 rados_ioctx_t io_ctx = NULL; 308 rbd_image_t image = NULL; 309 310 if (!rbd->cluster_name) { 311 rbd->cluster_p = &rbd->cluster; 312 /* Cluster should be created in non-SPDK thread to avoid conflict between 313 * Rados and SPDK thread */ 314 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) { 315 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd); 316 return -1; 317 } 318 } else { 319 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p); 320 if (ret < 0) { 321 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n", 322 rbd, rbd->cluster_name); 323 return -1; 324 } 325 } 326 327 ret = rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &io_ctx); 328 if (ret < 0) { 329 SPDK_ERRLOG("Failed to create ioctx\n"); 330 return -1; 331 } 332 333 ret = rbd_open(io_ctx, rbd->rbd_name, &image, NULL); 334 if (ret < 0) { 335 SPDK_ERRLOG("Failed to open specified rbd device\n"); 336 goto end; 337 } 338 ret = rbd_stat(image, &rbd->info, sizeof(rbd->info)); 339 rbd_close(image); 340 if (ret < 0) { 341 SPDK_ERRLOG("Failed to stat specified rbd device\n"); 342 } 343 344 end: 345 rados_ioctx_destroy(io_ctx); 346 return ret; 347 } 348 349 static void 350 bdev_rbd_exit(rbd_image_t image) 351 { 352 rbd_flush(image); 353 rbd_close(image); 354 } 355 356 static void 357 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg) 358 { 359 /* Doing nothing here */ 360 } 361 362 static void 363 bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 364 struct iovec *iov, int iovcnt, uint64_t offset, size_t len) 365 { 366 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch); 367 int ret; 368 rbd_completion_t comp; 369 struct bdev_rbd_io *rbd_io; 370 rbd_image_t image = rbdio_ch->image; 371 372 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb, 373 &comp); 374 if (ret < 0) { 375 goto err; 376 } 377 378 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 379 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 380 rbd_io->total_len = len; 381 if (spdk_likely(iovcnt == 1)) { 382 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 383 } else { 384 ret = rbd_aio_readv(image, iov, iovcnt, offset, comp); 385 } 386 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 387 if (spdk_likely(iovcnt == 1)) { 388 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base, comp); 389 } else { 390 ret = rbd_aio_writev(image, iov, iovcnt, offset, comp); 391 } 392 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) { 393 ret = rbd_aio_flush(image, comp); 394 } 395 396 if (ret < 0) { 397 rbd_aio_release(comp); 398 goto err; 399 } 400 401 return; 402 403 err: 404 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 405 } 406 407 static int bdev_rbd_library_init(void); 408 409 static void bdev_rbd_library_fini(void); 410 411 static int 412 bdev_rbd_get_ctx_size(void) 413 { 414 return sizeof(struct bdev_rbd_io); 415 } 416 417 static struct spdk_bdev_module rbd_if = { 418 .name = "rbd", 419 .module_init = bdev_rbd_library_init, 420 .module_fini = bdev_rbd_library_fini, 421 .get_ctx_size = bdev_rbd_get_ctx_size, 422 423 }; 424 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if) 425 426 static int 427 bdev_rbd_reset_timer(void *arg) 428 { 429 struct bdev_rbd *disk = arg; 430 431 /* 432 * TODO: This should check if any I/O is still in flight before completing the reset. 433 * For now, just complete after the timer expires. 434 */ 435 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 436 spdk_poller_unregister(&disk->reset_timer); 437 disk->reset_bdev_io = NULL; 438 439 return SPDK_POLLER_BUSY; 440 } 441 442 static void 443 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io) 444 { 445 /* 446 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a 447 * timer to wait for in-flight I/O to complete. 448 */ 449 assert(disk->reset_bdev_io == NULL); 450 disk->reset_bdev_io = bdev_io; 451 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000); 452 } 453 454 static void 455 bdev_rbd_free_cb(void *io_device) 456 { 457 struct bdev_rbd *rbd = io_device; 458 459 assert(rbd != NULL); 460 461 bdev_rbd_free((struct bdev_rbd *)rbd); 462 } 463 464 static int 465 bdev_rbd_destruct(void *ctx) 466 { 467 struct bdev_rbd *rbd = ctx; 468 469 spdk_io_device_unregister(rbd, bdev_rbd_free_cb); 470 471 return 0; 472 } 473 474 static void 475 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 476 bool success) 477 { 478 if (!success) { 479 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 480 return; 481 } 482 483 bdev_rbd_start_aio(ch, 484 bdev_io, 485 bdev_io->u.bdev.iovs, 486 bdev_io->u.bdev.iovcnt, 487 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 488 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 489 } 490 491 static void 492 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 493 { 494 switch (bdev_io->type) { 495 case SPDK_BDEV_IO_TYPE_READ: 496 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb, 497 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 498 break; 499 500 case SPDK_BDEV_IO_TYPE_WRITE: 501 case SPDK_BDEV_IO_TYPE_FLUSH: 502 bdev_rbd_start_aio(ch, 503 bdev_io, 504 bdev_io->u.bdev.iovs, 505 bdev_io->u.bdev.iovcnt, 506 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen, 507 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 508 break; 509 510 case SPDK_BDEV_IO_TYPE_RESET: 511 bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt, 512 bdev_io); 513 break; 514 515 default: 516 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type); 517 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 518 break; 519 } 520 } 521 522 static bool 523 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 524 { 525 switch (io_type) { 526 case SPDK_BDEV_IO_TYPE_READ: 527 case SPDK_BDEV_IO_TYPE_WRITE: 528 case SPDK_BDEV_IO_TYPE_FLUSH: 529 case SPDK_BDEV_IO_TYPE_RESET: 530 return true; 531 532 default: 533 return false; 534 } 535 } 536 537 static void 538 bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch) 539 { 540 int i, io_status, rc; 541 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH]; 542 struct spdk_bdev_io *bdev_io; 543 struct bdev_rbd_io *rbd_io; 544 enum spdk_bdev_io_status bio_status; 545 546 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH); 547 for (i = 0; i < rc; i++) { 548 bdev_io = rbd_aio_get_arg(comps[i]); 549 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx; 550 io_status = rbd_aio_get_return_value(comps[i]); 551 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS; 552 553 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 554 if ((int)rbd_io->total_len != io_status) { 555 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 556 } 557 } else { 558 /* For others, 0 means success */ 559 if (io_status != 0) { 560 bio_status = SPDK_BDEV_IO_STATUS_FAILED; 561 } 562 } 563 564 rbd_aio_release(comps[i]); 565 566 spdk_bdev_io_complete(bdev_io, bio_status); 567 } 568 } 569 570 static void 571 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch) 572 { 573 if (!ch) { 574 return; 575 } 576 577 if (ch->image) { 578 bdev_rbd_exit(ch->image); 579 } 580 581 if (ch->io_ctx) { 582 rados_ioctx_destroy(ch->io_ctx); 583 } 584 585 if (ch->pfd >= 0) { 586 close(ch->pfd); 587 } 588 589 if (ch->group_ch) { 590 spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch)); 591 } 592 } 593 594 static void * 595 bdev_rbd_handle(void *arg) 596 { 597 struct bdev_rbd_io_channel *ch = arg; 598 void *ret = arg; 599 600 assert(ch->disk->cluster_p != NULL); 601 602 if (rados_ioctx_create(*(ch->disk->cluster_p), ch->disk->pool_name, &ch->io_ctx) < 0) { 603 SPDK_ERRLOG("Failed to create ioctx\n"); 604 ret = NULL; 605 return ret; 606 } 607 608 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) { 609 SPDK_ERRLOG("Failed to open specified rbd device\n"); 610 ret = NULL; 611 } 612 613 return ret; 614 } 615 616 static int 617 bdev_rbd_create_cb(void *io_device, void *ctx_buf) 618 { 619 struct bdev_rbd_io_channel *ch = ctx_buf; 620 int ret; 621 struct epoll_event event; 622 623 ch->disk = io_device; 624 ch->image = NULL; 625 ch->io_ctx = NULL; 626 ch->pfd = -1; 627 628 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) { 629 goto err; 630 } 631 632 ch->pfd = eventfd(0, EFD_NONBLOCK); 633 if (ch->pfd < 0) { 634 SPDK_ERRLOG("Failed to get eventfd\n"); 635 goto err; 636 } 637 638 ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD); 639 if (ret < 0) { 640 SPDK_ERRLOG("Failed to set rbd image notification\n"); 641 goto err; 642 } 643 644 ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if)); 645 assert(ch->group_ch != NULL); 646 memset(&event, 0, sizeof(event)); 647 event.events = EPOLLIN; 648 event.data.ptr = ch; 649 650 ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event); 651 if (ret < 0) { 652 SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch, 653 ch->group_ch); 654 goto err; 655 } 656 657 return 0; 658 659 err: 660 bdev_rbd_free_channel(ch); 661 return -1; 662 } 663 664 static void 665 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf) 666 { 667 struct bdev_rbd_io_channel *io_channel = ctx_buf; 668 int rc; 669 670 rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL, 671 io_channel->pfd, NULL); 672 if (rc < 0) { 673 SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n", 674 io_channel, io_channel->group_ch); 675 } 676 677 bdev_rbd_free_channel(io_channel); 678 } 679 680 static struct spdk_io_channel * 681 bdev_rbd_get_io_channel(void *ctx) 682 { 683 struct bdev_rbd *rbd_bdev = ctx; 684 685 return spdk_get_io_channel(rbd_bdev); 686 } 687 688 static void 689 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w) 690 { 691 struct bdev_rbd_cluster *entry; 692 693 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 694 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 695 if (strcmp(cluster_name, entry->name)) { 696 continue; 697 } 698 if (entry->user_id) { 699 spdk_json_write_named_string(w, "user_id", entry->user_id); 700 } 701 702 if (entry->config_param) { 703 char **config_entry = entry->config_param; 704 705 spdk_json_write_named_object_begin(w, "config_param"); 706 while (*config_entry) { 707 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 708 config_entry += 2; 709 } 710 spdk_json_write_object_end(w); 711 } else if (entry->config_file) { 712 spdk_json_write_named_string(w, "config_file", entry->config_file); 713 } 714 715 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 716 return; 717 } 718 719 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 720 } 721 722 static int 723 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 724 { 725 struct bdev_rbd *rbd_bdev = ctx; 726 727 spdk_json_write_named_object_begin(w, "rbd"); 728 729 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name); 730 731 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name); 732 733 if (rbd_bdev->cluster_name) { 734 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w); 735 goto end; 736 } 737 738 if (rbd_bdev->user_id) { 739 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id); 740 } 741 742 if (rbd_bdev->config) { 743 char **entry = rbd_bdev->config; 744 745 spdk_json_write_named_object_begin(w, "config"); 746 while (*entry) { 747 spdk_json_write_named_string(w, entry[0], entry[1]); 748 entry += 2; 749 } 750 spdk_json_write_object_end(w); 751 } 752 753 end: 754 spdk_json_write_object_end(w); 755 756 return 0; 757 } 758 759 static void 760 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 761 { 762 struct bdev_rbd *rbd = bdev->ctxt; 763 764 spdk_json_write_object_begin(w); 765 766 spdk_json_write_named_string(w, "method", "bdev_rbd_create"); 767 768 spdk_json_write_named_object_begin(w, "params"); 769 spdk_json_write_named_string(w, "name", bdev->name); 770 spdk_json_write_named_string(w, "pool_name", rbd->pool_name); 771 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name); 772 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen); 773 if (rbd->user_id) { 774 spdk_json_write_named_string(w, "user_id", rbd->user_id); 775 } 776 777 if (rbd->config) { 778 char **entry = rbd->config; 779 780 spdk_json_write_named_object_begin(w, "config"); 781 while (*entry) { 782 spdk_json_write_named_string(w, entry[0], entry[1]); 783 entry += 2; 784 } 785 spdk_json_write_object_end(w); 786 } 787 788 spdk_json_write_object_end(w); 789 790 spdk_json_write_object_end(w); 791 } 792 793 static void 794 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w) 795 { 796 assert(entry != NULL); 797 798 spdk_json_write_object_begin(w); 799 spdk_json_write_named_string(w, "cluster_name", entry->name); 800 801 if (entry->user_id) { 802 spdk_json_write_named_string(w, "user_id", entry->user_id); 803 } 804 805 if (entry->config_param) { 806 char **config_entry = entry->config_param; 807 808 spdk_json_write_named_object_begin(w, "config_param"); 809 while (*config_entry) { 810 spdk_json_write_named_string(w, config_entry[0], config_entry[1]); 811 config_entry += 2; 812 } 813 spdk_json_write_object_end(w); 814 } else if (entry->config_file) { 815 spdk_json_write_named_string(w, "config_file", entry->config_file); 816 } 817 818 spdk_json_write_object_end(w); 819 } 820 821 int 822 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name) 823 { 824 struct bdev_rbd_cluster *entry; 825 struct spdk_json_write_ctx *w; 826 827 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 828 829 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) { 830 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 831 return -ENOENT; 832 } 833 834 /* If cluster name is provided */ 835 if (name) { 836 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 837 if (strcmp(name, entry->name) == 0) { 838 w = spdk_jsonrpc_begin_result(request); 839 dump_single_cluster_entry(entry, w); 840 spdk_jsonrpc_end_result(request, w); 841 842 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 843 return 0; 844 } 845 } 846 847 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 848 return -ENOENT; 849 } 850 851 w = spdk_jsonrpc_begin_result(request); 852 spdk_json_write_array_begin(w); 853 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 854 dump_single_cluster_entry(entry, w); 855 } 856 spdk_json_write_array_end(w); 857 spdk_jsonrpc_end_result(request, w); 858 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 859 860 return 0; 861 } 862 863 static const struct spdk_bdev_fn_table rbd_fn_table = { 864 .destruct = bdev_rbd_destruct, 865 .submit_request = bdev_rbd_submit_request, 866 .io_type_supported = bdev_rbd_io_type_supported, 867 .get_io_channel = bdev_rbd_get_io_channel, 868 .dump_info_json = bdev_rbd_dump_info_json, 869 .write_config_json = bdev_rbd_write_config_json, 870 }; 871 872 static int 873 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param, 874 const char *config_file) 875 { 876 struct bdev_rbd_cluster *entry; 877 int rc; 878 879 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 880 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 881 if (strcmp(name, entry->name) == 0) { 882 SPDK_ERRLOG("Cluster name=%s already exists\n", name); 883 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 884 return -1; 885 } 886 } 887 888 entry = calloc(1, sizeof(*entry)); 889 if (!entry) { 890 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name); 891 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 892 return -1; 893 } 894 895 entry->name = strdup(name); 896 if (entry->name == NULL) { 897 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry); 898 goto err_handle; 899 } 900 901 if (user_id) { 902 entry->user_id = strdup(user_id); 903 if (entry->user_id == NULL) { 904 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry); 905 goto err_handle; 906 } 907 } 908 909 /* The first priority is the config_param, then we use the config_file */ 910 if (config_param) { 911 entry->config_param = bdev_rbd_dup_config(config_param); 912 if (entry->config_param == NULL) { 913 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry); 914 goto err_handle; 915 } 916 } else if (config_file) { 917 entry->config_file = strdup(config_file); 918 if (entry->config_file == NULL) { 919 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry); 920 goto err_handle; 921 } 922 } 923 924 rc = rados_create(&entry->cluster, user_id); 925 if (rc < 0) { 926 SPDK_ERRLOG("Failed to create rados_t struct\n"); 927 goto err_handle; 928 } 929 930 if (config_param) { 931 const char *const *config_entry = config_param; 932 while (*config_entry) { 933 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]); 934 if (rc < 0) { 935 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]); 936 rados_shutdown(entry->cluster); 937 goto err_handle; 938 } 939 config_entry += 2; 940 } 941 } else { 942 rc = rados_conf_read_file(entry->cluster, entry->config_file); 943 if (rc < 0) { 944 SPDK_ERRLOG("Failed to read conf file\n"); 945 rados_shutdown(entry->cluster); 946 goto err_handle; 947 } 948 } 949 950 rc = rados_connect(entry->cluster); 951 if (rc < 0) { 952 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster); 953 rados_shutdown(entry->cluster); 954 goto err_handle; 955 } 956 957 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link); 958 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 959 960 return 0; 961 962 err_handle: 963 bdev_rbd_cluster_free(entry); 964 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 965 return -1; 966 } 967 968 int 969 bdev_rbd_unregister_cluster(const char *name) 970 { 971 struct bdev_rbd_cluster *entry; 972 int rc = 0; 973 974 if (name == NULL) { 975 return -1; 976 } 977 978 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex); 979 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) { 980 if (strcmp(name, entry->name) == 0) { 981 if (entry->ref == 0) { 982 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link); 983 rados_shutdown(entry->cluster); 984 bdev_rbd_cluster_free(entry); 985 } else { 986 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n", 987 entry->name); 988 rc = -1; 989 } 990 991 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 992 return rc; 993 } 994 } 995 996 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex); 997 998 SPDK_ERRLOG("Could not find the cluster name =%p\n", name); 999 1000 return -1; 1001 } 1002 1003 static void * 1004 _bdev_rbd_register_cluster(void *arg) 1005 { 1006 struct cluster_register_info *info = arg; 1007 void *ret = arg; 1008 int rc; 1009 1010 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id, 1011 (const char *const *)info->config_param, (const char *)info->config_file); 1012 if (rc) { 1013 ret = NULL; 1014 } 1015 1016 return ret; 1017 } 1018 1019 int 1020 bdev_rbd_register_cluster(struct cluster_register_info *info) 1021 { 1022 assert(info != NULL); 1023 1024 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU 1025 * resource contention */ 1026 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) { 1027 return -1; 1028 } 1029 1030 return 0; 1031 } 1032 1033 int 1034 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id, 1035 const char *pool_name, 1036 const char *const *config, 1037 const char *rbd_name, 1038 uint32_t block_size, 1039 const char *cluster_name) 1040 { 1041 struct bdev_rbd *rbd; 1042 int ret; 1043 1044 if ((pool_name == NULL) || (rbd_name == NULL)) { 1045 return -EINVAL; 1046 } 1047 1048 rbd = calloc(1, sizeof(struct bdev_rbd)); 1049 if (rbd == NULL) { 1050 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n"); 1051 return -ENOMEM; 1052 } 1053 1054 rbd->rbd_name = strdup(rbd_name); 1055 if (!rbd->rbd_name) { 1056 bdev_rbd_free(rbd); 1057 return -ENOMEM; 1058 } 1059 1060 if (user_id) { 1061 rbd->user_id = strdup(user_id); 1062 if (!rbd->user_id) { 1063 bdev_rbd_free(rbd); 1064 return -ENOMEM; 1065 } 1066 } 1067 1068 if (cluster_name) { 1069 rbd->cluster_name = strdup(cluster_name); 1070 if (!rbd->cluster_name) { 1071 bdev_rbd_free(rbd); 1072 return -ENOMEM; 1073 } 1074 } 1075 rbd->pool_name = strdup(pool_name); 1076 if (!rbd->pool_name) { 1077 bdev_rbd_free(rbd); 1078 return -ENOMEM; 1079 } 1080 1081 if (config && !(rbd->config = bdev_rbd_dup_config(config))) { 1082 bdev_rbd_free(rbd); 1083 return -ENOMEM; 1084 } 1085 1086 ret = bdev_rbd_init(rbd); 1087 if (ret < 0) { 1088 bdev_rbd_free(rbd); 1089 SPDK_ERRLOG("Failed to init rbd device\n"); 1090 return ret; 1091 } 1092 1093 if (name) { 1094 rbd->disk.name = strdup(name); 1095 } else { 1096 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count); 1097 } 1098 if (!rbd->disk.name) { 1099 bdev_rbd_free(rbd); 1100 return -ENOMEM; 1101 } 1102 rbd->disk.product_name = "Ceph Rbd Disk"; 1103 bdev_rbd_count++; 1104 1105 rbd->disk.write_cache = 0; 1106 rbd->disk.blocklen = block_size; 1107 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen; 1108 rbd->disk.ctxt = rbd; 1109 rbd->disk.fn_table = &rbd_fn_table; 1110 rbd->disk.module = &rbd_if; 1111 1112 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name); 1113 1114 spdk_io_device_register(rbd, bdev_rbd_create_cb, 1115 bdev_rbd_destroy_cb, 1116 sizeof(struct bdev_rbd_io_channel), 1117 rbd_name); 1118 ret = spdk_bdev_register(&rbd->disk); 1119 if (ret) { 1120 spdk_io_device_unregister(rbd, NULL); 1121 bdev_rbd_free(rbd); 1122 return ret; 1123 } 1124 1125 *bdev = &(rbd->disk); 1126 1127 return ret; 1128 } 1129 1130 void 1131 bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg) 1132 { 1133 if (!bdev || bdev->module != &rbd_if) { 1134 cb_fn(cb_arg, -ENODEV); 1135 return; 1136 } 1137 1138 spdk_bdev_unregister(bdev, cb_fn, cb_arg); 1139 } 1140 1141 int 1142 bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb) 1143 { 1144 struct spdk_io_channel *ch; 1145 struct bdev_rbd_io_channel *rbd_io_ch; 1146 int rc; 1147 uint64_t new_size_in_byte; 1148 uint64_t current_size_in_mb; 1149 1150 if (bdev->module != &rbd_if) { 1151 return -EINVAL; 1152 } 1153 1154 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024); 1155 if (current_size_in_mb > new_size_in_mb) { 1156 SPDK_ERRLOG("The new bdev size must be lager than current bdev size.\n"); 1157 return -EINVAL; 1158 } 1159 1160 ch = bdev_rbd_get_io_channel(bdev); 1161 rbd_io_ch = spdk_io_channel_get_ctx(ch); 1162 new_size_in_byte = new_size_in_mb * 1024 * 1024; 1163 1164 rc = rbd_resize(rbd_io_ch->image, new_size_in_byte); 1165 spdk_put_io_channel(ch); 1166 if (rc != 0) { 1167 SPDK_ERRLOG("failed to resize the ceph bdev.\n"); 1168 return rc; 1169 } 1170 1171 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen); 1172 if (rc != 0) { 1173 SPDK_ERRLOG("failed to notify block cnt change.\n"); 1174 return rc; 1175 } 1176 1177 return rc; 1178 } 1179 1180 static int 1181 bdev_rbd_group_poll(void *arg) 1182 { 1183 struct bdev_rbd_group_channel *group_ch = arg; 1184 struct epoll_event events[MAX_EVENTS_PER_POLL]; 1185 int num_events, i; 1186 1187 num_events = epoll_wait(group_ch->epoll_fd, events, MAX_EVENTS_PER_POLL, 0); 1188 1189 if (num_events <= 0) { 1190 return SPDK_POLLER_IDLE; 1191 } 1192 1193 for (i = 0; i < num_events; i++) { 1194 bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr); 1195 } 1196 1197 return SPDK_POLLER_BUSY; 1198 } 1199 1200 static int 1201 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf) 1202 { 1203 struct bdev_rbd_group_channel *ch = ctx_buf; 1204 1205 ch->epoll_fd = epoll_create1(0); 1206 if (ch->epoll_fd < 0) { 1207 SPDK_ERRLOG("Could not create epoll fd on io device=%p\n", io_device); 1208 return -1; 1209 } 1210 1211 ch->poller = SPDK_POLLER_REGISTER(bdev_rbd_group_poll, ch, 0); 1212 1213 return 0; 1214 } 1215 1216 static void 1217 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf) 1218 { 1219 struct bdev_rbd_group_channel *ch = ctx_buf; 1220 1221 if (ch->epoll_fd >= 0) { 1222 close(ch->epoll_fd); 1223 } 1224 1225 spdk_poller_unregister(&ch->poller); 1226 } 1227 1228 static int 1229 bdev_rbd_library_init(void) 1230 { 1231 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb, 1232 sizeof(struct bdev_rbd_group_channel), "bdev_rbd_poll_groups"); 1233 1234 return 0; 1235 } 1236 1237 static void 1238 bdev_rbd_library_fini(void) 1239 { 1240 spdk_io_device_unregister(&rbd_if, NULL); 1241 } 1242 1243 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd) 1244