1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "env_posix.cc" 35 36 extern "C" { 37 #include "spdk/env.h" 38 #include "spdk/event.h" 39 #include "spdk/blob.h" 40 #include "spdk/blobfs.h" 41 #include "spdk/blob_bdev.h" 42 #include "spdk/log.h" 43 #include "spdk/io_channel.h" 44 #include "spdk/bdev.h" 45 } 46 47 namespace rocksdb 48 { 49 50 struct spdk_filesystem *g_fs = NULL; 51 struct spdk_bs_dev *g_bs_dev; 52 std::string g_bdev_name; 53 volatile bool g_spdk_ready = false; 54 struct sync_args { 55 struct spdk_io_channel *channel; 56 }; 57 58 __thread struct sync_args g_sync_args; 59 60 static void 61 __call_fn(void *arg1, void *arg2) 62 { 63 fs_request_fn fn; 64 65 fn = (fs_request_fn)arg1; 66 fn(arg2); 67 } 68 69 static void 70 __send_request(fs_request_fn fn, void *arg) 71 { 72 struct spdk_event *event; 73 74 event = spdk_event_allocate(0, __call_fn, (void *)fn, arg); 75 spdk_event_call(event); 76 } 77 78 class SpdkSequentialFile : public SequentialFile 79 { 80 struct spdk_file *mFile; 81 uint64_t mOffset; 82 public: 83 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {} 84 virtual ~SpdkSequentialFile(); 85 86 virtual Status Read(size_t n, Slice *result, char *scratch) override; 87 virtual Status Skip(uint64_t n) override; 88 virtual Status InvalidateCache(size_t offset, size_t length) override; 89 }; 90 91 static std::string 92 basename(std::string full) 93 { 94 return full.substr(full.rfind("/") + 1); 95 } 96 97 SpdkSequentialFile::~SpdkSequentialFile(void) 98 { 99 spdk_file_close(mFile, g_sync_args.channel); 100 } 101 102 Status 103 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch) 104 { 105 uint64_t ret; 106 107 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n); 108 mOffset += ret; 109 *result = Slice(scratch, ret); 110 return Status::OK(); 111 } 112 113 Status 114 SpdkSequentialFile::Skip(uint64_t n) 115 { 116 mOffset += n; 117 return Status::OK(); 118 } 119 120 Status 121 SpdkSequentialFile::InvalidateCache(size_t offset, size_t length) 122 { 123 return Status::OK(); 124 } 125 126 class SpdkRandomAccessFile : public RandomAccessFile 127 { 128 struct spdk_file *mFile; 129 public: 130 SpdkRandomAccessFile(const std::string &fname, const EnvOptions &options); 131 virtual ~SpdkRandomAccessFile(); 132 133 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override; 134 virtual Status InvalidateCache(size_t offset, size_t length) override; 135 }; 136 137 SpdkRandomAccessFile::SpdkRandomAccessFile(const std::string &fname, const EnvOptions &options) 138 { 139 spdk_fs_open_file(g_fs, g_sync_args.channel, fname.c_str(), SPDK_BLOBFS_OPEN_CREATE, &mFile); 140 } 141 142 SpdkRandomAccessFile::~SpdkRandomAccessFile(void) 143 { 144 spdk_file_close(mFile, g_sync_args.channel); 145 } 146 147 Status 148 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const 149 { 150 spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n); 151 *result = Slice(scratch, n); 152 return Status::OK(); 153 } 154 155 Status 156 SpdkRandomAccessFile::InvalidateCache(size_t offset, size_t length) 157 { 158 return Status::OK(); 159 } 160 161 class SpdkWritableFile : public WritableFile 162 { 163 struct spdk_file *mFile; 164 uint32_t mSize; 165 166 public: 167 SpdkWritableFile(const std::string &fname, const EnvOptions &options); 168 ~SpdkWritableFile() 169 { 170 if (mFile != NULL) { 171 Close(); 172 } 173 } 174 175 virtual void SetIOPriority(Env::IOPriority pri) 176 { 177 if (pri == Env::IO_HIGH) { 178 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH); 179 } 180 } 181 182 virtual Status Truncate(uint64_t size) override 183 { 184 spdk_file_truncate(mFile, g_sync_args.channel, size); 185 mSize = size; 186 return Status::OK(); 187 } 188 virtual Status Close() override 189 { 190 spdk_file_close(mFile, g_sync_args.channel); 191 mFile = NULL; 192 return Status::OK(); 193 } 194 virtual Status Append(const Slice &data) override; 195 virtual Status Flush() override 196 { 197 return Status::OK(); 198 } 199 virtual Status Sync() override 200 { 201 spdk_file_sync(mFile, g_sync_args.channel); 202 return Status::OK(); 203 } 204 virtual Status Fsync() override 205 { 206 spdk_file_sync(mFile, g_sync_args.channel); 207 return Status::OK(); 208 } 209 virtual bool IsSyncThreadSafe() const override 210 { 211 return true; 212 } 213 virtual uint64_t GetFileSize() override 214 { 215 return mSize; 216 } 217 virtual Status InvalidateCache(size_t offset, size_t length) override 218 { 219 return Status::OK(); 220 } 221 #ifdef ROCKSDB_FALLOCATE_PRESENT 222 virtual Status Allocate(uint64_t offset, uint64_t len) override 223 { 224 spdk_file_truncate(mFile, g_sync_args.channel, offset + len); 225 return Status::OK(); 226 } 227 virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override 228 { 229 /* 230 * SPDK BlobFS does not have a range sync operation yet, so just sync 231 * the whole file. 232 */ 233 spdk_file_sync(mFile, g_sync_args.channel); 234 return Status::OK(); 235 } 236 virtual size_t GetUniqueId(char *id, size_t max_size) const override 237 { 238 return 0; 239 } 240 #endif 241 }; 242 243 SpdkWritableFile::SpdkWritableFile(const std::string &fname, const EnvOptions &options) : mSize(0) 244 { 245 spdk_fs_open_file(g_fs, g_sync_args.channel, fname.c_str(), SPDK_BLOBFS_OPEN_CREATE, &mFile); 246 } 247 248 Status 249 SpdkWritableFile::Append(const Slice &data) 250 { 251 spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size()); 252 mSize += data.size(); 253 254 return Status::OK(); 255 } 256 257 class SpdkDirectory : public Directory 258 { 259 public: 260 SpdkDirectory() {} 261 ~SpdkDirectory() {} 262 Status Fsync() override 263 { 264 return Status::OK(); 265 } 266 }; 267 268 class SpdkEnv : public PosixEnv 269 { 270 private: 271 pthread_t mSpdkTid; 272 std::string mDirectory; 273 std::string mConfig; 274 std::string mBdev; 275 276 public: 277 SpdkEnv(const std::string &dir, const std::string &conf, 278 const std::string &bdev, uint64_t cache_size_in_mb); 279 280 virtual ~SpdkEnv(); 281 282 virtual Status NewSequentialFile(const std::string &fname, 283 unique_ptr<SequentialFile> *result, 284 const EnvOptions &options) override 285 { 286 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 287 struct spdk_file *file; 288 int rc; 289 290 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, 291 basename(fname).c_str(), 0, &file); 292 if (rc == 0) { 293 result->reset(new SpdkSequentialFile(file)); 294 return Status::OK(); 295 } else { 296 /* Myrocks engine uses errno(ENOENT) as one 297 * special condition, for the purpose to 298 * support MySQL, set the errno to right value. 299 */ 300 errno = -rc; 301 return IOError(fname, errno); 302 } 303 } else { 304 return PosixEnv::NewSequentialFile(fname, result, options); 305 } 306 } 307 308 virtual Status NewRandomAccessFile(const std::string &fname, 309 unique_ptr<RandomAccessFile> *result, 310 const EnvOptions &options) override 311 { 312 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 313 result->reset(new SpdkRandomAccessFile(basename(fname), options)); 314 return Status::OK(); 315 } else { 316 return PosixEnv::NewRandomAccessFile(fname, result, options); 317 } 318 } 319 320 virtual Status NewWritableFile(const std::string &fname, 321 unique_ptr<WritableFile> *result, 322 const EnvOptions &options) override 323 { 324 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) { 325 result->reset(new SpdkWritableFile(basename(fname), options)); 326 return Status::OK(); 327 } else { 328 return PosixEnv::NewWritableFile(fname, result, options); 329 } 330 } 331 332 virtual Status ReuseWritableFile(const std::string &fname, 333 const std::string &old_fname, 334 unique_ptr<WritableFile> *result, 335 const EnvOptions &options) override 336 { 337 return PosixEnv::ReuseWritableFile(fname, old_fname, result, options); 338 } 339 340 virtual Status NewDirectory(const std::string &name, 341 unique_ptr<Directory> *result) override 342 { 343 result->reset(new SpdkDirectory()); 344 return Status::OK(); 345 } 346 virtual Status FileExists(const std::string &fname) override 347 { 348 struct spdk_file_stat stat; 349 std::string fname_base = basename(fname); 350 int rc; 351 352 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, fname_base.c_str(), &stat); 353 if (rc == 0) { 354 return Status::OK(); 355 } 356 return PosixEnv::FileExists(fname); 357 } 358 virtual Status RenameFile(const std::string &src, const std::string &target) override 359 { 360 std::string target_base = basename(target); 361 std::string src_base = basename(src); 362 int rc; 363 364 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel, 365 src_base.c_str(), target_base.c_str()); 366 if (rc == -ENOENT) { 367 return PosixEnv::RenameFile(src, target); 368 } 369 return Status::OK(); 370 } 371 virtual Status LinkFile(const std::string &src, const std::string &target) override 372 { 373 return Status::NotSupported("SpdkEnv does not support LinkFile"); 374 } 375 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override 376 { 377 struct spdk_file_stat stat; 378 std::string fname_base = basename(fname); 379 int rc; 380 381 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, fname_base.c_str(), &stat); 382 if (rc == -ENOENT) { 383 return PosixEnv::GetFileSize(fname, size); 384 } 385 *size = stat.size; 386 return Status::OK(); 387 } 388 virtual Status DeleteFile(const std::string &fname) override 389 { 390 int rc; 391 std::string fname_base = basename(fname); 392 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, fname_base.c_str()); 393 if (rc == -ENOENT) { 394 return PosixEnv::DeleteFile(fname); 395 } 396 return Status::OK(); 397 } 398 virtual void StartThread(void (*function)(void *arg), void *arg) override; 399 virtual Status LockFile(const std::string &fname, FileLock **lock) override 400 { 401 spdk_fs_open_file(g_fs, g_sync_args.channel, basename(fname).c_str(), 402 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock); 403 return Status::OK(); 404 } 405 virtual Status UnlockFile(FileLock *lock) override 406 { 407 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel); 408 return Status::OK(); 409 } 410 virtual Status GetChildren(const std::string &dir, 411 std::vector<std::string> *result) override 412 { 413 if (dir.find("archive") != std::string::npos) { 414 return Status::OK(); 415 } 416 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) { 417 spdk_fs_iter iter; 418 struct spdk_file *file; 419 420 iter = spdk_fs_iter_first(g_fs); 421 while (iter != NULL) { 422 file = spdk_fs_iter_get_file(iter); 423 result->push_back(std::string(spdk_file_get_name(file))); 424 iter = spdk_fs_iter_next(iter); 425 } 426 return Status::OK(); 427 } 428 return PosixEnv::GetChildren(dir, result); 429 } 430 }; 431 432 static void 433 _spdk_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) 434 { 435 /* Not supported */ 436 assert(false); 437 } 438 439 void SpdkInitializeThread(void) 440 { 441 if (g_fs != NULL) { 442 spdk_allocate_thread(_spdk_send_msg, NULL); 443 g_sync_args.channel = spdk_fs_alloc_io_channel_sync(g_fs); 444 } 445 } 446 447 static void SpdkStartThreadWrapper(void *arg) 448 { 449 StartThreadState *state = reinterpret_cast<StartThreadState *>(arg); 450 451 SpdkInitializeThread(); 452 StartThreadWrapper(state); 453 } 454 455 void SpdkEnv::StartThread(void (*function)(void *arg), void *arg) 456 { 457 StartThreadState *state = new StartThreadState; 458 state->user_function = function; 459 state->arg = arg; 460 PosixEnv::StartThread(SpdkStartThreadWrapper, state); 461 } 462 463 static void 464 fs_load_cb(void *ctx, struct spdk_filesystem *fs, int fserrno) 465 { 466 if (fserrno == 0) { 467 g_fs = fs; 468 } 469 g_spdk_ready = true; 470 } 471 472 static void 473 spdk_rocksdb_run(void *arg1, void *arg2) 474 { 475 struct spdk_bdev *bdev; 476 477 pthread_setname_np(pthread_self(), "spdk"); 478 bdev = spdk_bdev_get_by_name(g_bdev_name.c_str()); 479 480 if (bdev == NULL) { 481 SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str()); 482 exit(1); 483 } 484 485 g_bs_dev = spdk_bdev_create_bs_dev(bdev); 486 printf("using bdev %s\n", g_bdev_name.c_str()); 487 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL); 488 } 489 490 static void 491 fs_unload_cb(void *ctx, int fserrno) 492 { 493 assert(fserrno == 0); 494 495 spdk_app_stop(0); 496 } 497 498 static void 499 spdk_rocksdb_shutdown(void) 500 { 501 if (g_fs != NULL) { 502 spdk_fs_unload(g_fs, fs_unload_cb, NULL); 503 } else { 504 fs_unload_cb(NULL, 0); 505 } 506 } 507 508 static void * 509 initialize_spdk(void *arg) 510 { 511 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg; 512 513 spdk_app_start(opts, spdk_rocksdb_run, NULL, NULL); 514 spdk_app_fini(); 515 516 delete opts; 517 pthread_exit(NULL); 518 } 519 520 SpdkEnv::SpdkEnv(const std::string &dir, const std::string &conf, 521 const std::string &bdev, uint64_t cache_size_in_mb) 522 : PosixEnv(), mDirectory(dir), mConfig(conf), mBdev(bdev) 523 { 524 struct spdk_app_opts *opts = new struct spdk_app_opts; 525 526 spdk_app_opts_init(opts); 527 opts->name = "rocksdb"; 528 opts->config_file = mConfig.c_str(); 529 opts->reactor_mask = "0x1"; 530 opts->mem_size = 1024 + cache_size_in_mb; 531 opts->shutdown_cb = spdk_rocksdb_shutdown; 532 533 spdk_fs_set_cache_size(cache_size_in_mb); 534 g_bdev_name = mBdev; 535 536 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts); 537 while (!g_spdk_ready) 538 ; 539 540 SpdkInitializeThread(); 541 } 542 543 SpdkEnv::~SpdkEnv() 544 { 545 spdk_app_start_shutdown(); 546 pthread_join(mSpdkTid, NULL); 547 } 548 549 void NewSpdkEnv(Env **env, const std::string &dir, const std::string &conf, 550 const std::string &bdev, uint64_t cache_size_in_mb) 551 { 552 SpdkEnv *spdk_env = new SpdkEnv(dir, conf, bdev, cache_size_in_mb); 553 554 if (g_fs != NULL) { 555 *env = spdk_env; 556 } else { 557 *env = NULL; 558 delete spdk_env; 559 } 560 } 561 562 } // namespace rocksdb 563