1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "rocksdb/env.h" 7 #include <set> 8 #include <iostream> 9 #include <stdexcept> 10 11 extern "C" { 12 #include "spdk/env.h" 13 #include "spdk/event.h" 14 #include "spdk/blob.h" 15 #include "spdk/blobfs.h" 16 #include "spdk/blob_bdev.h" 17 #include "spdk/log.h" 18 #include "spdk/thread.h" 19 #include "spdk/bdev.h" 20 } 21 22 namespace rocksdb 23 { 24 25 struct spdk_filesystem *g_fs = NULL; 26 struct spdk_bs_dev *g_bs_dev; 27 uint32_t g_lcore = 0; 28 std::string g_bdev_name; 29 volatile bool g_spdk_ready = false; 30 volatile bool g_spdk_start_failure = false; 31 32 void SpdkInitializeThread(void); 33 34 class SpdkThreadCtx 35 { 36 public: 37 struct spdk_fs_thread_ctx *channel; 38 39 SpdkThreadCtx(void) : channel(NULL) 40 { 41 SpdkInitializeThread(); 42 } 43 44 ~SpdkThreadCtx(void) 45 { 46 if (channel) { 47 spdk_fs_free_thread_ctx(channel); 48 channel = NULL; 49 } 50 } 51 52 private: 53 SpdkThreadCtx(const SpdkThreadCtx &); 54 SpdkThreadCtx &operator=(const SpdkThreadCtx &); 55 }; 56 57 thread_local SpdkThreadCtx g_sync_args; 58 59 static void 60 set_channel() 61 { 62 if (g_fs != NULL && g_sync_args.channel == NULL) { 63 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 64 } 65 } 66 67 static void 68 __call_fn(void *arg1, void *arg2) 69 { 70 fs_request_fn fn; 71 72 fn = (fs_request_fn)arg1; 73 fn(arg2); 74 } 75 76 static void 77 __send_request(fs_request_fn fn, void *arg) 78 { 79 struct spdk_event *event; 80 81 event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg); 82 spdk_event_call(event); 83 } 84 85 static std::string 86 sanitize_path(const std::string &input, const std::string &mount_directory) 87 { 88 int index = 0; 89 std::string name; 90 std::string input_tmp; 91 92 input_tmp = input.substr(mount_directory.length(), input.length()); 93 for (const char &c : input_tmp) { 94 if (index == 0) { 95 if (c != '/') { 96 name = name.insert(index, 1, '/'); 97 index++; 98 } 99 name = name.insert(index, 1, c); 100 index++; 101 } else { 102 if (name[index - 1] == '/' && c == '/') { 103 continue; 104 } else { 105 name = name.insert(index, 1, c); 106 index++; 107 } 108 } 109 } 110 111 if (name[name.size() - 1] == '/') { 112 name = name.erase(name.size() - 1, 1); 113 } 114 return name; 115 } 116 117 class SpdkSequentialFile : public SequentialFile 118 { 119 struct spdk_file *mFile; 120 uint64_t mOffset; 121 public: 122 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 123 virtual ~SpdkSequentialFile(); 124 125 virtual Status Read(size_t n, Slice *result, char *scratch) override; 126 virtual Status Skip(uint64_t n) override; 127 virtual Status InvalidateCache(size_t offset, size_t length) override; 128 }; 129 130 SpdkSequentialFile::~SpdkSequentialFile(void) 131 { 132 set_channel(); 133 spdk_file_close(mFile, g_sync_args.channel); 134 } 135 136 Status 137 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 138 { 139 int64_t ret; 140 141 set_channel(); 142 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 143 if (ret >= 0) { 144 mOffset += ret; 145 *result = Slice(scratch, ret); 146 return Status::OK(); 147 } else { 148 errno = -ret; 149 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 150 } 151 } 152 153 Status 154 SpdkSequentialFile::Skip(uint64_t n) 155 { 156 mOffset += n; 157 return Status::OK(); 158 } 159 160 Status 161 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset, 162 __attribute__((unused)) size_t length) 163 { 164 return Status::OK(); 165 } 166 167 class SpdkRandomAccessFile : public RandomAccessFile 168 { 169 struct spdk_file *mFile; 170 public: 171 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {} 172 virtual ~SpdkRandomAccessFile(); 173 174 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 175 virtual Status InvalidateCache(size_t offset, size_t length) override; 176 }; 177 178 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 179 { 180 set_channel(); 181 spdk_file_close(mFile, g_sync_args.channel); 182 } 183 184 Status 185 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 186 { 187 int64_t rc; 188 189 set_channel(); 190 rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 191 if (rc >= 0) { 192 *result = Slice(scratch, n); 193 return Status::OK(); 194 } else { 195 errno = -rc; 196 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 197 } 198 } 199 200 Status 201 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset, 202 __attribute__((unused)) size_t length) 203 { 204 return Status::OK(); 205 } 206 207 class SpdkWritableFile : public WritableFile 208 { 209 struct spdk_file *mFile; 210 uint64_t mSize; 211 212 public: 213 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {} 214 ~SpdkWritableFile() 215 { 216 if (mFile != NULL) { 217 Close(); 218 } 219 } 220 221 virtual void SetIOPriority(Env::IOPriority pri) 222 { 223 if (pri == Env::IO_HIGH) { 224 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 225 } 226 } 227 228 virtual Status Truncate(uint64_t size) override 229 { 230 int rc; 231 232 set_channel(); 233 rc = spdk_file_truncate(mFile, g_sync_args.channel, size); 234 if (!rc) { 235 mSize = size; 236 return Status::OK(); 237 } else { 238 errno = -rc; 239 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 240 } 241 } 242 virtual Status Close() override 243 { 244 set_channel(); 245 spdk_file_close(mFile, g_sync_args.channel); 246 mFile = NULL; 247 return Status::OK(); 248 } 249 virtual Status Append(const Slice &data) override; 250 virtual Status Flush() override 251 { 252 return Status::OK(); 253 } 254 virtual Status Sync() override 255 { 256 int rc; 257 258 set_channel(); 259 rc = spdk_file_sync(mFile, g_sync_args.channel); 260 if (!rc) { 261 return Status::OK(); 262 } else { 263 errno = -rc; 264 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 265 } 266 } 267 virtual Status Fsync() override 268 { 269 int rc; 270 271 set_channel(); 272 rc = spdk_file_sync(mFile, g_sync_args.channel); 273 if (!rc) { 274 return Status::OK(); 275 } else { 276 errno = -rc; 277 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 278 } 279 } 280 virtual bool IsSyncThreadSafe() const override 281 { 282 return true; 283 } 284 virtual uint64_t GetFileSize() override 285 { 286 return mSize; 287 } 288 virtual Status InvalidateCache(__attribute__((unused)) size_t offset, 289 __attribute__((unused)) size_t length) override 290 { 291 return Status::OK(); 292 } 293 virtual Status Allocate(uint64_t offset, uint64_t len) override 294 { 295 int rc; 296 297 set_channel(); 298 rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 299 if (!rc) { 300 return Status::OK(); 301 } else { 302 errno = -rc; 303 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 304 } 305 } 306 virtual Status RangeSync(__attribute__((unused)) uint64_t offset, 307 __attribute__((unused)) uint64_t nbytes) override 308 { 309 int rc; 310 311 /* 312 * SPDK BlobFS does not have a range sync operation yet, so just sync 313 * the whole file. 314 */ 315 set_channel(); 316 rc = spdk_file_sync(mFile, g_sync_args.channel); 317 if (!rc) { 318 return Status::OK(); 319 } else { 320 errno = -rc; 321 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 322 } 323 } 324 virtual size_t GetUniqueId(char *id, size_t max_size) const override 325 { 326 int rc; 327 328 rc = spdk_file_get_id(mFile, id, max_size); 329 if (rc < 0) { 330 return 0; 331 } else { 332 return rc; 333 } 334 } 335 }; 336 337 Status 338 SpdkWritableFile::Append(const Slice &data) 339 { 340 int64_t rc; 341 342 set_channel(); 343 rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 344 if (rc >= 0) { 345 mSize += data.size(); 346 return Status::OK(); 347 } else { 348 errno = -rc; 349 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 350 } 351 } 352 353 class SpdkDirectory : public Directory 354 { 355 public: 356 SpdkDirectory() {} 357 ~SpdkDirectory() {} 358 Status Fsync() override 359 { 360 return Status::OK(); 361 } 362 }; 363 364 class SpdkAppStartException : public std::runtime_error 365 { 366 public: 367 SpdkAppStartException(std::string mess): std::runtime_error(mess) {} 368 }; 369 370 class SpdkEnv : public EnvWrapper 371 { 372 private: 373 pthread_t mSpdkTid; 374 std::string mDirectory; 375 std::string mConfig; 376 std::string mBdev; 377 378 public: 379 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 380 const std::string &bdev, uint64_t cache_size_in_mb); 381 382 virtual ~SpdkEnv(); 383 384 virtual Status NewSequentialFile(const std::string &fname, 385 std::unique_ptr<SequentialFile> *result, 386 const EnvOptions &options) override 387 { 388 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 389 struct spdk_file *file; 390 int rc; 391 392 std::string name = sanitize_path(fname, mDirectory); 393 set_channel(); 394 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 395 name.c_str(), 0, &file); 396 if (rc == 0) { 397 result->reset(new SpdkSequentialFile(file)); 398 return Status::OK(); 399 } else { 400 /* Myrocks engine uses errno(ENOENT) as one 401 * special condition, for the purpose to 402 * support MySQL, set the errno to right value. 403 */ 404 errno = -rc; 405 return Status::IOError(name, strerror(errno)); 406 } 407 } else { 408 return EnvWrapper::NewSequentialFile(fname, result, options); 409 } 410 } 411 412 virtual Status NewRandomAccessFile(const std::string &fname, 413 std::unique_ptr<RandomAccessFile> *result, 414 const EnvOptions &options) override 415 { 416 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 417 std::string name = sanitize_path(fname, mDirectory); 418 struct spdk_file *file; 419 int rc; 420 421 set_channel(); 422 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 423 name.c_str(), 0, &file); 424 if (rc == 0) { 425 result->reset(new SpdkRandomAccessFile(file)); 426 return Status::OK(); 427 } else { 428 errno = -rc; 429 return Status::IOError(name, strerror(errno)); 430 } 431 } else { 432 return EnvWrapper::NewRandomAccessFile(fname, result, options); 433 } 434 } 435 436 virtual Status NewWritableFile(const std::string &fname, 437 std::unique_ptr<WritableFile> *result, 438 const EnvOptions &options) override 439 { 440 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 441 std::string name = sanitize_path(fname, mDirectory); 442 struct spdk_file *file; 443 int rc; 444 445 set_channel(); 446 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 447 SPDK_BLOBFS_OPEN_CREATE, &file); 448 if (rc == 0) { 449 result->reset(new SpdkWritableFile(file)); 450 return Status::OK(); 451 } else { 452 errno = -rc; 453 return Status::IOError(name, strerror(errno)); 454 } 455 } else { 456 return EnvWrapper::NewWritableFile(fname, result, options); 457 } 458 } 459 460 virtual Status ReuseWritableFile(const std::string &fname, 461 const std::string &old_fname, 462 std::unique_ptr<WritableFile> *result, 463 const EnvOptions &options) override 464 { 465 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); 466 } 467 468 virtual Status NewDirectory(__attribute__((unused)) const std::string &name, 469 std::unique_ptr<Directory> *result) override 470 { 471 result->reset(new SpdkDirectory()); 472 return Status::OK(); 473 } 474 virtual Status FileExists(const std::string &fname) override 475 { 476 struct spdk_file_stat stat; 477 int rc; 478 std::string name = sanitize_path(fname, mDirectory); 479 480 set_channel(); 481 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 482 if (rc == 0) { 483 return Status::OK(); 484 } 485 return EnvWrapper::FileExists(fname); 486 } 487 virtual Status RenameFile(const std::string &src, const std::string &t) override 488 { 489 int rc; 490 std::string src_name = sanitize_path(src, mDirectory); 491 std::string target_name = sanitize_path(t, mDirectory); 492 493 set_channel(); 494 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 495 src_name.c_str(), target_name.c_str()); 496 if (rc == -ENOENT) { 497 return EnvWrapper::RenameFile(src, t); 498 } 499 return Status::OK(); 500 } 501 virtual Status LinkFile(__attribute__((unused)) const std::string &src, 502 __attribute__((unused)) const std::string &t) override 503 { 504 return Status::NotSupported("SpdkEnv does not support LinkFile"); 505 } 506 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 507 { 508 struct spdk_file_stat stat; 509 int rc; 510 std::string name = sanitize_path(fname, mDirectory); 511 512 set_channel(); 513 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 514 if (rc == -ENOENT) { 515 return EnvWrapper::GetFileSize(fname, size); 516 } 517 *size = stat.size; 518 return Status::OK(); 519 } 520 virtual Status DeleteFile(const std::string &fname) override 521 { 522 int rc; 523 std::string name = sanitize_path(fname, mDirectory); 524 525 set_channel(); 526 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); 527 if (rc == -ENOENT) { 528 return EnvWrapper::DeleteFile(fname); 529 } 530 return Status::OK(); 531 } 532 virtual Status LockFile(const std::string &fname, FileLock **lock) override 533 { 534 std::string name = sanitize_path(fname, mDirectory); 535 int64_t rc; 536 537 set_channel(); 538 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 539 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 540 if (!rc) { 541 return Status::OK(); 542 } else { 543 errno = -rc; 544 return Status::IOError(name, strerror(errno)); 545 } 546 } 547 virtual Status UnlockFile(FileLock *lock) override 548 { 549 set_channel(); 550 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 551 return Status::OK(); 552 } 553 virtual Status GetChildren(const std::string &dir, 554 std::vector<std::string> *result) override 555 { 556 std::string::size_type pos; 557 std::set<std::string> dir_and_file_set; 558 std::string full_path; 559 std::string filename; 560 std::string dir_name; 561 562 if (dir.find("archive") != std::string::npos) { 563 return Status::OK(); 564 } 565 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 566 spdk_fs_iter iter; 567 struct spdk_file *file; 568 dir_name = sanitize_path(dir, mDirectory); 569 570 iter = spdk_fs_iter_first(g_fs); 571 while (iter != NULL) { 572 file = spdk_fs_iter_get_file(iter); 573 full_path = spdk_file_get_name(file); 574 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) { 575 iter = spdk_fs_iter_next(iter); 576 continue; 577 } 578 pos = full_path.find("/", dir_name.length() + 1); 579 580 if (pos != std::string::npos) { 581 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1); 582 } else { 583 filename = full_path.substr(dir_name.length() + 1); 584 } 585 dir_and_file_set.insert(filename); 586 iter = spdk_fs_iter_next(iter); 587 } 588 589 for (auto &s : dir_and_file_set) { 590 result->push_back(s); 591 } 592 593 result->push_back("."); 594 result->push_back(".."); 595 596 return Status::OK(); 597 } 598 return EnvWrapper::GetChildren(dir, result); 599 } 600 }; 601 602 /* The thread local constructor doesn't work for the main thread, since 603 * the filesystem hasn't been loaded yet. So we break out this 604 * SpdkInitializeThread function, so that the main thread can explicitly 605 * call it after the filesystem has been loaded. 606 */ 607 void SpdkInitializeThread(void) 608 { 609 if (g_fs != NULL) { 610 if (g_sync_args.channel) { 611 spdk_fs_free_thread_ctx(g_sync_args.channel); 612 } 613 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 614 } 615 } 616 617 static void 618 fs_load_cb(__attribute__((unused)) void *ctx, 619 struct spdk_filesystem *fs, int fserrno) 620 { 621 if (fserrno == 0) { 622 g_fs = fs; 623 } 624 g_spdk_ready = true; 625 } 626 627 static void 628 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev, 629 __attribute__((unused)) void *event_ctx) 630 { 631 printf("Unsupported bdev event: type %d\n", type); 632 } 633 634 static void 635 rocksdb_run(__attribute__((unused)) void *arg1) 636 { 637 int rc; 638 639 rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL, 640 &g_bs_dev); 641 if (rc != 0) { 642 printf("Could not create blob bdev\n"); 643 spdk_app_stop(0); 644 exit(1); 645 } 646 647 g_lcore = spdk_env_get_first_core(); 648 649 printf("using bdev %s\n", g_bdev_name.c_str()); 650 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 651 } 652 653 static void 654 fs_unload_cb(__attribute__((unused)) void *ctx, 655 __attribute__((unused)) int fserrno) 656 { 657 assert(fserrno == 0); 658 659 spdk_app_stop(0); 660 } 661 662 static void 663 rocksdb_shutdown(void) 664 { 665 if (g_fs != NULL) { 666 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 667 } else { 668 fs_unload_cb(NULL, 0); 669 } 670 } 671 672 static void * 673 initialize_spdk(void *arg) 674 { 675 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 676 int rc; 677 678 rc = spdk_app_start(opts, rocksdb_run, NULL); 679 /* 680 * TODO: Revisit for case of internal failure of 681 * spdk_app_start(), itself. At this time, it's known 682 * the only application's use of spdk_app_stop() passes 683 * a zero; i.e. no fail (non-zero) cases so here we 684 * assume there was an internal failure and flag it 685 * so we can throw an exception. 686 */ 687 if (rc) { 688 g_spdk_start_failure = true; 689 } else { 690 spdk_app_fini(); 691 delete opts; 692 } 693 pthread_exit(NULL); 694 695 } 696 697 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 698 const std::string &bdev, uint64_t cache_size_in_mb) 699 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev) 700 { 701 struct spdk_app_opts *opts = new struct spdk_app_opts; 702 703 spdk_app_opts_init(opts, sizeof(*opts)); 704 opts->name = "rocksdb"; 705 opts->json_config_file = mConfig.c_str(); 706 opts->shutdown_cb = rocksdb_shutdown; 707 opts->tpoint_group_mask = "0x80"; 708 709 spdk_fs_set_cache_size(cache_size_in_mb); 710 g_bdev_name = mBdev; 711 712 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 713 while (!g_spdk_ready && !g_spdk_start_failure) 714 ; 715 if (g_spdk_start_failure) { 716 delete opts; 717 throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()"); 718 } 719 720 SpdkInitializeThread(); 721 } 722 723 SpdkEnv::~SpdkEnv() 724 { 725 /* This is a workaround for rocksdb test, we close the files if the rocksdb not 726 * do the work before the test quit. 727 */ 728 if (g_fs != NULL) { 729 spdk_fs_iter iter; 730 struct spdk_file *file; 731 732 if (!g_sync_args.channel) { 733 SpdkInitializeThread(); 734 } 735 736 iter = spdk_fs_iter_first(g_fs); 737 while (iter != NULL) { 738 file = spdk_fs_iter_get_file(iter); 739 spdk_file_close(file, g_sync_args.channel); 740 iter = spdk_fs_iter_next(iter); 741 } 742 } 743 744 spdk_app_start_shutdown(); 745 pthread_join(mSpdkTid, NULL); 746 } 747 748 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 749 const std::string &bdev, uint64_t cache_size_in_mb) 750 { 751 try { 752 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb); 753 if (g_fs != NULL) { 754 return spdk_env; 755 } else { 756 delete spdk_env; 757 return NULL; 758 } 759 } catch (SpdkAppStartException &e) { 760 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what()); 761 return NULL; 762 } catch (...) { 763 SPDK_ERRLOG("NewSpdkEnv: default exception caught"); 764 return NULL; 765 } 766 } 767 768 } // namespace rocksdb 769