1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "rocksdb/env.h" 35 #include <set> 36 #include <iostream> 37 #include <stdexcept> 38 39 extern "C" { 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/blob.h" 43 #include "spdk/blobfs.h" 44 #include "spdk/blob_bdev.h" 45 #include "spdk/log.h" 46 #include "spdk/thread.h" 47 #include "spdk/bdev.h" 48 49 #include "spdk_internal/thread.h" 50 } 51 52 namespace rocksdb 53 { 54 55 struct spdk_filesystem *g_fs = NULL; 56 struct spdk_bs_dev *g_bs_dev; 57 uint32_t g_lcore = 0; 58 std::string g_bdev_name; 59 volatile bool g_spdk_ready = false; 60 volatile bool g_spdk_start_failure = false; 61 62 void SpdkInitializeThread(void); 63 64 class SpdkThreadCtx 65 { 66 public: 67 struct spdk_fs_thread_ctx *channel; 68 69 SpdkThreadCtx(void) : channel(NULL) 70 { 71 SpdkInitializeThread(); 72 } 73 74 ~SpdkThreadCtx(void) 75 { 76 if (channel) { 77 spdk_fs_free_thread_ctx(channel); 78 channel = NULL; 79 } 80 } 81 82 private: 83 SpdkThreadCtx(const SpdkThreadCtx &); 84 SpdkThreadCtx &operator=(const SpdkThreadCtx &); 85 }; 86 87 thread_local SpdkThreadCtx g_sync_args; 88 89 static void 90 set_channel() 91 { 92 struct spdk_thread *thread; 93 94 if (g_fs != NULL && g_sync_args.channel == NULL) { 95 thread = spdk_thread_create("spdK_rocksdb", NULL); 96 spdk_set_thread(thread); 97 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 98 } 99 } 100 101 static void 102 __call_fn(void *arg1, void *arg2) 103 { 104 fs_request_fn fn; 105 106 fn = (fs_request_fn)arg1; 107 fn(arg2); 108 } 109 110 static void 111 __send_request(fs_request_fn fn, void *arg) 112 { 113 struct spdk_event *event; 114 115 event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg); 116 spdk_event_call(event); 117 } 118 119 static std::string 120 sanitize_path(const std::string &input, const std::string &mount_directory) 121 { 122 int index = 0; 123 std::string name; 124 std::string input_tmp; 125 126 input_tmp = input.substr(mount_directory.length(), input.length()); 127 for (const char &c : input_tmp) { 128 if (index == 0) { 129 if (c != '/') { 130 name = name.insert(index, 1, '/'); 131 index++; 132 } 133 name = name.insert(index, 1, c); 134 index++; 135 } else { 136 if (name[index - 1] == '/' && c == '/') { 137 continue; 138 } else { 139 name = name.insert(index, 1, c); 140 index++; 141 } 142 } 143 } 144 145 if (name[name.size() - 1] == '/') { 146 name = name.erase(name.size() - 1, 1); 147 } 148 return name; 149 } 150 151 class SpdkSequentialFile : public SequentialFile 152 { 153 struct spdk_file *mFile; 154 uint64_t mOffset; 155 public: 156 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 157 virtual ~SpdkSequentialFile(); 158 159 virtual Status Read(size_t n, Slice *result, char *scratch) override; 160 virtual Status Skip(uint64_t n) override; 161 virtual Status InvalidateCache(size_t offset, size_t length) override; 162 }; 163 164 SpdkSequentialFile::~SpdkSequentialFile(void) 165 { 166 set_channel(); 167 spdk_file_close(mFile, g_sync_args.channel); 168 } 169 170 Status 171 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 172 { 173 int64_t ret; 174 175 set_channel(); 176 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 177 if (ret >= 0) { 178 mOffset += ret; 179 *result = Slice(scratch, ret); 180 return Status::OK(); 181 } else { 182 errno = -ret; 183 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 184 } 185 } 186 187 Status 188 SpdkSequentialFile::Skip(uint64_t n) 189 { 190 mOffset += n; 191 return Status::OK(); 192 } 193 194 Status 195 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset, 196 __attribute__((unused)) size_t length) 197 { 198 return Status::OK(); 199 } 200 201 class SpdkRandomAccessFile : public RandomAccessFile 202 { 203 struct spdk_file *mFile; 204 public: 205 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {} 206 virtual ~SpdkRandomAccessFile(); 207 208 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 209 virtual Status InvalidateCache(size_t offset, size_t length) override; 210 }; 211 212 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 213 { 214 set_channel(); 215 spdk_file_close(mFile, g_sync_args.channel); 216 } 217 218 Status 219 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 220 { 221 int64_t rc; 222 223 set_channel(); 224 rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 225 if (rc >= 0) { 226 *result = Slice(scratch, n); 227 return Status::OK(); 228 } else { 229 errno = -rc; 230 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 231 } 232 } 233 234 Status 235 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset, 236 __attribute__((unused)) size_t length) 237 { 238 return Status::OK(); 239 } 240 241 class SpdkWritableFile : public WritableFile 242 { 243 struct spdk_file *mFile; 244 uint64_t mSize; 245 246 public: 247 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {} 248 ~SpdkWritableFile() 249 { 250 if (mFile != NULL) { 251 Close(); 252 } 253 } 254 255 virtual void SetIOPriority(Env::IOPriority pri) 256 { 257 if (pri == Env::IO_HIGH) { 258 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 259 } 260 } 261 262 virtual Status Truncate(uint64_t size) override 263 { 264 int rc; 265 266 set_channel(); 267 rc = spdk_file_truncate(mFile, g_sync_args.channel, size); 268 if (!rc) { 269 mSize = size; 270 return Status::OK(); 271 } else { 272 errno = -rc; 273 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 274 } 275 } 276 virtual Status Close() override 277 { 278 set_channel(); 279 spdk_file_close(mFile, g_sync_args.channel); 280 mFile = NULL; 281 return Status::OK(); 282 } 283 virtual Status Append(const Slice &data) override; 284 virtual Status Flush() override 285 { 286 return Status::OK(); 287 } 288 virtual Status Sync() override 289 { 290 int rc; 291 292 set_channel(); 293 rc = spdk_file_sync(mFile, g_sync_args.channel); 294 if (!rc) { 295 return Status::OK(); 296 } else { 297 errno = -rc; 298 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 299 } 300 } 301 virtual Status Fsync() override 302 { 303 int rc; 304 305 set_channel(); 306 rc = spdk_file_sync(mFile, g_sync_args.channel); 307 if (!rc) { 308 return Status::OK(); 309 } else { 310 errno = -rc; 311 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 312 } 313 } 314 virtual bool IsSyncThreadSafe() const override 315 { 316 return true; 317 } 318 virtual uint64_t GetFileSize() override 319 { 320 return mSize; 321 } 322 virtual Status InvalidateCache(__attribute__((unused)) size_t offset, 323 __attribute__((unused)) size_t length) override 324 { 325 return Status::OK(); 326 } 327 virtual Status Allocate(uint64_t offset, uint64_t len) override 328 { 329 int rc; 330 331 set_channel(); 332 rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 333 if (!rc) { 334 return Status::OK(); 335 } else { 336 errno = -rc; 337 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 338 } 339 } 340 virtual Status RangeSync(__attribute__((unused)) uint64_t offset, 341 __attribute__((unused)) uint64_t nbytes) override 342 { 343 int rc; 344 345 /* 346 * SPDK BlobFS does not have a range sync operation yet, so just sync 347 * the whole file. 348 */ 349 set_channel(); 350 rc = spdk_file_sync(mFile, g_sync_args.channel); 351 if (!rc) { 352 return Status::OK(); 353 } else { 354 errno = -rc; 355 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 356 } 357 } 358 virtual size_t GetUniqueId(char *id, size_t max_size) const override 359 { 360 int rc; 361 362 rc = spdk_file_get_id(mFile, id, max_size); 363 if (rc < 0) { 364 return 0; 365 } else { 366 return rc; 367 } 368 } 369 }; 370 371 Status 372 SpdkWritableFile::Append(const Slice &data) 373 { 374 int64_t rc; 375 376 set_channel(); 377 rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 378 if (rc >= 0) { 379 mSize += data.size(); 380 return Status::OK(); 381 } else { 382 errno = -rc; 383 return Status::IOError(spdk_file_get_name(mFile), strerror(errno)); 384 } 385 } 386 387 class SpdkDirectory : public Directory 388 { 389 public: 390 SpdkDirectory() {} 391 ~SpdkDirectory() {} 392 Status Fsync() override 393 { 394 return Status::OK(); 395 } 396 }; 397 398 class SpdkAppStartException : public std::runtime_error 399 { 400 public: 401 SpdkAppStartException(std::string mess): std::runtime_error(mess) {} 402 }; 403 404 class SpdkEnv : public EnvWrapper 405 { 406 private: 407 pthread_t mSpdkTid; 408 std::string mDirectory; 409 std::string mConfig; 410 std::string mBdev; 411 412 public: 413 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 414 const std::string &bdev, uint64_t cache_size_in_mb); 415 416 virtual ~SpdkEnv(); 417 418 virtual Status NewSequentialFile(const std::string &fname, 419 unique_ptr<SequentialFile> *result, 420 const EnvOptions &options) override 421 { 422 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 423 struct spdk_file *file; 424 int rc; 425 426 std::string name = sanitize_path(fname, mDirectory); 427 set_channel(); 428 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 429 name.c_str(), 0, &file); 430 if (rc == 0) { 431 result->reset(new SpdkSequentialFile(file)); 432 return Status::OK(); 433 } else { 434 /* Myrocks engine uses errno(ENOENT) as one 435 * special condition, for the purpose to 436 * support MySQL, set the errno to right value. 437 */ 438 errno = -rc; 439 return Status::IOError(name, strerror(errno)); 440 } 441 } else { 442 return EnvWrapper::NewSequentialFile(fname, result, options); 443 } 444 } 445 446 virtual Status NewRandomAccessFile(const std::string &fname, 447 unique_ptr<RandomAccessFile> *result, 448 const EnvOptions &options) override 449 { 450 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 451 std::string name = sanitize_path(fname, mDirectory); 452 struct spdk_file *file; 453 int rc; 454 455 set_channel(); 456 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 457 name.c_str(), 0, &file); 458 if (rc == 0) { 459 result->reset(new SpdkRandomAccessFile(file)); 460 return Status::OK(); 461 } else { 462 errno = -rc; 463 return Status::IOError(name, strerror(errno)); 464 } 465 } else { 466 return EnvWrapper::NewRandomAccessFile(fname, result, options); 467 } 468 } 469 470 virtual Status NewWritableFile(const std::string &fname, 471 unique_ptr<WritableFile> *result, 472 const EnvOptions &options) override 473 { 474 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 475 std::string name = sanitize_path(fname, mDirectory); 476 struct spdk_file *file; 477 int rc; 478 479 set_channel(); 480 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 481 SPDK_BLOBFS_OPEN_CREATE, &file); 482 if (rc == 0) { 483 result->reset(new SpdkWritableFile(file)); 484 return Status::OK(); 485 } else { 486 errno = -rc; 487 return Status::IOError(name, strerror(errno)); 488 } 489 } else { 490 return EnvWrapper::NewWritableFile(fname, result, options); 491 } 492 } 493 494 virtual Status ReuseWritableFile(const std::string &fname, 495 const std::string &old_fname, 496 unique_ptr<WritableFile> *result, 497 const EnvOptions &options) override 498 { 499 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); 500 } 501 502 virtual Status NewDirectory(__attribute__((unused)) const std::string &name, 503 unique_ptr<Directory> *result) override 504 { 505 result->reset(new SpdkDirectory()); 506 return Status::OK(); 507 } 508 virtual Status FileExists(const std::string &fname) override 509 { 510 struct spdk_file_stat stat; 511 int rc; 512 std::string name = sanitize_path(fname, mDirectory); 513 514 set_channel(); 515 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 516 if (rc == 0) { 517 return Status::OK(); 518 } 519 return EnvWrapper::FileExists(fname); 520 } 521 virtual Status RenameFile(const std::string &src, const std::string &t) override 522 { 523 int rc; 524 std::string src_name = sanitize_path(src, mDirectory); 525 std::string target_name = sanitize_path(t, mDirectory); 526 527 set_channel(); 528 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 529 src_name.c_str(), target_name.c_str()); 530 if (rc == -ENOENT) { 531 return EnvWrapper::RenameFile(src, t); 532 } 533 return Status::OK(); 534 } 535 virtual Status LinkFile(__attribute__((unused)) const std::string &src, 536 __attribute__((unused)) const std::string &t) override 537 { 538 return Status::NotSupported("SpdkEnv does not support LinkFile"); 539 } 540 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 541 { 542 struct spdk_file_stat stat; 543 int rc; 544 std::string name = sanitize_path(fname, mDirectory); 545 546 set_channel(); 547 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 548 if (rc == -ENOENT) { 549 return EnvWrapper::GetFileSize(fname, size); 550 } 551 *size = stat.size; 552 return Status::OK(); 553 } 554 virtual Status DeleteFile(const std::string &fname) override 555 { 556 int rc; 557 std::string name = sanitize_path(fname, mDirectory); 558 559 set_channel(); 560 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); 561 if (rc == -ENOENT) { 562 return EnvWrapper::DeleteFile(fname); 563 } 564 return Status::OK(); 565 } 566 virtual Status LockFile(const std::string &fname, FileLock **lock) override 567 { 568 std::string name = sanitize_path(fname, mDirectory); 569 int64_t rc; 570 571 set_channel(); 572 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 573 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 574 if (!rc) { 575 return Status::OK(); 576 } else { 577 errno = -rc; 578 return Status::IOError(name, strerror(errno)); 579 } 580 } 581 virtual Status UnlockFile(FileLock *lock) override 582 { 583 set_channel(); 584 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 585 return Status::OK(); 586 } 587 virtual Status GetChildren(const std::string &dir, 588 std::vector<std::string> *result) override 589 { 590 std::string::size_type pos; 591 std::set<std::string> dir_and_file_set; 592 std::string full_path; 593 std::string filename; 594 std::string dir_name; 595 596 if (dir.find("archive") != std::string::npos) { 597 return Status::OK(); 598 } 599 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 600 spdk_fs_iter iter; 601 struct spdk_file *file; 602 dir_name = sanitize_path(dir, mDirectory); 603 604 iter = spdk_fs_iter_first(g_fs); 605 while (iter != NULL) { 606 file = spdk_fs_iter_get_file(iter); 607 full_path = spdk_file_get_name(file); 608 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) { 609 iter = spdk_fs_iter_next(iter); 610 continue; 611 } 612 pos = full_path.find("/", dir_name.length() + 1); 613 614 if (pos != std::string::npos) { 615 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1); 616 } else { 617 filename = full_path.substr(dir_name.length() + 1); 618 } 619 dir_and_file_set.insert(filename); 620 iter = spdk_fs_iter_next(iter); 621 } 622 623 for (auto &s : dir_and_file_set) { 624 result->push_back(s); 625 } 626 627 result->push_back("."); 628 result->push_back(".."); 629 630 return Status::OK(); 631 } 632 return EnvWrapper::GetChildren(dir, result); 633 } 634 }; 635 636 /* The thread local constructor doesn't work for the main thread, since 637 * the filesystem hasn't been loaded yet. So we break out this 638 * SpdkInitializeThread function, so that the main thread can explicitly 639 * call it after the filesystem has been loaded. 640 */ 641 void SpdkInitializeThread(void) 642 { 643 struct spdk_thread *thread; 644 645 if (g_fs != NULL) { 646 if (g_sync_args.channel) { 647 spdk_fs_free_thread_ctx(g_sync_args.channel); 648 } 649 thread = spdk_thread_create("spdk_rocksdb", NULL); 650 spdk_set_thread(thread); 651 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs); 652 } 653 } 654 655 static void 656 fs_load_cb(__attribute__((unused)) void *ctx, 657 struct spdk_filesystem *fs, int fserrno) 658 { 659 if (fserrno == 0) { 660 g_fs = fs; 661 } 662 g_spdk_ready = true; 663 } 664 665 static void 666 rocksdb_run(__attribute__((unused)) void *arg1) 667 { 668 struct spdk_bdev *bdev; 669 670 bdev = spdk_bdev_get_by_name(g_bdev_name.c_str()); 671 672 if (bdev == NULL) { 673 SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str()); 674 exit(1); 675 } 676 677 g_lcore = spdk_env_get_first_core(); 678 679 g_bs_dev = spdk_bdev_create_bs_dev(bdev, NULL, NULL); 680 printf("using bdev %s\n", g_bdev_name.c_str()); 681 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 682 } 683 684 static void 685 fs_unload_cb(__attribute__((unused)) void *ctx, 686 __attribute__((unused)) int fserrno) 687 { 688 assert(fserrno == 0); 689 690 spdk_app_stop(0); 691 } 692 693 static void 694 rocksdb_shutdown(void) 695 { 696 if (g_fs != NULL) { 697 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 698 } else { 699 fs_unload_cb(NULL, 0); 700 } 701 } 702 703 static void * 704 initialize_spdk(void *arg) 705 { 706 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 707 int rc; 708 709 rc = spdk_app_start(opts, rocksdb_run, NULL); 710 /* 711 * TODO: Revisit for case of internal failure of 712 * spdk_app_start(), itself. At this time, it's known 713 * the only application's use of spdk_app_stop() passes 714 * a zero; i.e. no fail (non-zero) cases so here we 715 * assume there was an internal failure and flag it 716 * so we can throw an exception. 717 */ 718 if (rc) { 719 g_spdk_start_failure = true; 720 } else { 721 spdk_app_fini(); 722 delete opts; 723 } 724 pthread_exit(NULL); 725 726 } 727 728 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 729 const std::string &bdev, uint64_t cache_size_in_mb) 730 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev) 731 { 732 struct spdk_app_opts *opts = new struct spdk_app_opts; 733 734 spdk_app_opts_init(opts); 735 opts->name = "rocksdb"; 736 opts->config_file = mConfig.c_str(); 737 opts->shutdown_cb = rocksdb_shutdown; 738 739 spdk_fs_set_cache_size(cache_size_in_mb); 740 g_bdev_name = mBdev; 741 742 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 743 while (!g_spdk_ready && !g_spdk_start_failure) 744 ; 745 if (g_spdk_start_failure) { 746 delete opts; 747 throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()"); 748 } 749 750 SpdkInitializeThread(); 751 } 752 753 SpdkEnv::~SpdkEnv() 754 { 755 /* This is a workaround for rocksdb test, we close the files if the rocksdb not 756 * do the work before the test quit. 757 */ 758 if (g_fs != NULL) { 759 spdk_fs_iter iter; 760 struct spdk_file *file; 761 762 if (!g_sync_args.channel) { 763 SpdkInitializeThread(); 764 } 765 766 iter = spdk_fs_iter_first(g_fs); 767 while (iter != NULL) { 768 file = spdk_fs_iter_get_file(iter); 769 spdk_file_close(file, g_sync_args.channel); 770 iter = spdk_fs_iter_next(iter); 771 } 772 } 773 774 spdk_app_start_shutdown(); 775 pthread_join(mSpdkTid, NULL); 776 } 777 778 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 779 const std::string &bdev, uint64_t cache_size_in_mb) 780 { 781 try { 782 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb); 783 if (g_fs != NULL) { 784 return spdk_env; 785 } else { 786 delete spdk_env; 787 return NULL; 788 } 789 } catch (SpdkAppStartException &e) { 790 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what()); 791 return NULL; 792 } catch (...) { 793 SPDK_ERRLOG("NewSpdkEnv: default exception caught"); 794 return NULL; 795 } 796 } 797 798 } // namespace rocksdb 799