xref: /spdk/lib/rocksdb/env_spdk.cc (revision dd1c38cc680e4e8ca2642e93bf289072bff7fc3d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "rocksdb/env.h"
35 #include <set>
36 #include <iostream>
37 #include <stdexcept>
38 
39 extern "C" {
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/blob.h"
43 #include "spdk/blobfs.h"
44 #include "spdk/blob_bdev.h"
45 #include "spdk/log.h"
46 #include "spdk/thread.h"
47 #include "spdk/bdev.h"
48 
49 #include "spdk_internal/thread.h"
50 }
51 
52 namespace rocksdb
53 {
54 
55 struct spdk_filesystem *g_fs = NULL;
56 struct spdk_bs_dev *g_bs_dev;
57 uint32_t g_lcore = 0;
58 std::string g_bdev_name;
59 volatile bool g_spdk_ready = false;
60 volatile bool g_spdk_start_failure = false;
61 
62 void SpdkInitializeThread(void);
63 void SpdkFinalizeThread(void);
64 
65 class SpdkThreadCtx
66 {
67 public:
68 	struct spdk_fs_thread_ctx *channel;
69 
70 	SpdkThreadCtx(void) : channel(NULL)
71 	{
72 		SpdkInitializeThread();
73 	}
74 
75 	~SpdkThreadCtx(void)
76 	{
77 		SpdkFinalizeThread();
78 	}
79 
80 private:
81 	SpdkThreadCtx(const SpdkThreadCtx &);
82 	SpdkThreadCtx &operator=(const SpdkThreadCtx &);
83 };
84 
85 thread_local SpdkThreadCtx g_sync_args;
86 
87 static void
88 __call_fn(void *arg1, void *arg2)
89 {
90 	fs_request_fn fn;
91 
92 	fn = (fs_request_fn)arg1;
93 	fn(arg2);
94 }
95 
96 static void
97 __send_request(fs_request_fn fn, void *arg)
98 {
99 	struct spdk_event *event;
100 
101 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
102 	spdk_event_call(event);
103 }
104 
105 static std::string
106 sanitize_path(const std::string &input, const std::string &mount_directory)
107 {
108 	int index = 0;
109 	std::string name;
110 	std::string input_tmp;
111 
112 	input_tmp = input.substr(mount_directory.length(), input.length());
113 	for (const char &c : input_tmp) {
114 		if (index == 0) {
115 			if (c != '/') {
116 				name = name.insert(index, 1, '/');
117 				index++;
118 			}
119 			name = name.insert(index, 1, c);
120 			index++;
121 		} else {
122 			if (name[index - 1] == '/' && c == '/') {
123 				continue;
124 			} else {
125 				name = name.insert(index, 1, c);
126 				index++;
127 			}
128 		}
129 	}
130 
131 	if (name[name.size() - 1] == '/') {
132 		name = name.erase(name.size() - 1, 1);
133 	}
134 	return name;
135 }
136 
137 class SpdkSequentialFile : public SequentialFile
138 {
139 	struct spdk_file *mFile;
140 	uint64_t mOffset;
141 public:
142 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
143 	virtual ~SpdkSequentialFile();
144 
145 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
146 	virtual Status Skip(uint64_t n) override;
147 	virtual Status InvalidateCache(size_t offset, size_t length) override;
148 };
149 
150 SpdkSequentialFile::~SpdkSequentialFile(void)
151 {
152 	spdk_file_close(mFile, g_sync_args.channel);
153 }
154 
155 Status
156 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
157 {
158 	int64_t ret;
159 
160 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
161 	if (ret >= 0) {
162 		mOffset += ret;
163 		*result = Slice(scratch, ret);
164 		return Status::OK();
165 	} else {
166 		errno = -ret;
167 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
168 	}
169 }
170 
171 Status
172 SpdkSequentialFile::Skip(uint64_t n)
173 {
174 	mOffset += n;
175 	return Status::OK();
176 }
177 
178 Status
179 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
180 				    __attribute__((unused)) size_t length)
181 {
182 	return Status::OK();
183 }
184 
185 class SpdkRandomAccessFile : public RandomAccessFile
186 {
187 	struct spdk_file *mFile;
188 public:
189 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
190 	virtual ~SpdkRandomAccessFile();
191 
192 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
193 	virtual Status InvalidateCache(size_t offset, size_t length) override;
194 };
195 
196 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
197 {
198 	spdk_file_close(mFile, g_sync_args.channel);
199 }
200 
201 Status
202 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
203 {
204 	int64_t rc;
205 
206 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
207 	if (rc >= 0) {
208 		*result = Slice(scratch, n);
209 		return Status::OK();
210 	} else {
211 		errno = -rc;
212 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
213 	}
214 }
215 
216 Status
217 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
218 				      __attribute__((unused)) size_t length)
219 {
220 	return Status::OK();
221 }
222 
223 class SpdkWritableFile : public WritableFile
224 {
225 	struct spdk_file *mFile;
226 	uint64_t mSize;
227 
228 public:
229 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
230 	~SpdkWritableFile()
231 	{
232 		if (mFile != NULL) {
233 			Close();
234 		}
235 	}
236 
237 	virtual void SetIOPriority(Env::IOPriority pri)
238 	{
239 		if (pri == Env::IO_HIGH) {
240 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
241 		}
242 	}
243 
244 	virtual Status Truncate(uint64_t size) override
245 	{
246 		int rc;
247 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
248 		if (!rc) {
249 			mSize = size;
250 			return Status::OK();
251 		} else {
252 			errno = -rc;
253 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
254 		}
255 	}
256 	virtual Status Close() override
257 	{
258 		spdk_file_close(mFile, g_sync_args.channel);
259 		mFile = NULL;
260 		return Status::OK();
261 	}
262 	virtual Status Append(const Slice &data) override;
263 	virtual Status Flush() override
264 	{
265 		return Status::OK();
266 	}
267 	virtual Status Sync() override
268 	{
269 		int rc;
270 
271 		rc = spdk_file_sync(mFile, g_sync_args.channel);
272 		if (!rc) {
273 			return Status::OK();
274 		} else {
275 			errno = -rc;
276 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
277 		}
278 	}
279 	virtual Status Fsync() override
280 	{
281 		int rc;
282 
283 		rc = spdk_file_sync(mFile, g_sync_args.channel);
284 		if (!rc) {
285 			return Status::OK();
286 		} else {
287 			errno = -rc;
288 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
289 		}
290 	}
291 	virtual bool IsSyncThreadSafe() const override
292 	{
293 		return true;
294 	}
295 	virtual uint64_t GetFileSize() override
296 	{
297 		return mSize;
298 	}
299 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
300 				       __attribute__((unused)) size_t length) override
301 	{
302 		return Status::OK();
303 	}
304 	virtual Status Allocate(uint64_t offset, uint64_t len) override
305 	{
306 		int rc;
307 
308 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
309 		if (!rc) {
310 			return Status::OK();
311 		} else {
312 			errno = -rc;
313 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
314 		}
315 	}
316 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
317 				 __attribute__((unused)) uint64_t nbytes) override
318 	{
319 		int rc;
320 
321 		/*
322 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
323 		 *  the whole file.
324 		 */
325 		rc = spdk_file_sync(mFile, g_sync_args.channel);
326 		if (!rc) {
327 			return Status::OK();
328 		} else {
329 			errno = -rc;
330 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
331 		}
332 	}
333 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
334 	{
335 		int rc;
336 
337 		rc = spdk_file_get_id(mFile, id, max_size);
338 		if (rc < 0) {
339 			return 0;
340 		} else {
341 			return rc;
342 		}
343 	}
344 };
345 
346 Status
347 SpdkWritableFile::Append(const Slice &data)
348 {
349 	int64_t rc;
350 
351 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
352 	if (rc >= 0) {
353 		mSize += data.size();
354 		return Status::OK();
355 	} else {
356 		errno = -rc;
357 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
358 	}
359 }
360 
361 class SpdkDirectory : public Directory
362 {
363 public:
364 	SpdkDirectory() {}
365 	~SpdkDirectory() {}
366 	Status Fsync() override
367 	{
368 		return Status::OK();
369 	}
370 };
371 
372 class SpdkAppStartException : public std::runtime_error
373 {
374 public:
375 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
376 };
377 
378 class SpdkEnv : public EnvWrapper
379 {
380 private:
381 	pthread_t mSpdkTid;
382 	std::string mDirectory;
383 	std::string mConfig;
384 	std::string mBdev;
385 
386 public:
387 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
388 		const std::string &bdev, uint64_t cache_size_in_mb);
389 
390 	virtual ~SpdkEnv();
391 
392 	virtual Status NewSequentialFile(const std::string &fname,
393 					 unique_ptr<SequentialFile> *result,
394 					 const EnvOptions &options) override
395 	{
396 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
397 			struct spdk_file *file;
398 			int rc;
399 
400 			std::string name = sanitize_path(fname, mDirectory);
401 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
402 					       name.c_str(), 0, &file);
403 			if (rc == 0) {
404 				result->reset(new SpdkSequentialFile(file));
405 				return Status::OK();
406 			} else {
407 				/* Myrocks engine uses errno(ENOENT) as one
408 				 * special condition, for the purpose to
409 				 * support MySQL, set the errno to right value.
410 				 */
411 				errno = -rc;
412 				return Status::IOError(name, strerror(errno));
413 			}
414 		} else {
415 			return EnvWrapper::NewSequentialFile(fname, result, options);
416 		}
417 	}
418 
419 	virtual Status NewRandomAccessFile(const std::string &fname,
420 					   unique_ptr<RandomAccessFile> *result,
421 					   const EnvOptions &options) override
422 	{
423 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
424 			std::string name = sanitize_path(fname, mDirectory);
425 			struct spdk_file *file;
426 			int rc;
427 
428 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
429 					       name.c_str(), 0, &file);
430 			if (rc == 0) {
431 				result->reset(new SpdkRandomAccessFile(file));
432 				return Status::OK();
433 			} else {
434 				errno = -rc;
435 				return Status::IOError(name, strerror(errno));
436 			}
437 		} else {
438 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
439 		}
440 	}
441 
442 	virtual Status NewWritableFile(const std::string &fname,
443 				       unique_ptr<WritableFile> *result,
444 				       const EnvOptions &options) override
445 	{
446 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
447 			std::string name = sanitize_path(fname, mDirectory);
448 			struct spdk_file *file;
449 			int rc;
450 
451 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
452 					       SPDK_BLOBFS_OPEN_CREATE, &file);
453 			if (rc == 0) {
454 				result->reset(new SpdkWritableFile(file));
455 				return Status::OK();
456 			} else {
457 				errno = -rc;
458 				return Status::IOError(name, strerror(errno));
459 			}
460 		} else {
461 			return EnvWrapper::NewWritableFile(fname, result, options);
462 		}
463 	}
464 
465 	virtual Status ReuseWritableFile(const std::string &fname,
466 					 const std::string &old_fname,
467 					 unique_ptr<WritableFile> *result,
468 					 const EnvOptions &options) override
469 	{
470 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
471 	}
472 
473 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
474 				    unique_ptr<Directory> *result) override
475 	{
476 		result->reset(new SpdkDirectory());
477 		return Status::OK();
478 	}
479 	virtual Status FileExists(const std::string &fname) override
480 	{
481 		struct spdk_file_stat stat;
482 		int rc;
483 		std::string name = sanitize_path(fname, mDirectory);
484 
485 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
486 		if (rc == 0) {
487 			return Status::OK();
488 		}
489 		return EnvWrapper::FileExists(fname);
490 	}
491 	virtual Status RenameFile(const std::string &src, const std::string &t) override
492 	{
493 		int rc;
494 		std::string src_name = sanitize_path(src, mDirectory);
495 		std::string target_name = sanitize_path(t, mDirectory);
496 
497 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
498 					 src_name.c_str(), target_name.c_str());
499 		if (rc == -ENOENT) {
500 			return EnvWrapper::RenameFile(src, t);
501 		}
502 		return Status::OK();
503 	}
504 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
505 				__attribute__((unused)) const std::string &t) override
506 	{
507 		return Status::NotSupported("SpdkEnv does not support LinkFile");
508 	}
509 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
510 	{
511 		struct spdk_file_stat stat;
512 		int rc;
513 		std::string name = sanitize_path(fname, mDirectory);
514 
515 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
516 		if (rc == -ENOENT) {
517 			return EnvWrapper::GetFileSize(fname, size);
518 		}
519 		*size = stat.size;
520 		return Status::OK();
521 	}
522 	virtual Status DeleteFile(const std::string &fname) override
523 	{
524 		int rc;
525 		std::string name = sanitize_path(fname, mDirectory);
526 
527 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
528 		if (rc == -ENOENT) {
529 			return EnvWrapper::DeleteFile(fname);
530 		}
531 		return Status::OK();
532 	}
533 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
534 	{
535 		std::string name = sanitize_path(fname, mDirectory);
536 		int64_t rc;
537 
538 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
539 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
540 		if (!rc) {
541 			return Status::OK();
542 		} else {
543 			errno = -rc;
544 			return Status::IOError(name, strerror(errno));
545 		}
546 	}
547 	virtual Status UnlockFile(FileLock *lock) override
548 	{
549 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
550 		return Status::OK();
551 	}
552 	virtual Status GetChildren(const std::string &dir,
553 				   std::vector<std::string> *result) override
554 	{
555 		std::string::size_type pos;
556 		std::set<std::string> dir_and_file_set;
557 		std::string full_path;
558 		std::string filename;
559 		std::string dir_name;
560 
561 		if (dir.find("archive") != std::string::npos) {
562 			return Status::OK();
563 		}
564 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
565 			spdk_fs_iter iter;
566 			struct spdk_file *file;
567 			dir_name = sanitize_path(dir, mDirectory);
568 
569 			iter = spdk_fs_iter_first(g_fs);
570 			while (iter != NULL) {
571 				file = spdk_fs_iter_get_file(iter);
572 				full_path = spdk_file_get_name(file);
573 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
574 					iter = spdk_fs_iter_next(iter);
575 					continue;
576 				}
577 				pos = full_path.find("/", dir_name.length() + 1);
578 
579 				if (pos != std::string::npos) {
580 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
581 				} else {
582 					filename = full_path.substr(dir_name.length() + 1);
583 				}
584 				dir_and_file_set.insert(filename);
585 				iter = spdk_fs_iter_next(iter);
586 			}
587 
588 			for (auto &s : dir_and_file_set) {
589 				result->push_back(s);
590 			}
591 
592 			result->push_back(".");
593 			result->push_back("..");
594 
595 			return Status::OK();
596 		}
597 		return EnvWrapper::GetChildren(dir, result);
598 	}
599 };
600 
601 void SpdkInitializeThread(void)
602 {
603 	struct spdk_thread *thread;
604 
605 	if (g_fs != NULL && g_sync_args.channel == NULL) {
606 		thread = spdk_thread_create("spdk_rocksdb", NULL);
607 		spdk_set_thread(thread);
608 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
609 	}
610 }
611 
612 void SpdkFinalizeThread(void)
613 {
614 	if (g_sync_args.channel) {
615 		spdk_fs_free_thread_ctx(g_sync_args.channel);
616 		g_sync_args.channel = NULL;
617 	}
618 }
619 
620 static void
621 fs_load_cb(__attribute__((unused)) void *ctx,
622 	   struct spdk_filesystem *fs, int fserrno)
623 {
624 	if (fserrno == 0) {
625 		g_fs = fs;
626 	}
627 	g_spdk_ready = true;
628 }
629 
630 static void
631 spdk_rocksdb_run(__attribute__((unused)) void *arg1)
632 {
633 	struct spdk_bdev *bdev;
634 
635 	bdev = spdk_bdev_get_by_name(g_bdev_name.c_str());
636 
637 	if (bdev == NULL) {
638 		SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str());
639 		exit(1);
640 	}
641 
642 	g_lcore = spdk_env_get_first_core();
643 
644 	g_bs_dev = spdk_bdev_create_bs_dev(bdev, NULL, NULL);
645 	printf("using bdev %s\n", g_bdev_name.c_str());
646 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
647 }
648 
649 static void
650 fs_unload_cb(__attribute__((unused)) void *ctx,
651 	     __attribute__((unused)) int fserrno)
652 {
653 	assert(fserrno == 0);
654 
655 	spdk_app_stop(0);
656 }
657 
658 static void
659 spdk_rocksdb_shutdown(void)
660 {
661 	if (g_fs != NULL) {
662 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
663 	} else {
664 		fs_unload_cb(NULL, 0);
665 	}
666 }
667 
668 static void *
669 initialize_spdk(void *arg)
670 {
671 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
672 	int rc;
673 
674 	rc = spdk_app_start(opts, spdk_rocksdb_run, NULL);
675 	/*
676 	 * TODO:  Revisit for case of internal failure of
677 	 * spdk_app_start(), itself.  At this time, it's known
678 	 * the only application's use of spdk_app_stop() passes
679 	 * a zero; i.e. no fail (non-zero) cases so here we
680 	 * assume there was an internal failure and flag it
681 	 * so we can throw an exception.
682 	 */
683 	if (rc) {
684 		g_spdk_start_failure = true;
685 	} else {
686 		spdk_app_fini();
687 		delete opts;
688 	}
689 	pthread_exit(NULL);
690 
691 }
692 
693 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
694 		 const std::string &bdev, uint64_t cache_size_in_mb)
695 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
696 {
697 	struct spdk_app_opts *opts = new struct spdk_app_opts;
698 
699 	spdk_app_opts_init(opts);
700 	opts->name = "rocksdb";
701 	opts->config_file = mConfig.c_str();
702 	opts->shutdown_cb = spdk_rocksdb_shutdown;
703 
704 	spdk_fs_set_cache_size(cache_size_in_mb);
705 	g_bdev_name = mBdev;
706 
707 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
708 	while (!g_spdk_ready && !g_spdk_start_failure)
709 		;
710 	if (g_spdk_start_failure) {
711 		delete opts;
712 		throw SpdkAppStartException("spdk_app_start() unable to start spdk_rocksdb_run()");
713 	}
714 
715 	SpdkInitializeThread();
716 }
717 
718 SpdkEnv::~SpdkEnv()
719 {
720 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
721 	 * do the work before the test quit.
722 	 */
723 	if (g_fs != NULL) {
724 		spdk_fs_iter iter;
725 		struct spdk_file *file;
726 
727 		if (!g_sync_args.channel) {
728 			SpdkInitializeThread();
729 		}
730 
731 		iter = spdk_fs_iter_first(g_fs);
732 		while (iter != NULL) {
733 			file = spdk_fs_iter_get_file(iter);
734 			spdk_file_close(file, g_sync_args.channel);
735 			iter = spdk_fs_iter_next(iter);
736 		}
737 	}
738 
739 	spdk_app_start_shutdown();
740 	pthread_join(mSpdkTid, NULL);
741 }
742 
743 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
744 		const std::string &bdev, uint64_t cache_size_in_mb)
745 {
746 	try {
747 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
748 		if (g_fs != NULL) {
749 			return spdk_env;
750 		} else {
751 			delete spdk_env;
752 			return NULL;
753 		}
754 	} catch (SpdkAppStartException &e) {
755 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
756 		return NULL;
757 	} catch (...) {
758 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
759 		return NULL;
760 	}
761 }
762 
763 } // namespace rocksdb
764