1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (C) 2017 Intel Corporation.
3 * All rights reserved.
4 */
5
6 #include "rocksdb/env.h"
7 #include <set>
8 #include <iostream>
9 #include <stdexcept>
10
11 extern "C" {
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/blob.h"
15 #include "spdk/blobfs.h"
16 #include "spdk/blob_bdev.h"
17 #include "spdk/log.h"
18 #include "spdk/thread.h"
19 #include "spdk/bdev.h"
20 }
21
22 namespace rocksdb
23 {
24
25 struct spdk_filesystem *g_fs = NULL;
26 struct spdk_bs_dev *g_bs_dev;
27 struct spdk_thread *g_app_thread;
28 std::string g_bdev_name;
29 volatile bool g_spdk_ready = false;
30 volatile bool g_spdk_start_failure = false;
31
32 void SpdkInitializeThread(void);
33
34 class SpdkThreadCtx
35 {
36 public:
37 struct spdk_fs_thread_ctx *channel;
38
SpdkThreadCtx(void)39 SpdkThreadCtx(void) : channel(NULL)
40 {
41 SpdkInitializeThread();
42 }
43
~SpdkThreadCtx(void)44 ~SpdkThreadCtx(void)
45 {
46 if (channel) {
47 spdk_fs_free_thread_ctx(channel);
48 channel = NULL;
49 }
50 }
51
52 private:
53 SpdkThreadCtx(const SpdkThreadCtx &);
54 SpdkThreadCtx &operator=(const SpdkThreadCtx &);
55 };
56
57 thread_local SpdkThreadCtx g_sync_args;
58
59 static void
set_channel()60 set_channel()
61 {
62 if (g_fs != NULL && g_sync_args.channel == NULL) {
63 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
64 }
65 }
66
67 static void
__send_request(fs_request_fn fn,void * arg)68 __send_request(fs_request_fn fn, void *arg)
69 {
70 spdk_thread_send_msg(g_app_thread, fn, arg);
71 }
72
73 static std::string
sanitize_path(const std::string & input,const std::string & mount_directory)74 sanitize_path(const std::string &input, const std::string &mount_directory)
75 {
76 int index = 0;
77 std::string name;
78 std::string input_tmp;
79
80 input_tmp = input.substr(mount_directory.length(), input.length());
81 for (const char &c : input_tmp) {
82 if (index == 0) {
83 if (c != '/') {
84 name = name.insert(index, 1, '/');
85 index++;
86 }
87 name = name.insert(index, 1, c);
88 index++;
89 } else {
90 if (name[index - 1] == '/' && c == '/') {
91 continue;
92 } else {
93 name = name.insert(index, 1, c);
94 index++;
95 }
96 }
97 }
98
99 if (name[name.size() - 1] == '/') {
100 name = name.erase(name.size() - 1, 1);
101 }
102 return name;
103 }
104
105 class SpdkSequentialFile : public SequentialFile
106 {
107 struct spdk_file *mFile;
108 uint64_t mOffset;
109 public:
SpdkSequentialFile(struct spdk_file * file)110 SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
111 virtual ~SpdkSequentialFile();
112
113 virtual Status Read(size_t n, Slice *result, char *scratch) override;
114 virtual Status Skip(uint64_t n) override;
115 virtual Status InvalidateCache(size_t offset, size_t length) override;
116 };
117
~SpdkSequentialFile(void)118 SpdkSequentialFile::~SpdkSequentialFile(void)
119 {
120 set_channel();
121 spdk_file_close(mFile, g_sync_args.channel);
122 }
123
124 Status
Read(size_t n,Slice * result,char * scratch)125 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
126 {
127 int64_t ret;
128
129 set_channel();
130 ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
131 if (ret >= 0) {
132 mOffset += ret;
133 *result = Slice(scratch, ret);
134 return Status::OK();
135 } else {
136 errno = -ret;
137 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
138 }
139 }
140
141 Status
Skip(uint64_t n)142 SpdkSequentialFile::Skip(uint64_t n)
143 {
144 mOffset += n;
145 return Status::OK();
146 }
147
148 Status
InvalidateCache(size_t offset,size_t length)149 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
150 __attribute__((unused)) size_t length)
151 {
152 return Status::OK();
153 }
154
155 class SpdkRandomAccessFile : public RandomAccessFile
156 {
157 struct spdk_file *mFile;
158 public:
SpdkRandomAccessFile(struct spdk_file * file)159 SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
160 virtual ~SpdkRandomAccessFile();
161
162 virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
163 virtual Status InvalidateCache(size_t offset, size_t length) override;
164 };
165
~SpdkRandomAccessFile(void)166 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
167 {
168 set_channel();
169 spdk_file_close(mFile, g_sync_args.channel);
170 }
171
172 Status
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const173 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
174 {
175 int64_t rc;
176
177 set_channel();
178 rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
179 if (rc >= 0) {
180 *result = Slice(scratch, rc);
181 return Status::OK();
182 } else {
183 errno = -rc;
184 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
185 }
186 }
187
188 Status
InvalidateCache(size_t offset,size_t length)189 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
190 __attribute__((unused)) size_t length)
191 {
192 return Status::OK();
193 }
194
195 class SpdkWritableFile : public WritableFile
196 {
197 struct spdk_file *mFile;
198 uint64_t mSize;
199
200 public:
SpdkWritableFile(struct spdk_file * file)201 SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
~SpdkWritableFile()202 ~SpdkWritableFile()
203 {
204 if (mFile != NULL) {
205 Close();
206 }
207 }
208
SetIOPriority(Env::IOPriority pri)209 virtual void SetIOPriority(Env::IOPriority pri)
210 {
211 if (pri == Env::IO_HIGH) {
212 spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
213 }
214 }
215
Truncate(uint64_t size)216 virtual Status Truncate(uint64_t size) override
217 {
218 int rc;
219
220 set_channel();
221 rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
222 if (!rc) {
223 mSize = size;
224 return Status::OK();
225 } else {
226 errno = -rc;
227 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
228 }
229 }
Close()230 virtual Status Close() override
231 {
232 set_channel();
233 spdk_file_close(mFile, g_sync_args.channel);
234 mFile = NULL;
235 return Status::OK();
236 }
237 using WritableFile::Append;
238 virtual Status Append(const Slice &data) override;
Flush()239 virtual Status Flush() override
240 {
241 return Status::OK();
242 }
Sync()243 virtual Status Sync() override
244 {
245 int rc;
246
247 set_channel();
248 rc = spdk_file_sync(mFile, g_sync_args.channel);
249 if (!rc) {
250 return Status::OK();
251 } else {
252 errno = -rc;
253 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
254 }
255 }
Fsync()256 virtual Status Fsync() override
257 {
258 int rc;
259
260 set_channel();
261 rc = spdk_file_sync(mFile, g_sync_args.channel);
262 if (!rc) {
263 return Status::OK();
264 } else {
265 errno = -rc;
266 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
267 }
268 }
IsSyncThreadSafe() const269 virtual bool IsSyncThreadSafe() const override
270 {
271 return true;
272 }
GetFileSize()273 virtual uint64_t GetFileSize() override
274 {
275 return mSize;
276 }
InvalidateCache(size_t offset,size_t length)277 virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
278 __attribute__((unused)) size_t length) override
279 {
280 return Status::OK();
281 }
Allocate(uint64_t offset,uint64_t len)282 virtual Status Allocate(uint64_t offset, uint64_t len) override
283 {
284 int rc;
285
286 set_channel();
287 rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
288 if (!rc) {
289 return Status::OK();
290 } else {
291 errno = -rc;
292 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
293 }
294 }
RangeSync(uint64_t offset,uint64_t nbytes)295 virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
296 __attribute__((unused)) uint64_t nbytes) override
297 {
298 int rc;
299
300 /*
301 * SPDK BlobFS does not have a range sync operation yet, so just sync
302 * the whole file.
303 */
304 set_channel();
305 rc = spdk_file_sync(mFile, g_sync_args.channel);
306 if (!rc) {
307 return Status::OK();
308 } else {
309 errno = -rc;
310 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
311 }
312 }
GetUniqueId(char * id,size_t max_size) const313 virtual size_t GetUniqueId(char *id, size_t max_size) const override
314 {
315 int rc;
316
317 rc = spdk_file_get_id(mFile, id, max_size);
318 if (rc < 0) {
319 return 0;
320 } else {
321 return rc;
322 }
323 }
324 };
325
326 Status
Append(const Slice & data)327 SpdkWritableFile::Append(const Slice &data)
328 {
329 int64_t rc;
330
331 set_channel();
332 rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
333 if (rc >= 0) {
334 mSize += data.size();
335 return Status::OK();
336 } else {
337 errno = -rc;
338 return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
339 }
340 }
341
342 class SpdkDirectory : public Directory
343 {
344 public:
SpdkDirectory()345 SpdkDirectory() {}
~SpdkDirectory()346 ~SpdkDirectory() {}
Fsync()347 Status Fsync() override
348 {
349 return Status::OK();
350 }
351 };
352
353 class SpdkAppStartException : public std::runtime_error
354 {
355 public:
SpdkAppStartException(std::string mess)356 SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
357 };
358
359 class SpdkEnv : public EnvWrapper
360 {
361 private:
362 pthread_t mSpdkTid;
363 std::string mDirectory;
364 std::string mConfig;
365 std::string mBdev;
366
367 public:
368 SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
369 const std::string &bdev, uint64_t cache_size_in_mb);
370
371 virtual ~SpdkEnv();
372
NewSequentialFile(const std::string & fname,std::unique_ptr<SequentialFile> * result,const EnvOptions & options)373 virtual Status NewSequentialFile(const std::string &fname,
374 std::unique_ptr<SequentialFile> *result,
375 const EnvOptions &options) override
376 {
377 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
378 struct spdk_file *file;
379 int rc;
380
381 std::string name = sanitize_path(fname, mDirectory);
382 set_channel();
383 rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
384 name.c_str(), 0, &file);
385 if (rc == 0) {
386 result->reset(new SpdkSequentialFile(file));
387 return Status::OK();
388 } else {
389 /* Myrocks engine uses errno(ENOENT) as one
390 * special condition, for the purpose to
391 * support MySQL, set the errno to right value.
392 */
393 errno = -rc;
394 return Status::IOError(name, strerror(errno));
395 }
396 } else {
397 return EnvWrapper::NewSequentialFile(fname, result, options);
398 }
399 }
400
NewRandomAccessFile(const std::string & fname,std::unique_ptr<RandomAccessFile> * result,const EnvOptions & options)401 virtual Status NewRandomAccessFile(const std::string &fname,
402 std::unique_ptr<RandomAccessFile> *result,
403 const EnvOptions &options) override
404 {
405 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
406 std::string name = sanitize_path(fname, mDirectory);
407 struct spdk_file *file;
408 int rc;
409
410 set_channel();
411 rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
412 name.c_str(), 0, &file);
413 if (rc == 0) {
414 result->reset(new SpdkRandomAccessFile(file));
415 return Status::OK();
416 } else {
417 errno = -rc;
418 return Status::IOError(name, strerror(errno));
419 }
420 } else {
421 return EnvWrapper::NewRandomAccessFile(fname, result, options);
422 }
423 }
424
NewWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)425 virtual Status NewWritableFile(const std::string &fname,
426 std::unique_ptr<WritableFile> *result,
427 const EnvOptions &options) override
428 {
429 if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
430 std::string name = sanitize_path(fname, mDirectory);
431 struct spdk_file *file;
432 int rc;
433
434 set_channel();
435 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
436 SPDK_BLOBFS_OPEN_CREATE, &file);
437 if (rc == 0) {
438 result->reset(new SpdkWritableFile(file));
439 return Status::OK();
440 } else {
441 errno = -rc;
442 return Status::IOError(name, strerror(errno));
443 }
444 } else {
445 return EnvWrapper::NewWritableFile(fname, result, options);
446 }
447 }
448
ReuseWritableFile(const std::string & fname,const std::string & old_fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)449 virtual Status ReuseWritableFile(const std::string &fname,
450 const std::string &old_fname,
451 std::unique_ptr<WritableFile> *result,
452 const EnvOptions &options) override
453 {
454 return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
455 }
456
NewDirectory(const std::string & name,std::unique_ptr<Directory> * result)457 virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
458 std::unique_ptr<Directory> *result) override
459 {
460 result->reset(new SpdkDirectory());
461 return Status::OK();
462 }
FileExists(const std::string & fname)463 virtual Status FileExists(const std::string &fname) override
464 {
465 struct spdk_file_stat stat;
466 int rc;
467 std::string name = sanitize_path(fname, mDirectory);
468
469 set_channel();
470 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
471 if (rc == 0) {
472 return Status::OK();
473 }
474 return EnvWrapper::FileExists(fname);
475 }
RenameFile(const std::string & src,const std::string & t)476 virtual Status RenameFile(const std::string &src, const std::string &t) override
477 {
478 int rc;
479 std::string src_name = sanitize_path(src, mDirectory);
480 std::string target_name = sanitize_path(t, mDirectory);
481
482 set_channel();
483 rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
484 src_name.c_str(), target_name.c_str());
485 if (rc == -ENOENT) {
486 return EnvWrapper::RenameFile(src, t);
487 }
488 return Status::OK();
489 }
LinkFile(const std::string & src,const std::string & t)490 virtual Status LinkFile(__attribute__((unused)) const std::string &src,
491 __attribute__((unused)) const std::string &t) override
492 {
493 return Status::NotSupported("SpdkEnv does not support LinkFile");
494 }
GetFileSize(const std::string & fname,uint64_t * size)495 virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
496 {
497 struct spdk_file_stat stat;
498 int rc;
499 std::string name = sanitize_path(fname, mDirectory);
500
501 set_channel();
502 rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
503 if (rc == -ENOENT) {
504 return EnvWrapper::GetFileSize(fname, size);
505 }
506 *size = stat.size;
507 return Status::OK();
508 }
DeleteFile(const std::string & fname)509 virtual Status DeleteFile(const std::string &fname) override
510 {
511 int rc;
512 std::string name = sanitize_path(fname, mDirectory);
513
514 set_channel();
515 rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
516 if (rc == -ENOENT) {
517 return EnvWrapper::DeleteFile(fname);
518 }
519 return Status::OK();
520 }
LockFile(const std::string & fname,FileLock ** lock)521 virtual Status LockFile(const std::string &fname, FileLock **lock) override
522 {
523 std::string name = sanitize_path(fname, mDirectory);
524 int64_t rc;
525
526 set_channel();
527 rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
528 SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
529 if (!rc) {
530 return Status::OK();
531 } else {
532 errno = -rc;
533 return Status::IOError(name, strerror(errno));
534 }
535 }
UnlockFile(FileLock * lock)536 virtual Status UnlockFile(FileLock *lock) override
537 {
538 set_channel();
539 spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
540 return Status::OK();
541 }
GetChildren(const std::string & dir,std::vector<std::string> * result)542 virtual Status GetChildren(const std::string &dir,
543 std::vector<std::string> *result) override
544 {
545 std::string::size_type pos;
546 std::set<std::string> dir_and_file_set;
547 std::string full_path;
548 std::string filename;
549 std::string dir_name;
550
551 if (dir.find("archive") != std::string::npos) {
552 return Status::OK();
553 }
554 if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
555 spdk_fs_iter iter;
556 struct spdk_file *file;
557 dir_name = sanitize_path(dir, mDirectory);
558
559 iter = spdk_fs_iter_first(g_fs);
560 while (iter != NULL) {
561 file = spdk_fs_iter_get_file(iter);
562 full_path = spdk_file_get_name(file);
563 if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
564 iter = spdk_fs_iter_next(iter);
565 continue;
566 }
567 pos = full_path.find("/", dir_name.length() + 1);
568
569 if (pos != std::string::npos) {
570 filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
571 } else {
572 filename = full_path.substr(dir_name.length() + 1);
573 }
574 dir_and_file_set.insert(filename);
575 iter = spdk_fs_iter_next(iter);
576 }
577
578 for (auto &s : dir_and_file_set) {
579 result->push_back(s);
580 }
581
582 result->push_back(".");
583 result->push_back("..");
584
585 return Status::OK();
586 }
587 return EnvWrapper::GetChildren(dir, result);
588 }
589 };
590
591 /* The thread local constructor doesn't work for the main thread, since
592 * the filesystem hasn't been loaded yet. So we break out this
593 * SpdkInitializeThread function, so that the main thread can explicitly
594 * call it after the filesystem has been loaded.
595 */
SpdkInitializeThread(void)596 void SpdkInitializeThread(void)
597 {
598 if (g_fs != NULL) {
599 if (g_sync_args.channel) {
600 spdk_fs_free_thread_ctx(g_sync_args.channel);
601 }
602 g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
603 }
604 }
605
606 static void
fs_load_cb(void * ctx,struct spdk_filesystem * fs,int fserrno)607 fs_load_cb(__attribute__((unused)) void *ctx,
608 struct spdk_filesystem *fs, int fserrno)
609 {
610 if (fserrno == 0) {
611 g_fs = fs;
612 }
613 g_spdk_ready = true;
614 }
615
616 static void
base_bdev_event_cb(enum spdk_bdev_event_type type,struct spdk_bdev * bdev,void * event_ctx)617 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev,
618 __attribute__((unused)) void *event_ctx)
619 {
620 printf("Unsupported bdev event: type %d\n", type);
621 }
622
623 static void
rocksdb_run(void * arg1)624 rocksdb_run(__attribute__((unused)) void *arg1)
625 {
626 int rc;
627
628 rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL,
629 &g_bs_dev);
630 if (rc != 0) {
631 printf("Could not create blob bdev\n");
632 spdk_app_stop(0);
633 exit(1);
634 }
635
636 g_app_thread = spdk_get_thread();
637
638 printf("using bdev %s\n", g_bdev_name.c_str());
639 spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
640 }
641
642 static void
fs_unload_cb(void * ctx,int fserrno)643 fs_unload_cb(__attribute__((unused)) void *ctx,
644 __attribute__((unused)) int fserrno)
645 {
646 assert(fserrno == 0);
647
648 spdk_app_stop(0);
649 }
650
651 static void
rocksdb_shutdown(void)652 rocksdb_shutdown(void)
653 {
654 if (g_fs != NULL) {
655 spdk_fs_unload(g_fs, fs_unload_cb, NULL);
656 } else {
657 fs_unload_cb(NULL, 0);
658 }
659 }
660
661 static void *
initialize_spdk(void * arg)662 initialize_spdk(void *arg)
663 {
664 struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
665 int rc;
666
667 rc = spdk_app_start(opts, rocksdb_run, NULL);
668 /*
669 * TODO: Revisit for case of internal failure of
670 * spdk_app_start(), itself. At this time, it's known
671 * the only application's use of spdk_app_stop() passes
672 * a zero; i.e. no fail (non-zero) cases so here we
673 * assume there was an internal failure and flag it
674 * so we can throw an exception.
675 */
676 if (rc) {
677 g_spdk_start_failure = true;
678 } else {
679 spdk_app_fini();
680 delete opts;
681 }
682 pthread_exit(NULL);
683
684 }
685
SpdkEnv(Env * base_env,const std::string & dir,const std::string & conf,const std::string & bdev,uint64_t cache_size_in_mb)686 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
687 const std::string &bdev, uint64_t cache_size_in_mb)
688 : EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
689 {
690 struct spdk_app_opts *opts = new struct spdk_app_opts;
691
692 spdk_app_opts_init(opts, sizeof(*opts));
693 opts->name = "rocksdb";
694 opts->json_config_file = mConfig.c_str();
695 opts->shutdown_cb = rocksdb_shutdown;
696 opts->tpoint_group_mask = "0x80";
697
698 spdk_fs_set_cache_size(cache_size_in_mb);
699 g_bdev_name = mBdev;
700
701 pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
702 while (!g_spdk_ready && !g_spdk_start_failure)
703 ;
704 if (g_spdk_start_failure) {
705 delete opts;
706 throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()");
707 }
708
709 SpdkInitializeThread();
710 }
711
~SpdkEnv()712 SpdkEnv::~SpdkEnv()
713 {
714 /* This is a workaround for rocksdb test, we close the files if the rocksdb not
715 * do the work before the test quit.
716 */
717 if (g_fs != NULL) {
718 spdk_fs_iter iter;
719 struct spdk_file *file;
720
721 if (!g_sync_args.channel) {
722 SpdkInitializeThread();
723 }
724
725 iter = spdk_fs_iter_first(g_fs);
726 while (iter != NULL) {
727 file = spdk_fs_iter_get_file(iter);
728 spdk_file_close(file, g_sync_args.channel);
729 iter = spdk_fs_iter_next(iter);
730 }
731 }
732
733 spdk_app_start_shutdown();
734 pthread_join(mSpdkTid, NULL);
735 }
736
NewSpdkEnv(Env * base_env,const std::string & dir,const std::string & conf,const std::string & bdev,uint64_t cache_size_in_mb)737 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
738 const std::string &bdev, uint64_t cache_size_in_mb)
739 {
740 try {
741 SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
742 if (g_fs != NULL) {
743 return spdk_env;
744 } else {
745 delete spdk_env;
746 return NULL;
747 }
748 } catch (SpdkAppStartException &e) {
749 SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
750 return NULL;
751 } catch (...) {
752 SPDK_ERRLOG("NewSpdkEnv: default exception caught");
753 return NULL;
754 }
755 }
756
757 } // namespace rocksdb
758