xref: /spdk/lib/rocksdb/env_spdk.cc (revision 12fbe739a31b09aff0d05f354d4f3bbef99afc55)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "rocksdb/env.h"
7 #include <set>
8 #include <iostream>
9 #include <stdexcept>
10 
11 extern "C" {
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/blob.h"
15 #include "spdk/blobfs.h"
16 #include "spdk/blob_bdev.h"
17 #include "spdk/log.h"
18 #include "spdk/thread.h"
19 #include "spdk/bdev.h"
20 }
21 
22 namespace rocksdb
23 {
24 
25 struct spdk_filesystem *g_fs = NULL;
26 struct spdk_bs_dev *g_bs_dev;
27 uint32_t g_lcore = 0;
28 std::string g_bdev_name;
29 volatile bool g_spdk_ready = false;
30 volatile bool g_spdk_start_failure = false;
31 
32 void SpdkInitializeThread(void);
33 
34 class SpdkThreadCtx
35 {
36 public:
37 	struct spdk_fs_thread_ctx *channel;
38 
39 	SpdkThreadCtx(void) : channel(NULL)
40 	{
41 		SpdkInitializeThread();
42 	}
43 
44 	~SpdkThreadCtx(void)
45 	{
46 		if (channel) {
47 			spdk_fs_free_thread_ctx(channel);
48 			channel = NULL;
49 		}
50 	}
51 
52 private:
53 	SpdkThreadCtx(const SpdkThreadCtx &);
54 	SpdkThreadCtx &operator=(const SpdkThreadCtx &);
55 };
56 
57 thread_local SpdkThreadCtx g_sync_args;
58 
59 static void
60 set_channel()
61 {
62 	if (g_fs != NULL && g_sync_args.channel == NULL) {
63 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
64 	}
65 }
66 
67 static void
68 __call_fn(void *arg1, void *arg2)
69 {
70 	fs_request_fn fn;
71 
72 	fn = (fs_request_fn)arg1;
73 	fn(arg2);
74 }
75 
76 static void
77 __send_request(fs_request_fn fn, void *arg)
78 {
79 	struct spdk_event *event;
80 
81 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
82 	spdk_event_call(event);
83 }
84 
85 static std::string
86 sanitize_path(const std::string &input, const std::string &mount_directory)
87 {
88 	int index = 0;
89 	std::string name;
90 	std::string input_tmp;
91 
92 	input_tmp = input.substr(mount_directory.length(), input.length());
93 	for (const char &c : input_tmp) {
94 		if (index == 0) {
95 			if (c != '/') {
96 				name = name.insert(index, 1, '/');
97 				index++;
98 			}
99 			name = name.insert(index, 1, c);
100 			index++;
101 		} else {
102 			if (name[index - 1] == '/' && c == '/') {
103 				continue;
104 			} else {
105 				name = name.insert(index, 1, c);
106 				index++;
107 			}
108 		}
109 	}
110 
111 	if (name[name.size() - 1] == '/') {
112 		name = name.erase(name.size() - 1, 1);
113 	}
114 	return name;
115 }
116 
117 class SpdkSequentialFile : public SequentialFile
118 {
119 	struct spdk_file *mFile;
120 	uint64_t mOffset;
121 public:
122 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
123 	virtual ~SpdkSequentialFile();
124 
125 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
126 	virtual Status Skip(uint64_t n) override;
127 	virtual Status InvalidateCache(size_t offset, size_t length) override;
128 };
129 
130 SpdkSequentialFile::~SpdkSequentialFile(void)
131 {
132 	set_channel();
133 	spdk_file_close(mFile, g_sync_args.channel);
134 }
135 
136 Status
137 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
138 {
139 	int64_t ret;
140 
141 	set_channel();
142 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
143 	if (ret >= 0) {
144 		mOffset += ret;
145 		*result = Slice(scratch, ret);
146 		return Status::OK();
147 	} else {
148 		errno = -ret;
149 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
150 	}
151 }
152 
153 Status
154 SpdkSequentialFile::Skip(uint64_t n)
155 {
156 	mOffset += n;
157 	return Status::OK();
158 }
159 
160 Status
161 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
162 				    __attribute__((unused)) size_t length)
163 {
164 	return Status::OK();
165 }
166 
167 class SpdkRandomAccessFile : public RandomAccessFile
168 {
169 	struct spdk_file *mFile;
170 public:
171 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
172 	virtual ~SpdkRandomAccessFile();
173 
174 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
175 	virtual Status InvalidateCache(size_t offset, size_t length) override;
176 };
177 
178 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
179 {
180 	set_channel();
181 	spdk_file_close(mFile, g_sync_args.channel);
182 }
183 
184 Status
185 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
186 {
187 	int64_t rc;
188 
189 	set_channel();
190 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
191 	if (rc >= 0) {
192 		*result = Slice(scratch, rc);
193 		return Status::OK();
194 	} else {
195 		errno = -rc;
196 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
197 	}
198 }
199 
200 Status
201 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
202 				      __attribute__((unused)) size_t length)
203 {
204 	return Status::OK();
205 }
206 
207 class SpdkWritableFile : public WritableFile
208 {
209 	struct spdk_file *mFile;
210 	uint64_t mSize;
211 
212 public:
213 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
214 	~SpdkWritableFile()
215 	{
216 		if (mFile != NULL) {
217 			Close();
218 		}
219 	}
220 
221 	virtual void SetIOPriority(Env::IOPriority pri)
222 	{
223 		if (pri == Env::IO_HIGH) {
224 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
225 		}
226 	}
227 
228 	virtual Status Truncate(uint64_t size) override
229 	{
230 		int rc;
231 
232 		set_channel();
233 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
234 		if (!rc) {
235 			mSize = size;
236 			return Status::OK();
237 		} else {
238 			errno = -rc;
239 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
240 		}
241 	}
242 	virtual Status Close() override
243 	{
244 		set_channel();
245 		spdk_file_close(mFile, g_sync_args.channel);
246 		mFile = NULL;
247 		return Status::OK();
248 	}
249 	using WritableFile::Append;
250 	virtual Status Append(const Slice &data) override;
251 	virtual Status Flush() override
252 	{
253 		return Status::OK();
254 	}
255 	virtual Status Sync() override
256 	{
257 		int rc;
258 
259 		set_channel();
260 		rc = spdk_file_sync(mFile, g_sync_args.channel);
261 		if (!rc) {
262 			return Status::OK();
263 		} else {
264 			errno = -rc;
265 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
266 		}
267 	}
268 	virtual Status Fsync() override
269 	{
270 		int rc;
271 
272 		set_channel();
273 		rc = spdk_file_sync(mFile, g_sync_args.channel);
274 		if (!rc) {
275 			return Status::OK();
276 		} else {
277 			errno = -rc;
278 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
279 		}
280 	}
281 	virtual bool IsSyncThreadSafe() const override
282 	{
283 		return true;
284 	}
285 	virtual uint64_t GetFileSize() override
286 	{
287 		return mSize;
288 	}
289 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
290 				       __attribute__((unused)) size_t length) override
291 	{
292 		return Status::OK();
293 	}
294 	virtual Status Allocate(uint64_t offset, uint64_t len) override
295 	{
296 		int rc;
297 
298 		set_channel();
299 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
300 		if (!rc) {
301 			return Status::OK();
302 		} else {
303 			errno = -rc;
304 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
305 		}
306 	}
307 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
308 				 __attribute__((unused)) uint64_t nbytes) override
309 	{
310 		int rc;
311 
312 		/*
313 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
314 		 *  the whole file.
315 		 */
316 		set_channel();
317 		rc = spdk_file_sync(mFile, g_sync_args.channel);
318 		if (!rc) {
319 			return Status::OK();
320 		} else {
321 			errno = -rc;
322 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
323 		}
324 	}
325 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
326 	{
327 		int rc;
328 
329 		rc = spdk_file_get_id(mFile, id, max_size);
330 		if (rc < 0) {
331 			return 0;
332 		} else {
333 			return rc;
334 		}
335 	}
336 };
337 
338 Status
339 SpdkWritableFile::Append(const Slice &data)
340 {
341 	int64_t rc;
342 
343 	set_channel();
344 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
345 	if (rc >= 0) {
346 		mSize += data.size();
347 		return Status::OK();
348 	} else {
349 		errno = -rc;
350 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
351 	}
352 }
353 
354 class SpdkDirectory : public Directory
355 {
356 public:
357 	SpdkDirectory() {}
358 	~SpdkDirectory() {}
359 	Status Fsync() override
360 	{
361 		return Status::OK();
362 	}
363 };
364 
365 class SpdkAppStartException : public std::runtime_error
366 {
367 public:
368 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
369 };
370 
371 class SpdkEnv : public EnvWrapper
372 {
373 private:
374 	pthread_t mSpdkTid;
375 	std::string mDirectory;
376 	std::string mConfig;
377 	std::string mBdev;
378 
379 public:
380 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
381 		const std::string &bdev, uint64_t cache_size_in_mb);
382 
383 	virtual ~SpdkEnv();
384 
385 	virtual Status NewSequentialFile(const std::string &fname,
386 					 std::unique_ptr<SequentialFile> *result,
387 					 const EnvOptions &options) override
388 	{
389 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
390 			struct spdk_file *file;
391 			int rc;
392 
393 			std::string name = sanitize_path(fname, mDirectory);
394 			set_channel();
395 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
396 					       name.c_str(), 0, &file);
397 			if (rc == 0) {
398 				result->reset(new SpdkSequentialFile(file));
399 				return Status::OK();
400 			} else {
401 				/* Myrocks engine uses errno(ENOENT) as one
402 				 * special condition, for the purpose to
403 				 * support MySQL, set the errno to right value.
404 				 */
405 				errno = -rc;
406 				return Status::IOError(name, strerror(errno));
407 			}
408 		} else {
409 			return EnvWrapper::NewSequentialFile(fname, result, options);
410 		}
411 	}
412 
413 	virtual Status NewRandomAccessFile(const std::string &fname,
414 					   std::unique_ptr<RandomAccessFile> *result,
415 					   const EnvOptions &options) override
416 	{
417 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
418 			std::string name = sanitize_path(fname, mDirectory);
419 			struct spdk_file *file;
420 			int rc;
421 
422 			set_channel();
423 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
424 					       name.c_str(), 0, &file);
425 			if (rc == 0) {
426 				result->reset(new SpdkRandomAccessFile(file));
427 				return Status::OK();
428 			} else {
429 				errno = -rc;
430 				return Status::IOError(name, strerror(errno));
431 			}
432 		} else {
433 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
434 		}
435 	}
436 
437 	virtual Status NewWritableFile(const std::string &fname,
438 				       std::unique_ptr<WritableFile> *result,
439 				       const EnvOptions &options) override
440 	{
441 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
442 			std::string name = sanitize_path(fname, mDirectory);
443 			struct spdk_file *file;
444 			int rc;
445 
446 			set_channel();
447 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
448 					       SPDK_BLOBFS_OPEN_CREATE, &file);
449 			if (rc == 0) {
450 				result->reset(new SpdkWritableFile(file));
451 				return Status::OK();
452 			} else {
453 				errno = -rc;
454 				return Status::IOError(name, strerror(errno));
455 			}
456 		} else {
457 			return EnvWrapper::NewWritableFile(fname, result, options);
458 		}
459 	}
460 
461 	virtual Status ReuseWritableFile(const std::string &fname,
462 					 const std::string &old_fname,
463 					 std::unique_ptr<WritableFile> *result,
464 					 const EnvOptions &options) override
465 	{
466 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
467 	}
468 
469 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
470 				    std::unique_ptr<Directory> *result) override
471 	{
472 		result->reset(new SpdkDirectory());
473 		return Status::OK();
474 	}
475 	virtual Status FileExists(const std::string &fname) override
476 	{
477 		struct spdk_file_stat stat;
478 		int rc;
479 		std::string name = sanitize_path(fname, mDirectory);
480 
481 		set_channel();
482 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
483 		if (rc == 0) {
484 			return Status::OK();
485 		}
486 		return EnvWrapper::FileExists(fname);
487 	}
488 	virtual Status RenameFile(const std::string &src, const std::string &t) override
489 	{
490 		int rc;
491 		std::string src_name = sanitize_path(src, mDirectory);
492 		std::string target_name = sanitize_path(t, mDirectory);
493 
494 		set_channel();
495 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
496 					 src_name.c_str(), target_name.c_str());
497 		if (rc == -ENOENT) {
498 			return EnvWrapper::RenameFile(src, t);
499 		}
500 		return Status::OK();
501 	}
502 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
503 				__attribute__((unused)) const std::string &t) override
504 	{
505 		return Status::NotSupported("SpdkEnv does not support LinkFile");
506 	}
507 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
508 	{
509 		struct spdk_file_stat stat;
510 		int rc;
511 		std::string name = sanitize_path(fname, mDirectory);
512 
513 		set_channel();
514 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
515 		if (rc == -ENOENT) {
516 			return EnvWrapper::GetFileSize(fname, size);
517 		}
518 		*size = stat.size;
519 		return Status::OK();
520 	}
521 	virtual Status DeleteFile(const std::string &fname) override
522 	{
523 		int rc;
524 		std::string name = sanitize_path(fname, mDirectory);
525 
526 		set_channel();
527 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
528 		if (rc == -ENOENT) {
529 			return EnvWrapper::DeleteFile(fname);
530 		}
531 		return Status::OK();
532 	}
533 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
534 	{
535 		std::string name = sanitize_path(fname, mDirectory);
536 		int64_t rc;
537 
538 		set_channel();
539 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
540 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
541 		if (!rc) {
542 			return Status::OK();
543 		} else {
544 			errno = -rc;
545 			return Status::IOError(name, strerror(errno));
546 		}
547 	}
548 	virtual Status UnlockFile(FileLock *lock) override
549 	{
550 		set_channel();
551 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
552 		return Status::OK();
553 	}
554 	virtual Status GetChildren(const std::string &dir,
555 				   std::vector<std::string> *result) override
556 	{
557 		std::string::size_type pos;
558 		std::set<std::string> dir_and_file_set;
559 		std::string full_path;
560 		std::string filename;
561 		std::string dir_name;
562 
563 		if (dir.find("archive") != std::string::npos) {
564 			return Status::OK();
565 		}
566 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
567 			spdk_fs_iter iter;
568 			struct spdk_file *file;
569 			dir_name = sanitize_path(dir, mDirectory);
570 
571 			iter = spdk_fs_iter_first(g_fs);
572 			while (iter != NULL) {
573 				file = spdk_fs_iter_get_file(iter);
574 				full_path = spdk_file_get_name(file);
575 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
576 					iter = spdk_fs_iter_next(iter);
577 					continue;
578 				}
579 				pos = full_path.find("/", dir_name.length() + 1);
580 
581 				if (pos != std::string::npos) {
582 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
583 				} else {
584 					filename = full_path.substr(dir_name.length() + 1);
585 				}
586 				dir_and_file_set.insert(filename);
587 				iter = spdk_fs_iter_next(iter);
588 			}
589 
590 			for (auto &s : dir_and_file_set) {
591 				result->push_back(s);
592 			}
593 
594 			result->push_back(".");
595 			result->push_back("..");
596 
597 			return Status::OK();
598 		}
599 		return EnvWrapper::GetChildren(dir, result);
600 	}
601 };
602 
603 /* The thread local constructor doesn't work for the main thread, since
604  * the filesystem hasn't been loaded yet.  So we break out this
605  * SpdkInitializeThread function, so that the main thread can explicitly
606  * call it after the filesystem has been loaded.
607  */
608 void SpdkInitializeThread(void)
609 {
610 	if (g_fs != NULL) {
611 		if (g_sync_args.channel) {
612 			spdk_fs_free_thread_ctx(g_sync_args.channel);
613 		}
614 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
615 	}
616 }
617 
618 static void
619 fs_load_cb(__attribute__((unused)) void *ctx,
620 	   struct spdk_filesystem *fs, int fserrno)
621 {
622 	if (fserrno == 0) {
623 		g_fs = fs;
624 	}
625 	g_spdk_ready = true;
626 }
627 
628 static void
629 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev,
630 		   __attribute__((unused)) void *event_ctx)
631 {
632 	printf("Unsupported bdev event: type %d\n", type);
633 }
634 
635 static void
636 rocksdb_run(__attribute__((unused)) void *arg1)
637 {
638 	int rc;
639 
640 	rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL,
641 					 &g_bs_dev);
642 	if (rc != 0) {
643 		printf("Could not create blob bdev\n");
644 		spdk_app_stop(0);
645 		exit(1);
646 	}
647 
648 	g_lcore = spdk_env_get_first_core();
649 
650 	printf("using bdev %s\n", g_bdev_name.c_str());
651 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
652 }
653 
654 static void
655 fs_unload_cb(__attribute__((unused)) void *ctx,
656 	     __attribute__((unused)) int fserrno)
657 {
658 	assert(fserrno == 0);
659 
660 	spdk_app_stop(0);
661 }
662 
663 static void
664 rocksdb_shutdown(void)
665 {
666 	if (g_fs != NULL) {
667 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
668 	} else {
669 		fs_unload_cb(NULL, 0);
670 	}
671 }
672 
673 static void *
674 initialize_spdk(void *arg)
675 {
676 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
677 	int rc;
678 
679 	rc = spdk_app_start(opts, rocksdb_run, NULL);
680 	/*
681 	 * TODO:  Revisit for case of internal failure of
682 	 * spdk_app_start(), itself.  At this time, it's known
683 	 * the only application's use of spdk_app_stop() passes
684 	 * a zero; i.e. no fail (non-zero) cases so here we
685 	 * assume there was an internal failure and flag it
686 	 * so we can throw an exception.
687 	 */
688 	if (rc) {
689 		g_spdk_start_failure = true;
690 	} else {
691 		spdk_app_fini();
692 		delete opts;
693 	}
694 	pthread_exit(NULL);
695 
696 }
697 
698 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
699 		 const std::string &bdev, uint64_t cache_size_in_mb)
700 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
701 {
702 	struct spdk_app_opts *opts = new struct spdk_app_opts;
703 
704 	spdk_app_opts_init(opts, sizeof(*opts));
705 	opts->name = "rocksdb";
706 	opts->json_config_file = mConfig.c_str();
707 	opts->shutdown_cb = rocksdb_shutdown;
708 	opts->tpoint_group_mask = "0x80";
709 
710 	spdk_fs_set_cache_size(cache_size_in_mb);
711 	g_bdev_name = mBdev;
712 
713 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
714 	while (!g_spdk_ready && !g_spdk_start_failure)
715 		;
716 	if (g_spdk_start_failure) {
717 		delete opts;
718 		throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()");
719 	}
720 
721 	SpdkInitializeThread();
722 }
723 
724 SpdkEnv::~SpdkEnv()
725 {
726 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
727 	 * do the work before the test quit.
728 	 */
729 	if (g_fs != NULL) {
730 		spdk_fs_iter iter;
731 		struct spdk_file *file;
732 
733 		if (!g_sync_args.channel) {
734 			SpdkInitializeThread();
735 		}
736 
737 		iter = spdk_fs_iter_first(g_fs);
738 		while (iter != NULL) {
739 			file = spdk_fs_iter_get_file(iter);
740 			spdk_file_close(file, g_sync_args.channel);
741 			iter = spdk_fs_iter_next(iter);
742 		}
743 	}
744 
745 	spdk_app_start_shutdown();
746 	pthread_join(mSpdkTid, NULL);
747 }
748 
749 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
750 		const std::string &bdev, uint64_t cache_size_in_mb)
751 {
752 	try {
753 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
754 		if (g_fs != NULL) {
755 			return spdk_env;
756 		} else {
757 			delete spdk_env;
758 			return NULL;
759 		}
760 	} catch (SpdkAppStartException &e) {
761 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
762 		return NULL;
763 	} catch (...) {
764 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
765 		return NULL;
766 	}
767 }
768 
769 } // namespace rocksdb
770