xref: /spdk/lib/rocksdb/env_spdk.cc (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1  /*   SPDX-License-Identifier: BSD-3-Clause
2   *   Copyright (C) 2017 Intel Corporation.
3   *   All rights reserved.
4   */
5  
6  #include "rocksdb/env.h"
7  #include <set>
8  #include <iostream>
9  #include <stdexcept>
10  
11  extern "C" {
12  #include "spdk/env.h"
13  #include "spdk/event.h"
14  #include "spdk/blob.h"
15  #include "spdk/blobfs.h"
16  #include "spdk/blob_bdev.h"
17  #include "spdk/log.h"
18  #include "spdk/thread.h"
19  #include "spdk/bdev.h"
20  }
21  
22  namespace rocksdb
23  {
24  
25  struct spdk_filesystem *g_fs = NULL;
26  struct spdk_bs_dev *g_bs_dev;
27  uint32_t g_lcore = 0;
28  std::string g_bdev_name;
29  volatile bool g_spdk_ready = false;
30  volatile bool g_spdk_start_failure = false;
31  
32  void SpdkInitializeThread(void);
33  
34  class SpdkThreadCtx
35  {
36  public:
37  	struct spdk_fs_thread_ctx *channel;
38  
39  	SpdkThreadCtx(void) : channel(NULL)
40  	{
41  		SpdkInitializeThread();
42  	}
43  
44  	~SpdkThreadCtx(void)
45  	{
46  		if (channel) {
47  			spdk_fs_free_thread_ctx(channel);
48  			channel = NULL;
49  		}
50  	}
51  
52  private:
53  	SpdkThreadCtx(const SpdkThreadCtx &);
54  	SpdkThreadCtx &operator=(const SpdkThreadCtx &);
55  };
56  
57  thread_local SpdkThreadCtx g_sync_args;
58  
59  static void
60  set_channel()
61  {
62  	if (g_fs != NULL && g_sync_args.channel == NULL) {
63  		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
64  	}
65  }
66  
67  static void
68  __call_fn(void *arg1, void *arg2)
69  {
70  	fs_request_fn fn;
71  
72  	fn = (fs_request_fn)arg1;
73  	fn(arg2);
74  }
75  
76  static void
77  __send_request(fs_request_fn fn, void *arg)
78  {
79  	struct spdk_event *event;
80  
81  	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
82  	spdk_event_call(event);
83  }
84  
85  static std::string
86  sanitize_path(const std::string &input, const std::string &mount_directory)
87  {
88  	int index = 0;
89  	std::string name;
90  	std::string input_tmp;
91  
92  	input_tmp = input.substr(mount_directory.length(), input.length());
93  	for (const char &c : input_tmp) {
94  		if (index == 0) {
95  			if (c != '/') {
96  				name = name.insert(index, 1, '/');
97  				index++;
98  			}
99  			name = name.insert(index, 1, c);
100  			index++;
101  		} else {
102  			if (name[index - 1] == '/' && c == '/') {
103  				continue;
104  			} else {
105  				name = name.insert(index, 1, c);
106  				index++;
107  			}
108  		}
109  	}
110  
111  	if (name[name.size() - 1] == '/') {
112  		name = name.erase(name.size() - 1, 1);
113  	}
114  	return name;
115  }
116  
117  class SpdkSequentialFile : public SequentialFile
118  {
119  	struct spdk_file *mFile;
120  	uint64_t mOffset;
121  public:
122  	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
123  	virtual ~SpdkSequentialFile();
124  
125  	virtual Status Read(size_t n, Slice *result, char *scratch) override;
126  	virtual Status Skip(uint64_t n) override;
127  	virtual Status InvalidateCache(size_t offset, size_t length) override;
128  };
129  
130  SpdkSequentialFile::~SpdkSequentialFile(void)
131  {
132  	set_channel();
133  	spdk_file_close(mFile, g_sync_args.channel);
134  }
135  
136  Status
137  SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
138  {
139  	int64_t ret;
140  
141  	set_channel();
142  	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
143  	if (ret >= 0) {
144  		mOffset += ret;
145  		*result = Slice(scratch, ret);
146  		return Status::OK();
147  	} else {
148  		errno = -ret;
149  		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
150  	}
151  }
152  
153  Status
154  SpdkSequentialFile::Skip(uint64_t n)
155  {
156  	mOffset += n;
157  	return Status::OK();
158  }
159  
160  Status
161  SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
162  				    __attribute__((unused)) size_t length)
163  {
164  	return Status::OK();
165  }
166  
167  class SpdkRandomAccessFile : public RandomAccessFile
168  {
169  	struct spdk_file *mFile;
170  public:
171  	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
172  	virtual ~SpdkRandomAccessFile();
173  
174  	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
175  	virtual Status InvalidateCache(size_t offset, size_t length) override;
176  };
177  
178  SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
179  {
180  	set_channel();
181  	spdk_file_close(mFile, g_sync_args.channel);
182  }
183  
184  Status
185  SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
186  {
187  	int64_t rc;
188  
189  	set_channel();
190  	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
191  	if (rc >= 0) {
192  		*result = Slice(scratch, rc);
193  		return Status::OK();
194  	} else {
195  		errno = -rc;
196  		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
197  	}
198  }
199  
200  Status
201  SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
202  				      __attribute__((unused)) size_t length)
203  {
204  	return Status::OK();
205  }
206  
207  class SpdkWritableFile : public WritableFile
208  {
209  	struct spdk_file *mFile;
210  	uint64_t mSize;
211  
212  public:
213  	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
214  	~SpdkWritableFile()
215  	{
216  		if (mFile != NULL) {
217  			Close();
218  		}
219  	}
220  
221  	virtual void SetIOPriority(Env::IOPriority pri)
222  	{
223  		if (pri == Env::IO_HIGH) {
224  			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
225  		}
226  	}
227  
228  	virtual Status Truncate(uint64_t size) override
229  	{
230  		int rc;
231  
232  		set_channel();
233  		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
234  		if (!rc) {
235  			mSize = size;
236  			return Status::OK();
237  		} else {
238  			errno = -rc;
239  			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
240  		}
241  	}
242  	virtual Status Close() override
243  	{
244  		set_channel();
245  		spdk_file_close(mFile, g_sync_args.channel);
246  		mFile = NULL;
247  		return Status::OK();
248  	}
249  	using WritableFile::Append;
250  	virtual Status Append(const Slice &data) override;
251  	virtual Status Flush() override
252  	{
253  		return Status::OK();
254  	}
255  	virtual Status Sync() override
256  	{
257  		int rc;
258  
259  		set_channel();
260  		rc = spdk_file_sync(mFile, g_sync_args.channel);
261  		if (!rc) {
262  			return Status::OK();
263  		} else {
264  			errno = -rc;
265  			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
266  		}
267  	}
268  	virtual Status Fsync() override
269  	{
270  		int rc;
271  
272  		set_channel();
273  		rc = spdk_file_sync(mFile, g_sync_args.channel);
274  		if (!rc) {
275  			return Status::OK();
276  		} else {
277  			errno = -rc;
278  			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
279  		}
280  	}
281  	virtual bool IsSyncThreadSafe() const override
282  	{
283  		return true;
284  	}
285  	virtual uint64_t GetFileSize() override
286  	{
287  		return mSize;
288  	}
289  	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
290  				       __attribute__((unused)) size_t length) override
291  	{
292  		return Status::OK();
293  	}
294  	virtual Status Allocate(uint64_t offset, uint64_t len) override
295  	{
296  		int rc;
297  
298  		set_channel();
299  		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
300  		if (!rc) {
301  			return Status::OK();
302  		} else {
303  			errno = -rc;
304  			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
305  		}
306  	}
307  	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
308  				 __attribute__((unused)) uint64_t nbytes) override
309  	{
310  		int rc;
311  
312  		/*
313  		 * SPDK BlobFS does not have a range sync operation yet, so just sync
314  		 *  the whole file.
315  		 */
316  		set_channel();
317  		rc = spdk_file_sync(mFile, g_sync_args.channel);
318  		if (!rc) {
319  			return Status::OK();
320  		} else {
321  			errno = -rc;
322  			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
323  		}
324  	}
325  	virtual size_t GetUniqueId(char *id, size_t max_size) const override
326  	{
327  		int rc;
328  
329  		rc = spdk_file_get_id(mFile, id, max_size);
330  		if (rc < 0) {
331  			return 0;
332  		} else {
333  			return rc;
334  		}
335  	}
336  };
337  
338  Status
339  SpdkWritableFile::Append(const Slice &data)
340  {
341  	int64_t rc;
342  
343  	set_channel();
344  	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
345  	if (rc >= 0) {
346  		mSize += data.size();
347  		return Status::OK();
348  	} else {
349  		errno = -rc;
350  		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
351  	}
352  }
353  
354  class SpdkDirectory : public Directory
355  {
356  public:
357  	SpdkDirectory() {}
358  	~SpdkDirectory() {}
359  	Status Fsync() override
360  	{
361  		return Status::OK();
362  	}
363  };
364  
365  class SpdkAppStartException : public std::runtime_error
366  {
367  public:
368  	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
369  };
370  
371  class SpdkEnv : public EnvWrapper
372  {
373  private:
374  	pthread_t mSpdkTid;
375  	std::string mDirectory;
376  	std::string mConfig;
377  	std::string mBdev;
378  
379  public:
380  	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
381  		const std::string &bdev, uint64_t cache_size_in_mb);
382  
383  	virtual ~SpdkEnv();
384  
385  	virtual Status NewSequentialFile(const std::string &fname,
386  					 std::unique_ptr<SequentialFile> *result,
387  					 const EnvOptions &options) override
388  	{
389  		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
390  			struct spdk_file *file;
391  			int rc;
392  
393  			std::string name = sanitize_path(fname, mDirectory);
394  			set_channel();
395  			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
396  					       name.c_str(), 0, &file);
397  			if (rc == 0) {
398  				result->reset(new SpdkSequentialFile(file));
399  				return Status::OK();
400  			} else {
401  				/* Myrocks engine uses errno(ENOENT) as one
402  				 * special condition, for the purpose to
403  				 * support MySQL, set the errno to right value.
404  				 */
405  				errno = -rc;
406  				return Status::IOError(name, strerror(errno));
407  			}
408  		} else {
409  			return EnvWrapper::NewSequentialFile(fname, result, options);
410  		}
411  	}
412  
413  	virtual Status NewRandomAccessFile(const std::string &fname,
414  					   std::unique_ptr<RandomAccessFile> *result,
415  					   const EnvOptions &options) override
416  	{
417  		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
418  			std::string name = sanitize_path(fname, mDirectory);
419  			struct spdk_file *file;
420  			int rc;
421  
422  			set_channel();
423  			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
424  					       name.c_str(), 0, &file);
425  			if (rc == 0) {
426  				result->reset(new SpdkRandomAccessFile(file));
427  				return Status::OK();
428  			} else {
429  				errno = -rc;
430  				return Status::IOError(name, strerror(errno));
431  			}
432  		} else {
433  			return EnvWrapper::NewRandomAccessFile(fname, result, options);
434  		}
435  	}
436  
437  	virtual Status NewWritableFile(const std::string &fname,
438  				       std::unique_ptr<WritableFile> *result,
439  				       const EnvOptions &options) override
440  	{
441  		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
442  			std::string name = sanitize_path(fname, mDirectory);
443  			struct spdk_file *file;
444  			int rc;
445  
446  			set_channel();
447  			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
448  					       SPDK_BLOBFS_OPEN_CREATE, &file);
449  			if (rc == 0) {
450  				result->reset(new SpdkWritableFile(file));
451  				return Status::OK();
452  			} else {
453  				errno = -rc;
454  				return Status::IOError(name, strerror(errno));
455  			}
456  		} else {
457  			return EnvWrapper::NewWritableFile(fname, result, options);
458  		}
459  	}
460  
461  	virtual Status ReuseWritableFile(const std::string &fname,
462  					 const std::string &old_fname,
463  					 std::unique_ptr<WritableFile> *result,
464  					 const EnvOptions &options) override
465  	{
466  		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
467  	}
468  
469  	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
470  				    std::unique_ptr<Directory> *result) override
471  	{
472  		result->reset(new SpdkDirectory());
473  		return Status::OK();
474  	}
475  	virtual Status FileExists(const std::string &fname) override
476  	{
477  		struct spdk_file_stat stat;
478  		int rc;
479  		std::string name = sanitize_path(fname, mDirectory);
480  
481  		set_channel();
482  		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
483  		if (rc == 0) {
484  			return Status::OK();
485  		}
486  		return EnvWrapper::FileExists(fname);
487  	}
488  	virtual Status RenameFile(const std::string &src, const std::string &t) override
489  	{
490  		int rc;
491  		std::string src_name = sanitize_path(src, mDirectory);
492  		std::string target_name = sanitize_path(t, mDirectory);
493  
494  		set_channel();
495  		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
496  					 src_name.c_str(), target_name.c_str());
497  		if (rc == -ENOENT) {
498  			return EnvWrapper::RenameFile(src, t);
499  		}
500  		return Status::OK();
501  	}
502  	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
503  				__attribute__((unused)) const std::string &t) override
504  	{
505  		return Status::NotSupported("SpdkEnv does not support LinkFile");
506  	}
507  	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
508  	{
509  		struct spdk_file_stat stat;
510  		int rc;
511  		std::string name = sanitize_path(fname, mDirectory);
512  
513  		set_channel();
514  		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
515  		if (rc == -ENOENT) {
516  			return EnvWrapper::GetFileSize(fname, size);
517  		}
518  		*size = stat.size;
519  		return Status::OK();
520  	}
521  	virtual Status DeleteFile(const std::string &fname) override
522  	{
523  		int rc;
524  		std::string name = sanitize_path(fname, mDirectory);
525  
526  		set_channel();
527  		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
528  		if (rc == -ENOENT) {
529  			return EnvWrapper::DeleteFile(fname);
530  		}
531  		return Status::OK();
532  	}
533  	virtual Status LockFile(const std::string &fname, FileLock **lock) override
534  	{
535  		std::string name = sanitize_path(fname, mDirectory);
536  		int64_t rc;
537  
538  		set_channel();
539  		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
540  				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
541  		if (!rc) {
542  			return Status::OK();
543  		} else {
544  			errno = -rc;
545  			return Status::IOError(name, strerror(errno));
546  		}
547  	}
548  	virtual Status UnlockFile(FileLock *lock) override
549  	{
550  		set_channel();
551  		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
552  		return Status::OK();
553  	}
554  	virtual Status GetChildren(const std::string &dir,
555  				   std::vector<std::string> *result) override
556  	{
557  		std::string::size_type pos;
558  		std::set<std::string> dir_and_file_set;
559  		std::string full_path;
560  		std::string filename;
561  		std::string dir_name;
562  
563  		if (dir.find("archive") != std::string::npos) {
564  			return Status::OK();
565  		}
566  		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
567  			spdk_fs_iter iter;
568  			struct spdk_file *file;
569  			dir_name = sanitize_path(dir, mDirectory);
570  
571  			iter = spdk_fs_iter_first(g_fs);
572  			while (iter != NULL) {
573  				file = spdk_fs_iter_get_file(iter);
574  				full_path = spdk_file_get_name(file);
575  				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
576  					iter = spdk_fs_iter_next(iter);
577  					continue;
578  				}
579  				pos = full_path.find("/", dir_name.length() + 1);
580  
581  				if (pos != std::string::npos) {
582  					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
583  				} else {
584  					filename = full_path.substr(dir_name.length() + 1);
585  				}
586  				dir_and_file_set.insert(filename);
587  				iter = spdk_fs_iter_next(iter);
588  			}
589  
590  			for (auto &s : dir_and_file_set) {
591  				result->push_back(s);
592  			}
593  
594  			result->push_back(".");
595  			result->push_back("..");
596  
597  			return Status::OK();
598  		}
599  		return EnvWrapper::GetChildren(dir, result);
600  	}
601  };
602  
603  /* The thread local constructor doesn't work for the main thread, since
604   * the filesystem hasn't been loaded yet.  So we break out this
605   * SpdkInitializeThread function, so that the main thread can explicitly
606   * call it after the filesystem has been loaded.
607   */
608  void SpdkInitializeThread(void)
609  {
610  	if (g_fs != NULL) {
611  		if (g_sync_args.channel) {
612  			spdk_fs_free_thread_ctx(g_sync_args.channel);
613  		}
614  		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
615  	}
616  }
617  
618  static void
619  fs_load_cb(__attribute__((unused)) void *ctx,
620  	   struct spdk_filesystem *fs, int fserrno)
621  {
622  	if (fserrno == 0) {
623  		g_fs = fs;
624  	}
625  	g_spdk_ready = true;
626  }
627  
628  static void
629  base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev,
630  		   __attribute__((unused)) void *event_ctx)
631  {
632  	printf("Unsupported bdev event: type %d\n", type);
633  }
634  
635  static void
636  rocksdb_run(__attribute__((unused)) void *arg1)
637  {
638  	int rc;
639  
640  	rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL,
641  					 &g_bs_dev);
642  	if (rc != 0) {
643  		printf("Could not create blob bdev\n");
644  		spdk_app_stop(0);
645  		exit(1);
646  	}
647  
648  	g_lcore = spdk_env_get_first_core();
649  
650  	printf("using bdev %s\n", g_bdev_name.c_str());
651  	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
652  }
653  
654  static void
655  fs_unload_cb(__attribute__((unused)) void *ctx,
656  	     __attribute__((unused)) int fserrno)
657  {
658  	assert(fserrno == 0);
659  
660  	spdk_app_stop(0);
661  }
662  
663  static void
664  rocksdb_shutdown(void)
665  {
666  	if (g_fs != NULL) {
667  		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
668  	} else {
669  		fs_unload_cb(NULL, 0);
670  	}
671  }
672  
673  static void *
674  initialize_spdk(void *arg)
675  {
676  	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
677  	int rc;
678  
679  	rc = spdk_app_start(opts, rocksdb_run, NULL);
680  	/*
681  	 * TODO:  Revisit for case of internal failure of
682  	 * spdk_app_start(), itself.  At this time, it's known
683  	 * the only application's use of spdk_app_stop() passes
684  	 * a zero; i.e. no fail (non-zero) cases so here we
685  	 * assume there was an internal failure and flag it
686  	 * so we can throw an exception.
687  	 */
688  	if (rc) {
689  		g_spdk_start_failure = true;
690  	} else {
691  		spdk_app_fini();
692  		delete opts;
693  	}
694  	pthread_exit(NULL);
695  
696  }
697  
698  SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
699  		 const std::string &bdev, uint64_t cache_size_in_mb)
700  	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
701  {
702  	struct spdk_app_opts *opts = new struct spdk_app_opts;
703  
704  	spdk_app_opts_init(opts, sizeof(*opts));
705  	opts->name = "rocksdb";
706  	opts->json_config_file = mConfig.c_str();
707  	opts->shutdown_cb = rocksdb_shutdown;
708  	opts->tpoint_group_mask = "0x80";
709  
710  	spdk_fs_set_cache_size(cache_size_in_mb);
711  	g_bdev_name = mBdev;
712  
713  	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
714  	while (!g_spdk_ready && !g_spdk_start_failure)
715  		;
716  	if (g_spdk_start_failure) {
717  		delete opts;
718  		throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()");
719  	}
720  
721  	SpdkInitializeThread();
722  }
723  
724  SpdkEnv::~SpdkEnv()
725  {
726  	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
727  	 * do the work before the test quit.
728  	 */
729  	if (g_fs != NULL) {
730  		spdk_fs_iter iter;
731  		struct spdk_file *file;
732  
733  		if (!g_sync_args.channel) {
734  			SpdkInitializeThread();
735  		}
736  
737  		iter = spdk_fs_iter_first(g_fs);
738  		while (iter != NULL) {
739  			file = spdk_fs_iter_get_file(iter);
740  			spdk_file_close(file, g_sync_args.channel);
741  			iter = spdk_fs_iter_next(iter);
742  		}
743  	}
744  
745  	spdk_app_start_shutdown();
746  	pthread_join(mSpdkTid, NULL);
747  }
748  
749  Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
750  		const std::string &bdev, uint64_t cache_size_in_mb)
751  {
752  	try {
753  		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
754  		if (g_fs != NULL) {
755  			return spdk_env;
756  		} else {
757  			delete spdk_env;
758  			return NULL;
759  		}
760  	} catch (SpdkAppStartException &e) {
761  		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
762  		return NULL;
763  	} catch (...) {
764  		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
765  		return NULL;
766  	}
767  }
768  
769  } // namespace rocksdb
770