1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "rocksdb/env.h" 35 #include <set> 36 #include <iostream> 37 #include <stdexcept> 38 39 extern "C" { 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/blob.h" 43 #include "spdk/blobfs.h" 44 #include "spdk/blob_bdev.h" 45 #include "spdk/log.h" 46 #include "spdk/thread.h" 47 #include "spdk/bdev.h" 48 } 49 50 namespace rocksdb 51 { 52 53 struct spdk_filesystem *g_fs = NULL; 54 struct spdk_bs_dev *g_bs_dev; 55 uint32_t g_lcore = 0; 56 std::string g_bdev_name; 57 volatile bool g_spdk_ready = false; 58 volatile bool g_spdk_start_failure = false; 59 60 void SpdkInitializeThread(void); 61 62 class SpdkThreadCtx 63 { 64 public: 65 struct spdk_fs_thread_ctx *channel; 66 67 SpdkThreadCtx(void) : channel(NULL) 68 { 69 SpdkInitializeThread(); 70 } 71 72 ~SpdkThreadCtx(void) 73 { 74 if (channel) { 75 spdk_fs_free_thread_ctx(channel); 76 channel = NULL; 77 } 78 } 79 80 private: 81 SpdkThreadCtx(const SpdkThreadCtx &); 82 SpdkThreadCtx &operator=(const SpdkThreadCtx &); 83 }; 84 85 thread_local SpdkThreadCtx g_sync_args; 86 87 static void 88 set_channel() 89 { 90 struct spdk_thread *thread; 91 92 if (g_fs != NULL && g_sync_args.channel == NULL) { 93 thread = spdk_thread_create("spdK_rocksdb", NULL); 94 spdk_set_thread(thread); 95 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 96 } 97 } 98 99 static void 100 __call_fn(void *arg1, void *arg2) 101 { 102 fs_request_fn fn; 103 104 fn = (fs_request_fn)arg1; 105 fn(arg2); 106 } 107 108 static void 109 __send_request(fs_request_fn fn, void *arg) 110 { 111 struct spdk_event *event; 112 113 event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg); 114 spdk_event_call(event); 115 } 116 117 static std::string 118 sanitize_path(const std::string &input, const std::string &mount_directory) 119 { 120 int index = 0; 121 std::string name; 122 std::string input_tmp; 123 124 input_tmp = input.substr(mount_directory.length(), input.length()); 125 for (const char &c : input_tmp) { 126 if (index == 0) { 127 if (c != '/') { 128 name = name.insert(index, 1, '/'); 129 index++; 130 } 131 name = name.insert(index, 1, c); 132 index++; 133 } else { 134 if (name[index - 1] == '/' && c == '/') { 135 continue; 136 } else { 137 name = name.insert(index, 1, c); 138 index++; 139 } 140 } 141 } 142 143 if (name[name.size() - 1] == '/') { 144 name = name.erase(name.size() - 1, 1); 145 } 146 return name; 147 } 148 149 class SpdkSequentialFile : public SequentialFile 150 { 151 struct spdk_file *mFile; 152 uint64_t mOffset; 153 public: 154 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 155 virtual ~SpdkSequentialFile(); 156 157 virtual Status Read(size_t n, Slice *result, char *scratch) override; 158 virtual Status Skip(uint64_t n) override; 159 virtual Status InvalidateCache(size_t offset, size_t length) override; 160 }; 161 162 SpdkSequentialFile::~SpdkSequentialFile(void) 163 { 164 set_channel(); 165 spdk_file_close(mFile, g_sync_args.channel); 166 } 167 168 Status 169 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 170 { 171 int64_t ret; 172 173 set_channel(); 174 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 175 if (ret >= 0) { 176 mOffset += ret; 177 *result = Slice(scratch, ret); 178 return Status::OK(); 179 } else { 180 errno = -ret; 181 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 182 } 183 } 184 185 Status 186 SpdkSequentialFile::Skip(uint64_t n) 187 { 188 mOffset += n; 189 return Status::OK(); 190 } 191 192 Status 193 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset, 194 __attribute__((unused)) size_t length) 195 { 196 return Status::OK(); 197 } 198 199 class SpdkRandomAccessFile : public RandomAccessFile 200 { 201 struct spdk_file *mFile; 202 public: 203 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {} 204 virtual ~SpdkRandomAccessFile(); 205 206 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 207 virtual Status InvalidateCache(size_t offset, size_t length) override; 208 }; 209 210 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 211 { 212 set_channel(); 213 spdk_file_close(mFile, g_sync_args.channel); 214 } 215 216 Status 217 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 218 { 219 int64_t rc; 220 221 set_channel(); 222 rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 223 if (rc >= 0) { 224 *result = Slice(scratch, n); 225 return Status::OK(); 226 } else { 227 errno = -rc; 228 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 229 } 230 } 231 232 Status 233 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset, 234 __attribute__((unused)) size_t length) 235 { 236 return Status::OK(); 237 } 238 239 class SpdkWritableFile : public WritableFile 240 { 241 struct spdk_file *mFile; 242 uint64_t mSize; 243 244 public: 245 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {} 246 ~SpdkWritableFile() 247 { 248 if (mFile != NULL) { 249 Close(); 250 } 251 } 252 253 virtual void SetIOPriority(Env::IOPriority pri) 254 { 255 if (pri == Env::IO_HIGH) { 256 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 257 } 258 } 259 260 virtual Status Truncate(uint64_t size) override 261 { 262 int rc; 263 264 set_channel(); 265 rc = spdk_file_truncate(mFile, g_sync_args.channel, size); 266 if (!rc) { 267 mSize = size; 268 return Status::OK(); 269 } else { 270 errno = -rc; 271 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 272 } 273 } 274 virtual Status Close() override 275 { 276 set_channel(); 277 spdk_file_close(mFile, g_sync_args.channel); 278 mFile = NULL; 279 return Status::OK(); 280 } 281 virtual Status Append(const Slice &data) override; 282 virtual Status Flush() override 283 { 284 return Status::OK(); 285 } 286 virtual Status Sync() override 287 { 288 int rc; 289 290 set_channel(); 291 rc = spdk_file_sync(mFile, g_sync_args.channel); 292 if (!rc) { 293 return Status::OK(); 294 } else { 295 errno = -rc; 296 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 297 } 298 } 299 virtual Status Fsync() override 300 { 301 int rc; 302 303 set_channel(); 304 rc = spdk_file_sync(mFile, g_sync_args.channel); 305 if (!rc) { 306 return Status::OK(); 307 } else { 308 errno = -rc; 309 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 310 } 311 } 312 virtual bool IsSyncThreadSafe() const override 313 { 314 return true; 315 } 316 virtual uint64_t GetFileSize() override 317 { 318 return mSize; 319 } 320 virtual Status InvalidateCache(__attribute__((unused)) size_t offset, 321 __attribute__((unused)) size_t length) override 322 { 323 return Status::OK(); 324 } 325 virtual Status Allocate(uint64_t offset, uint64_t len) override 326 { 327 int rc; 328 329 set_channel(); 330 rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 331 if (!rc) { 332 return Status::OK(); 333 } else { 334 errno = -rc; 335 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 336 } 337 } 338 virtual Status RangeSync(__attribute__((unused)) uint64_t offset, 339 __attribute__((unused)) uint64_t nbytes) override 340 { 341 int rc; 342 343 /* 344 * SPDK BlobFS does not have a range sync operation yet, so just sync 345 * the whole file. 346 */ 347 set_channel(); 348 rc = spdk_file_sync(mFile, g_sync_args.channel); 349 if (!rc) { 350 return Status::OK(); 351 } else { 352 errno = -rc; 353 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 354 } 355 } 356 virtual size_t GetUniqueId(char *id, size_t max_size) const override 357 { 358 int rc; 359 360 rc = spdk_file_get_id(mFile, id, max_size); 361 if (rc < 0) { 362 return 0; 363 } else { 364 return rc; 365 } 366 } 367 }; 368 369 Status 370 SpdkWritableFile::Append(const Slice &data) 371 { 372 int64_t rc; 373 374 set_channel(); 375 rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 376 if (rc >= 0) { 377 mSize += data.size(); 378 return Status::OK(); 379 } else { 380 errno = -rc; 381 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 382 } 383 } 384 385 class SpdkDirectory : public Directory 386 { 387 public: 388 SpdkDirectory() {} 389 ~SpdkDirectory() {} 390 Status Fsync() override 391 { 392 return Status::OK(); 393 } 394 }; 395 396 class SpdkAppStartException : public std::runtime_error 397 { 398 public: 399 SpdkAppStartException(std::string mess): std::runtime_error(mess) {} 400 }; 401 402 class SpdkEnv : public EnvWrapper 403 { 404 private: 405 pthread_t mSpdkTid; 406 std::string mDirectory; 407 std::string mConfig; 408 std::string mBdev; 409 410 public: 411 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 412 const std::string &bdev, uint64_t cache_size_in_mb); 413 414 virtual ~SpdkEnv(); 415 416 virtual Status NewSequentialFile(const std::string &fname, 417 std::unique_ptr<SequentialFile> *result, 418 const EnvOptions &options) override 419 { 420 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 421 struct spdk_file *file; 422 int rc; 423 424 std::string name = sanitize_path(fname, mDirectory); 425 set_channel(); 426 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 427 name.c_str(), 0, &file); 428 if (rc == 0) { 429 result->reset(new SpdkSequentialFile(file)); 430 return Status::OK(); 431 } else { 432 /* Myrocks engine uses errno(ENOENT) as one 433 * special condition, for the purpose to 434 * support MySQL, set the errno to right value. 435 */ 436 errno = -rc; 437 return Status::IOError(name, strerror(errno)); 438 } 439 } else { 440 return EnvWrapper::NewSequentialFile(fname, result, options); 441 } 442 } 443 444 virtual Status NewRandomAccessFile(const std::string &fname, 445 std::unique_ptr<RandomAccessFile> *result, 446 const EnvOptions &options) override 447 { 448 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 449 std::string name = sanitize_path(fname, mDirectory); 450 struct spdk_file *file; 451 int rc; 452 453 set_channel(); 454 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 455 name.c_str(), 0, &file); 456 if (rc == 0) { 457 result->reset(new SpdkRandomAccessFile(file)); 458 return Status::OK(); 459 } else { 460 errno = -rc; 461 return Status::IOError(name, strerror(errno)); 462 } 463 } else { 464 return EnvWrapper::NewRandomAccessFile(fname, result, options); 465 } 466 } 467 468 virtual Status NewWritableFile(const std::string &fname, 469 std::unique_ptr<WritableFile> *result, 470 const EnvOptions &options) override 471 { 472 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 473 std::string name = sanitize_path(fname, mDirectory); 474 struct spdk_file *file; 475 int rc; 476 477 set_channel(); 478 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 479 SPDK_BLOBFS_OPEN_CREATE, &file); 480 if (rc == 0) { 481 result->reset(new SpdkWritableFile(file)); 482 return Status::OK(); 483 } else { 484 errno = -rc; 485 return Status::IOError(name, strerror(errno)); 486 } 487 } else { 488 return EnvWrapper::NewWritableFile(fname, result, options); 489 } 490 } 491 492 virtual Status ReuseWritableFile(const std::string &fname, 493 const std::string &old_fname, 494 std::unique_ptr<WritableFile> *result, 495 const EnvOptions &options) override 496 { 497 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); 498 } 499 500 virtual Status NewDirectory(__attribute__((unused)) const std::string &name, 501 std::unique_ptr<Directory> *result) override 502 { 503 result->reset(new SpdkDirectory()); 504 return Status::OK(); 505 } 506 virtual Status FileExists(const std::string &fname) override 507 { 508 struct spdk_file_stat stat; 509 int rc; 510 std::string name = sanitize_path(fname, mDirectory); 511 512 set_channel(); 513 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 514 if (rc == 0) { 515 return Status::OK(); 516 } 517 return EnvWrapper::FileExists(fname); 518 } 519 virtual Status RenameFile(const std::string &src, const std::string &t) override 520 { 521 int rc; 522 std::string src_name = sanitize_path(src, mDirectory); 523 std::string target_name = sanitize_path(t, mDirectory); 524 525 set_channel(); 526 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 527 src_name.c_str(), target_name.c_str()); 528 if (rc == -ENOENT) { 529 return EnvWrapper::RenameFile(src, t); 530 } 531 return Status::OK(); 532 } 533 virtual Status LinkFile(__attribute__((unused)) const std::string &src, 534 __attribute__((unused)) const std::string &t) override 535 { 536 return Status::NotSupported("SpdkEnv does not support LinkFile"); 537 } 538 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 539 { 540 struct spdk_file_stat stat; 541 int rc; 542 std::string name = sanitize_path(fname, mDirectory); 543 544 set_channel(); 545 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 546 if (rc == -ENOENT) { 547 return EnvWrapper::GetFileSize(fname, size); 548 } 549 *size = stat.size; 550 return Status::OK(); 551 } 552 virtual Status DeleteFile(const std::string &fname) override 553 { 554 int rc; 555 std::string name = sanitize_path(fname, mDirectory); 556 557 set_channel(); 558 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); 559 if (rc == -ENOENT) { 560 return EnvWrapper::DeleteFile(fname); 561 } 562 return Status::OK(); 563 } 564 virtual Status LockFile(const std::string &fname, FileLock **lock) override 565 { 566 std::string name = sanitize_path(fname, mDirectory); 567 int64_t rc; 568 569 set_channel(); 570 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 571 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 572 if (!rc) { 573 return Status::OK(); 574 } else { 575 errno = -rc; 576 return Status::IOError(name, strerror(errno)); 577 } 578 } 579 virtual Status UnlockFile(FileLock *lock) override 580 { 581 set_channel(); 582 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 583 return Status::OK(); 584 } 585 virtual Status GetChildren(const std::string &dir, 586 std::vector<std::string> *result) override 587 { 588 std::string::size_type pos; 589 std::set<std::string> dir_and_file_set; 590 std::string full_path; 591 std::string filename; 592 std::string dir_name; 593 594 if (dir.find("archive") != std::string::npos) { 595 return Status::OK(); 596 } 597 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 598 spdk_fs_iter iter; 599 struct spdk_file *file; 600 dir_name = sanitize_path(dir, mDirectory); 601 602 iter = spdk_fs_iter_first(g_fs); 603 while (iter != NULL) { 604 file = spdk_fs_iter_get_file(iter); 605 full_path = spdk_file_get_name(file); 606 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) { 607 iter = spdk_fs_iter_next(iter); 608 continue; 609 } 610 pos = full_path.find("/", dir_name.length() + 1); 611 612 if (pos != std::string::npos) { 613 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1); 614 } else { 615 filename = full_path.substr(dir_name.length() + 1); 616 } 617 dir_and_file_set.insert(filename); 618 iter = spdk_fs_iter_next(iter); 619 } 620 621 for (auto &s : dir_and_file_set) { 622 result->push_back(s); 623 } 624 625 result->push_back("."); 626 result->push_back(".."); 627 628 return Status::OK(); 629 } 630 return EnvWrapper::GetChildren(dir, result); 631 } 632 }; 633 634 /* The thread local constructor doesn't work for the main thread, since 635 * the filesystem hasn't been loaded yet. So we break out this 636 * SpdkInitializeThread function, so that the main thread can explicitly 637 * call it after the filesystem has been loaded. 638 */ 639 void SpdkInitializeThread(void) 640 { 641 struct spdk_thread *thread; 642 643 if (g_fs != NULL) { 644 if (g_sync_args.channel) { 645 spdk_fs_free_thread_ctx(g_sync_args.channel); 646 } 647 thread = spdk_thread_create("spdk_rocksdb", NULL); 648 spdk_set_thread(thread); 649 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 650 } 651 } 652 653 static void 654 fs_load_cb(__attribute__((unused)) void *ctx, 655 struct spdk_filesystem *fs, int fserrno) 656 { 657 if (fserrno == 0) { 658 g_fs = fs; 659 } 660 g_spdk_ready = true; 661 } 662 663 static void 664 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev, 665 __attribute__((unused)) void *event_ctx) 666 { 667 printf("Unsupported bdev event: type %d\n", type); 668 } 669 670 static void 671 rocksdb_run(__attribute__((unused)) void *arg1) 672 { 673 int rc; 674 675 rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL, 676 &g_bs_dev); 677 if (rc != 0) { 678 printf("Could not create blob bdev\n"); 679 spdk_app_stop(0); 680 exit(1); 681 } 682 683 g_lcore = spdk_env_get_first_core(); 684 685 printf("using bdev %s\n", g_bdev_name.c_str()); 686 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 687 } 688 689 static void 690 fs_unload_cb(__attribute__((unused)) void *ctx, 691 __attribute__((unused)) int fserrno) 692 { 693 assert(fserrno == 0); 694 695 spdk_app_stop(0); 696 } 697 698 static void 699 rocksdb_shutdown(void) 700 { 701 if (g_fs != NULL) { 702 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 703 } else { 704 fs_unload_cb(NULL, 0); 705 } 706 } 707 708 static void * 709 initialize_spdk(void *arg) 710 { 711 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 712 int rc; 713 714 rc = spdk_app_start(opts, rocksdb_run, NULL); 715 /* 716 * TODO: Revisit for case of internal failure of 717 * spdk_app_start(), itself. At this time, it's known 718 * the only application's use of spdk_app_stop() passes 719 * a zero; i.e. no fail (non-zero) cases so here we 720 * assume there was an internal failure and flag it 721 * so we can throw an exception. 722 */ 723 if (rc) { 724 g_spdk_start_failure = true; 725 } else { 726 spdk_app_fini(); 727 delete opts; 728 } 729 pthread_exit(NULL); 730 731 } 732 733 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 734 const std::string &bdev, uint64_t cache_size_in_mb) 735 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev) 736 { 737 struct spdk_app_opts *opts = new struct spdk_app_opts; 738 739 spdk_app_opts_init(opts, sizeof(*opts)); 740 opts->name = "rocksdb"; 741 opts->json_config_file = mConfig.c_str(); 742 opts->shutdown_cb = rocksdb_shutdown; 743 opts->tpoint_group_mask = "0x80"; 744 745 spdk_fs_set_cache_size(cache_size_in_mb); 746 g_bdev_name = mBdev; 747 748 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 749 while (!g_spdk_ready && !g_spdk_start_failure) 750 ; 751 if (g_spdk_start_failure) { 752 delete opts; 753 throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()"); 754 } 755 756 SpdkInitializeThread(); 757 } 758 759 SpdkEnv::~SpdkEnv() 760 { 761 /* This is a workaround for rocksdb test, we close the files if the rocksdb not 762 * do the work before the test quit. 763 */ 764 if (g_fs != NULL) { 765 spdk_fs_iter iter; 766 struct spdk_file *file; 767 768 if (!g_sync_args.channel) { 769 SpdkInitializeThread(); 770 } 771 772 iter = spdk_fs_iter_first(g_fs); 773 while (iter != NULL) { 774 file = spdk_fs_iter_get_file(iter); 775 spdk_file_close(file, g_sync_args.channel); 776 iter = spdk_fs_iter_next(iter); 777 } 778 } 779 780 spdk_app_start_shutdown(); 781 pthread_join(mSpdkTid, NULL); 782 } 783 784 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 785 const std::string &bdev, uint64_t cache_size_in_mb) 786 { 787 try { 788 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb); 789 if (g_fs != NULL) { 790 return spdk_env; 791 } else { 792 delete spdk_env; 793 return NULL; 794 } 795 } catch (SpdkAppStartException &e) { 796 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what()); 797 return NULL; 798 } catch (...) { 799 SPDK_ERRLOG("NewSpdkEnv: default exception caught"); 800 return NULL; 801 } 802 } 803 804 } // namespace rocksdb 805