1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "rocksdb/env.h" 7 #include <set> 8 #include <iostream> 9 #include <stdexcept> 10 11 extern "C" { 12 #include "spdk/env.h" 13 #include "spdk/event.h" 14 #include "spdk/blob.h" 15 #include "spdk/blobfs.h" 16 #include "spdk/blob_bdev.h" 17 #include "spdk/log.h" 18 #include "spdk/thread.h" 19 #include "spdk/bdev.h" 20 } 21 22 namespace rocksdb 23 { 24 25 struct spdk_filesystem *g_fs = NULL; 26 struct spdk_bs_dev *g_bs_dev; 27 struct spdk_thread *g_app_thread; 28 std::string g_bdev_name; 29 volatile bool g_spdk_ready = false; 30 volatile bool g_spdk_start_failure = false; 31 32 void SpdkInitializeThread(void); 33 34 class SpdkThreadCtx 35 { 36 public: 37 struct spdk_fs_thread_ctx *channel; 38 39 SpdkThreadCtx(void) : channel(NULL) 40 { 41 SpdkInitializeThread(); 42 } 43 44 ~SpdkThreadCtx(void) 45 { 46 if (channel) { 47 spdk_fs_free_thread_ctx(channel); 48 channel = NULL; 49 } 50 } 51 52 private: 53 SpdkThreadCtx(const SpdkThreadCtx &); 54 SpdkThreadCtx &operator=(const SpdkThreadCtx &); 55 }; 56 57 thread_local SpdkThreadCtx g_sync_args; 58 59 static void 60 set_channel() 61 { 62 if (g_fs != NULL && g_sync_args.channel == NULL) { 63 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 64 } 65 } 66 67 static void 68 __send_request(fs_request_fn fn, void *arg) 69 { 70 spdk_thread_send_msg(g_app_thread, fn, arg); 71 } 72 73 static std::string 74 sanitize_path(const std::string &input, const std::string &mount_directory) 75 { 76 int index = 0; 77 std::string name; 78 std::string input_tmp; 79 80 input_tmp = input.substr(mount_directory.length(), input.length()); 81 for (const char &c : input_tmp) { 82 if (index == 0) { 83 if (c != '/') { 84 name = name.insert(index, 1, '/'); 85 index++; 86 } 87 name = name.insert(index, 1, c); 88 index++; 89 } else { 90 if (name[index - 1] == '/' && c == '/') { 91 continue; 92 } else { 93 name = name.insert(index, 1, c); 94 index++; 95 } 96 } 97 } 98 99 if (name[name.size() - 1] == '/') { 100 name = name.erase(name.size() - 1, 1); 101 } 102 return name; 103 } 104 105 class SpdkSequentialFile : public SequentialFile 106 { 107 struct spdk_file *mFile; 108 uint64_t mOffset; 109 public: 110 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 111 virtual ~SpdkSequentialFile(); 112 113 virtual Status Read(size_t n, Slice *result, char *scratch) override; 114 virtual Status Skip(uint64_t n) override; 115 virtual Status InvalidateCache(size_t offset, size_t length) override; 116 }; 117 118 SpdkSequentialFile::~SpdkSequentialFile(void) 119 { 120 set_channel(); 121 spdk_file_close(mFile, g_sync_args.channel); 122 } 123 124 Status 125 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 126 { 127 int64_t ret; 128 129 set_channel(); 130 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 131 if (ret >= 0) { 132 mOffset += ret; 133 *result = Slice(scratch, ret); 134 return Status::OK(); 135 } else { 136 errno = -ret; 137 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 138 } 139 } 140 141 Status 142 SpdkSequentialFile::Skip(uint64_t n) 143 { 144 mOffset += n; 145 return Status::OK(); 146 } 147 148 Status 149 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset, 150 __attribute__((unused)) size_t length) 151 { 152 return Status::OK(); 153 } 154 155 class SpdkRandomAccessFile : public RandomAccessFile 156 { 157 struct spdk_file *mFile; 158 public: 159 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {} 160 virtual ~SpdkRandomAccessFile(); 161 162 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 163 virtual Status InvalidateCache(size_t offset, size_t length) override; 164 }; 165 166 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 167 { 168 set_channel(); 169 spdk_file_close(mFile, g_sync_args.channel); 170 } 171 172 Status 173 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 174 { 175 int64_t rc; 176 177 set_channel(); 178 rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 179 if (rc >= 0) { 180 *result = Slice(scratch, rc); 181 return Status::OK(); 182 } else { 183 errno = -rc; 184 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 185 } 186 } 187 188 Status 189 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset, 190 __attribute__((unused)) size_t length) 191 { 192 return Status::OK(); 193 } 194 195 class SpdkWritableFile : public WritableFile 196 { 197 struct spdk_file *mFile; 198 uint64_t mSize; 199 200 public: 201 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {} 202 ~SpdkWritableFile() 203 { 204 if (mFile != NULL) { 205 Close(); 206 } 207 } 208 209 virtual void SetIOPriority(Env::IOPriority pri) 210 { 211 if (pri == Env::IO_HIGH) { 212 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 213 } 214 } 215 216 virtual Status Truncate(uint64_t size) override 217 { 218 int rc; 219 220 set_channel(); 221 rc = spdk_file_truncate(mFile, g_sync_args.channel, size); 222 if (!rc) { 223 mSize = size; 224 return Status::OK(); 225 } else { 226 errno = -rc; 227 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 228 } 229 } 230 virtual Status Close() override 231 { 232 set_channel(); 233 spdk_file_close(mFile, g_sync_args.channel); 234 mFile = NULL; 235 return Status::OK(); 236 } 237 using WritableFile::Append; 238 virtual Status Append(const Slice &data) override; 239 virtual Status Flush() override 240 { 241 return Status::OK(); 242 } 243 virtual Status Sync() override 244 { 245 int rc; 246 247 set_channel(); 248 rc = spdk_file_sync(mFile, g_sync_args.channel); 249 if (!rc) { 250 return Status::OK(); 251 } else { 252 errno = -rc; 253 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 254 } 255 } 256 virtual Status Fsync() override 257 { 258 int rc; 259 260 set_channel(); 261 rc = spdk_file_sync(mFile, g_sync_args.channel); 262 if (!rc) { 263 return Status::OK(); 264 } else { 265 errno = -rc; 266 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 267 } 268 } 269 virtual bool IsSyncThreadSafe() const override 270 { 271 return true; 272 } 273 virtual uint64_t GetFileSize() override 274 { 275 return mSize; 276 } 277 virtual Status InvalidateCache(__attribute__((unused)) size_t offset, 278 __attribute__((unused)) size_t length) override 279 { 280 return Status::OK(); 281 } 282 virtual Status Allocate(uint64_t offset, uint64_t len) override 283 { 284 int rc; 285 286 set_channel(); 287 rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 288 if (!rc) { 289 return Status::OK(); 290 } else { 291 errno = -rc; 292 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 293 } 294 } 295 virtual Status RangeSync(__attribute__((unused)) uint64_t offset, 296 __attribute__((unused)) uint64_t nbytes) override 297 { 298 int rc; 299 300 /* 301 * SPDK BlobFS does not have a range sync operation yet, so just sync 302 * the whole file. 303 */ 304 set_channel(); 305 rc = spdk_file_sync(mFile, g_sync_args.channel); 306 if (!rc) { 307 return Status::OK(); 308 } else { 309 errno = -rc; 310 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 311 } 312 } 313 virtual size_t GetUniqueId(char *id, size_t max_size) const override 314 { 315 int rc; 316 317 rc = spdk_file_get_id(mFile, id, max_size); 318 if (rc < 0) { 319 return 0; 320 } else { 321 return rc; 322 } 323 } 324 }; 325 326 Status 327 SpdkWritableFile::Append(const Slice &data) 328 { 329 int64_t rc; 330 331 set_channel(); 332 rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 333 if (rc >= 0) { 334 mSize += data.size(); 335 return Status::OK(); 336 } else { 337 errno = -rc; 338 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 339 } 340 } 341 342 class SpdkDirectory : public Directory 343 { 344 public: 345 SpdkDirectory() {} 346 ~SpdkDirectory() {} 347 Status Fsync() override 348 { 349 return Status::OK(); 350 } 351 }; 352 353 class SpdkAppStartException : public std::runtime_error 354 { 355 public: 356 SpdkAppStartException(std::string mess): std::runtime_error(mess) {} 357 }; 358 359 class SpdkEnv : public EnvWrapper 360 { 361 private: 362 pthread_t mSpdkTid; 363 std::string mDirectory; 364 std::string mConfig; 365 std::string mBdev; 366 367 public: 368 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 369 const std::string &bdev, uint64_t cache_size_in_mb); 370 371 virtual ~SpdkEnv(); 372 373 virtual Status NewSequentialFile(const std::string &fname, 374 std::unique_ptr<SequentialFile> *result, 375 const EnvOptions &options) override 376 { 377 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 378 struct spdk_file *file; 379 int rc; 380 381 std::string name = sanitize_path(fname, mDirectory); 382 set_channel(); 383 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 384 name.c_str(), 0, &file); 385 if (rc == 0) { 386 result->reset(new SpdkSequentialFile(file)); 387 return Status::OK(); 388 } else { 389 /* Myrocks engine uses errno(ENOENT) as one 390 * special condition, for the purpose to 391 * support MySQL, set the errno to right value. 392 */ 393 errno = -rc; 394 return Status::IOError(name, strerror(errno)); 395 } 396 } else { 397 return EnvWrapper::NewSequentialFile(fname, result, options); 398 } 399 } 400 401 virtual Status NewRandomAccessFile(const std::string &fname, 402 std::unique_ptr<RandomAccessFile> *result, 403 const EnvOptions &options) override 404 { 405 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 406 std::string name = sanitize_path(fname, mDirectory); 407 struct spdk_file *file; 408 int rc; 409 410 set_channel(); 411 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 412 name.c_str(), 0, &file); 413 if (rc == 0) { 414 result->reset(new SpdkRandomAccessFile(file)); 415 return Status::OK(); 416 } else { 417 errno = -rc; 418 return Status::IOError(name, strerror(errno)); 419 } 420 } else { 421 return EnvWrapper::NewRandomAccessFile(fname, result, options); 422 } 423 } 424 425 virtual Status NewWritableFile(const std::string &fname, 426 std::unique_ptr<WritableFile> *result, 427 const EnvOptions &options) override 428 { 429 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 430 std::string name = sanitize_path(fname, mDirectory); 431 struct spdk_file *file; 432 int rc; 433 434 set_channel(); 435 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 436 SPDK_BLOBFS_OPEN_CREATE, &file); 437 if (rc == 0) { 438 result->reset(new SpdkWritableFile(file)); 439 return Status::OK(); 440 } else { 441 errno = -rc; 442 return Status::IOError(name, strerror(errno)); 443 } 444 } else { 445 return EnvWrapper::NewWritableFile(fname, result, options); 446 } 447 } 448 449 virtual Status ReuseWritableFile(const std::string &fname, 450 const std::string &old_fname, 451 std::unique_ptr<WritableFile> *result, 452 const EnvOptions &options) override 453 { 454 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); 455 } 456 457 virtual Status NewDirectory(__attribute__((unused)) const std::string &name, 458 std::unique_ptr<Directory> *result) override 459 { 460 result->reset(new SpdkDirectory()); 461 return Status::OK(); 462 } 463 virtual Status FileExists(const std::string &fname) override 464 { 465 struct spdk_file_stat stat; 466 int rc; 467 std::string name = sanitize_path(fname, mDirectory); 468 469 set_channel(); 470 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 471 if (rc == 0) { 472 return Status::OK(); 473 } 474 return EnvWrapper::FileExists(fname); 475 } 476 virtual Status RenameFile(const std::string &src, const std::string &t) override 477 { 478 int rc; 479 std::string src_name = sanitize_path(src, mDirectory); 480 std::string target_name = sanitize_path(t, mDirectory); 481 482 set_channel(); 483 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 484 src_name.c_str(), target_name.c_str()); 485 if (rc == -ENOENT) { 486 return EnvWrapper::RenameFile(src, t); 487 } 488 return Status::OK(); 489 } 490 virtual Status LinkFile(__attribute__((unused)) const std::string &src, 491 __attribute__((unused)) const std::string &t) override 492 { 493 return Status::NotSupported("SpdkEnv does not support LinkFile"); 494 } 495 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 496 { 497 struct spdk_file_stat stat; 498 int rc; 499 std::string name = sanitize_path(fname, mDirectory); 500 501 set_channel(); 502 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 503 if (rc == -ENOENT) { 504 return EnvWrapper::GetFileSize(fname, size); 505 } 506 *size = stat.size; 507 return Status::OK(); 508 } 509 virtual Status DeleteFile(const std::string &fname) override 510 { 511 int rc; 512 std::string name = sanitize_path(fname, mDirectory); 513 514 set_channel(); 515 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); 516 if (rc == -ENOENT) { 517 return EnvWrapper::DeleteFile(fname); 518 } 519 return Status::OK(); 520 } 521 virtual Status LockFile(const std::string &fname, FileLock **lock) override 522 { 523 std::string name = sanitize_path(fname, mDirectory); 524 int64_t rc; 525 526 set_channel(); 527 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 528 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 529 if (!rc) { 530 return Status::OK(); 531 } else { 532 errno = -rc; 533 return Status::IOError(name, strerror(errno)); 534 } 535 } 536 virtual Status UnlockFile(FileLock *lock) override 537 { 538 set_channel(); 539 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 540 return Status::OK(); 541 } 542 virtual Status GetChildren(const std::string &dir, 543 std::vector<std::string> *result) override 544 { 545 std::string::size_type pos; 546 std::set<std::string> dir_and_file_set; 547 std::string full_path; 548 std::string filename; 549 std::string dir_name; 550 551 if (dir.find("archive") != std::string::npos) { 552 return Status::OK(); 553 } 554 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 555 spdk_fs_iter iter; 556 struct spdk_file *file; 557 dir_name = sanitize_path(dir, mDirectory); 558 559 iter = spdk_fs_iter_first(g_fs); 560 while (iter != NULL) { 561 file = spdk_fs_iter_get_file(iter); 562 full_path = spdk_file_get_name(file); 563 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) { 564 iter = spdk_fs_iter_next(iter); 565 continue; 566 } 567 pos = full_path.find("/", dir_name.length() + 1); 568 569 if (pos != std::string::npos) { 570 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1); 571 } else { 572 filename = full_path.substr(dir_name.length() + 1); 573 } 574 dir_and_file_set.insert(filename); 575 iter = spdk_fs_iter_next(iter); 576 } 577 578 for (auto &s : dir_and_file_set) { 579 result->push_back(s); 580 } 581 582 result->push_back("."); 583 result->push_back(".."); 584 585 return Status::OK(); 586 } 587 return EnvWrapper::GetChildren(dir, result); 588 } 589 }; 590 591 /* The thread local constructor doesn't work for the main thread, since 592 * the filesystem hasn't been loaded yet. So we break out this 593 * SpdkInitializeThread function, so that the main thread can explicitly 594 * call it after the filesystem has been loaded. 595 */ 596 void SpdkInitializeThread(void) 597 { 598 if (g_fs != NULL) { 599 if (g_sync_args.channel) { 600 spdk_fs_free_thread_ctx(g_sync_args.channel); 601 } 602 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 603 } 604 } 605 606 static void 607 fs_load_cb(__attribute__((unused)) void *ctx, 608 struct spdk_filesystem *fs, int fserrno) 609 { 610 if (fserrno == 0) { 611 g_fs = fs; 612 } 613 g_spdk_ready = true; 614 } 615 616 static void 617 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev, 618 __attribute__((unused)) void *event_ctx) 619 { 620 printf("Unsupported bdev event: type %d\n", type); 621 } 622 623 static void 624 rocksdb_run(__attribute__((unused)) void *arg1) 625 { 626 int rc; 627 628 rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL, 629 &g_bs_dev); 630 if (rc != 0) { 631 printf("Could not create blob bdev\n"); 632 spdk_app_stop(0); 633 exit(1); 634 } 635 636 g_app_thread = spdk_get_thread(); 637 638 printf("using bdev %s\n", g_bdev_name.c_str()); 639 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 640 } 641 642 static void 643 fs_unload_cb(__attribute__((unused)) void *ctx, 644 __attribute__((unused)) int fserrno) 645 { 646 assert(fserrno == 0); 647 648 spdk_app_stop(0); 649 } 650 651 static void 652 rocksdb_shutdown(void) 653 { 654 if (g_fs != NULL) { 655 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 656 } else { 657 fs_unload_cb(NULL, 0); 658 } 659 } 660 661 static void * 662 initialize_spdk(void *arg) 663 { 664 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 665 int rc; 666 667 rc = spdk_app_start(opts, rocksdb_run, NULL); 668 /* 669 * TODO: Revisit for case of internal failure of 670 * spdk_app_start(), itself. At this time, it's known 671 * the only application's use of spdk_app_stop() passes 672 * a zero; i.e. no fail (non-zero) cases so here we 673 * assume there was an internal failure and flag it 674 * so we can throw an exception. 675 */ 676 if (rc) { 677 g_spdk_start_failure = true; 678 } else { 679 spdk_app_fini(); 680 delete opts; 681 } 682 pthread_exit(NULL); 683 684 } 685 686 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 687 const std::string &bdev, uint64_t cache_size_in_mb) 688 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev) 689 { 690 struct spdk_app_opts *opts = new struct spdk_app_opts; 691 692 spdk_app_opts_init(opts, sizeof(*opts)); 693 opts->name = "rocksdb"; 694 opts->json_config_file = mConfig.c_str(); 695 opts->shutdown_cb = rocksdb_shutdown; 696 opts->tpoint_group_mask = "0x80"; 697 698 spdk_fs_set_cache_size(cache_size_in_mb); 699 g_bdev_name = mBdev; 700 701 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 702 while (!g_spdk_ready && !g_spdk_start_failure) 703 ; 704 if (g_spdk_start_failure) { 705 delete opts; 706 throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()"); 707 } 708 709 SpdkInitializeThread(); 710 } 711 712 SpdkEnv::~SpdkEnv() 713 { 714 /* This is a workaround for rocksdb test, we close the files if the rocksdb not 715 * do the work before the test quit. 716 */ 717 if (g_fs != NULL) { 718 spdk_fs_iter iter; 719 struct spdk_file *file; 720 721 if (!g_sync_args.channel) { 722 SpdkInitializeThread(); 723 } 724 725 iter = spdk_fs_iter_first(g_fs); 726 while (iter != NULL) { 727 file = spdk_fs_iter_get_file(iter); 728 spdk_file_close(file, g_sync_args.channel); 729 iter = spdk_fs_iter_next(iter); 730 } 731 } 732 733 spdk_app_start_shutdown(); 734 pthread_join(mSpdkTid, NULL); 735 } 736 737 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 738 const std::string &bdev, uint64_t cache_size_in_mb) 739 { 740 try { 741 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb); 742 if (g_fs != NULL) { 743 return spdk_env; 744 } else { 745 delete spdk_env; 746 return NULL; 747 } 748 } catch (SpdkAppStartException &e) { 749 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what()); 750 return NULL; 751 } catch (...) { 752 SPDK_ERRLOG("NewSpdkEnv: default exception caught"); 753 return NULL; 754 } 755 } 756 757 } // namespace rocksdb 758