xref: /spdk/lib/rocksdb/env_spdk.cc (revision 1fc4165fe9bf8512483356ad8e6d27f793f2e3db)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "rocksdb/env.h"
35 #include <set>
36 #include <iostream>
37 #include <stdexcept>
38 
39 extern "C" {
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/blob.h"
43 #include "spdk/blobfs.h"
44 #include "spdk/blob_bdev.h"
45 #include "spdk/log.h"
46 #include "spdk/thread.h"
47 #include "spdk/bdev.h"
48 
49 #include "spdk_internal/thread.h"
50 }
51 
52 namespace rocksdb
53 {
54 
55 struct spdk_filesystem *g_fs = NULL;
56 struct spdk_bs_dev *g_bs_dev;
57 uint32_t g_lcore = 0;
58 std::string g_bdev_name;
59 volatile bool g_spdk_ready = false;
60 volatile bool g_spdk_start_failure = false;
61 struct sync_args {
62 	struct spdk_io_channel *channel;
63 };
64 
65 __thread struct sync_args g_sync_args;
66 
67 static void
68 __call_fn(void *arg1, void *arg2)
69 {
70 	fs_request_fn fn;
71 
72 	fn = (fs_request_fn)arg1;
73 	fn(arg2);
74 }
75 
76 static void
77 __send_request(fs_request_fn fn, void *arg)
78 {
79 	struct spdk_event *event;
80 
81 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
82 	spdk_event_call(event);
83 }
84 
85 static std::string
86 sanitize_path(const std::string &input, const std::string &mount_directory)
87 {
88 	int index = 0;
89 	std::string name;
90 	std::string input_tmp;
91 
92 	input_tmp = input.substr(mount_directory.length(), input.length());
93 	for (const char &c : input_tmp) {
94 		if (index == 0) {
95 			if (c != '/') {
96 				name = name.insert(index, 1, '/');
97 				index++;
98 			}
99 			name = name.insert(index, 1, c);
100 			index++;
101 		} else {
102 			if (name[index - 1] == '/' && c == '/') {
103 				continue;
104 			} else {
105 				name = name.insert(index, 1, c);
106 				index++;
107 			}
108 		}
109 	}
110 
111 	if (name[name.size() - 1] == '/') {
112 		name = name.erase(name.size() - 1, 1);
113 	}
114 	return name;
115 }
116 
117 class SpdkSequentialFile : public SequentialFile
118 {
119 	struct spdk_file *mFile;
120 	uint64_t mOffset;
121 public:
122 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
123 	virtual ~SpdkSequentialFile();
124 
125 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
126 	virtual Status Skip(uint64_t n) override;
127 	virtual Status InvalidateCache(size_t offset, size_t length) override;
128 };
129 
130 SpdkSequentialFile::~SpdkSequentialFile(void)
131 {
132 	spdk_file_close(mFile, g_sync_args.channel);
133 }
134 
135 Status
136 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
137 {
138 	int64_t ret;
139 
140 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
141 	if (ret >= 0) {
142 		mOffset += ret;
143 		*result = Slice(scratch, ret);
144 		return Status::OK();
145 	} else {
146 		errno = -ret;
147 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
148 	}
149 }
150 
151 Status
152 SpdkSequentialFile::Skip(uint64_t n)
153 {
154 	mOffset += n;
155 	return Status::OK();
156 }
157 
158 Status
159 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
160 				    __attribute__((unused)) size_t length)
161 {
162 	return Status::OK();
163 }
164 
165 class SpdkRandomAccessFile : public RandomAccessFile
166 {
167 	struct spdk_file *mFile;
168 public:
169 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
170 	virtual ~SpdkRandomAccessFile();
171 
172 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
173 	virtual Status InvalidateCache(size_t offset, size_t length) override;
174 };
175 
176 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
177 {
178 	spdk_file_close(mFile, g_sync_args.channel);
179 }
180 
181 Status
182 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
183 {
184 	int64_t rc;
185 
186 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
187 	if (rc >= 0) {
188 		*result = Slice(scratch, n);
189 		return Status::OK();
190 	} else {
191 		errno = -rc;
192 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
193 	}
194 }
195 
196 Status
197 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
198 				      __attribute__((unused)) size_t length)
199 {
200 	return Status::OK();
201 }
202 
203 class SpdkWritableFile : public WritableFile
204 {
205 	struct spdk_file *mFile;
206 	uint64_t mSize;
207 
208 public:
209 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
210 	~SpdkWritableFile()
211 	{
212 		if (mFile != NULL) {
213 			Close();
214 		}
215 	}
216 
217 	virtual void SetIOPriority(Env::IOPriority pri)
218 	{
219 		if (pri == Env::IO_HIGH) {
220 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
221 		}
222 	}
223 
224 	virtual Status Truncate(uint64_t size) override
225 	{
226 		int rc;
227 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
228 		if (!rc) {
229 			mSize = size;
230 			return Status::OK();
231 		} else {
232 			errno = -rc;
233 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
234 		}
235 	}
236 	virtual Status Close() override
237 	{
238 		spdk_file_close(mFile, g_sync_args.channel);
239 		mFile = NULL;
240 		return Status::OK();
241 	}
242 	virtual Status Append(const Slice &data) override;
243 	virtual Status Flush() override
244 	{
245 		return Status::OK();
246 	}
247 	virtual Status Sync() override
248 	{
249 		int rc;
250 
251 		rc = spdk_file_sync(mFile, g_sync_args.channel);
252 		if (!rc) {
253 			return Status::OK();
254 		} else {
255 			errno = -rc;
256 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
257 		}
258 	}
259 	virtual Status Fsync() override
260 	{
261 		int rc;
262 
263 		rc = spdk_file_sync(mFile, g_sync_args.channel);
264 		if (!rc) {
265 			return Status::OK();
266 		} else {
267 			errno = -rc;
268 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
269 		}
270 	}
271 	virtual bool IsSyncThreadSafe() const override
272 	{
273 		return true;
274 	}
275 	virtual uint64_t GetFileSize() override
276 	{
277 		return mSize;
278 	}
279 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
280 				       __attribute__((unused)) size_t length) override
281 	{
282 		return Status::OK();
283 	}
284 	virtual Status Allocate(uint64_t offset, uint64_t len) override
285 	{
286 		int rc;
287 
288 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
289 		if (!rc) {
290 			return Status::OK();
291 		} else {
292 			errno = -rc;
293 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
294 		}
295 	}
296 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
297 				 __attribute__((unused)) uint64_t nbytes) override
298 	{
299 		int rc;
300 
301 		/*
302 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
303 		 *  the whole file.
304 		 */
305 		rc = spdk_file_sync(mFile, g_sync_args.channel);
306 		if (!rc) {
307 			return Status::OK();
308 		} else {
309 			errno = -rc;
310 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
311 		}
312 	}
313 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
314 	{
315 		int rc;
316 
317 		rc = spdk_file_get_id(mFile, id, max_size);
318 		if (rc < 0) {
319 			return 0;
320 		} else {
321 			return rc;
322 		}
323 	}
324 };
325 
326 Status
327 SpdkWritableFile::Append(const Slice &data)
328 {
329 	int64_t rc;
330 
331 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
332 	if (rc >= 0) {
333 		mSize += data.size();
334 		return Status::OK();
335 	} else {
336 		errno = -rc;
337 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
338 	}
339 }
340 
341 class SpdkDirectory : public Directory
342 {
343 public:
344 	SpdkDirectory() {}
345 	~SpdkDirectory() {}
346 	Status Fsync() override
347 	{
348 		return Status::OK();
349 	}
350 };
351 
352 class SpdkAppStartException : public std::runtime_error
353 {
354 public:
355 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
356 };
357 
358 class SpdkEnv : public EnvWrapper
359 {
360 private:
361 	pthread_t mSpdkTid;
362 	std::string mDirectory;
363 	std::string mConfig;
364 	std::string mBdev;
365 
366 public:
367 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
368 		const std::string &bdev, uint64_t cache_size_in_mb);
369 
370 	virtual ~SpdkEnv();
371 
372 	virtual Status NewSequentialFile(const std::string &fname,
373 					 unique_ptr<SequentialFile> *result,
374 					 const EnvOptions &options) override
375 	{
376 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
377 			struct spdk_file *file;
378 			int rc;
379 
380 			std::string name = sanitize_path(fname, mDirectory);
381 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
382 					       name.c_str(), 0, &file);
383 			if (rc == 0) {
384 				result->reset(new SpdkSequentialFile(file));
385 				return Status::OK();
386 			} else {
387 				/* Myrocks engine uses errno(ENOENT) as one
388 				 * special condition, for the purpose to
389 				 * support MySQL, set the errno to right value.
390 				 */
391 				errno = -rc;
392 				return Status::IOError(name, strerror(errno));
393 			}
394 		} else {
395 			return EnvWrapper::NewSequentialFile(fname, result, options);
396 		}
397 	}
398 
399 	virtual Status NewRandomAccessFile(const std::string &fname,
400 					   unique_ptr<RandomAccessFile> *result,
401 					   const EnvOptions &options) override
402 	{
403 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
404 			std::string name = sanitize_path(fname, mDirectory);
405 			struct spdk_file *file;
406 			int rc;
407 
408 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
409 					       name.c_str(), 0, &file);
410 			if (rc == 0) {
411 				result->reset(new SpdkRandomAccessFile(file));
412 				return Status::OK();
413 			} else {
414 				errno = -rc;
415 				return Status::IOError(name, strerror(errno));
416 			}
417 		} else {
418 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
419 		}
420 	}
421 
422 	virtual Status NewWritableFile(const std::string &fname,
423 				       unique_ptr<WritableFile> *result,
424 				       const EnvOptions &options) override
425 	{
426 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
427 			std::string name = sanitize_path(fname, mDirectory);
428 			struct spdk_file *file;
429 			int rc;
430 
431 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
432 					       SPDK_BLOBFS_OPEN_CREATE, &file);
433 			if (rc == 0) {
434 				result->reset(new SpdkWritableFile(file));
435 				return Status::OK();
436 			} else {
437 				errno = -rc;
438 				return Status::IOError(name, strerror(errno));
439 			}
440 		} else {
441 			return EnvWrapper::NewWritableFile(fname, result, options);
442 		}
443 	}
444 
445 	virtual Status ReuseWritableFile(const std::string &fname,
446 					 const std::string &old_fname,
447 					 unique_ptr<WritableFile> *result,
448 					 const EnvOptions &options) override
449 	{
450 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
451 	}
452 
453 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
454 				    unique_ptr<Directory> *result) override
455 	{
456 		result->reset(new SpdkDirectory());
457 		return Status::OK();
458 	}
459 	virtual Status FileExists(const std::string &fname) override
460 	{
461 		struct spdk_file_stat stat;
462 		int rc;
463 		std::string name = sanitize_path(fname, mDirectory);
464 
465 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
466 		if (rc == 0) {
467 			return Status::OK();
468 		}
469 		return EnvWrapper::FileExists(fname);
470 	}
471 	virtual Status RenameFile(const std::string &src, const std::string &t) override
472 	{
473 		int rc;
474 		std::string src_name = sanitize_path(src, mDirectory);
475 		std::string target_name = sanitize_path(t, mDirectory);
476 
477 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
478 					 src_name.c_str(), target_name.c_str());
479 		if (rc == -ENOENT) {
480 			return EnvWrapper::RenameFile(src, t);
481 		}
482 		return Status::OK();
483 	}
484 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
485 				__attribute__((unused)) const std::string &t) override
486 	{
487 		return Status::NotSupported("SpdkEnv does not support LinkFile");
488 	}
489 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
490 	{
491 		struct spdk_file_stat stat;
492 		int rc;
493 		std::string name = sanitize_path(fname, mDirectory);
494 
495 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
496 		if (rc == -ENOENT) {
497 			return EnvWrapper::GetFileSize(fname, size);
498 		}
499 		*size = stat.size;
500 		return Status::OK();
501 	}
502 	virtual Status DeleteFile(const std::string &fname) override
503 	{
504 		int rc;
505 		std::string name = sanitize_path(fname, mDirectory);
506 
507 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
508 		if (rc == -ENOENT) {
509 			return EnvWrapper::DeleteFile(fname);
510 		}
511 		return Status::OK();
512 	}
513 	virtual void StartThread(void (*function)(void *arg), void *arg) override;
514 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
515 	{
516 		std::string name = sanitize_path(fname, mDirectory);
517 		int64_t rc;
518 
519 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
520 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
521 		if (!rc) {
522 			return Status::OK();
523 		} else {
524 			errno = -rc;
525 			return Status::IOError(name, strerror(errno));
526 		}
527 	}
528 	virtual Status UnlockFile(FileLock *lock) override
529 	{
530 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
531 		return Status::OK();
532 	}
533 	virtual Status GetChildren(const std::string &dir,
534 				   std::vector<std::string> *result) override
535 	{
536 		std::string::size_type pos;
537 		std::set<std::string> dir_and_file_set;
538 		std::string full_path;
539 		std::string filename;
540 		std::string dir_name;
541 
542 		if (dir.find("archive") != std::string::npos) {
543 			return Status::OK();
544 		}
545 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
546 			spdk_fs_iter iter;
547 			struct spdk_file *file;
548 			dir_name = sanitize_path(dir, mDirectory);
549 
550 			iter = spdk_fs_iter_first(g_fs);
551 			while (iter != NULL) {
552 				file = spdk_fs_iter_get_file(iter);
553 				full_path = spdk_file_get_name(file);
554 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
555 					iter = spdk_fs_iter_next(iter);
556 					continue;
557 				}
558 				pos = full_path.find("/", dir_name.length() + 1);
559 
560 				if (pos != std::string::npos) {
561 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
562 				} else {
563 					filename = full_path.substr(dir_name.length() + 1);
564 				}
565 				dir_and_file_set.insert(filename);
566 				iter = spdk_fs_iter_next(iter);
567 			}
568 
569 			for (auto &s : dir_and_file_set) {
570 				result->push_back(s);
571 			}
572 
573 			result->push_back(".");
574 			result->push_back("..");
575 
576 			return Status::OK();
577 		}
578 		return EnvWrapper::GetChildren(dir, result);
579 	}
580 };
581 
582 void SpdkInitializeThread(void)
583 {
584 	struct spdk_thread *thread;
585 
586 	if (g_fs != NULL) {
587 		thread = spdk_thread_create("spdk_rocksdb");
588 		spdk_set_thread(thread);
589 		g_sync_args.channel = spdk_fs_alloc_io_channel_sync(g_fs);
590 	}
591 }
592 
593 struct SpdkThreadState {
594 	void (*user_function)(void *);
595 	void *arg;
596 };
597 
598 static void SpdkStartThreadWrapper(void *arg)
599 {
600 	SpdkThreadState *state = reinterpret_cast<SpdkThreadState *>(arg);
601 
602 	SpdkInitializeThread();
603 	state->user_function(state->arg);
604 	delete state;
605 }
606 
607 void SpdkEnv::StartThread(void (*function)(void *arg), void *arg)
608 {
609 	SpdkThreadState *state = new SpdkThreadState;
610 	state->user_function = function;
611 	state->arg = arg;
612 	EnvWrapper::StartThread(SpdkStartThreadWrapper, state);
613 }
614 
615 static void
616 fs_load_cb(__attribute__((unused)) void *ctx,
617 	   struct spdk_filesystem *fs, int fserrno)
618 {
619 	if (fserrno == 0) {
620 		g_fs = fs;
621 	}
622 	g_spdk_ready = true;
623 }
624 
625 static void
626 spdk_rocksdb_run(__attribute__((unused)) void *arg1,
627 		 __attribute__((unused)) void *arg2)
628 {
629 	struct spdk_bdev *bdev;
630 
631 	bdev = spdk_bdev_get_by_name(g_bdev_name.c_str());
632 
633 	if (bdev == NULL) {
634 		SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str());
635 		exit(1);
636 	}
637 
638 	g_lcore = spdk_env_get_first_core();
639 
640 	g_bs_dev = spdk_bdev_create_bs_dev(bdev, NULL, NULL);
641 	printf("using bdev %s\n", g_bdev_name.c_str());
642 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
643 }
644 
645 static void
646 fs_unload_cb(__attribute__((unused)) void *ctx,
647 	     __attribute__((unused)) int fserrno)
648 {
649 	assert(fserrno == 0);
650 
651 	spdk_app_stop(0);
652 }
653 
654 static void
655 spdk_rocksdb_shutdown(void)
656 {
657 	if (g_fs != NULL) {
658 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
659 	} else {
660 		fs_unload_cb(NULL, 0);
661 	}
662 }
663 
664 static void *
665 initialize_spdk(void *arg)
666 {
667 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
668 	int rc;
669 
670 	rc = spdk_app_start(opts, spdk_rocksdb_run, NULL);
671 	/*
672 	 * TODO:  Revisit for case of internal failure of
673 	 * spdk_app_start(), itself.  At this time, it's known
674 	 * the only application's use of spdk_app_stop() passes
675 	 * a zero; i.e. no fail (non-zero) cases so here we
676 	 * assume there was an internal failure and flag it
677 	 * so we can throw an exception.
678 	 */
679 	if (rc) {
680 		g_spdk_start_failure = true;
681 	} else {
682 		spdk_app_fini();
683 		delete opts;
684 	}
685 	pthread_exit(NULL);
686 
687 }
688 
689 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
690 		 const std::string &bdev, uint64_t cache_size_in_mb)
691 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
692 {
693 	struct spdk_app_opts *opts = new struct spdk_app_opts;
694 
695 	spdk_app_opts_init(opts);
696 	opts->name = "rocksdb";
697 	opts->config_file = mConfig.c_str();
698 	opts->shutdown_cb = spdk_rocksdb_shutdown;
699 
700 	spdk_fs_set_cache_size(cache_size_in_mb);
701 	g_bdev_name = mBdev;
702 
703 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
704 	while (!g_spdk_ready && !g_spdk_start_failure)
705 		;
706 	if (g_spdk_start_failure) {
707 		delete opts;
708 		throw SpdkAppStartException("spdk_app_start() unable to start spdk_rocksdb_run()");
709 	}
710 
711 	SpdkInitializeThread();
712 }
713 
714 SpdkEnv::~SpdkEnv()
715 {
716 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
717 	 * do the work before the test quit.
718 	 */
719 	if (g_fs != NULL) {
720 		spdk_fs_iter iter;
721 		struct spdk_file *file;
722 
723 		if (!g_sync_args.channel) {
724 			SpdkInitializeThread();
725 		}
726 		iter = spdk_fs_iter_first(g_fs);
727 		while (iter != NULL) {
728 			file = spdk_fs_iter_get_file(iter);
729 			spdk_file_close(file, g_sync_args.channel);
730 			iter = spdk_fs_iter_next(iter);
731 		}
732 	}
733 
734 	spdk_app_start_shutdown();
735 	pthread_join(mSpdkTid, NULL);
736 }
737 
738 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
739 		const std::string &bdev, uint64_t cache_size_in_mb)
740 {
741 	try {
742 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
743 		if (g_fs != NULL) {
744 			return spdk_env;
745 		} else {
746 			delete spdk_env;
747 			return NULL;
748 		}
749 	} catch (SpdkAppStartException &e) {
750 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
751 		return NULL;
752 	} catch (...) {
753 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
754 		return NULL;
755 	}
756 }
757 
758 } // namespace rocksdb
759