1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "rocksdb/env.h" 7 #include <set> 8 #include <iostream> 9 #include <stdexcept> 10 11 extern "C" { 12 #include "spdk/env.h" 13 #include "spdk/event.h" 14 #include "spdk/blob.h" 15 #include "spdk/blobfs.h" 16 #include "spdk/blob_bdev.h" 17 #include "spdk/log.h" 18 #include "spdk/thread.h" 19 #include "spdk/bdev.h" 20 } 21 22 namespace rocksdb 23 { 24 25 struct spdk_filesystem *g_fs = NULL; 26 struct spdk_bs_dev *g_bs_dev; 27 uint32_t g_lcore = 0; 28 std::string g_bdev_name; 29 volatile bool g_spdk_ready = false; 30 volatile bool g_spdk_start_failure = false; 31 32 void SpdkInitializeThread(void); 33 34 class SpdkThreadCtx 35 { 36 public: 37 struct spdk_fs_thread_ctx *channel; 38 39 SpdkThreadCtx(void) : channel(NULL) 40 { 41 SpdkInitializeThread(); 42 } 43 44 ~SpdkThreadCtx(void) 45 { 46 if (channel) { 47 spdk_fs_free_thread_ctx(channel); 48 channel = NULL; 49 } 50 } 51 52 private: 53 SpdkThreadCtx(const SpdkThreadCtx &); 54 SpdkThreadCtx &operator=(const SpdkThreadCtx &); 55 }; 56 57 thread_local SpdkThreadCtx g_sync_args; 58 59 static void 60 set_channel() 61 { 62 if (g_fs != NULL && g_sync_args.channel == NULL) { 63 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 64 } 65 } 66 67 static void 68 __call_fn(void *arg1, void *arg2) 69 { 70 fs_request_fn fn; 71 72 fn = (fs_request_fn)arg1; 73 fn(arg2); 74 } 75 76 static void 77 __send_request(fs_request_fn fn, void *arg) 78 { 79 struct spdk_event *event; 80 81 event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg); 82 spdk_event_call(event); 83 } 84 85 static std::string 86 sanitize_path(const std::string &input, const std::string &mount_directory) 87 { 88 int index = 0; 89 std::string name; 90 std::string input_tmp; 91 92 input_tmp = input.substr(mount_directory.length(), input.length()); 93 for (const char &c : input_tmp) { 94 if (index == 0) { 95 if (c != '/') { 96 name = name.insert(index, 1, '/'); 97 index++; 98 } 99 name = name.insert(index, 1, c); 100 index++; 101 } else { 102 if (name[index - 1] == '/' && c == '/') { 103 continue; 104 } else { 105 name = name.insert(index, 1, c); 106 index++; 107 } 108 } 109 } 110 111 if (name[name.size() - 1] == '/') { 112 name = name.erase(name.size() - 1, 1); 113 } 114 return name; 115 } 116 117 class SpdkSequentialFile : public SequentialFile 118 { 119 struct spdk_file *mFile; 120 uint64_t mOffset; 121 public: 122 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 123 virtual ~SpdkSequentialFile(); 124 125 virtual Status Read(size_t n, Slice *result, char *scratch) override; 126 virtual Status Skip(uint64_t n) override; 127 virtual Status InvalidateCache(size_t offset, size_t length) override; 128 }; 129 130 SpdkSequentialFile::~SpdkSequentialFile(void) 131 { 132 set_channel(); 133 spdk_file_close(mFile, g_sync_args.channel); 134 } 135 136 Status 137 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 138 { 139 int64_t ret; 140 141 set_channel(); 142 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 143 if (ret >= 0) { 144 mOffset += ret; 145 *result = Slice(scratch, ret); 146 return Status::OK(); 147 } else { 148 errno = -ret; 149 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 150 } 151 } 152 153 Status 154 SpdkSequentialFile::Skip(uint64_t n) 155 { 156 mOffset += n; 157 return Status::OK(); 158 } 159 160 Status 161 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset, 162 __attribute__((unused)) size_t length) 163 { 164 return Status::OK(); 165 } 166 167 class SpdkRandomAccessFile : public RandomAccessFile 168 { 169 struct spdk_file *mFile; 170 public: 171 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {} 172 virtual ~SpdkRandomAccessFile(); 173 174 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 175 virtual Status InvalidateCache(size_t offset, size_t length) override; 176 }; 177 178 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 179 { 180 set_channel(); 181 spdk_file_close(mFile, g_sync_args.channel); 182 } 183 184 Status 185 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 186 { 187 int64_t rc; 188 189 set_channel(); 190 rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 191 if (rc >= 0) { 192 *result = Slice(scratch, rc); 193 return Status::OK(); 194 } else { 195 errno = -rc; 196 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 197 } 198 } 199 200 Status 201 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset, 202 __attribute__((unused)) size_t length) 203 { 204 return Status::OK(); 205 } 206 207 class SpdkWritableFile : public WritableFile 208 { 209 struct spdk_file *mFile; 210 uint64_t mSize; 211 212 public: 213 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {} 214 ~SpdkWritableFile() 215 { 216 if (mFile != NULL) { 217 Close(); 218 } 219 } 220 221 virtual void SetIOPriority(Env::IOPriority pri) 222 { 223 if (pri == Env::IO_HIGH) { 224 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 225 } 226 } 227 228 virtual Status Truncate(uint64_t size) override 229 { 230 int rc; 231 232 set_channel(); 233 rc = spdk_file_truncate(mFile, g_sync_args.channel, size); 234 if (!rc) { 235 mSize = size; 236 return Status::OK(); 237 } else { 238 errno = -rc; 239 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 240 } 241 } 242 virtual Status Close() override 243 { 244 set_channel(); 245 spdk_file_close(mFile, g_sync_args.channel); 246 mFile = NULL; 247 return Status::OK(); 248 } 249 using WritableFile::Append; 250 virtual Status Append(const Slice &data) override; 251 virtual Status Flush() override 252 { 253 return Status::OK(); 254 } 255 virtual Status Sync() override 256 { 257 int rc; 258 259 set_channel(); 260 rc = spdk_file_sync(mFile, g_sync_args.channel); 261 if (!rc) { 262 return Status::OK(); 263 } else { 264 errno = -rc; 265 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 266 } 267 } 268 virtual Status Fsync() override 269 { 270 int rc; 271 272 set_channel(); 273 rc = spdk_file_sync(mFile, g_sync_args.channel); 274 if (!rc) { 275 return Status::OK(); 276 } else { 277 errno = -rc; 278 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 279 } 280 } 281 virtual bool IsSyncThreadSafe() const override 282 { 283 return true; 284 } 285 virtual uint64_t GetFileSize() override 286 { 287 return mSize; 288 } 289 virtual Status InvalidateCache(__attribute__((unused)) size_t offset, 290 __attribute__((unused)) size_t length) override 291 { 292 return Status::OK(); 293 } 294 virtual Status Allocate(uint64_t offset, uint64_t len) override 295 { 296 int rc; 297 298 set_channel(); 299 rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 300 if (!rc) { 301 return Status::OK(); 302 } else { 303 errno = -rc; 304 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 305 } 306 } 307 virtual Status RangeSync(__attribute__((unused)) uint64_t offset, 308 __attribute__((unused)) uint64_t nbytes) override 309 { 310 int rc; 311 312 /* 313 * SPDK BlobFS does not have a range sync operation yet, so just sync 314 * the whole file. 315 */ 316 set_channel(); 317 rc = spdk_file_sync(mFile, g_sync_args.channel); 318 if (!rc) { 319 return Status::OK(); 320 } else { 321 errno = -rc; 322 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 323 } 324 } 325 virtual size_t GetUniqueId(char *id, size_t max_size) const override 326 { 327 int rc; 328 329 rc = spdk_file_get_id(mFile, id, max_size); 330 if (rc < 0) { 331 return 0; 332 } else { 333 return rc; 334 } 335 } 336 }; 337 338 Status 339 SpdkWritableFile::Append(const Slice &data) 340 { 341 int64_t rc; 342 343 set_channel(); 344 rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 345 if (rc >= 0) { 346 mSize += data.size(); 347 return Status::OK(); 348 } else { 349 errno = -rc; 350 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 351 } 352 } 353 354 class SpdkDirectory : public Directory 355 { 356 public: 357 SpdkDirectory() {} 358 ~SpdkDirectory() {} 359 Status Fsync() override 360 { 361 return Status::OK(); 362 } 363 }; 364 365 class SpdkAppStartException : public std::runtime_error 366 { 367 public: 368 SpdkAppStartException(std::string mess): std::runtime_error(mess) {} 369 }; 370 371 class SpdkEnv : public EnvWrapper 372 { 373 private: 374 pthread_t mSpdkTid; 375 std::string mDirectory; 376 std::string mConfig; 377 std::string mBdev; 378 379 public: 380 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 381 const std::string &bdev, uint64_t cache_size_in_mb); 382 383 virtual ~SpdkEnv(); 384 385 virtual Status NewSequentialFile(const std::string &fname, 386 std::unique_ptr<SequentialFile> *result, 387 const EnvOptions &options) override 388 { 389 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 390 struct spdk_file *file; 391 int rc; 392 393 std::string name = sanitize_path(fname, mDirectory); 394 set_channel(); 395 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 396 name.c_str(), 0, &file); 397 if (rc == 0) { 398 result->reset(new SpdkSequentialFile(file)); 399 return Status::OK(); 400 } else { 401 /* Myrocks engine uses errno(ENOENT) as one 402 * special condition, for the purpose to 403 * support MySQL, set the errno to right value. 404 */ 405 errno = -rc; 406 return Status::IOError(name, strerror(errno)); 407 } 408 } else { 409 return EnvWrapper::NewSequentialFile(fname, result, options); 410 } 411 } 412 413 virtual Status NewRandomAccessFile(const std::string &fname, 414 std::unique_ptr<RandomAccessFile> *result, 415 const EnvOptions &options) override 416 { 417 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 418 std::string name = sanitize_path(fname, mDirectory); 419 struct spdk_file *file; 420 int rc; 421 422 set_channel(); 423 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 424 name.c_str(), 0, &file); 425 if (rc == 0) { 426 result->reset(new SpdkRandomAccessFile(file)); 427 return Status::OK(); 428 } else { 429 errno = -rc; 430 return Status::IOError(name, strerror(errno)); 431 } 432 } else { 433 return EnvWrapper::NewRandomAccessFile(fname, result, options); 434 } 435 } 436 437 virtual Status NewWritableFile(const std::string &fname, 438 std::unique_ptr<WritableFile> *result, 439 const EnvOptions &options) override 440 { 441 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 442 std::string name = sanitize_path(fname, mDirectory); 443 struct spdk_file *file; 444 int rc; 445 446 set_channel(); 447 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 448 SPDK_BLOBFS_OPEN_CREATE, &file); 449 if (rc == 0) { 450 result->reset(new SpdkWritableFile(file)); 451 return Status::OK(); 452 } else { 453 errno = -rc; 454 return Status::IOError(name, strerror(errno)); 455 } 456 } else { 457 return EnvWrapper::NewWritableFile(fname, result, options); 458 } 459 } 460 461 virtual Status ReuseWritableFile(const std::string &fname, 462 const std::string &old_fname, 463 std::unique_ptr<WritableFile> *result, 464 const EnvOptions &options) override 465 { 466 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); 467 } 468 469 virtual Status NewDirectory(__attribute__((unused)) const std::string &name, 470 std::unique_ptr<Directory> *result) override 471 { 472 result->reset(new SpdkDirectory()); 473 return Status::OK(); 474 } 475 virtual Status FileExists(const std::string &fname) override 476 { 477 struct spdk_file_stat stat; 478 int rc; 479 std::string name = sanitize_path(fname, mDirectory); 480 481 set_channel(); 482 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 483 if (rc == 0) { 484 return Status::OK(); 485 } 486 return EnvWrapper::FileExists(fname); 487 } 488 virtual Status RenameFile(const std::string &src, const std::string &t) override 489 { 490 int rc; 491 std::string src_name = sanitize_path(src, mDirectory); 492 std::string target_name = sanitize_path(t, mDirectory); 493 494 set_channel(); 495 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 496 src_name.c_str(), target_name.c_str()); 497 if (rc == -ENOENT) { 498 return EnvWrapper::RenameFile(src, t); 499 } 500 return Status::OK(); 501 } 502 virtual Status LinkFile(__attribute__((unused)) const std::string &src, 503 __attribute__((unused)) const std::string &t) override 504 { 505 return Status::NotSupported("SpdkEnv does not support LinkFile"); 506 } 507 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 508 { 509 struct spdk_file_stat stat; 510 int rc; 511 std::string name = sanitize_path(fname, mDirectory); 512 513 set_channel(); 514 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 515 if (rc == -ENOENT) { 516 return EnvWrapper::GetFileSize(fname, size); 517 } 518 *size = stat.size; 519 return Status::OK(); 520 } 521 virtual Status DeleteFile(const std::string &fname) override 522 { 523 int rc; 524 std::string name = sanitize_path(fname, mDirectory); 525 526 set_channel(); 527 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); 528 if (rc == -ENOENT) { 529 return EnvWrapper::DeleteFile(fname); 530 } 531 return Status::OK(); 532 } 533 virtual Status LockFile(const std::string &fname, FileLock **lock) override 534 { 535 std::string name = sanitize_path(fname, mDirectory); 536 int64_t rc; 537 538 set_channel(); 539 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 540 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 541 if (!rc) { 542 return Status::OK(); 543 } else { 544 errno = -rc; 545 return Status::IOError(name, strerror(errno)); 546 } 547 } 548 virtual Status UnlockFile(FileLock *lock) override 549 { 550 set_channel(); 551 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 552 return Status::OK(); 553 } 554 virtual Status GetChildren(const std::string &dir, 555 std::vector<std::string> *result) override 556 { 557 std::string::size_type pos; 558 std::set<std::string> dir_and_file_set; 559 std::string full_path; 560 std::string filename; 561 std::string dir_name; 562 563 if (dir.find("archive") != std::string::npos) { 564 return Status::OK(); 565 } 566 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 567 spdk_fs_iter iter; 568 struct spdk_file *file; 569 dir_name = sanitize_path(dir, mDirectory); 570 571 iter = spdk_fs_iter_first(g_fs); 572 while (iter != NULL) { 573 file = spdk_fs_iter_get_file(iter); 574 full_path = spdk_file_get_name(file); 575 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) { 576 iter = spdk_fs_iter_next(iter); 577 continue; 578 } 579 pos = full_path.find("/", dir_name.length() + 1); 580 581 if (pos != std::string::npos) { 582 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1); 583 } else { 584 filename = full_path.substr(dir_name.length() + 1); 585 } 586 dir_and_file_set.insert(filename); 587 iter = spdk_fs_iter_next(iter); 588 } 589 590 for (auto &s : dir_and_file_set) { 591 result->push_back(s); 592 } 593 594 result->push_back("."); 595 result->push_back(".."); 596 597 return Status::OK(); 598 } 599 return EnvWrapper::GetChildren(dir, result); 600 } 601 }; 602 603 /* The thread local constructor doesn't work for the main thread, since 604 * the filesystem hasn't been loaded yet. So we break out this 605 * SpdkInitializeThread function, so that the main thread can explicitly 606 * call it after the filesystem has been loaded. 607 */ 608 void SpdkInitializeThread(void) 609 { 610 if (g_fs != NULL) { 611 if (g_sync_args.channel) { 612 spdk_fs_free_thread_ctx(g_sync_args.channel); 613 } 614 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 615 } 616 } 617 618 static void 619 fs_load_cb(__attribute__((unused)) void *ctx, 620 struct spdk_filesystem *fs, int fserrno) 621 { 622 if (fserrno == 0) { 623 g_fs = fs; 624 } 625 g_spdk_ready = true; 626 } 627 628 static void 629 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev, 630 __attribute__((unused)) void *event_ctx) 631 { 632 printf("Unsupported bdev event: type %d\n", type); 633 } 634 635 static void 636 rocksdb_run(__attribute__((unused)) void *arg1) 637 { 638 int rc; 639 640 rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL, 641 &g_bs_dev); 642 if (rc != 0) { 643 printf("Could not create blob bdev\n"); 644 spdk_app_stop(0); 645 exit(1); 646 } 647 648 g_lcore = spdk_env_get_first_core(); 649 650 printf("using bdev %s\n", g_bdev_name.c_str()); 651 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 652 } 653 654 static void 655 fs_unload_cb(__attribute__((unused)) void *ctx, 656 __attribute__((unused)) int fserrno) 657 { 658 assert(fserrno == 0); 659 660 spdk_app_stop(0); 661 } 662 663 static void 664 rocksdb_shutdown(void) 665 { 666 if (g_fs != NULL) { 667 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 668 } else { 669 fs_unload_cb(NULL, 0); 670 } 671 } 672 673 static void * 674 initialize_spdk(void *arg) 675 { 676 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 677 int rc; 678 679 rc = spdk_app_start(opts, rocksdb_run, NULL); 680 /* 681 * TODO: Revisit for case of internal failure of 682 * spdk_app_start(), itself. At this time, it's known 683 * the only application's use of spdk_app_stop() passes 684 * a zero; i.e. no fail (non-zero) cases so here we 685 * assume there was an internal failure and flag it 686 * so we can throw an exception. 687 */ 688 if (rc) { 689 g_spdk_start_failure = true; 690 } else { 691 spdk_app_fini(); 692 delete opts; 693 } 694 pthread_exit(NULL); 695 696 } 697 698 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 699 const std::string &bdev, uint64_t cache_size_in_mb) 700 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev) 701 { 702 struct spdk_app_opts *opts = new struct spdk_app_opts; 703 704 spdk_app_opts_init(opts, sizeof(*opts)); 705 opts->name = "rocksdb"; 706 opts->json_config_file = mConfig.c_str(); 707 opts->shutdown_cb = rocksdb_shutdown; 708 opts->tpoint_group_mask = "0x80"; 709 710 spdk_fs_set_cache_size(cache_size_in_mb); 711 g_bdev_name = mBdev; 712 713 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 714 while (!g_spdk_ready && !g_spdk_start_failure) 715 ; 716 if (g_spdk_start_failure) { 717 delete opts; 718 throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()"); 719 } 720 721 SpdkInitializeThread(); 722 } 723 724 SpdkEnv::~SpdkEnv() 725 { 726 /* This is a workaround for rocksdb test, we close the files if the rocksdb not 727 * do the work before the test quit. 728 */ 729 if (g_fs != NULL) { 730 spdk_fs_iter iter; 731 struct spdk_file *file; 732 733 if (!g_sync_args.channel) { 734 SpdkInitializeThread(); 735 } 736 737 iter = spdk_fs_iter_first(g_fs); 738 while (iter != NULL) { 739 file = spdk_fs_iter_get_file(iter); 740 spdk_file_close(file, g_sync_args.channel); 741 iter = spdk_fs_iter_next(iter); 742 } 743 } 744 745 spdk_app_start_shutdown(); 746 pthread_join(mSpdkTid, NULL); 747 } 748 749 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 750 const std::string &bdev, uint64_t cache_size_in_mb) 751 { 752 try { 753 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb); 754 if (g_fs != NULL) { 755 return spdk_env; 756 } else { 757 delete spdk_env; 758 return NULL; 759 } 760 } catch (SpdkAppStartException &e) { 761 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what()); 762 return NULL; 763 } catch (...) { 764 SPDK_ERRLOG("NewSpdkEnv: default exception caught"); 765 return NULL; 766 } 767 } 768 769 } // namespace rocksdb 770