xref: /spdk/lib/rocksdb/env_spdk.cc (revision fa2d95b3fe66e7f5c543eaef89fa00d4eaa0e6e7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "rocksdb/env.h"
35 #include <set>
36 #include <iostream>
37 #include <stdexcept>
38 
39 extern "C" {
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/blob.h"
43 #include "spdk/blobfs.h"
44 #include "spdk/blob_bdev.h"
45 #include "spdk/log.h"
46 #include "spdk/thread.h"
47 #include "spdk/bdev.h"
48 
49 #include "spdk_internal/thread.h"
50 }
51 
52 namespace rocksdb
53 {
54 
55 struct spdk_filesystem *g_fs = NULL;
56 struct spdk_bs_dev *g_bs_dev;
57 uint32_t g_lcore = 0;
58 std::string g_bdev_name;
59 volatile bool g_spdk_ready = false;
60 volatile bool g_spdk_start_failure = false;
61 struct sync_args {
62 	struct spdk_io_channel *channel;
63 };
64 
65 __thread struct sync_args g_sync_args;
66 
67 static void
68 __call_fn(void *arg1, void *arg2)
69 {
70 	fs_request_fn fn;
71 
72 	fn = (fs_request_fn)arg1;
73 	fn(arg2);
74 }
75 
76 static void
77 __send_request(fs_request_fn fn, void *arg)
78 {
79 	struct spdk_event *event;
80 
81 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
82 	spdk_event_call(event);
83 }
84 
85 static std::string
86 sanitize_path(const std::string &input, const std::string &mount_directory)
87 {
88 	int index = 0;
89 	std::string name;
90 	std::string input_tmp;
91 
92 	input_tmp = input.substr(mount_directory.length(), input.length());
93 	for (const char &c : input_tmp) {
94 		if (index == 0) {
95 			if (c != '/') {
96 				name = name.insert(index, 1, '/');
97 				index++;
98 			}
99 			name = name.insert(index, 1, c);
100 			index++;
101 		} else {
102 			if (name[index - 1] == '/' && c == '/') {
103 				continue;
104 			} else {
105 				name = name.insert(index, 1, c);
106 				index++;
107 			}
108 		}
109 	}
110 
111 	if (name[name.size() - 1] == '/') {
112 		name = name.erase(name.size() - 1, 1);
113 	}
114 	return name;
115 }
116 
117 class SpdkSequentialFile : public SequentialFile
118 {
119 	struct spdk_file *mFile;
120 	uint64_t mOffset;
121 public:
122 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
123 	virtual ~SpdkSequentialFile();
124 
125 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
126 	virtual Status Skip(uint64_t n) override;
127 	virtual Status InvalidateCache(size_t offset, size_t length) override;
128 };
129 
130 SpdkSequentialFile::~SpdkSequentialFile(void)
131 {
132 	spdk_file_close(mFile, g_sync_args.channel);
133 }
134 
135 Status
136 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
137 {
138 	int64_t ret;
139 
140 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
141 	if (ret >= 0) {
142 		mOffset += ret;
143 		*result = Slice(scratch, ret);
144 		return Status::OK();
145 	} else {
146 		errno = -ret;
147 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
148 	}
149 }
150 
151 Status
152 SpdkSequentialFile::Skip(uint64_t n)
153 {
154 	mOffset += n;
155 	return Status::OK();
156 }
157 
158 Status
159 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
160 				    __attribute__((unused)) size_t length)
161 {
162 	return Status::OK();
163 }
164 
165 class SpdkRandomAccessFile : public RandomAccessFile
166 {
167 	struct spdk_file *mFile;
168 public:
169 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
170 	virtual ~SpdkRandomAccessFile();
171 
172 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
173 	virtual Status InvalidateCache(size_t offset, size_t length) override;
174 };
175 
176 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
177 {
178 	spdk_file_close(mFile, g_sync_args.channel);
179 }
180 
181 Status
182 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
183 {
184 	int64_t rc;
185 
186 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
187 	if (rc >= 0) {
188 		*result = Slice(scratch, n);
189 		return Status::OK();
190 	} else {
191 		errno = -rc;
192 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
193 	}
194 }
195 
196 Status
197 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
198 				      __attribute__((unused)) size_t length)
199 {
200 	return Status::OK();
201 }
202 
203 class SpdkWritableFile : public WritableFile
204 {
205 	struct spdk_file *mFile;
206 	uint64_t mSize;
207 
208 public:
209 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
210 	~SpdkWritableFile()
211 	{
212 		if (mFile != NULL) {
213 			Close();
214 		}
215 	}
216 
217 	virtual void SetIOPriority(Env::IOPriority pri)
218 	{
219 		if (pri == Env::IO_HIGH) {
220 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
221 		}
222 	}
223 
224 	virtual Status Truncate(uint64_t size) override
225 	{
226 		int rc;
227 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
228 		if (!rc) {
229 			mSize = size;
230 			return Status::OK();
231 		} else {
232 			errno = -rc;
233 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
234 		}
235 	}
236 	virtual Status Close() override
237 	{
238 		spdk_file_close(mFile, g_sync_args.channel);
239 		mFile = NULL;
240 		return Status::OK();
241 	}
242 	virtual Status Append(const Slice &data) override;
243 	virtual Status Flush() override
244 	{
245 		return Status::OK();
246 	}
247 	virtual Status Sync() override
248 	{
249 		int rc;
250 
251 		rc = spdk_file_sync(mFile, g_sync_args.channel);
252 		if (!rc) {
253 			return Status::OK();
254 		} else {
255 			errno = -rc;
256 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
257 		}
258 	}
259 	virtual Status Fsync() override
260 	{
261 		int rc;
262 
263 		rc = spdk_file_sync(mFile, g_sync_args.channel);
264 		if (!rc) {
265 			return Status::OK();
266 		} else {
267 			errno = -rc;
268 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
269 		}
270 	}
271 	virtual bool IsSyncThreadSafe() const override
272 	{
273 		return true;
274 	}
275 	virtual uint64_t GetFileSize() override
276 	{
277 		return mSize;
278 	}
279 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
280 				       __attribute__((unused)) size_t length) override
281 	{
282 		return Status::OK();
283 	}
284 	virtual Status Allocate(uint64_t offset, uint64_t len) override
285 	{
286 		int rc;
287 
288 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
289 		if (!rc) {
290 			return Status::OK();
291 		} else {
292 			errno = -rc;
293 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
294 		}
295 	}
296 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
297 				 __attribute__((unused)) uint64_t nbytes) override
298 	{
299 		int rc;
300 
301 		/*
302 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
303 		 *  the whole file.
304 		 */
305 		rc = spdk_file_sync(mFile, g_sync_args.channel);
306 		if (!rc) {
307 			return Status::OK();
308 		} else {
309 			errno = -rc;
310 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
311 		}
312 	}
313 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
314 	{
315 		int rc;
316 
317 		rc = spdk_file_get_id(mFile, id, max_size);
318 		if (rc < 0) {
319 			return 0;
320 		} else {
321 			return rc;
322 		}
323 	}
324 };
325 
326 Status
327 SpdkWritableFile::Append(const Slice &data)
328 {
329 	int64_t rc;
330 
331 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
332 	if (rc >= 0) {
333 		mSize += data.size();
334 		return Status::OK();
335 	} else {
336 		errno = -rc;
337 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
338 	}
339 }
340 
341 class SpdkDirectory : public Directory
342 {
343 public:
344 	SpdkDirectory() {}
345 	~SpdkDirectory() {}
346 	Status Fsync() override
347 	{
348 		return Status::OK();
349 	}
350 };
351 
352 class SpdkAppStartException : public std::runtime_error
353 {
354 public:
355 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
356 };
357 
358 class SpdkEnv : public EnvWrapper
359 {
360 private:
361 	pthread_t mSpdkTid;
362 	std::string mDirectory;
363 	std::string mConfig;
364 	std::string mBdev;
365 
366 public:
367 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
368 		const std::string &bdev, uint64_t cache_size_in_mb);
369 
370 	virtual ~SpdkEnv();
371 
372 	virtual Status NewSequentialFile(const std::string &fname,
373 					 unique_ptr<SequentialFile> *result,
374 					 const EnvOptions &options) override
375 	{
376 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
377 			struct spdk_file *file;
378 			int rc;
379 
380 			std::string name = sanitize_path(fname, mDirectory);
381 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
382 					       name.c_str(), 0, &file);
383 			if (rc == 0) {
384 				result->reset(new SpdkSequentialFile(file));
385 				return Status::OK();
386 			} else {
387 				/* Myrocks engine uses errno(ENOENT) as one
388 				 * special condition, for the purpose to
389 				 * support MySQL, set the errno to right value.
390 				 */
391 				errno = -rc;
392 				return Status::IOError(name, strerror(errno));
393 			}
394 		} else {
395 			return EnvWrapper::NewSequentialFile(fname, result, options);
396 		}
397 	}
398 
399 	virtual Status NewRandomAccessFile(const std::string &fname,
400 					   unique_ptr<RandomAccessFile> *result,
401 					   const EnvOptions &options) override
402 	{
403 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
404 			std::string name = sanitize_path(fname, mDirectory);
405 			struct spdk_file *file;
406 			int rc;
407 
408 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
409 					       name.c_str(), 0, &file);
410 			if (rc == 0) {
411 				result->reset(new SpdkRandomAccessFile(file));
412 				return Status::OK();
413 			} else {
414 				errno = -rc;
415 				return Status::IOError(name, strerror(errno));
416 			}
417 		} else {
418 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
419 		}
420 	}
421 
422 	virtual Status NewWritableFile(const std::string &fname,
423 				       unique_ptr<WritableFile> *result,
424 				       const EnvOptions &options) override
425 	{
426 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
427 			std::string name = sanitize_path(fname, mDirectory);
428 			struct spdk_file *file;
429 			int rc;
430 
431 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
432 					       SPDK_BLOBFS_OPEN_CREATE, &file);
433 			if (rc == 0) {
434 				result->reset(new SpdkWritableFile(file));
435 				return Status::OK();
436 			} else {
437 				errno = -rc;
438 				return Status::IOError(name, strerror(errno));
439 			}
440 		} else {
441 			return EnvWrapper::NewWritableFile(fname, result, options);
442 		}
443 	}
444 
445 	virtual Status ReuseWritableFile(const std::string &fname,
446 					 const std::string &old_fname,
447 					 unique_ptr<WritableFile> *result,
448 					 const EnvOptions &options) override
449 	{
450 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
451 	}
452 
453 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
454 				    unique_ptr<Directory> *result) override
455 	{
456 		result->reset(new SpdkDirectory());
457 		return Status::OK();
458 	}
459 	virtual Status FileExists(const std::string &fname) override
460 	{
461 		struct spdk_file_stat stat;
462 		int rc;
463 		std::string name = sanitize_path(fname, mDirectory);
464 
465 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
466 		if (rc == 0) {
467 			return Status::OK();
468 		}
469 		return EnvWrapper::FileExists(fname);
470 	}
471 	virtual Status RenameFile(const std::string &src, const std::string &t) override
472 	{
473 		int rc;
474 		std::string src_name = sanitize_path(src, mDirectory);
475 		std::string target_name = sanitize_path(t, mDirectory);
476 
477 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
478 					 src_name.c_str(), target_name.c_str());
479 		if (rc == -ENOENT) {
480 			return EnvWrapper::RenameFile(src, t);
481 		}
482 		return Status::OK();
483 	}
484 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
485 				__attribute__((unused)) const std::string &t) override
486 	{
487 		return Status::NotSupported("SpdkEnv does not support LinkFile");
488 	}
489 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
490 	{
491 		struct spdk_file_stat stat;
492 		int rc;
493 		std::string name = sanitize_path(fname, mDirectory);
494 
495 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
496 		if (rc == -ENOENT) {
497 			return EnvWrapper::GetFileSize(fname, size);
498 		}
499 		*size = stat.size;
500 		return Status::OK();
501 	}
502 	virtual Status DeleteFile(const std::string &fname) override
503 	{
504 		int rc;
505 		std::string name = sanitize_path(fname, mDirectory);
506 
507 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
508 		if (rc == -ENOENT) {
509 			return EnvWrapper::DeleteFile(fname);
510 		}
511 		return Status::OK();
512 	}
513 	virtual void StartThread(void (*function)(void *arg), void *arg) override;
514 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
515 	{
516 		std::string name = sanitize_path(fname, mDirectory);
517 		int64_t rc;
518 
519 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
520 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
521 		if (!rc) {
522 			return Status::OK();
523 		} else {
524 			errno = -rc;
525 			return Status::IOError(name, strerror(errno));
526 		}
527 	}
528 	virtual Status UnlockFile(FileLock *lock) override
529 	{
530 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
531 		return Status::OK();
532 	}
533 	virtual Status GetChildren(const std::string &dir,
534 				   std::vector<std::string> *result) override
535 	{
536 		std::string::size_type pos;
537 		std::set<std::string> dir_and_file_set;
538 		std::string full_path;
539 		std::string filename;
540 		std::string dir_name;
541 
542 		if (dir.find("archive") != std::string::npos) {
543 			return Status::OK();
544 		}
545 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
546 			spdk_fs_iter iter;
547 			struct spdk_file *file;
548 			dir_name = sanitize_path(dir, mDirectory);
549 
550 			iter = spdk_fs_iter_first(g_fs);
551 			while (iter != NULL) {
552 				file = spdk_fs_iter_get_file(iter);
553 				full_path = spdk_file_get_name(file);
554 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
555 					iter = spdk_fs_iter_next(iter);
556 					continue;
557 				}
558 				pos = full_path.find("/", dir_name.length() + 1);
559 
560 				if (pos != std::string::npos) {
561 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
562 				} else {
563 					filename = full_path.substr(dir_name.length() + 1);
564 				}
565 				dir_and_file_set.insert(filename);
566 				iter = spdk_fs_iter_next(iter);
567 			}
568 
569 			for (auto &s : dir_and_file_set) {
570 				result->push_back(s);
571 			}
572 
573 			result->push_back(".");
574 			result->push_back("..");
575 
576 			return Status::OK();
577 		}
578 		return EnvWrapper::GetChildren(dir, result);
579 	}
580 };
581 
582 void SpdkInitializeThread(void)
583 {
584 	struct spdk_thread *thread;
585 
586 	if (g_fs != NULL) {
587 		thread = spdk_thread_create("spdk_rocksdb");
588 		spdk_set_thread(thread);
589 		g_sync_args.channel = spdk_fs_alloc_io_channel_sync(g_fs);
590 	}
591 }
592 
593 struct SpdkThreadState {
594 	void (*user_function)(void *);
595 	void *arg;
596 };
597 
598 static void SpdkStartThreadWrapper(void *arg)
599 {
600 	SpdkThreadState *state = reinterpret_cast<SpdkThreadState *>(arg);
601 
602 	SpdkInitializeThread();
603 	state->user_function(state->arg);
604 	delete state;
605 }
606 
607 void SpdkEnv::StartThread(void (*function)(void *arg), void *arg)
608 {
609 	SpdkThreadState *state = new SpdkThreadState;
610 	state->user_function = function;
611 	state->arg = arg;
612 	EnvWrapper::StartThread(SpdkStartThreadWrapper, state);
613 }
614 
615 static void
616 fs_load_cb(__attribute__((unused)) void *ctx,
617 	   struct spdk_filesystem *fs, int fserrno)
618 {
619 	if (fserrno == 0) {
620 		g_fs = fs;
621 	}
622 	g_spdk_ready = true;
623 }
624 
625 static void
626 spdk_rocksdb_run(__attribute__((unused)) void *arg1)
627 {
628 	struct spdk_bdev *bdev;
629 
630 	bdev = spdk_bdev_get_by_name(g_bdev_name.c_str());
631 
632 	if (bdev == NULL) {
633 		SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str());
634 		exit(1);
635 	}
636 
637 	g_lcore = spdk_env_get_first_core();
638 
639 	g_bs_dev = spdk_bdev_create_bs_dev(bdev, NULL, NULL);
640 	printf("using bdev %s\n", g_bdev_name.c_str());
641 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
642 }
643 
644 static void
645 fs_unload_cb(__attribute__((unused)) void *ctx,
646 	     __attribute__((unused)) int fserrno)
647 {
648 	assert(fserrno == 0);
649 
650 	spdk_app_stop(0);
651 }
652 
653 static void
654 spdk_rocksdb_shutdown(void)
655 {
656 	if (g_fs != NULL) {
657 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
658 	} else {
659 		fs_unload_cb(NULL, 0);
660 	}
661 }
662 
663 static void *
664 initialize_spdk(void *arg)
665 {
666 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
667 	int rc;
668 
669 	rc = spdk_app_start(opts, spdk_rocksdb_run, NULL);
670 	/*
671 	 * TODO:  Revisit for case of internal failure of
672 	 * spdk_app_start(), itself.  At this time, it's known
673 	 * the only application's use of spdk_app_stop() passes
674 	 * a zero; i.e. no fail (non-zero) cases so here we
675 	 * assume there was an internal failure and flag it
676 	 * so we can throw an exception.
677 	 */
678 	if (rc) {
679 		g_spdk_start_failure = true;
680 	} else {
681 		spdk_app_fini();
682 		delete opts;
683 	}
684 	pthread_exit(NULL);
685 
686 }
687 
688 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
689 		 const std::string &bdev, uint64_t cache_size_in_mb)
690 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
691 {
692 	struct spdk_app_opts *opts = new struct spdk_app_opts;
693 
694 	spdk_app_opts_init(opts);
695 	opts->name = "rocksdb";
696 	opts->config_file = mConfig.c_str();
697 	opts->shutdown_cb = spdk_rocksdb_shutdown;
698 
699 	spdk_fs_set_cache_size(cache_size_in_mb);
700 	g_bdev_name = mBdev;
701 
702 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
703 	while (!g_spdk_ready && !g_spdk_start_failure)
704 		;
705 	if (g_spdk_start_failure) {
706 		delete opts;
707 		throw SpdkAppStartException("spdk_app_start() unable to start spdk_rocksdb_run()");
708 	}
709 
710 	SpdkInitializeThread();
711 }
712 
713 SpdkEnv::~SpdkEnv()
714 {
715 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
716 	 * do the work before the test quit.
717 	 */
718 	if (g_fs != NULL) {
719 		spdk_fs_iter iter;
720 		struct spdk_file *file;
721 
722 		if (!g_sync_args.channel) {
723 			SpdkInitializeThread();
724 		}
725 		iter = spdk_fs_iter_first(g_fs);
726 		while (iter != NULL) {
727 			file = spdk_fs_iter_get_file(iter);
728 			spdk_file_close(file, g_sync_args.channel);
729 			iter = spdk_fs_iter_next(iter);
730 		}
731 	}
732 
733 	spdk_app_start_shutdown();
734 	pthread_join(mSpdkTid, NULL);
735 }
736 
737 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
738 		const std::string &bdev, uint64_t cache_size_in_mb)
739 {
740 	try {
741 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
742 		if (g_fs != NULL) {
743 			return spdk_env;
744 		} else {
745 			delete spdk_env;
746 			return NULL;
747 		}
748 	} catch (SpdkAppStartException &e) {
749 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
750 		return NULL;
751 	} catch (...) {
752 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
753 		return NULL;
754 	}
755 }
756 
757 } // namespace rocksdb
758