xref: /spdk/lib/rocksdb/env_spdk.cc (revision a6dbe3721eb3b5990707fc3e378c95e505dd8ab5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "rocksdb/env.h"
7 #include <set>
8 #include <iostream>
9 #include <stdexcept>
10 
11 extern "C" {
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/blob.h"
15 #include "spdk/blobfs.h"
16 #include "spdk/blob_bdev.h"
17 #include "spdk/log.h"
18 #include "spdk/thread.h"
19 #include "spdk/bdev.h"
20 }
21 
22 namespace rocksdb
23 {
24 
25 struct spdk_filesystem *g_fs = NULL;
26 struct spdk_bs_dev *g_bs_dev;
27 uint32_t g_lcore = 0;
28 std::string g_bdev_name;
29 volatile bool g_spdk_ready = false;
30 volatile bool g_spdk_start_failure = false;
31 
32 void SpdkInitializeThread(void);
33 
34 class SpdkThreadCtx
35 {
36 public:
37 	struct spdk_fs_thread_ctx *channel;
38 
39 	SpdkThreadCtx(void) : channel(NULL)
40 	{
41 		SpdkInitializeThread();
42 	}
43 
44 	~SpdkThreadCtx(void)
45 	{
46 		if (channel) {
47 			spdk_fs_free_thread_ctx(channel);
48 			channel = NULL;
49 		}
50 	}
51 
52 private:
53 	SpdkThreadCtx(const SpdkThreadCtx &);
54 	SpdkThreadCtx &operator=(const SpdkThreadCtx &);
55 };
56 
57 thread_local SpdkThreadCtx g_sync_args;
58 
59 static void
60 set_channel()
61 {
62 	struct spdk_thread *thread;
63 
64 	if (g_fs != NULL && g_sync_args.channel == NULL) {
65 		thread = spdk_thread_create("spdk_rocksdb", NULL);
66 		spdk_set_thread(thread);
67 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
68 	}
69 }
70 
71 static void
72 __call_fn(void *arg1, void *arg2)
73 {
74 	fs_request_fn fn;
75 
76 	fn = (fs_request_fn)arg1;
77 	fn(arg2);
78 }
79 
80 static void
81 __send_request(fs_request_fn fn, void *arg)
82 {
83 	struct spdk_event *event;
84 
85 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
86 	spdk_event_call(event);
87 }
88 
89 static std::string
90 sanitize_path(const std::string &input, const std::string &mount_directory)
91 {
92 	int index = 0;
93 	std::string name;
94 	std::string input_tmp;
95 
96 	input_tmp = input.substr(mount_directory.length(), input.length());
97 	for (const char &c : input_tmp) {
98 		if (index == 0) {
99 			if (c != '/') {
100 				name = name.insert(index, 1, '/');
101 				index++;
102 			}
103 			name = name.insert(index, 1, c);
104 			index++;
105 		} else {
106 			if (name[index - 1] == '/' && c == '/') {
107 				continue;
108 			} else {
109 				name = name.insert(index, 1, c);
110 				index++;
111 			}
112 		}
113 	}
114 
115 	if (name[name.size() - 1] == '/') {
116 		name = name.erase(name.size() - 1, 1);
117 	}
118 	return name;
119 }
120 
121 class SpdkSequentialFile : public SequentialFile
122 {
123 	struct spdk_file *mFile;
124 	uint64_t mOffset;
125 public:
126 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
127 	virtual ~SpdkSequentialFile();
128 
129 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
130 	virtual Status Skip(uint64_t n) override;
131 	virtual Status InvalidateCache(size_t offset, size_t length) override;
132 };
133 
134 SpdkSequentialFile::~SpdkSequentialFile(void)
135 {
136 	set_channel();
137 	spdk_file_close(mFile, g_sync_args.channel);
138 }
139 
140 Status
141 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
142 {
143 	int64_t ret;
144 
145 	set_channel();
146 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
147 	if (ret >= 0) {
148 		mOffset += ret;
149 		*result = Slice(scratch, ret);
150 		return Status::OK();
151 	} else {
152 		errno = -ret;
153 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
154 	}
155 }
156 
157 Status
158 SpdkSequentialFile::Skip(uint64_t n)
159 {
160 	mOffset += n;
161 	return Status::OK();
162 }
163 
164 Status
165 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
166 				    __attribute__((unused)) size_t length)
167 {
168 	return Status::OK();
169 }
170 
171 class SpdkRandomAccessFile : public RandomAccessFile
172 {
173 	struct spdk_file *mFile;
174 public:
175 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
176 	virtual ~SpdkRandomAccessFile();
177 
178 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
179 	virtual Status InvalidateCache(size_t offset, size_t length) override;
180 };
181 
182 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
183 {
184 	set_channel();
185 	spdk_file_close(mFile, g_sync_args.channel);
186 }
187 
188 Status
189 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
190 {
191 	int64_t rc;
192 
193 	set_channel();
194 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
195 	if (rc >= 0) {
196 		*result = Slice(scratch, n);
197 		return Status::OK();
198 	} else {
199 		errno = -rc;
200 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
201 	}
202 }
203 
204 Status
205 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
206 				      __attribute__((unused)) size_t length)
207 {
208 	return Status::OK();
209 }
210 
211 class SpdkWritableFile : public WritableFile
212 {
213 	struct spdk_file *mFile;
214 	uint64_t mSize;
215 
216 public:
217 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
218 	~SpdkWritableFile()
219 	{
220 		if (mFile != NULL) {
221 			Close();
222 		}
223 	}
224 
225 	virtual void SetIOPriority(Env::IOPriority pri)
226 	{
227 		if (pri == Env::IO_HIGH) {
228 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
229 		}
230 	}
231 
232 	virtual Status Truncate(uint64_t size) override
233 	{
234 		int rc;
235 
236 		set_channel();
237 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
238 		if (!rc) {
239 			mSize = size;
240 			return Status::OK();
241 		} else {
242 			errno = -rc;
243 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
244 		}
245 	}
246 	virtual Status Close() override
247 	{
248 		set_channel();
249 		spdk_file_close(mFile, g_sync_args.channel);
250 		mFile = NULL;
251 		return Status::OK();
252 	}
253 	virtual Status Append(const Slice &data) override;
254 	virtual Status Flush() override
255 	{
256 		return Status::OK();
257 	}
258 	virtual Status Sync() override
259 	{
260 		int rc;
261 
262 		set_channel();
263 		rc = spdk_file_sync(mFile, g_sync_args.channel);
264 		if (!rc) {
265 			return Status::OK();
266 		} else {
267 			errno = -rc;
268 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
269 		}
270 	}
271 	virtual Status Fsync() override
272 	{
273 		int rc;
274 
275 		set_channel();
276 		rc = spdk_file_sync(mFile, g_sync_args.channel);
277 		if (!rc) {
278 			return Status::OK();
279 		} else {
280 			errno = -rc;
281 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
282 		}
283 	}
284 	virtual bool IsSyncThreadSafe() const override
285 	{
286 		return true;
287 	}
288 	virtual uint64_t GetFileSize() override
289 	{
290 		return mSize;
291 	}
292 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
293 				       __attribute__((unused)) size_t length) override
294 	{
295 		return Status::OK();
296 	}
297 	virtual Status Allocate(uint64_t offset, uint64_t len) override
298 	{
299 		int rc;
300 
301 		set_channel();
302 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
303 		if (!rc) {
304 			return Status::OK();
305 		} else {
306 			errno = -rc;
307 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
308 		}
309 	}
310 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
311 				 __attribute__((unused)) uint64_t nbytes) override
312 	{
313 		int rc;
314 
315 		/*
316 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
317 		 *  the whole file.
318 		 */
319 		set_channel();
320 		rc = spdk_file_sync(mFile, g_sync_args.channel);
321 		if (!rc) {
322 			return Status::OK();
323 		} else {
324 			errno = -rc;
325 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
326 		}
327 	}
328 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
329 	{
330 		int rc;
331 
332 		rc = spdk_file_get_id(mFile, id, max_size);
333 		if (rc < 0) {
334 			return 0;
335 		} else {
336 			return rc;
337 		}
338 	}
339 };
340 
341 Status
342 SpdkWritableFile::Append(const Slice &data)
343 {
344 	int64_t rc;
345 
346 	set_channel();
347 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
348 	if (rc >= 0) {
349 		mSize += data.size();
350 		return Status::OK();
351 	} else {
352 		errno = -rc;
353 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
354 	}
355 }
356 
357 class SpdkDirectory : public Directory
358 {
359 public:
360 	SpdkDirectory() {}
361 	~SpdkDirectory() {}
362 	Status Fsync() override
363 	{
364 		return Status::OK();
365 	}
366 };
367 
368 class SpdkAppStartException : public std::runtime_error
369 {
370 public:
371 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
372 };
373 
374 class SpdkEnv : public EnvWrapper
375 {
376 private:
377 	pthread_t mSpdkTid;
378 	std::string mDirectory;
379 	std::string mConfig;
380 	std::string mBdev;
381 
382 public:
383 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
384 		const std::string &bdev, uint64_t cache_size_in_mb);
385 
386 	virtual ~SpdkEnv();
387 
388 	virtual Status NewSequentialFile(const std::string &fname,
389 					 std::unique_ptr<SequentialFile> *result,
390 					 const EnvOptions &options) override
391 	{
392 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
393 			struct spdk_file *file;
394 			int rc;
395 
396 			std::string name = sanitize_path(fname, mDirectory);
397 			set_channel();
398 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
399 					       name.c_str(), 0, &file);
400 			if (rc == 0) {
401 				result->reset(new SpdkSequentialFile(file));
402 				return Status::OK();
403 			} else {
404 				/* Myrocks engine uses errno(ENOENT) as one
405 				 * special condition, for the purpose to
406 				 * support MySQL, set the errno to right value.
407 				 */
408 				errno = -rc;
409 				return Status::IOError(name, strerror(errno));
410 			}
411 		} else {
412 			return EnvWrapper::NewSequentialFile(fname, result, options);
413 		}
414 	}
415 
416 	virtual Status NewRandomAccessFile(const std::string &fname,
417 					   std::unique_ptr<RandomAccessFile> *result,
418 					   const EnvOptions &options) override
419 	{
420 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
421 			std::string name = sanitize_path(fname, mDirectory);
422 			struct spdk_file *file;
423 			int rc;
424 
425 			set_channel();
426 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
427 					       name.c_str(), 0, &file);
428 			if (rc == 0) {
429 				result->reset(new SpdkRandomAccessFile(file));
430 				return Status::OK();
431 			} else {
432 				errno = -rc;
433 				return Status::IOError(name, strerror(errno));
434 			}
435 		} else {
436 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
437 		}
438 	}
439 
440 	virtual Status NewWritableFile(const std::string &fname,
441 				       std::unique_ptr<WritableFile> *result,
442 				       const EnvOptions &options) override
443 	{
444 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
445 			std::string name = sanitize_path(fname, mDirectory);
446 			struct spdk_file *file;
447 			int rc;
448 
449 			set_channel();
450 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
451 					       SPDK_BLOBFS_OPEN_CREATE, &file);
452 			if (rc == 0) {
453 				result->reset(new SpdkWritableFile(file));
454 				return Status::OK();
455 			} else {
456 				errno = -rc;
457 				return Status::IOError(name, strerror(errno));
458 			}
459 		} else {
460 			return EnvWrapper::NewWritableFile(fname, result, options);
461 		}
462 	}
463 
464 	virtual Status ReuseWritableFile(const std::string &fname,
465 					 const std::string &old_fname,
466 					 std::unique_ptr<WritableFile> *result,
467 					 const EnvOptions &options) override
468 	{
469 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
470 	}
471 
472 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
473 				    std::unique_ptr<Directory> *result) override
474 	{
475 		result->reset(new SpdkDirectory());
476 		return Status::OK();
477 	}
478 	virtual Status FileExists(const std::string &fname) override
479 	{
480 		struct spdk_file_stat stat;
481 		int rc;
482 		std::string name = sanitize_path(fname, mDirectory);
483 
484 		set_channel();
485 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
486 		if (rc == 0) {
487 			return Status::OK();
488 		}
489 		return EnvWrapper::FileExists(fname);
490 	}
491 	virtual Status RenameFile(const std::string &src, const std::string &t) override
492 	{
493 		int rc;
494 		std::string src_name = sanitize_path(src, mDirectory);
495 		std::string target_name = sanitize_path(t, mDirectory);
496 
497 		set_channel();
498 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
499 					 src_name.c_str(), target_name.c_str());
500 		if (rc == -ENOENT) {
501 			return EnvWrapper::RenameFile(src, t);
502 		}
503 		return Status::OK();
504 	}
505 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
506 				__attribute__((unused)) const std::string &t) override
507 	{
508 		return Status::NotSupported("SpdkEnv does not support LinkFile");
509 	}
510 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
511 	{
512 		struct spdk_file_stat stat;
513 		int rc;
514 		std::string name = sanitize_path(fname, mDirectory);
515 
516 		set_channel();
517 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
518 		if (rc == -ENOENT) {
519 			return EnvWrapper::GetFileSize(fname, size);
520 		}
521 		*size = stat.size;
522 		return Status::OK();
523 	}
524 	virtual Status DeleteFile(const std::string &fname) override
525 	{
526 		int rc;
527 		std::string name = sanitize_path(fname, mDirectory);
528 
529 		set_channel();
530 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
531 		if (rc == -ENOENT) {
532 			return EnvWrapper::DeleteFile(fname);
533 		}
534 		return Status::OK();
535 	}
536 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
537 	{
538 		std::string name = sanitize_path(fname, mDirectory);
539 		int64_t rc;
540 
541 		set_channel();
542 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
543 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
544 		if (!rc) {
545 			return Status::OK();
546 		} else {
547 			errno = -rc;
548 			return Status::IOError(name, strerror(errno));
549 		}
550 	}
551 	virtual Status UnlockFile(FileLock *lock) override
552 	{
553 		set_channel();
554 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
555 		return Status::OK();
556 	}
557 	virtual Status GetChildren(const std::string &dir,
558 				   std::vector<std::string> *result) override
559 	{
560 		std::string::size_type pos;
561 		std::set<std::string> dir_and_file_set;
562 		std::string full_path;
563 		std::string filename;
564 		std::string dir_name;
565 
566 		if (dir.find("archive") != std::string::npos) {
567 			return Status::OK();
568 		}
569 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
570 			spdk_fs_iter iter;
571 			struct spdk_file *file;
572 			dir_name = sanitize_path(dir, mDirectory);
573 
574 			iter = spdk_fs_iter_first(g_fs);
575 			while (iter != NULL) {
576 				file = spdk_fs_iter_get_file(iter);
577 				full_path = spdk_file_get_name(file);
578 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
579 					iter = spdk_fs_iter_next(iter);
580 					continue;
581 				}
582 				pos = full_path.find("/", dir_name.length() + 1);
583 
584 				if (pos != std::string::npos) {
585 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
586 				} else {
587 					filename = full_path.substr(dir_name.length() + 1);
588 				}
589 				dir_and_file_set.insert(filename);
590 				iter = spdk_fs_iter_next(iter);
591 			}
592 
593 			for (auto &s : dir_and_file_set) {
594 				result->push_back(s);
595 			}
596 
597 			result->push_back(".");
598 			result->push_back("..");
599 
600 			return Status::OK();
601 		}
602 		return EnvWrapper::GetChildren(dir, result);
603 	}
604 };
605 
606 /* The thread local constructor doesn't work for the main thread, since
607  * the filesystem hasn't been loaded yet.  So we break out this
608  * SpdkInitializeThread function, so that the main thread can explicitly
609  * call it after the filesystem has been loaded.
610  */
611 void SpdkInitializeThread(void)
612 {
613 	struct spdk_thread *thread;
614 
615 	if (g_fs != NULL) {
616 		if (g_sync_args.channel) {
617 			spdk_fs_free_thread_ctx(g_sync_args.channel);
618 		}
619 		thread = spdk_thread_create("spdk_rocksdb", NULL);
620 		spdk_set_thread(thread);
621 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
622 	}
623 }
624 
625 static void
626 fs_load_cb(__attribute__((unused)) void *ctx,
627 	   struct spdk_filesystem *fs, int fserrno)
628 {
629 	if (fserrno == 0) {
630 		g_fs = fs;
631 	}
632 	g_spdk_ready = true;
633 }
634 
635 static void
636 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev,
637 		   __attribute__((unused)) void *event_ctx)
638 {
639 	printf("Unsupported bdev event: type %d\n", type);
640 }
641 
642 static void
643 rocksdb_run(__attribute__((unused)) void *arg1)
644 {
645 	int rc;
646 
647 	rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL,
648 					 &g_bs_dev);
649 	if (rc != 0) {
650 		printf("Could not create blob bdev\n");
651 		spdk_app_stop(0);
652 		exit(1);
653 	}
654 
655 	g_lcore = spdk_env_get_first_core();
656 
657 	printf("using bdev %s\n", g_bdev_name.c_str());
658 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
659 }
660 
661 static void
662 fs_unload_cb(__attribute__((unused)) void *ctx,
663 	     __attribute__((unused)) int fserrno)
664 {
665 	assert(fserrno == 0);
666 
667 	spdk_app_stop(0);
668 }
669 
670 static void
671 rocksdb_shutdown(void)
672 {
673 	if (g_fs != NULL) {
674 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
675 	} else {
676 		fs_unload_cb(NULL, 0);
677 	}
678 }
679 
680 static void *
681 initialize_spdk(void *arg)
682 {
683 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
684 	int rc;
685 
686 	rc = spdk_app_start(opts, rocksdb_run, NULL);
687 	/*
688 	 * TODO:  Revisit for case of internal failure of
689 	 * spdk_app_start(), itself.  At this time, it's known
690 	 * the only application's use of spdk_app_stop() passes
691 	 * a zero; i.e. no fail (non-zero) cases so here we
692 	 * assume there was an internal failure and flag it
693 	 * so we can throw an exception.
694 	 */
695 	if (rc) {
696 		g_spdk_start_failure = true;
697 	} else {
698 		spdk_app_fini();
699 		delete opts;
700 	}
701 	pthread_exit(NULL);
702 
703 }
704 
705 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
706 		 const std::string &bdev, uint64_t cache_size_in_mb)
707 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
708 {
709 	struct spdk_app_opts *opts = new struct spdk_app_opts;
710 
711 	spdk_app_opts_init(opts, sizeof(*opts));
712 	opts->name = "rocksdb";
713 	opts->json_config_file = mConfig.c_str();
714 	opts->shutdown_cb = rocksdb_shutdown;
715 	opts->tpoint_group_mask = "0x80";
716 
717 	spdk_fs_set_cache_size(cache_size_in_mb);
718 	g_bdev_name = mBdev;
719 
720 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
721 	while (!g_spdk_ready && !g_spdk_start_failure)
722 		;
723 	if (g_spdk_start_failure) {
724 		delete opts;
725 		throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()");
726 	}
727 
728 	SpdkInitializeThread();
729 }
730 
731 SpdkEnv::~SpdkEnv()
732 {
733 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
734 	 * do the work before the test quit.
735 	 */
736 	if (g_fs != NULL) {
737 		spdk_fs_iter iter;
738 		struct spdk_file *file;
739 
740 		if (!g_sync_args.channel) {
741 			SpdkInitializeThread();
742 		}
743 
744 		iter = spdk_fs_iter_first(g_fs);
745 		while (iter != NULL) {
746 			file = spdk_fs_iter_get_file(iter);
747 			spdk_file_close(file, g_sync_args.channel);
748 			iter = spdk_fs_iter_next(iter);
749 		}
750 	}
751 
752 	spdk_app_start_shutdown();
753 	pthread_join(mSpdkTid, NULL);
754 }
755 
756 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
757 		const std::string &bdev, uint64_t cache_size_in_mb)
758 {
759 	try {
760 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
761 		if (g_fs != NULL) {
762 			return spdk_env;
763 		} else {
764 			delete spdk_env;
765 			return NULL;
766 		}
767 	} catch (SpdkAppStartException &e) {
768 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
769 		return NULL;
770 	} catch (...) {
771 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
772 		return NULL;
773 	}
774 }
775 
776 } // namespace rocksdb
777