1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "rocksdb/env.h" 7 #include <set> 8 #include <iostream> 9 #include <stdexcept> 10 11 extern "C" { 12 #include "spdk/env.h" 13 #include "spdk/event.h" 14 #include "spdk/blob.h" 15 #include "spdk/blobfs.h" 16 #include "spdk/blob_bdev.h" 17 #include "spdk/log.h" 18 #include "spdk/thread.h" 19 #include "spdk/bdev.h" 20 } 21 22 namespace rocksdb 23 { 24 25 struct spdk_filesystem *g_fs = NULL; 26 struct spdk_bs_dev *g_bs_dev; 27 uint32_t g_lcore = 0; 28 std::string g_bdev_name; 29 volatile bool g_spdk_ready = false; 30 volatile bool g_spdk_start_failure = false; 31 32 void SpdkInitializeThread(void); 33 34 class SpdkThreadCtx 35 { 36 public: 37 struct spdk_fs_thread_ctx *channel; 38 39 SpdkThreadCtx(void) : channel(NULL) 40 { 41 SpdkInitializeThread(); 42 } 43 44 ~SpdkThreadCtx(void) 45 { 46 if (channel) { 47 spdk_fs_free_thread_ctx(channel); 48 channel = NULL; 49 } 50 } 51 52 private: 53 SpdkThreadCtx(const SpdkThreadCtx &); 54 SpdkThreadCtx &operator=(const SpdkThreadCtx &); 55 }; 56 57 thread_local SpdkThreadCtx g_sync_args; 58 59 static void 60 set_channel() 61 { 62 struct spdk_thread *thread; 63 64 if (g_fs != NULL && g_sync_args.channel == NULL) { 65 thread = spdk_thread_create("spdk_rocksdb", NULL); 66 spdk_set_thread(thread); 67 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 68 } 69 } 70 71 static void 72 __call_fn(void *arg1, void *arg2) 73 { 74 fs_request_fn fn; 75 76 fn = (fs_request_fn)arg1; 77 fn(arg2); 78 } 79 80 static void 81 __send_request(fs_request_fn fn, void *arg) 82 { 83 struct spdk_event *event; 84 85 event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg); 86 spdk_event_call(event); 87 } 88 89 static std::string 90 sanitize_path(const std::string &input, const std::string &mount_directory) 91 { 92 int index = 0; 93 std::string name; 94 std::string input_tmp; 95 96 input_tmp = input.substr(mount_directory.length(), input.length()); 97 for (const char &c : input_tmp) { 98 if (index == 0) { 99 if (c != '/') { 100 name = name.insert(index, 1, '/'); 101 index++; 102 } 103 name = name.insert(index, 1, c); 104 index++; 105 } else { 106 if (name[index - 1] == '/' && c == '/') { 107 continue; 108 } else { 109 name = name.insert(index, 1, c); 110 index++; 111 } 112 } 113 } 114 115 if (name[name.size() - 1] == '/') { 116 name = name.erase(name.size() - 1, 1); 117 } 118 return name; 119 } 120 121 class SpdkSequentialFile : public SequentialFile 122 { 123 struct spdk_file *mFile; 124 uint64_t mOffset; 125 public: 126 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 127 virtual ~SpdkSequentialFile(); 128 129 virtual Status Read(size_t n, Slice *result, char *scratch) override; 130 virtual Status Skip(uint64_t n) override; 131 virtual Status InvalidateCache(size_t offset, size_t length) override; 132 }; 133 134 SpdkSequentialFile::~SpdkSequentialFile(void) 135 { 136 set_channel(); 137 spdk_file_close(mFile, g_sync_args.channel); 138 } 139 140 Status 141 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 142 { 143 int64_t ret; 144 145 set_channel(); 146 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 147 if (ret >= 0) { 148 mOffset += ret; 149 *result = Slice(scratch, ret); 150 return Status::OK(); 151 } else { 152 errno = -ret; 153 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 154 } 155 } 156 157 Status 158 SpdkSequentialFile::Skip(uint64_t n) 159 { 160 mOffset += n; 161 return Status::OK(); 162 } 163 164 Status 165 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset, 166 __attribute__((unused)) size_t length) 167 { 168 return Status::OK(); 169 } 170 171 class SpdkRandomAccessFile : public RandomAccessFile 172 { 173 struct spdk_file *mFile; 174 public: 175 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {} 176 virtual ~SpdkRandomAccessFile(); 177 178 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 179 virtual Status InvalidateCache(size_t offset, size_t length) override; 180 }; 181 182 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 183 { 184 set_channel(); 185 spdk_file_close(mFile, g_sync_args.channel); 186 } 187 188 Status 189 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 190 { 191 int64_t rc; 192 193 set_channel(); 194 rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 195 if (rc >= 0) { 196 *result = Slice(scratch, n); 197 return Status::OK(); 198 } else { 199 errno = -rc; 200 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 201 } 202 } 203 204 Status 205 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset, 206 __attribute__((unused)) size_t length) 207 { 208 return Status::OK(); 209 } 210 211 class SpdkWritableFile : public WritableFile 212 { 213 struct spdk_file *mFile; 214 uint64_t mSize; 215 216 public: 217 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {} 218 ~SpdkWritableFile() 219 { 220 if (mFile != NULL) { 221 Close(); 222 } 223 } 224 225 virtual void SetIOPriority(Env::IOPriority pri) 226 { 227 if (pri == Env::IO_HIGH) { 228 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 229 } 230 } 231 232 virtual Status Truncate(uint64_t size) override 233 { 234 int rc; 235 236 set_channel(); 237 rc = spdk_file_truncate(mFile, g_sync_args.channel, size); 238 if (!rc) { 239 mSize = size; 240 return Status::OK(); 241 } else { 242 errno = -rc; 243 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 244 } 245 } 246 virtual Status Close() override 247 { 248 set_channel(); 249 spdk_file_close(mFile, g_sync_args.channel); 250 mFile = NULL; 251 return Status::OK(); 252 } 253 virtual Status Append(const Slice &data) override; 254 virtual Status Flush() override 255 { 256 return Status::OK(); 257 } 258 virtual Status Sync() override 259 { 260 int rc; 261 262 set_channel(); 263 rc = spdk_file_sync(mFile, g_sync_args.channel); 264 if (!rc) { 265 return Status::OK(); 266 } else { 267 errno = -rc; 268 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 269 } 270 } 271 virtual Status Fsync() override 272 { 273 int rc; 274 275 set_channel(); 276 rc = spdk_file_sync(mFile, g_sync_args.channel); 277 if (!rc) { 278 return Status::OK(); 279 } else { 280 errno = -rc; 281 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 282 } 283 } 284 virtual bool IsSyncThreadSafe() const override 285 { 286 return true; 287 } 288 virtual uint64_t GetFileSize() override 289 { 290 return mSize; 291 } 292 virtual Status InvalidateCache(__attribute__((unused)) size_t offset, 293 __attribute__((unused)) size_t length) override 294 { 295 return Status::OK(); 296 } 297 virtual Status Allocate(uint64_t offset, uint64_t len) override 298 { 299 int rc; 300 301 set_channel(); 302 rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 303 if (!rc) { 304 return Status::OK(); 305 } else { 306 errno = -rc; 307 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 308 } 309 } 310 virtual Status RangeSync(__attribute__((unused)) uint64_t offset, 311 __attribute__((unused)) uint64_t nbytes) override 312 { 313 int rc; 314 315 /* 316 * SPDK BlobFS does not have a range sync operation yet, so just sync 317 * the whole file. 318 */ 319 set_channel(); 320 rc = spdk_file_sync(mFile, g_sync_args.channel); 321 if (!rc) { 322 return Status::OK(); 323 } else { 324 errno = -rc; 325 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 326 } 327 } 328 virtual size_t GetUniqueId(char *id, size_t max_size) const override 329 { 330 int rc; 331 332 rc = spdk_file_get_id(mFile, id, max_size); 333 if (rc < 0) { 334 return 0; 335 } else { 336 return rc; 337 } 338 } 339 }; 340 341 Status 342 SpdkWritableFile::Append(const Slice &data) 343 { 344 int64_t rc; 345 346 set_channel(); 347 rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 348 if (rc >= 0) { 349 mSize += data.size(); 350 return Status::OK(); 351 } else { 352 errno = -rc; 353 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 354 } 355 } 356 357 class SpdkDirectory : public Directory 358 { 359 public: 360 SpdkDirectory() {} 361 ~SpdkDirectory() {} 362 Status Fsync() override 363 { 364 return Status::OK(); 365 } 366 }; 367 368 class SpdkAppStartException : public std::runtime_error 369 { 370 public: 371 SpdkAppStartException(std::string mess): std::runtime_error(mess) {} 372 }; 373 374 class SpdkEnv : public EnvWrapper 375 { 376 private: 377 pthread_t mSpdkTid; 378 std::string mDirectory; 379 std::string mConfig; 380 std::string mBdev; 381 382 public: 383 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 384 const std::string &bdev, uint64_t cache_size_in_mb); 385 386 virtual ~SpdkEnv(); 387 388 virtual Status NewSequentialFile(const std::string &fname, 389 std::unique_ptr<SequentialFile> *result, 390 const EnvOptions &options) override 391 { 392 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 393 struct spdk_file *file; 394 int rc; 395 396 std::string name = sanitize_path(fname, mDirectory); 397 set_channel(); 398 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 399 name.c_str(), 0, &file); 400 if (rc == 0) { 401 result->reset(new SpdkSequentialFile(file)); 402 return Status::OK(); 403 } else { 404 /* Myrocks engine uses errno(ENOENT) as one 405 * special condition, for the purpose to 406 * support MySQL, set the errno to right value. 407 */ 408 errno = -rc; 409 return Status::IOError(name, strerror(errno)); 410 } 411 } else { 412 return EnvWrapper::NewSequentialFile(fname, result, options); 413 } 414 } 415 416 virtual Status NewRandomAccessFile(const std::string &fname, 417 std::unique_ptr<RandomAccessFile> *result, 418 const EnvOptions &options) override 419 { 420 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 421 std::string name = sanitize_path(fname, mDirectory); 422 struct spdk_file *file; 423 int rc; 424 425 set_channel(); 426 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 427 name.c_str(), 0, &file); 428 if (rc == 0) { 429 result->reset(new SpdkRandomAccessFile(file)); 430 return Status::OK(); 431 } else { 432 errno = -rc; 433 return Status::IOError(name, strerror(errno)); 434 } 435 } else { 436 return EnvWrapper::NewRandomAccessFile(fname, result, options); 437 } 438 } 439 440 virtual Status NewWritableFile(const std::string &fname, 441 std::unique_ptr<WritableFile> *result, 442 const EnvOptions &options) override 443 { 444 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 445 std::string name = sanitize_path(fname, mDirectory); 446 struct spdk_file *file; 447 int rc; 448 449 set_channel(); 450 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 451 SPDK_BLOBFS_OPEN_CREATE, &file); 452 if (rc == 0) { 453 result->reset(new SpdkWritableFile(file)); 454 return Status::OK(); 455 } else { 456 errno = -rc; 457 return Status::IOError(name, strerror(errno)); 458 } 459 } else { 460 return EnvWrapper::NewWritableFile(fname, result, options); 461 } 462 } 463 464 virtual Status ReuseWritableFile(const std::string &fname, 465 const std::string &old_fname, 466 std::unique_ptr<WritableFile> *result, 467 const EnvOptions &options) override 468 { 469 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); 470 } 471 472 virtual Status NewDirectory(__attribute__((unused)) const std::string &name, 473 std::unique_ptr<Directory> *result) override 474 { 475 result->reset(new SpdkDirectory()); 476 return Status::OK(); 477 } 478 virtual Status FileExists(const std::string &fname) override 479 { 480 struct spdk_file_stat stat; 481 int rc; 482 std::string name = sanitize_path(fname, mDirectory); 483 484 set_channel(); 485 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 486 if (rc == 0) { 487 return Status::OK(); 488 } 489 return EnvWrapper::FileExists(fname); 490 } 491 virtual Status RenameFile(const std::string &src, const std::string &t) override 492 { 493 int rc; 494 std::string src_name = sanitize_path(src, mDirectory); 495 std::string target_name = sanitize_path(t, mDirectory); 496 497 set_channel(); 498 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 499 src_name.c_str(), target_name.c_str()); 500 if (rc == -ENOENT) { 501 return EnvWrapper::RenameFile(src, t); 502 } 503 return Status::OK(); 504 } 505 virtual Status LinkFile(__attribute__((unused)) const std::string &src, 506 __attribute__((unused)) const std::string &t) override 507 { 508 return Status::NotSupported("SpdkEnv does not support LinkFile"); 509 } 510 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 511 { 512 struct spdk_file_stat stat; 513 int rc; 514 std::string name = sanitize_path(fname, mDirectory); 515 516 set_channel(); 517 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 518 if (rc == -ENOENT) { 519 return EnvWrapper::GetFileSize(fname, size); 520 } 521 *size = stat.size; 522 return Status::OK(); 523 } 524 virtual Status DeleteFile(const std::string &fname) override 525 { 526 int rc; 527 std::string name = sanitize_path(fname, mDirectory); 528 529 set_channel(); 530 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); 531 if (rc == -ENOENT) { 532 return EnvWrapper::DeleteFile(fname); 533 } 534 return Status::OK(); 535 } 536 virtual Status LockFile(const std::string &fname, FileLock **lock) override 537 { 538 std::string name = sanitize_path(fname, mDirectory); 539 int64_t rc; 540 541 set_channel(); 542 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 543 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 544 if (!rc) { 545 return Status::OK(); 546 } else { 547 errno = -rc; 548 return Status::IOError(name, strerror(errno)); 549 } 550 } 551 virtual Status UnlockFile(FileLock *lock) override 552 { 553 set_channel(); 554 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 555 return Status::OK(); 556 } 557 virtual Status GetChildren(const std::string &dir, 558 std::vector<std::string> *result) override 559 { 560 std::string::size_type pos; 561 std::set<std::string> dir_and_file_set; 562 std::string full_path; 563 std::string filename; 564 std::string dir_name; 565 566 if (dir.find("archive") != std::string::npos) { 567 return Status::OK(); 568 } 569 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 570 spdk_fs_iter iter; 571 struct spdk_file *file; 572 dir_name = sanitize_path(dir, mDirectory); 573 574 iter = spdk_fs_iter_first(g_fs); 575 while (iter != NULL) { 576 file = spdk_fs_iter_get_file(iter); 577 full_path = spdk_file_get_name(file); 578 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) { 579 iter = spdk_fs_iter_next(iter); 580 continue; 581 } 582 pos = full_path.find("/", dir_name.length() + 1); 583 584 if (pos != std::string::npos) { 585 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1); 586 } else { 587 filename = full_path.substr(dir_name.length() + 1); 588 } 589 dir_and_file_set.insert(filename); 590 iter = spdk_fs_iter_next(iter); 591 } 592 593 for (auto &s : dir_and_file_set) { 594 result->push_back(s); 595 } 596 597 result->push_back("."); 598 result->push_back(".."); 599 600 return Status::OK(); 601 } 602 return EnvWrapper::GetChildren(dir, result); 603 } 604 }; 605 606 /* The thread local constructor doesn't work for the main thread, since 607 * the filesystem hasn't been loaded yet. So we break out this 608 * SpdkInitializeThread function, so that the main thread can explicitly 609 * call it after the filesystem has been loaded. 610 */ 611 void SpdkInitializeThread(void) 612 { 613 struct spdk_thread *thread; 614 615 if (g_fs != NULL) { 616 if (g_sync_args.channel) { 617 spdk_fs_free_thread_ctx(g_sync_args.channel); 618 } 619 thread = spdk_thread_create("spdk_rocksdb", NULL); 620 spdk_set_thread(thread); 621 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 622 } 623 } 624 625 static void 626 fs_load_cb(__attribute__((unused)) void *ctx, 627 struct spdk_filesystem *fs, int fserrno) 628 { 629 if (fserrno == 0) { 630 g_fs = fs; 631 } 632 g_spdk_ready = true; 633 } 634 635 static void 636 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev, 637 __attribute__((unused)) void *event_ctx) 638 { 639 printf("Unsupported bdev event: type %d\n", type); 640 } 641 642 static void 643 rocksdb_run(__attribute__((unused)) void *arg1) 644 { 645 int rc; 646 647 rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL, 648 &g_bs_dev); 649 if (rc != 0) { 650 printf("Could not create blob bdev\n"); 651 spdk_app_stop(0); 652 exit(1); 653 } 654 655 g_lcore = spdk_env_get_first_core(); 656 657 printf("using bdev %s\n", g_bdev_name.c_str()); 658 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 659 } 660 661 static void 662 fs_unload_cb(__attribute__((unused)) void *ctx, 663 __attribute__((unused)) int fserrno) 664 { 665 assert(fserrno == 0); 666 667 spdk_app_stop(0); 668 } 669 670 static void 671 rocksdb_shutdown(void) 672 { 673 if (g_fs != NULL) { 674 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 675 } else { 676 fs_unload_cb(NULL, 0); 677 } 678 } 679 680 static void * 681 initialize_spdk(void *arg) 682 { 683 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 684 int rc; 685 686 rc = spdk_app_start(opts, rocksdb_run, NULL); 687 /* 688 * TODO: Revisit for case of internal failure of 689 * spdk_app_start(), itself. At this time, it's known 690 * the only application's use of spdk_app_stop() passes 691 * a zero; i.e. no fail (non-zero) cases so here we 692 * assume there was an internal failure and flag it 693 * so we can throw an exception. 694 */ 695 if (rc) { 696 g_spdk_start_failure = true; 697 } else { 698 spdk_app_fini(); 699 delete opts; 700 } 701 pthread_exit(NULL); 702 703 } 704 705 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 706 const std::string &bdev, uint64_t cache_size_in_mb) 707 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev) 708 { 709 struct spdk_app_opts *opts = new struct spdk_app_opts; 710 711 spdk_app_opts_init(opts, sizeof(*opts)); 712 opts->name = "rocksdb"; 713 opts->json_config_file = mConfig.c_str(); 714 opts->shutdown_cb = rocksdb_shutdown; 715 opts->tpoint_group_mask = "0x80"; 716 717 spdk_fs_set_cache_size(cache_size_in_mb); 718 g_bdev_name = mBdev; 719 720 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 721 while (!g_spdk_ready && !g_spdk_start_failure) 722 ; 723 if (g_spdk_start_failure) { 724 delete opts; 725 throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()"); 726 } 727 728 SpdkInitializeThread(); 729 } 730 731 SpdkEnv::~SpdkEnv() 732 { 733 /* This is a workaround for rocksdb test, we close the files if the rocksdb not 734 * do the work before the test quit. 735 */ 736 if (g_fs != NULL) { 737 spdk_fs_iter iter; 738 struct spdk_file *file; 739 740 if (!g_sync_args.channel) { 741 SpdkInitializeThread(); 742 } 743 744 iter = spdk_fs_iter_first(g_fs); 745 while (iter != NULL) { 746 file = spdk_fs_iter_get_file(iter); 747 spdk_file_close(file, g_sync_args.channel); 748 iter = spdk_fs_iter_next(iter); 749 } 750 } 751 752 spdk_app_start_shutdown(); 753 pthread_join(mSpdkTid, NULL); 754 } 755 756 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 757 const std::string &bdev, uint64_t cache_size_in_mb) 758 { 759 try { 760 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb); 761 if (g_fs != NULL) { 762 return spdk_env; 763 } else { 764 delete spdk_env; 765 return NULL; 766 } 767 } catch (SpdkAppStartException &e) { 768 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what()); 769 return NULL; 770 } catch (...) { 771 SPDK_ERRLOG("NewSpdkEnv: default exception caught"); 772 return NULL; 773 } 774 } 775 776 } // namespace rocksdb 777