1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "rocksdb/env.h" 35 #include <set> 36 #include <iostream> 37 #include <stdexcept> 38 39 extern "C" { 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/blob.h" 43 #include "spdk/blobfs.h" 44 #include "spdk/blob_bdev.h" 45 #include "spdk/log.h" 46 #include "spdk/thread.h" 47 #include "spdk/bdev.h" 48 } 49 50 namespace rocksdb 51 { 52 53 struct spdk_filesystem *g_fs = NULL; 54 struct spdk_bs_dev *g_bs_dev; 55 uint32_t g_lcore = 0; 56 std::string g_bdev_name; 57 volatile bool g_spdk_ready = false; 58 volatile bool g_spdk_start_failure = false; 59 struct sync_args { 60 struct spdk_io_channel *channel; 61 }; 62 63 __thread struct sync_args g_sync_args; 64 65 static void 66 __call_fn(void *arg1, void *arg2) 67 { 68 fs_request_fn fn; 69 70 fn = (fs_request_fn)arg1; 71 fn(arg2); 72 } 73 74 static void 75 __send_request(fs_request_fn fn, void *arg) 76 { 77 struct spdk_event *event; 78 79 event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg); 80 spdk_event_call(event); 81 } 82 83 static std::string 84 sanitize_path(const std::string &input, const std::string &mount_directory) 85 { 86 int index = 0; 87 std::string name; 88 std::string input_tmp; 89 90 input_tmp = input.substr(mount_directory.length(), input.length()); 91 for (const char &c : input_tmp) { 92 if (index == 0) { 93 if (c != '/') { 94 name = name.insert(index, 1, '/'); 95 index++; 96 } 97 name = name.insert(index, 1, c); 98 index++; 99 } else { 100 if (name[index - 1] == '/' && c == '/') { 101 continue; 102 } else { 103 name = name.insert(index, 1, c); 104 index++; 105 } 106 } 107 } 108 109 if (name[name.size() - 1] == '/') { 110 name = name.erase(name.size() - 1, 1); 111 } 112 return name; 113 } 114 115 class SpdkSequentialFile : public SequentialFile 116 { 117 struct spdk_file *mFile; 118 uint64_t mOffset; 119 public: 120 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 121 virtual ~SpdkSequentialFile(); 122 123 virtual Status Read(size_t n, Slice *result, char *scratch) override; 124 virtual Status Skip(uint64_t n) override; 125 virtual Status InvalidateCache(size_t offset, size_t length) override; 126 }; 127 128 SpdkSequentialFile::~SpdkSequentialFile(void) 129 { 130 spdk_file_close(mFile, g_sync_args.channel); 131 } 132 133 Status 134 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 135 { 136 uint64_t ret; 137 138 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 139 mOffset += ret; 140 *result = Slice(scratch, ret); 141 return Status::OK(); 142 } 143 144 Status 145 SpdkSequentialFile::Skip(uint64_t n) 146 { 147 mOffset += n; 148 return Status::OK(); 149 } 150 151 Status 152 SpdkSequentialFile::InvalidateCache(size_t offset, size_t length) 153 { 154 return Status::OK(); 155 } 156 157 class SpdkRandomAccessFile : public RandomAccessFile 158 { 159 struct spdk_file *mFile; 160 public: 161 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {} 162 virtual ~SpdkRandomAccessFile(); 163 164 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 165 virtual Status InvalidateCache(size_t offset, size_t length) override; 166 }; 167 168 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 169 { 170 spdk_file_close(mFile, g_sync_args.channel); 171 } 172 173 Status 174 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 175 { 176 spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 177 *result = Slice(scratch, n); 178 return Status::OK(); 179 } 180 181 Status 182 SpdkRandomAccessFile::InvalidateCache(size_t offset, size_t length) 183 { 184 return Status::OK(); 185 } 186 187 class SpdkWritableFile : public WritableFile 188 { 189 struct spdk_file *mFile; 190 uint64_t mSize; 191 192 public: 193 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {} 194 ~SpdkWritableFile() 195 { 196 if (mFile != NULL) { 197 Close(); 198 } 199 } 200 201 virtual void SetIOPriority(Env::IOPriority pri) 202 { 203 if (pri == Env::IO_HIGH) { 204 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 205 } 206 } 207 208 virtual Status Truncate(uint64_t size) override 209 { 210 spdk_file_truncate(mFile, g_sync_args.channel, size); 211 mSize = size; 212 return Status::OK(); 213 } 214 virtual Status Close() override 215 { 216 spdk_file_close(mFile, g_sync_args.channel); 217 mFile = NULL; 218 return Status::OK(); 219 } 220 virtual Status Append(const Slice &data) override; 221 virtual Status Flush() override 222 { 223 return Status::OK(); 224 } 225 virtual Status Sync() override 226 { 227 spdk_file_sync(mFile, g_sync_args.channel); 228 return Status::OK(); 229 } 230 virtual Status Fsync() override 231 { 232 spdk_file_sync(mFile, g_sync_args.channel); 233 return Status::OK(); 234 } 235 virtual bool IsSyncThreadSafe() const override 236 { 237 return true; 238 } 239 virtual uint64_t GetFileSize() override 240 { 241 return mSize; 242 } 243 virtual Status InvalidateCache(size_t offset, size_t length) override 244 { 245 return Status::OK(); 246 } 247 virtual Status Allocate(uint64_t offset, uint64_t len) override 248 { 249 spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 250 return Status::OK(); 251 } 252 virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override 253 { 254 /* 255 * SPDK BlobFS does not have a range sync operation yet, so just sync 256 * the whole file. 257 */ 258 spdk_file_sync(mFile, g_sync_args.channel); 259 return Status::OK(); 260 } 261 virtual size_t GetUniqueId(char *id, size_t max_size) const override 262 { 263 return 0; 264 } 265 }; 266 267 Status 268 SpdkWritableFile::Append(const Slice &data) 269 { 270 spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 271 mSize += data.size(); 272 273 return Status::OK(); 274 } 275 276 class SpdkDirectory : public Directory 277 { 278 public: 279 SpdkDirectory() {} 280 ~SpdkDirectory() {} 281 Status Fsync() override 282 { 283 return Status::OK(); 284 } 285 }; 286 287 class SpdkAppStartException : public std::runtime_error 288 { 289 public: 290 SpdkAppStartException(std::string mess): std::runtime_error(mess) {} 291 }; 292 293 class SpdkEnv : public EnvWrapper 294 { 295 private: 296 pthread_t mSpdkTid; 297 std::string mDirectory; 298 std::string mConfig; 299 std::string mBdev; 300 301 public: 302 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 303 const std::string &bdev, uint64_t cache_size_in_mb); 304 305 virtual ~SpdkEnv(); 306 307 virtual Status NewSequentialFile(const std::string &fname, 308 unique_ptr<SequentialFile> *result, 309 const EnvOptions &options) override 310 { 311 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 312 struct spdk_file *file; 313 int rc; 314 315 std::string name = sanitize_path(fname, mDirectory); 316 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 317 name.c_str(), 0, &file); 318 if (rc == 0) { 319 result->reset(new SpdkSequentialFile(file)); 320 return Status::OK(); 321 } else { 322 /* Myrocks engine uses errno(ENOENT) as one 323 * special condition, for the purpose to 324 * support MySQL, set the errno to right value. 325 */ 326 errno = -rc; 327 return Status::IOError(name, strerror(errno)); 328 } 329 } else { 330 return EnvWrapper::NewSequentialFile(fname, result, options); 331 } 332 } 333 334 virtual Status NewRandomAccessFile(const std::string &fname, 335 unique_ptr<RandomAccessFile> *result, 336 const EnvOptions &options) override 337 { 338 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 339 std::string name = sanitize_path(fname, mDirectory); 340 struct spdk_file *file; 341 int rc; 342 343 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 344 name.c_str(), 0, &file); 345 if (rc == 0) { 346 result->reset(new SpdkRandomAccessFile(file)); 347 return Status::OK(); 348 } else { 349 errno = -rc; 350 return Status::IOError(name, strerror(errno)); 351 } 352 } else { 353 return EnvWrapper::NewRandomAccessFile(fname, result, options); 354 } 355 } 356 357 virtual Status NewWritableFile(const std::string &fname, 358 unique_ptr<WritableFile> *result, 359 const EnvOptions &options) override 360 { 361 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 362 std::string name = sanitize_path(fname, mDirectory); 363 struct spdk_file *file; 364 int rc; 365 366 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 367 SPDK_BLOBFS_OPEN_CREATE, &file); 368 if (rc == 0) { 369 result->reset(new SpdkWritableFile(file)); 370 return Status::OK(); 371 } else { 372 errno = -rc; 373 return Status::IOError(name, strerror(errno)); 374 } 375 } else { 376 return EnvWrapper::NewWritableFile(fname, result, options); 377 } 378 } 379 380 virtual Status ReuseWritableFile(const std::string &fname, 381 const std::string &old_fname, 382 unique_ptr<WritableFile> *result, 383 const EnvOptions &options) override 384 { 385 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); 386 } 387 388 virtual Status NewDirectory(const std::string &name, 389 unique_ptr<Directory> *result) override 390 { 391 result->reset(new SpdkDirectory()); 392 return Status::OK(); 393 } 394 virtual Status FileExists(const std::string &fname) override 395 { 396 struct spdk_file_stat stat; 397 int rc; 398 std::string name = sanitize_path(fname, mDirectory); 399 400 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 401 if (rc == 0) { 402 return Status::OK(); 403 } 404 return EnvWrapper::FileExists(fname); 405 } 406 virtual Status RenameFile(const std::string &src, const std::string &t) override 407 { 408 int rc; 409 std::string src_name = sanitize_path(src, mDirectory); 410 std::string target_name = sanitize_path(t, mDirectory); 411 412 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 413 src_name.c_str(), target_name.c_str()); 414 if (rc == -ENOENT) { 415 return EnvWrapper::RenameFile(src, t); 416 } 417 return Status::OK(); 418 } 419 virtual Status LinkFile(const std::string &src, const std::string &t) override 420 { 421 return Status::NotSupported("SpdkEnv does not support LinkFile"); 422 } 423 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 424 { 425 struct spdk_file_stat stat; 426 int rc; 427 std::string name = sanitize_path(fname, mDirectory); 428 429 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat); 430 if (rc == -ENOENT) { 431 return EnvWrapper::GetFileSize(fname, size); 432 } 433 *size = stat.size; 434 return Status::OK(); 435 } 436 virtual Status DeleteFile(const std::string &fname) override 437 { 438 int rc; 439 std::string name = sanitize_path(fname, mDirectory); 440 441 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str()); 442 if (rc == -ENOENT) { 443 return EnvWrapper::DeleteFile(fname); 444 } 445 return Status::OK(); 446 } 447 virtual void StartThread(void (*function)(void *arg), void *arg) override; 448 virtual Status LockFile(const std::string &fname, FileLock **lock) override 449 { 450 std::string name = sanitize_path(fname, mDirectory); 451 452 spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(), 453 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 454 return Status::OK(); 455 } 456 virtual Status UnlockFile(FileLock *lock) override 457 { 458 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 459 return Status::OK(); 460 } 461 virtual Status GetChildren(const std::string &dir, 462 std::vector<std::string> *result) override 463 { 464 std::string::size_type pos; 465 std::set<std::string> dir_and_file_set; 466 std::string full_path; 467 std::string filename; 468 std::string dir_name; 469 470 if (dir.find("archive") != std::string::npos) { 471 return Status::OK(); 472 } 473 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 474 spdk_fs_iter iter; 475 struct spdk_file *file; 476 dir_name = sanitize_path(dir, mDirectory); 477 478 iter = spdk_fs_iter_first(g_fs); 479 while (iter != NULL) { 480 file = spdk_fs_iter_get_file(iter); 481 full_path = spdk_file_get_name(file); 482 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) { 483 iter = spdk_fs_iter_next(iter); 484 continue; 485 } 486 pos = full_path.find("/", dir_name.length() + 1); 487 488 if (pos != std::string::npos) { 489 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1); 490 } else { 491 filename = full_path.substr(dir_name.length() + 1); 492 } 493 dir_and_file_set.insert(filename); 494 iter = spdk_fs_iter_next(iter); 495 } 496 497 for (auto &s : dir_and_file_set) { 498 result->push_back(s); 499 } 500 501 result->push_back("."); 502 result->push_back(".."); 503 504 return Status::OK(); 505 } 506 return EnvWrapper::GetChildren(dir, result); 507 } 508 }; 509 510 static void 511 _spdk_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) 512 { 513 /* Not supported */ 514 assert(false); 515 } 516 517 void SpdkInitializeThread(void) 518 { 519 if (g_fs != NULL) { 520 /* TODO: Add an event lib call to dynamically register a thread */ 521 spdk_allocate_thread(_spdk_send_msg, NULL, NULL, NULL, "spdk_rocksdb"); 522 g_sync_args.channel = spdk_fs_alloc_io_channel_sync(g_fs); 523 } 524 } 525 526 struct SpdkThreadState { 527 void (*user_function)(void *); 528 void *arg; 529 }; 530 531 static void SpdkStartThreadWrapper(void *arg) 532 { 533 SpdkThreadState *state = reinterpret_cast<SpdkThreadState *>(arg); 534 535 SpdkInitializeThread(); 536 state->user_function(state->arg); 537 delete state; 538 } 539 540 void SpdkEnv::StartThread(void (*function)(void *arg), void *arg) 541 { 542 SpdkThreadState *state = new SpdkThreadState; 543 state->user_function = function; 544 state->arg = arg; 545 EnvWrapper::StartThread(SpdkStartThreadWrapper, state); 546 } 547 548 static void 549 fs_load_cb(void *ctx, struct spdk_filesystem *fs, int fserrno) 550 { 551 if (fserrno == 0) { 552 g_fs = fs; 553 } 554 g_spdk_ready = true; 555 } 556 557 static void 558 spdk_rocksdb_run(void *arg1, void *arg2) 559 { 560 struct spdk_bdev *bdev; 561 562 bdev = spdk_bdev_get_by_name(g_bdev_name.c_str()); 563 564 if (bdev == NULL) { 565 SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str()); 566 exit(1); 567 } 568 569 g_lcore = spdk_env_get_first_core(); 570 571 g_bs_dev = spdk_bdev_create_bs_dev(bdev, NULL, NULL); 572 printf("using bdev %s\n", g_bdev_name.c_str()); 573 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 574 } 575 576 static void 577 fs_unload_cb(void *ctx, int fserrno) 578 { 579 assert(fserrno == 0); 580 581 spdk_app_stop(0); 582 } 583 584 static void 585 spdk_rocksdb_shutdown(void) 586 { 587 if (g_fs != NULL) { 588 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 589 } else { 590 fs_unload_cb(NULL, 0); 591 } 592 } 593 594 static void * 595 initialize_spdk(void *arg) 596 { 597 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 598 int rc; 599 600 rc = spdk_app_start(opts, spdk_rocksdb_run, NULL, NULL); 601 /* 602 * TODO: Revisit for case of internal failure of 603 * spdk_app_start(), itself. At this time, it's known 604 * the only application's use of spdk_app_stop() passes 605 * a zero; i.e. no fail (non-zero) cases so here we 606 * assume there was an internal failure and flag it 607 * so we can throw an exception. 608 */ 609 if (rc) { 610 g_spdk_start_failure = true; 611 } else { 612 spdk_app_fini(); 613 delete opts; 614 } 615 pthread_exit(NULL); 616 617 } 618 619 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 620 const std::string &bdev, uint64_t cache_size_in_mb) 621 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev) 622 { 623 struct spdk_app_opts *opts = new struct spdk_app_opts; 624 625 spdk_app_opts_init(opts); 626 opts->name = "rocksdb"; 627 opts->config_file = mConfig.c_str(); 628 opts->mem_size = 1024 + cache_size_in_mb; 629 opts->shutdown_cb = spdk_rocksdb_shutdown; 630 631 spdk_fs_set_cache_size(cache_size_in_mb); 632 g_bdev_name = mBdev; 633 634 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 635 while (!g_spdk_ready && !g_spdk_start_failure) 636 ; 637 if (g_spdk_start_failure) { 638 delete opts; 639 throw SpdkAppStartException("spdk_app_start() unable to start spdk_rocksdb_run()"); 640 } 641 642 SpdkInitializeThread(); 643 } 644 645 SpdkEnv::~SpdkEnv() 646 { 647 /* This is a workaround for rocksdb test, we close the files if the rocksdb not 648 * do the work before the test quit. 649 */ 650 if (g_fs != NULL) { 651 spdk_fs_iter iter; 652 struct spdk_file *file; 653 654 if (!g_sync_args.channel) { 655 SpdkInitializeThread(); 656 } 657 iter = spdk_fs_iter_first(g_fs); 658 while (iter != NULL) { 659 file = spdk_fs_iter_get_file(iter); 660 spdk_file_close(file, g_sync_args.channel); 661 iter = spdk_fs_iter_next(iter); 662 } 663 } 664 665 spdk_app_start_shutdown(); 666 pthread_join(mSpdkTid, NULL); 667 } 668 669 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf, 670 const std::string &bdev, uint64_t cache_size_in_mb) 671 { 672 try { 673 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb); 674 if (g_fs != NULL) { 675 return spdk_env; 676 } else { 677 delete spdk_env; 678 return NULL; 679 } 680 } catch (SpdkAppStartException &e) { 681 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what()); 682 return NULL; 683 } catch (...) { 684 SPDK_ERRLOG("NewSpdkEnv: default exception caught"); 685 return NULL; 686 } 687 } 688 689 } // namespace rocksdb 690