xref: /spdk/lib/rocksdb/env_spdk.cc (revision 2172c432cfdaecc5a279d64e37c6b51e794683c1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "rocksdb/env.h"
35 #include <set>
36 #include <iostream>
37 #include <stdexcept>
38 
39 extern "C" {
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/blob.h"
43 #include "spdk/blobfs.h"
44 #include "spdk/blob_bdev.h"
45 #include "spdk/log.h"
46 #include "spdk/thread.h"
47 #include "spdk/bdev.h"
48 
49 #include "spdk_internal/thread.h"
50 }
51 
52 namespace rocksdb
53 {
54 
55 struct spdk_filesystem *g_fs = NULL;
56 struct spdk_bs_dev *g_bs_dev;
57 uint32_t g_lcore = 0;
58 std::string g_bdev_name;
59 volatile bool g_spdk_ready = false;
60 volatile bool g_spdk_start_failure = false;
61 
62 void SpdkInitializeThread(void);
63 
64 class SpdkThreadCtx
65 {
66 public:
67 	struct spdk_fs_thread_ctx *channel;
68 
69 	SpdkThreadCtx(void) : channel(NULL)
70 	{
71 		SpdkInitializeThread();
72 	}
73 
74 	~SpdkThreadCtx(void)
75 	{
76 		if (channel) {
77 			spdk_fs_free_thread_ctx(channel);
78 			channel = NULL;
79 		}
80 	}
81 
82 private:
83 	SpdkThreadCtx(const SpdkThreadCtx &);
84 	SpdkThreadCtx &operator=(const SpdkThreadCtx &);
85 };
86 
87 thread_local SpdkThreadCtx g_sync_args;
88 
89 static void
90 set_channel()
91 {
92 	struct spdk_thread *thread;
93 
94 	if (g_fs != NULL && g_sync_args.channel == NULL) {
95 		thread = spdk_thread_create("spdK_rocksdb", NULL);
96 		spdk_set_thread(thread);
97 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
98 	}
99 }
100 
101 static void
102 __call_fn(void *arg1, void *arg2)
103 {
104 	fs_request_fn fn;
105 
106 	fn = (fs_request_fn)arg1;
107 	fn(arg2);
108 }
109 
110 static void
111 __send_request(fs_request_fn fn, void *arg)
112 {
113 	struct spdk_event *event;
114 
115 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
116 	spdk_event_call(event);
117 }
118 
119 static std::string
120 sanitize_path(const std::string &input, const std::string &mount_directory)
121 {
122 	int index = 0;
123 	std::string name;
124 	std::string input_tmp;
125 
126 	input_tmp = input.substr(mount_directory.length(), input.length());
127 	for (const char &c : input_tmp) {
128 		if (index == 0) {
129 			if (c != '/') {
130 				name = name.insert(index, 1, '/');
131 				index++;
132 			}
133 			name = name.insert(index, 1, c);
134 			index++;
135 		} else {
136 			if (name[index - 1] == '/' && c == '/') {
137 				continue;
138 			} else {
139 				name = name.insert(index, 1, c);
140 				index++;
141 			}
142 		}
143 	}
144 
145 	if (name[name.size() - 1] == '/') {
146 		name = name.erase(name.size() - 1, 1);
147 	}
148 	return name;
149 }
150 
151 class SpdkSequentialFile : public SequentialFile
152 {
153 	struct spdk_file *mFile;
154 	uint64_t mOffset;
155 public:
156 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
157 	virtual ~SpdkSequentialFile();
158 
159 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
160 	virtual Status Skip(uint64_t n) override;
161 	virtual Status InvalidateCache(size_t offset, size_t length) override;
162 };
163 
164 SpdkSequentialFile::~SpdkSequentialFile(void)
165 {
166 	set_channel();
167 	spdk_file_close(mFile, g_sync_args.channel);
168 }
169 
170 Status
171 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
172 {
173 	int64_t ret;
174 
175 	set_channel();
176 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
177 	if (ret >= 0) {
178 		mOffset += ret;
179 		*result = Slice(scratch, ret);
180 		return Status::OK();
181 	} else {
182 		errno = -ret;
183 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
184 	}
185 }
186 
187 Status
188 SpdkSequentialFile::Skip(uint64_t n)
189 {
190 	mOffset += n;
191 	return Status::OK();
192 }
193 
194 Status
195 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
196 				    __attribute__((unused)) size_t length)
197 {
198 	return Status::OK();
199 }
200 
201 class SpdkRandomAccessFile : public RandomAccessFile
202 {
203 	struct spdk_file *mFile;
204 public:
205 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
206 	virtual ~SpdkRandomAccessFile();
207 
208 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
209 	virtual Status InvalidateCache(size_t offset, size_t length) override;
210 };
211 
212 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
213 {
214 	set_channel();
215 	spdk_file_close(mFile, g_sync_args.channel);
216 }
217 
218 Status
219 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
220 {
221 	int64_t rc;
222 
223 	set_channel();
224 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
225 	if (rc >= 0) {
226 		*result = Slice(scratch, n);
227 		return Status::OK();
228 	} else {
229 		errno = -rc;
230 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
231 	}
232 }
233 
234 Status
235 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
236 				      __attribute__((unused)) size_t length)
237 {
238 	return Status::OK();
239 }
240 
241 class SpdkWritableFile : public WritableFile
242 {
243 	struct spdk_file *mFile;
244 	uint64_t mSize;
245 
246 public:
247 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
248 	~SpdkWritableFile()
249 	{
250 		if (mFile != NULL) {
251 			Close();
252 		}
253 	}
254 
255 	virtual void SetIOPriority(Env::IOPriority pri)
256 	{
257 		if (pri == Env::IO_HIGH) {
258 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
259 		}
260 	}
261 
262 	virtual Status Truncate(uint64_t size) override
263 	{
264 		int rc;
265 
266 		set_channel();
267 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
268 		if (!rc) {
269 			mSize = size;
270 			return Status::OK();
271 		} else {
272 			errno = -rc;
273 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
274 		}
275 	}
276 	virtual Status Close() override
277 	{
278 		set_channel();
279 		spdk_file_close(mFile, g_sync_args.channel);
280 		mFile = NULL;
281 		return Status::OK();
282 	}
283 	virtual Status Append(const Slice &data) override;
284 	virtual Status Flush() override
285 	{
286 		return Status::OK();
287 	}
288 	virtual Status Sync() override
289 	{
290 		int rc;
291 
292 		set_channel();
293 		rc = spdk_file_sync(mFile, g_sync_args.channel);
294 		if (!rc) {
295 			return Status::OK();
296 		} else {
297 			errno = -rc;
298 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
299 		}
300 	}
301 	virtual Status Fsync() override
302 	{
303 		int rc;
304 
305 		set_channel();
306 		rc = spdk_file_sync(mFile, g_sync_args.channel);
307 		if (!rc) {
308 			return Status::OK();
309 		} else {
310 			errno = -rc;
311 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
312 		}
313 	}
314 	virtual bool IsSyncThreadSafe() const override
315 	{
316 		return true;
317 	}
318 	virtual uint64_t GetFileSize() override
319 	{
320 		return mSize;
321 	}
322 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
323 				       __attribute__((unused)) size_t length) override
324 	{
325 		return Status::OK();
326 	}
327 	virtual Status Allocate(uint64_t offset, uint64_t len) override
328 	{
329 		int rc;
330 
331 		set_channel();
332 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
333 		if (!rc) {
334 			return Status::OK();
335 		} else {
336 			errno = -rc;
337 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
338 		}
339 	}
340 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
341 				 __attribute__((unused)) uint64_t nbytes) override
342 	{
343 		int rc;
344 
345 		/*
346 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
347 		 *  the whole file.
348 		 */
349 		set_channel();
350 		rc = spdk_file_sync(mFile, g_sync_args.channel);
351 		if (!rc) {
352 			return Status::OK();
353 		} else {
354 			errno = -rc;
355 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
356 		}
357 	}
358 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
359 	{
360 		int rc;
361 
362 		rc = spdk_file_get_id(mFile, id, max_size);
363 		if (rc < 0) {
364 			return 0;
365 		} else {
366 			return rc;
367 		}
368 	}
369 };
370 
371 Status
372 SpdkWritableFile::Append(const Slice &data)
373 {
374 	int64_t rc;
375 
376 	set_channel();
377 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
378 	if (rc >= 0) {
379 		mSize += data.size();
380 		return Status::OK();
381 	} else {
382 		errno = -rc;
383 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
384 	}
385 }
386 
387 class SpdkDirectory : public Directory
388 {
389 public:
390 	SpdkDirectory() {}
391 	~SpdkDirectory() {}
392 	Status Fsync() override
393 	{
394 		return Status::OK();
395 	}
396 };
397 
398 class SpdkAppStartException : public std::runtime_error
399 {
400 public:
401 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
402 };
403 
404 class SpdkEnv : public EnvWrapper
405 {
406 private:
407 	pthread_t mSpdkTid;
408 	std::string mDirectory;
409 	std::string mConfig;
410 	std::string mBdev;
411 
412 public:
413 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
414 		const std::string &bdev, uint64_t cache_size_in_mb);
415 
416 	virtual ~SpdkEnv();
417 
418 	virtual Status NewSequentialFile(const std::string &fname,
419 					 std::unique_ptr<SequentialFile> *result,
420 					 const EnvOptions &options) override
421 	{
422 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
423 			struct spdk_file *file;
424 			int rc;
425 
426 			std::string name = sanitize_path(fname, mDirectory);
427 			set_channel();
428 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
429 					       name.c_str(), 0, &file);
430 			if (rc == 0) {
431 				result->reset(new SpdkSequentialFile(file));
432 				return Status::OK();
433 			} else {
434 				/* Myrocks engine uses errno(ENOENT) as one
435 				 * special condition, for the purpose to
436 				 * support MySQL, set the errno to right value.
437 				 */
438 				errno = -rc;
439 				return Status::IOError(name, strerror(errno));
440 			}
441 		} else {
442 			return EnvWrapper::NewSequentialFile(fname, result, options);
443 		}
444 	}
445 
446 	virtual Status NewRandomAccessFile(const std::string &fname,
447 					   std::unique_ptr<RandomAccessFile> *result,
448 					   const EnvOptions &options) override
449 	{
450 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
451 			std::string name = sanitize_path(fname, mDirectory);
452 			struct spdk_file *file;
453 			int rc;
454 
455 			set_channel();
456 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
457 					       name.c_str(), 0, &file);
458 			if (rc == 0) {
459 				result->reset(new SpdkRandomAccessFile(file));
460 				return Status::OK();
461 			} else {
462 				errno = -rc;
463 				return Status::IOError(name, strerror(errno));
464 			}
465 		} else {
466 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
467 		}
468 	}
469 
470 	virtual Status NewWritableFile(const std::string &fname,
471 				       std::unique_ptr<WritableFile> *result,
472 				       const EnvOptions &options) override
473 	{
474 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
475 			std::string name = sanitize_path(fname, mDirectory);
476 			struct spdk_file *file;
477 			int rc;
478 
479 			set_channel();
480 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
481 					       SPDK_BLOBFS_OPEN_CREATE, &file);
482 			if (rc == 0) {
483 				result->reset(new SpdkWritableFile(file));
484 				return Status::OK();
485 			} else {
486 				errno = -rc;
487 				return Status::IOError(name, strerror(errno));
488 			}
489 		} else {
490 			return EnvWrapper::NewWritableFile(fname, result, options);
491 		}
492 	}
493 
494 	virtual Status ReuseWritableFile(const std::string &fname,
495 					 const std::string &old_fname,
496 					 std::unique_ptr<WritableFile> *result,
497 					 const EnvOptions &options) override
498 	{
499 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
500 	}
501 
502 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
503 				    std::unique_ptr<Directory> *result) override
504 	{
505 		result->reset(new SpdkDirectory());
506 		return Status::OK();
507 	}
508 	virtual Status FileExists(const std::string &fname) override
509 	{
510 		struct spdk_file_stat stat;
511 		int rc;
512 		std::string name = sanitize_path(fname, mDirectory);
513 
514 		set_channel();
515 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
516 		if (rc == 0) {
517 			return Status::OK();
518 		}
519 		return EnvWrapper::FileExists(fname);
520 	}
521 	virtual Status RenameFile(const std::string &src, const std::string &t) override
522 	{
523 		int rc;
524 		std::string src_name = sanitize_path(src, mDirectory);
525 		std::string target_name = sanitize_path(t, mDirectory);
526 
527 		set_channel();
528 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
529 					 src_name.c_str(), target_name.c_str());
530 		if (rc == -ENOENT) {
531 			return EnvWrapper::RenameFile(src, t);
532 		}
533 		return Status::OK();
534 	}
535 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
536 				__attribute__((unused)) const std::string &t) override
537 	{
538 		return Status::NotSupported("SpdkEnv does not support LinkFile");
539 	}
540 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
541 	{
542 		struct spdk_file_stat stat;
543 		int rc;
544 		std::string name = sanitize_path(fname, mDirectory);
545 
546 		set_channel();
547 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
548 		if (rc == -ENOENT) {
549 			return EnvWrapper::GetFileSize(fname, size);
550 		}
551 		*size = stat.size;
552 		return Status::OK();
553 	}
554 	virtual Status DeleteFile(const std::string &fname) override
555 	{
556 		int rc;
557 		std::string name = sanitize_path(fname, mDirectory);
558 
559 		set_channel();
560 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
561 		if (rc == -ENOENT) {
562 			return EnvWrapper::DeleteFile(fname);
563 		}
564 		return Status::OK();
565 	}
566 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
567 	{
568 		std::string name = sanitize_path(fname, mDirectory);
569 		int64_t rc;
570 
571 		set_channel();
572 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
573 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
574 		if (!rc) {
575 			return Status::OK();
576 		} else {
577 			errno = -rc;
578 			return Status::IOError(name, strerror(errno));
579 		}
580 	}
581 	virtual Status UnlockFile(FileLock *lock) override
582 	{
583 		set_channel();
584 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
585 		return Status::OK();
586 	}
587 	virtual Status GetChildren(const std::string &dir,
588 				   std::vector<std::string> *result) override
589 	{
590 		std::string::size_type pos;
591 		std::set<std::string> dir_and_file_set;
592 		std::string full_path;
593 		std::string filename;
594 		std::string dir_name;
595 
596 		if (dir.find("archive") != std::string::npos) {
597 			return Status::OK();
598 		}
599 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
600 			spdk_fs_iter iter;
601 			struct spdk_file *file;
602 			dir_name = sanitize_path(dir, mDirectory);
603 
604 			iter = spdk_fs_iter_first(g_fs);
605 			while (iter != NULL) {
606 				file = spdk_fs_iter_get_file(iter);
607 				full_path = spdk_file_get_name(file);
608 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
609 					iter = spdk_fs_iter_next(iter);
610 					continue;
611 				}
612 				pos = full_path.find("/", dir_name.length() + 1);
613 
614 				if (pos != std::string::npos) {
615 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
616 				} else {
617 					filename = full_path.substr(dir_name.length() + 1);
618 				}
619 				dir_and_file_set.insert(filename);
620 				iter = spdk_fs_iter_next(iter);
621 			}
622 
623 			for (auto &s : dir_and_file_set) {
624 				result->push_back(s);
625 			}
626 
627 			result->push_back(".");
628 			result->push_back("..");
629 
630 			return Status::OK();
631 		}
632 		return EnvWrapper::GetChildren(dir, result);
633 	}
634 };
635 
636 /* The thread local constructor doesn't work for the main thread, since
637  * the filesystem hasn't been loaded yet.  So we break out this
638  * SpdkInitializeThread function, so that the main thread can explicitly
639  * call it after the filesystem has been loaded.
640  */
641 void SpdkInitializeThread(void)
642 {
643 	struct spdk_thread *thread;
644 
645 	if (g_fs != NULL) {
646 		if (g_sync_args.channel) {
647 			spdk_fs_free_thread_ctx(g_sync_args.channel);
648 		}
649 		thread = spdk_thread_create("spdk_rocksdb", NULL);
650 		spdk_set_thread(thread);
651 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
652 	}
653 }
654 
655 static void
656 fs_load_cb(__attribute__((unused)) void *ctx,
657 	   struct spdk_filesystem *fs, int fserrno)
658 {
659 	if (fserrno == 0) {
660 		g_fs = fs;
661 	}
662 	g_spdk_ready = true;
663 }
664 
665 static void
666 rocksdb_run(__attribute__((unused)) void *arg1)
667 {
668 	struct spdk_bdev *bdev;
669 
670 	bdev = spdk_bdev_get_by_name(g_bdev_name.c_str());
671 
672 	if (bdev == NULL) {
673 		SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str());
674 		spdk_app_stop(0);
675 		exit(1);
676 	}
677 
678 	g_lcore = spdk_env_get_first_core();
679 
680 	g_bs_dev = spdk_bdev_create_bs_dev(bdev, NULL, NULL);
681 	printf("using bdev %s\n", g_bdev_name.c_str());
682 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
683 }
684 
685 static void
686 fs_unload_cb(__attribute__((unused)) void *ctx,
687 	     __attribute__((unused)) int fserrno)
688 {
689 	assert(fserrno == 0);
690 
691 	spdk_app_stop(0);
692 }
693 
694 static void
695 rocksdb_shutdown(void)
696 {
697 	if (g_fs != NULL) {
698 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
699 	} else {
700 		fs_unload_cb(NULL, 0);
701 	}
702 }
703 
704 static void *
705 initialize_spdk(void *arg)
706 {
707 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
708 	int rc;
709 
710 	rc = spdk_app_start(opts, rocksdb_run, NULL);
711 	/*
712 	 * TODO:  Revisit for case of internal failure of
713 	 * spdk_app_start(), itself.  At this time, it's known
714 	 * the only application's use of spdk_app_stop() passes
715 	 * a zero; i.e. no fail (non-zero) cases so here we
716 	 * assume there was an internal failure and flag it
717 	 * so we can throw an exception.
718 	 */
719 	if (rc) {
720 		g_spdk_start_failure = true;
721 	} else {
722 		spdk_app_fini();
723 		delete opts;
724 	}
725 	pthread_exit(NULL);
726 
727 }
728 
729 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
730 		 const std::string &bdev, uint64_t cache_size_in_mb)
731 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
732 {
733 	struct spdk_app_opts *opts = new struct spdk_app_opts;
734 
735 	spdk_app_opts_init(opts);
736 	opts->name = "rocksdb";
737 	opts->json_config_file = mConfig.c_str();
738 	opts->shutdown_cb = rocksdb_shutdown;
739 	opts->tpoint_group_mask = "0x80";
740 
741 	spdk_fs_set_cache_size(cache_size_in_mb);
742 	g_bdev_name = mBdev;
743 
744 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
745 	while (!g_spdk_ready && !g_spdk_start_failure)
746 		;
747 	if (g_spdk_start_failure) {
748 		delete opts;
749 		throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()");
750 	}
751 
752 	SpdkInitializeThread();
753 }
754 
755 SpdkEnv::~SpdkEnv()
756 {
757 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
758 	 * do the work before the test quit.
759 	 */
760 	if (g_fs != NULL) {
761 		spdk_fs_iter iter;
762 		struct spdk_file *file;
763 
764 		if (!g_sync_args.channel) {
765 			SpdkInitializeThread();
766 		}
767 
768 		iter = spdk_fs_iter_first(g_fs);
769 		while (iter != NULL) {
770 			file = spdk_fs_iter_get_file(iter);
771 			spdk_file_close(file, g_sync_args.channel);
772 			iter = spdk_fs_iter_next(iter);
773 		}
774 	}
775 
776 	spdk_app_start_shutdown();
777 	pthread_join(mSpdkTid, NULL);
778 }
779 
780 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
781 		const std::string &bdev, uint64_t cache_size_in_mb)
782 {
783 	try {
784 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
785 		if (g_fs != NULL) {
786 			return spdk_env;
787 		} else {
788 			delete spdk_env;
789 			return NULL;
790 		}
791 	} catch (SpdkAppStartException &e) {
792 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
793 		return NULL;
794 	} catch (...) {
795 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
796 		return NULL;
797 	}
798 }
799 
800 } // namespace rocksdb
801