xref: /spdk/lib/rocksdb/env_spdk.cc (revision a8d21b9b550dde7d3e7ffc0cd1171528a136165f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "rocksdb/env.h"
7 #include <set>
8 #include <iostream>
9 #include <stdexcept>
10 
11 extern "C" {
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/blob.h"
15 #include "spdk/blobfs.h"
16 #include "spdk/blob_bdev.h"
17 #include "spdk/log.h"
18 #include "spdk/thread.h"
19 #include "spdk/bdev.h"
20 }
21 
22 namespace rocksdb
23 {
24 
25 struct spdk_filesystem *g_fs = NULL;
26 struct spdk_bs_dev *g_bs_dev;
27 uint32_t g_lcore = 0;
28 std::string g_bdev_name;
29 volatile bool g_spdk_ready = false;
30 volatile bool g_spdk_start_failure = false;
31 
32 void SpdkInitializeThread(void);
33 
34 class SpdkThreadCtx
35 {
36 public:
37 	struct spdk_fs_thread_ctx *channel;
38 
39 	SpdkThreadCtx(void) : channel(NULL)
40 	{
41 		SpdkInitializeThread();
42 	}
43 
44 	~SpdkThreadCtx(void)
45 	{
46 		if (channel) {
47 			spdk_fs_free_thread_ctx(channel);
48 			channel = NULL;
49 		}
50 	}
51 
52 private:
53 	SpdkThreadCtx(const SpdkThreadCtx &);
54 	SpdkThreadCtx &operator=(const SpdkThreadCtx &);
55 };
56 
57 thread_local SpdkThreadCtx g_sync_args;
58 
59 static void
60 set_channel()
61 {
62 	if (g_fs != NULL && g_sync_args.channel == NULL) {
63 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
64 	}
65 }
66 
67 static void
68 __call_fn(void *arg1, void *arg2)
69 {
70 	fs_request_fn fn;
71 
72 	fn = (fs_request_fn)arg1;
73 	fn(arg2);
74 }
75 
76 static void
77 __send_request(fs_request_fn fn, void *arg)
78 {
79 	struct spdk_event *event;
80 
81 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
82 	spdk_event_call(event);
83 }
84 
85 static std::string
86 sanitize_path(const std::string &input, const std::string &mount_directory)
87 {
88 	int index = 0;
89 	std::string name;
90 	std::string input_tmp;
91 
92 	input_tmp = input.substr(mount_directory.length(), input.length());
93 	for (const char &c : input_tmp) {
94 		if (index == 0) {
95 			if (c != '/') {
96 				name = name.insert(index, 1, '/');
97 				index++;
98 			}
99 			name = name.insert(index, 1, c);
100 			index++;
101 		} else {
102 			if (name[index - 1] == '/' && c == '/') {
103 				continue;
104 			} else {
105 				name = name.insert(index, 1, c);
106 				index++;
107 			}
108 		}
109 	}
110 
111 	if (name[name.size() - 1] == '/') {
112 		name = name.erase(name.size() - 1, 1);
113 	}
114 	return name;
115 }
116 
117 class SpdkSequentialFile : public SequentialFile
118 {
119 	struct spdk_file *mFile;
120 	uint64_t mOffset;
121 public:
122 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
123 	virtual ~SpdkSequentialFile();
124 
125 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
126 	virtual Status Skip(uint64_t n) override;
127 	virtual Status InvalidateCache(size_t offset, size_t length) override;
128 };
129 
130 SpdkSequentialFile::~SpdkSequentialFile(void)
131 {
132 	set_channel();
133 	spdk_file_close(mFile, g_sync_args.channel);
134 }
135 
136 Status
137 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
138 {
139 	int64_t ret;
140 
141 	set_channel();
142 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
143 	if (ret >= 0) {
144 		mOffset += ret;
145 		*result = Slice(scratch, ret);
146 		return Status::OK();
147 	} else {
148 		errno = -ret;
149 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
150 	}
151 }
152 
153 Status
154 SpdkSequentialFile::Skip(uint64_t n)
155 {
156 	mOffset += n;
157 	return Status::OK();
158 }
159 
160 Status
161 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
162 				    __attribute__((unused)) size_t length)
163 {
164 	return Status::OK();
165 }
166 
167 class SpdkRandomAccessFile : public RandomAccessFile
168 {
169 	struct spdk_file *mFile;
170 public:
171 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
172 	virtual ~SpdkRandomAccessFile();
173 
174 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
175 	virtual Status InvalidateCache(size_t offset, size_t length) override;
176 };
177 
178 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
179 {
180 	set_channel();
181 	spdk_file_close(mFile, g_sync_args.channel);
182 }
183 
184 Status
185 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
186 {
187 	int64_t rc;
188 
189 	set_channel();
190 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
191 	if (rc >= 0) {
192 		*result = Slice(scratch, n);
193 		return Status::OK();
194 	} else {
195 		errno = -rc;
196 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
197 	}
198 }
199 
200 Status
201 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
202 				      __attribute__((unused)) size_t length)
203 {
204 	return Status::OK();
205 }
206 
207 class SpdkWritableFile : public WritableFile
208 {
209 	struct spdk_file *mFile;
210 	uint64_t mSize;
211 
212 public:
213 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
214 	~SpdkWritableFile()
215 	{
216 		if (mFile != NULL) {
217 			Close();
218 		}
219 	}
220 
221 	virtual void SetIOPriority(Env::IOPriority pri)
222 	{
223 		if (pri == Env::IO_HIGH) {
224 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
225 		}
226 	}
227 
228 	virtual Status Truncate(uint64_t size) override
229 	{
230 		int rc;
231 
232 		set_channel();
233 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
234 		if (!rc) {
235 			mSize = size;
236 			return Status::OK();
237 		} else {
238 			errno = -rc;
239 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
240 		}
241 	}
242 	virtual Status Close() override
243 	{
244 		set_channel();
245 		spdk_file_close(mFile, g_sync_args.channel);
246 		mFile = NULL;
247 		return Status::OK();
248 	}
249 	virtual Status Append(const Slice &data) override;
250 	virtual Status Flush() override
251 	{
252 		return Status::OK();
253 	}
254 	virtual Status Sync() override
255 	{
256 		int rc;
257 
258 		set_channel();
259 		rc = spdk_file_sync(mFile, g_sync_args.channel);
260 		if (!rc) {
261 			return Status::OK();
262 		} else {
263 			errno = -rc;
264 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
265 		}
266 	}
267 	virtual Status Fsync() override
268 	{
269 		int rc;
270 
271 		set_channel();
272 		rc = spdk_file_sync(mFile, g_sync_args.channel);
273 		if (!rc) {
274 			return Status::OK();
275 		} else {
276 			errno = -rc;
277 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
278 		}
279 	}
280 	virtual bool IsSyncThreadSafe() const override
281 	{
282 		return true;
283 	}
284 	virtual uint64_t GetFileSize() override
285 	{
286 		return mSize;
287 	}
288 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
289 				       __attribute__((unused)) size_t length) override
290 	{
291 		return Status::OK();
292 	}
293 	virtual Status Allocate(uint64_t offset, uint64_t len) override
294 	{
295 		int rc;
296 
297 		set_channel();
298 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
299 		if (!rc) {
300 			return Status::OK();
301 		} else {
302 			errno = -rc;
303 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
304 		}
305 	}
306 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
307 				 __attribute__((unused)) uint64_t nbytes) override
308 	{
309 		int rc;
310 
311 		/*
312 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
313 		 *  the whole file.
314 		 */
315 		set_channel();
316 		rc = spdk_file_sync(mFile, g_sync_args.channel);
317 		if (!rc) {
318 			return Status::OK();
319 		} else {
320 			errno = -rc;
321 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
322 		}
323 	}
324 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
325 	{
326 		int rc;
327 
328 		rc = spdk_file_get_id(mFile, id, max_size);
329 		if (rc < 0) {
330 			return 0;
331 		} else {
332 			return rc;
333 		}
334 	}
335 };
336 
337 Status
338 SpdkWritableFile::Append(const Slice &data)
339 {
340 	int64_t rc;
341 
342 	set_channel();
343 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
344 	if (rc >= 0) {
345 		mSize += data.size();
346 		return Status::OK();
347 	} else {
348 		errno = -rc;
349 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
350 	}
351 }
352 
353 class SpdkDirectory : public Directory
354 {
355 public:
356 	SpdkDirectory() {}
357 	~SpdkDirectory() {}
358 	Status Fsync() override
359 	{
360 		return Status::OK();
361 	}
362 };
363 
364 class SpdkAppStartException : public std::runtime_error
365 {
366 public:
367 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
368 };
369 
370 class SpdkEnv : public EnvWrapper
371 {
372 private:
373 	pthread_t mSpdkTid;
374 	std::string mDirectory;
375 	std::string mConfig;
376 	std::string mBdev;
377 
378 public:
379 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
380 		const std::string &bdev, uint64_t cache_size_in_mb);
381 
382 	virtual ~SpdkEnv();
383 
384 	virtual Status NewSequentialFile(const std::string &fname,
385 					 std::unique_ptr<SequentialFile> *result,
386 					 const EnvOptions &options) override
387 	{
388 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
389 			struct spdk_file *file;
390 			int rc;
391 
392 			std::string name = sanitize_path(fname, mDirectory);
393 			set_channel();
394 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
395 					       name.c_str(), 0, &file);
396 			if (rc == 0) {
397 				result->reset(new SpdkSequentialFile(file));
398 				return Status::OK();
399 			} else {
400 				/* Myrocks engine uses errno(ENOENT) as one
401 				 * special condition, for the purpose to
402 				 * support MySQL, set the errno to right value.
403 				 */
404 				errno = -rc;
405 				return Status::IOError(name, strerror(errno));
406 			}
407 		} else {
408 			return EnvWrapper::NewSequentialFile(fname, result, options);
409 		}
410 	}
411 
412 	virtual Status NewRandomAccessFile(const std::string &fname,
413 					   std::unique_ptr<RandomAccessFile> *result,
414 					   const EnvOptions &options) override
415 	{
416 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
417 			std::string name = sanitize_path(fname, mDirectory);
418 			struct spdk_file *file;
419 			int rc;
420 
421 			set_channel();
422 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
423 					       name.c_str(), 0, &file);
424 			if (rc == 0) {
425 				result->reset(new SpdkRandomAccessFile(file));
426 				return Status::OK();
427 			} else {
428 				errno = -rc;
429 				return Status::IOError(name, strerror(errno));
430 			}
431 		} else {
432 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
433 		}
434 	}
435 
436 	virtual Status NewWritableFile(const std::string &fname,
437 				       std::unique_ptr<WritableFile> *result,
438 				       const EnvOptions &options) override
439 	{
440 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
441 			std::string name = sanitize_path(fname, mDirectory);
442 			struct spdk_file *file;
443 			int rc;
444 
445 			set_channel();
446 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
447 					       SPDK_BLOBFS_OPEN_CREATE, &file);
448 			if (rc == 0) {
449 				result->reset(new SpdkWritableFile(file));
450 				return Status::OK();
451 			} else {
452 				errno = -rc;
453 				return Status::IOError(name, strerror(errno));
454 			}
455 		} else {
456 			return EnvWrapper::NewWritableFile(fname, result, options);
457 		}
458 	}
459 
460 	virtual Status ReuseWritableFile(const std::string &fname,
461 					 const std::string &old_fname,
462 					 std::unique_ptr<WritableFile> *result,
463 					 const EnvOptions &options) override
464 	{
465 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
466 	}
467 
468 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
469 				    std::unique_ptr<Directory> *result) override
470 	{
471 		result->reset(new SpdkDirectory());
472 		return Status::OK();
473 	}
474 	virtual Status FileExists(const std::string &fname) override
475 	{
476 		struct spdk_file_stat stat;
477 		int rc;
478 		std::string name = sanitize_path(fname, mDirectory);
479 
480 		set_channel();
481 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
482 		if (rc == 0) {
483 			return Status::OK();
484 		}
485 		return EnvWrapper::FileExists(fname);
486 	}
487 	virtual Status RenameFile(const std::string &src, const std::string &t) override
488 	{
489 		int rc;
490 		std::string src_name = sanitize_path(src, mDirectory);
491 		std::string target_name = sanitize_path(t, mDirectory);
492 
493 		set_channel();
494 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
495 					 src_name.c_str(), target_name.c_str());
496 		if (rc == -ENOENT) {
497 			return EnvWrapper::RenameFile(src, t);
498 		}
499 		return Status::OK();
500 	}
501 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
502 				__attribute__((unused)) const std::string &t) override
503 	{
504 		return Status::NotSupported("SpdkEnv does not support LinkFile");
505 	}
506 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
507 	{
508 		struct spdk_file_stat stat;
509 		int rc;
510 		std::string name = sanitize_path(fname, mDirectory);
511 
512 		set_channel();
513 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
514 		if (rc == -ENOENT) {
515 			return EnvWrapper::GetFileSize(fname, size);
516 		}
517 		*size = stat.size;
518 		return Status::OK();
519 	}
520 	virtual Status DeleteFile(const std::string &fname) override
521 	{
522 		int rc;
523 		std::string name = sanitize_path(fname, mDirectory);
524 
525 		set_channel();
526 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
527 		if (rc == -ENOENT) {
528 			return EnvWrapper::DeleteFile(fname);
529 		}
530 		return Status::OK();
531 	}
532 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
533 	{
534 		std::string name = sanitize_path(fname, mDirectory);
535 		int64_t rc;
536 
537 		set_channel();
538 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
539 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
540 		if (!rc) {
541 			return Status::OK();
542 		} else {
543 			errno = -rc;
544 			return Status::IOError(name, strerror(errno));
545 		}
546 	}
547 	virtual Status UnlockFile(FileLock *lock) override
548 	{
549 		set_channel();
550 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
551 		return Status::OK();
552 	}
553 	virtual Status GetChildren(const std::string &dir,
554 				   std::vector<std::string> *result) override
555 	{
556 		std::string::size_type pos;
557 		std::set<std::string> dir_and_file_set;
558 		std::string full_path;
559 		std::string filename;
560 		std::string dir_name;
561 
562 		if (dir.find("archive") != std::string::npos) {
563 			return Status::OK();
564 		}
565 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
566 			spdk_fs_iter iter;
567 			struct spdk_file *file;
568 			dir_name = sanitize_path(dir, mDirectory);
569 
570 			iter = spdk_fs_iter_first(g_fs);
571 			while (iter != NULL) {
572 				file = spdk_fs_iter_get_file(iter);
573 				full_path = spdk_file_get_name(file);
574 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
575 					iter = spdk_fs_iter_next(iter);
576 					continue;
577 				}
578 				pos = full_path.find("/", dir_name.length() + 1);
579 
580 				if (pos != std::string::npos) {
581 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
582 				} else {
583 					filename = full_path.substr(dir_name.length() + 1);
584 				}
585 				dir_and_file_set.insert(filename);
586 				iter = spdk_fs_iter_next(iter);
587 			}
588 
589 			for (auto &s : dir_and_file_set) {
590 				result->push_back(s);
591 			}
592 
593 			result->push_back(".");
594 			result->push_back("..");
595 
596 			return Status::OK();
597 		}
598 		return EnvWrapper::GetChildren(dir, result);
599 	}
600 };
601 
602 /* The thread local constructor doesn't work for the main thread, since
603  * the filesystem hasn't been loaded yet.  So we break out this
604  * SpdkInitializeThread function, so that the main thread can explicitly
605  * call it after the filesystem has been loaded.
606  */
607 void SpdkInitializeThread(void)
608 {
609 	if (g_fs != NULL) {
610 		if (g_sync_args.channel) {
611 			spdk_fs_free_thread_ctx(g_sync_args.channel);
612 		}
613 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
614 	}
615 }
616 
617 static void
618 fs_load_cb(__attribute__((unused)) void *ctx,
619 	   struct spdk_filesystem *fs, int fserrno)
620 {
621 	if (fserrno == 0) {
622 		g_fs = fs;
623 	}
624 	g_spdk_ready = true;
625 }
626 
627 static void
628 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev,
629 		   __attribute__((unused)) void *event_ctx)
630 {
631 	printf("Unsupported bdev event: type %d\n", type);
632 }
633 
634 static void
635 rocksdb_run(__attribute__((unused)) void *arg1)
636 {
637 	int rc;
638 
639 	rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL,
640 					 &g_bs_dev);
641 	if (rc != 0) {
642 		printf("Could not create blob bdev\n");
643 		spdk_app_stop(0);
644 		exit(1);
645 	}
646 
647 	g_lcore = spdk_env_get_first_core();
648 
649 	printf("using bdev %s\n", g_bdev_name.c_str());
650 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
651 }
652 
653 static void
654 fs_unload_cb(__attribute__((unused)) void *ctx,
655 	     __attribute__((unused)) int fserrno)
656 {
657 	assert(fserrno == 0);
658 
659 	spdk_app_stop(0);
660 }
661 
662 static void
663 rocksdb_shutdown(void)
664 {
665 	if (g_fs != NULL) {
666 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
667 	} else {
668 		fs_unload_cb(NULL, 0);
669 	}
670 }
671 
672 static void *
673 initialize_spdk(void *arg)
674 {
675 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
676 	int rc;
677 
678 	rc = spdk_app_start(opts, rocksdb_run, NULL);
679 	/*
680 	 * TODO:  Revisit for case of internal failure of
681 	 * spdk_app_start(), itself.  At this time, it's known
682 	 * the only application's use of spdk_app_stop() passes
683 	 * a zero; i.e. no fail (non-zero) cases so here we
684 	 * assume there was an internal failure and flag it
685 	 * so we can throw an exception.
686 	 */
687 	if (rc) {
688 		g_spdk_start_failure = true;
689 	} else {
690 		spdk_app_fini();
691 		delete opts;
692 	}
693 	pthread_exit(NULL);
694 
695 }
696 
697 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
698 		 const std::string &bdev, uint64_t cache_size_in_mb)
699 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
700 {
701 	struct spdk_app_opts *opts = new struct spdk_app_opts;
702 
703 	spdk_app_opts_init(opts, sizeof(*opts));
704 	opts->name = "rocksdb";
705 	opts->json_config_file = mConfig.c_str();
706 	opts->shutdown_cb = rocksdb_shutdown;
707 	opts->tpoint_group_mask = "0x80";
708 
709 	spdk_fs_set_cache_size(cache_size_in_mb);
710 	g_bdev_name = mBdev;
711 
712 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
713 	while (!g_spdk_ready && !g_spdk_start_failure)
714 		;
715 	if (g_spdk_start_failure) {
716 		delete opts;
717 		throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()");
718 	}
719 
720 	SpdkInitializeThread();
721 }
722 
723 SpdkEnv::~SpdkEnv()
724 {
725 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
726 	 * do the work before the test quit.
727 	 */
728 	if (g_fs != NULL) {
729 		spdk_fs_iter iter;
730 		struct spdk_file *file;
731 
732 		if (!g_sync_args.channel) {
733 			SpdkInitializeThread();
734 		}
735 
736 		iter = spdk_fs_iter_first(g_fs);
737 		while (iter != NULL) {
738 			file = spdk_fs_iter_get_file(iter);
739 			spdk_file_close(file, g_sync_args.channel);
740 			iter = spdk_fs_iter_next(iter);
741 		}
742 	}
743 
744 	spdk_app_start_shutdown();
745 	pthread_join(mSpdkTid, NULL);
746 }
747 
748 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
749 		const std::string &bdev, uint64_t cache_size_in_mb)
750 {
751 	try {
752 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
753 		if (g_fs != NULL) {
754 			return spdk_env;
755 		} else {
756 			delete spdk_env;
757 			return NULL;
758 		}
759 	} catch (SpdkAppStartException &e) {
760 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
761 		return NULL;
762 	} catch (...) {
763 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
764 		return NULL;
765 	}
766 }
767 
768 } // namespace rocksdb
769