xref: /spdk/lib/rocksdb/env_spdk.cc (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "rocksdb/env.h"
35 #include <set>
36 #include <iostream>
37 #include <stdexcept>
38 
39 extern "C" {
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/blob.h"
43 #include "spdk/blobfs.h"
44 #include "spdk/blob_bdev.h"
45 #include "spdk/log.h"
46 #include "spdk/thread.h"
47 #include "spdk/bdev.h"
48 }
49 
50 namespace rocksdb
51 {
52 
53 struct spdk_filesystem *g_fs = NULL;
54 struct spdk_bs_dev *g_bs_dev;
55 uint32_t g_lcore = 0;
56 std::string g_bdev_name;
57 volatile bool g_spdk_ready = false;
58 volatile bool g_spdk_start_failure = false;
59 
60 void SpdkInitializeThread(void);
61 
62 class SpdkThreadCtx
63 {
64 public:
65 	struct spdk_fs_thread_ctx *channel;
66 
67 	SpdkThreadCtx(void) : channel(NULL)
68 	{
69 		SpdkInitializeThread();
70 	}
71 
72 	~SpdkThreadCtx(void)
73 	{
74 		if (channel) {
75 			spdk_fs_free_thread_ctx(channel);
76 			channel = NULL;
77 		}
78 	}
79 
80 private:
81 	SpdkThreadCtx(const SpdkThreadCtx &);
82 	SpdkThreadCtx &operator=(const SpdkThreadCtx &);
83 };
84 
85 thread_local SpdkThreadCtx g_sync_args;
86 
87 static void
88 set_channel()
89 {
90 	struct spdk_thread *thread;
91 
92 	if (g_fs != NULL && g_sync_args.channel == NULL) {
93 		thread = spdk_thread_create("spdK_rocksdb", NULL);
94 		spdk_set_thread(thread);
95 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
96 	}
97 }
98 
99 static void
100 __call_fn(void *arg1, void *arg2)
101 {
102 	fs_request_fn fn;
103 
104 	fn = (fs_request_fn)arg1;
105 	fn(arg2);
106 }
107 
108 static void
109 __send_request(fs_request_fn fn, void *arg)
110 {
111 	struct spdk_event *event;
112 
113 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
114 	spdk_event_call(event);
115 }
116 
117 static std::string
118 sanitize_path(const std::string &input, const std::string &mount_directory)
119 {
120 	int index = 0;
121 	std::string name;
122 	std::string input_tmp;
123 
124 	input_tmp = input.substr(mount_directory.length(), input.length());
125 	for (const char &c : input_tmp) {
126 		if (index == 0) {
127 			if (c != '/') {
128 				name = name.insert(index, 1, '/');
129 				index++;
130 			}
131 			name = name.insert(index, 1, c);
132 			index++;
133 		} else {
134 			if (name[index - 1] == '/' && c == '/') {
135 				continue;
136 			} else {
137 				name = name.insert(index, 1, c);
138 				index++;
139 			}
140 		}
141 	}
142 
143 	if (name[name.size() - 1] == '/') {
144 		name = name.erase(name.size() - 1, 1);
145 	}
146 	return name;
147 }
148 
149 class SpdkSequentialFile : public SequentialFile
150 {
151 	struct spdk_file *mFile;
152 	uint64_t mOffset;
153 public:
154 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
155 	virtual ~SpdkSequentialFile();
156 
157 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
158 	virtual Status Skip(uint64_t n) override;
159 	virtual Status InvalidateCache(size_t offset, size_t length) override;
160 };
161 
162 SpdkSequentialFile::~SpdkSequentialFile(void)
163 {
164 	set_channel();
165 	spdk_file_close(mFile, g_sync_args.channel);
166 }
167 
168 Status
169 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
170 {
171 	int64_t ret;
172 
173 	set_channel();
174 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
175 	if (ret >= 0) {
176 		mOffset += ret;
177 		*result = Slice(scratch, ret);
178 		return Status::OK();
179 	} else {
180 		errno = -ret;
181 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
182 	}
183 }
184 
185 Status
186 SpdkSequentialFile::Skip(uint64_t n)
187 {
188 	mOffset += n;
189 	return Status::OK();
190 }
191 
192 Status
193 SpdkSequentialFile::InvalidateCache(__attribute__((unused)) size_t offset,
194 				    __attribute__((unused)) size_t length)
195 {
196 	return Status::OK();
197 }
198 
199 class SpdkRandomAccessFile : public RandomAccessFile
200 {
201 	struct spdk_file *mFile;
202 public:
203 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
204 	virtual ~SpdkRandomAccessFile();
205 
206 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
207 	virtual Status InvalidateCache(size_t offset, size_t length) override;
208 };
209 
210 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
211 {
212 	set_channel();
213 	spdk_file_close(mFile, g_sync_args.channel);
214 }
215 
216 Status
217 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
218 {
219 	int64_t rc;
220 
221 	set_channel();
222 	rc = spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
223 	if (rc >= 0) {
224 		*result = Slice(scratch, n);
225 		return Status::OK();
226 	} else {
227 		errno = -rc;
228 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
229 	}
230 }
231 
232 Status
233 SpdkRandomAccessFile::InvalidateCache(__attribute__((unused)) size_t offset,
234 				      __attribute__((unused)) size_t length)
235 {
236 	return Status::OK();
237 }
238 
239 class SpdkWritableFile : public WritableFile
240 {
241 	struct spdk_file *mFile;
242 	uint64_t mSize;
243 
244 public:
245 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
246 	~SpdkWritableFile()
247 	{
248 		if (mFile != NULL) {
249 			Close();
250 		}
251 	}
252 
253 	virtual void SetIOPriority(Env::IOPriority pri)
254 	{
255 		if (pri == Env::IO_HIGH) {
256 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
257 		}
258 	}
259 
260 	virtual Status Truncate(uint64_t size) override
261 	{
262 		int rc;
263 
264 		set_channel();
265 		rc = spdk_file_truncate(mFile, g_sync_args.channel, size);
266 		if (!rc) {
267 			mSize = size;
268 			return Status::OK();
269 		} else {
270 			errno = -rc;
271 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
272 		}
273 	}
274 	virtual Status Close() override
275 	{
276 		set_channel();
277 		spdk_file_close(mFile, g_sync_args.channel);
278 		mFile = NULL;
279 		return Status::OK();
280 	}
281 	virtual Status Append(const Slice &data) override;
282 	virtual Status Flush() override
283 	{
284 		return Status::OK();
285 	}
286 	virtual Status Sync() override
287 	{
288 		int rc;
289 
290 		set_channel();
291 		rc = spdk_file_sync(mFile, g_sync_args.channel);
292 		if (!rc) {
293 			return Status::OK();
294 		} else {
295 			errno = -rc;
296 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
297 		}
298 	}
299 	virtual Status Fsync() override
300 	{
301 		int rc;
302 
303 		set_channel();
304 		rc = spdk_file_sync(mFile, g_sync_args.channel);
305 		if (!rc) {
306 			return Status::OK();
307 		} else {
308 			errno = -rc;
309 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
310 		}
311 	}
312 	virtual bool IsSyncThreadSafe() const override
313 	{
314 		return true;
315 	}
316 	virtual uint64_t GetFileSize() override
317 	{
318 		return mSize;
319 	}
320 	virtual Status InvalidateCache(__attribute__((unused)) size_t offset,
321 				       __attribute__((unused)) size_t length) override
322 	{
323 		return Status::OK();
324 	}
325 	virtual Status Allocate(uint64_t offset, uint64_t len) override
326 	{
327 		int rc;
328 
329 		set_channel();
330 		rc = spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
331 		if (!rc) {
332 			return Status::OK();
333 		} else {
334 			errno = -rc;
335 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
336 		}
337 	}
338 	virtual Status RangeSync(__attribute__((unused)) uint64_t offset,
339 				 __attribute__((unused)) uint64_t nbytes) override
340 	{
341 		int rc;
342 
343 		/*
344 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
345 		 *  the whole file.
346 		 */
347 		set_channel();
348 		rc = spdk_file_sync(mFile, g_sync_args.channel);
349 		if (!rc) {
350 			return Status::OK();
351 		} else {
352 			errno = -rc;
353 			return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
354 		}
355 	}
356 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
357 	{
358 		int rc;
359 
360 		rc = spdk_file_get_id(mFile, id, max_size);
361 		if (rc < 0) {
362 			return 0;
363 		} else {
364 			return rc;
365 		}
366 	}
367 };
368 
369 Status
370 SpdkWritableFile::Append(const Slice &data)
371 {
372 	int64_t rc;
373 
374 	set_channel();
375 	rc = spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
376 	if (rc >= 0) {
377 		mSize += data.size();
378 		return Status::OK();
379 	} else {
380 		errno = -rc;
381 		return Status::IOError(spdk_file_get_name(mFile), strerror(errno));
382 	}
383 }
384 
385 class SpdkDirectory : public Directory
386 {
387 public:
388 	SpdkDirectory() {}
389 	~SpdkDirectory() {}
390 	Status Fsync() override
391 	{
392 		return Status::OK();
393 	}
394 };
395 
396 class SpdkAppStartException : public std::runtime_error
397 {
398 public:
399 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
400 };
401 
402 class SpdkEnv : public EnvWrapper
403 {
404 private:
405 	pthread_t mSpdkTid;
406 	std::string mDirectory;
407 	std::string mConfig;
408 	std::string mBdev;
409 
410 public:
411 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
412 		const std::string &bdev, uint64_t cache_size_in_mb);
413 
414 	virtual ~SpdkEnv();
415 
416 	virtual Status NewSequentialFile(const std::string &fname,
417 					 std::unique_ptr<SequentialFile> *result,
418 					 const EnvOptions &options) override
419 	{
420 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
421 			struct spdk_file *file;
422 			int rc;
423 
424 			std::string name = sanitize_path(fname, mDirectory);
425 			set_channel();
426 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
427 					       name.c_str(), 0, &file);
428 			if (rc == 0) {
429 				result->reset(new SpdkSequentialFile(file));
430 				return Status::OK();
431 			} else {
432 				/* Myrocks engine uses errno(ENOENT) as one
433 				 * special condition, for the purpose to
434 				 * support MySQL, set the errno to right value.
435 				 */
436 				errno = -rc;
437 				return Status::IOError(name, strerror(errno));
438 			}
439 		} else {
440 			return EnvWrapper::NewSequentialFile(fname, result, options);
441 		}
442 	}
443 
444 	virtual Status NewRandomAccessFile(const std::string &fname,
445 					   std::unique_ptr<RandomAccessFile> *result,
446 					   const EnvOptions &options) override
447 	{
448 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
449 			std::string name = sanitize_path(fname, mDirectory);
450 			struct spdk_file *file;
451 			int rc;
452 
453 			set_channel();
454 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
455 					       name.c_str(), 0, &file);
456 			if (rc == 0) {
457 				result->reset(new SpdkRandomAccessFile(file));
458 				return Status::OK();
459 			} else {
460 				errno = -rc;
461 				return Status::IOError(name, strerror(errno));
462 			}
463 		} else {
464 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
465 		}
466 	}
467 
468 	virtual Status NewWritableFile(const std::string &fname,
469 				       std::unique_ptr<WritableFile> *result,
470 				       const EnvOptions &options) override
471 	{
472 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
473 			std::string name = sanitize_path(fname, mDirectory);
474 			struct spdk_file *file;
475 			int rc;
476 
477 			set_channel();
478 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
479 					       SPDK_BLOBFS_OPEN_CREATE, &file);
480 			if (rc == 0) {
481 				result->reset(new SpdkWritableFile(file));
482 				return Status::OK();
483 			} else {
484 				errno = -rc;
485 				return Status::IOError(name, strerror(errno));
486 			}
487 		} else {
488 			return EnvWrapper::NewWritableFile(fname, result, options);
489 		}
490 	}
491 
492 	virtual Status ReuseWritableFile(const std::string &fname,
493 					 const std::string &old_fname,
494 					 std::unique_ptr<WritableFile> *result,
495 					 const EnvOptions &options) override
496 	{
497 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
498 	}
499 
500 	virtual Status NewDirectory(__attribute__((unused)) const std::string &name,
501 				    std::unique_ptr<Directory> *result) override
502 	{
503 		result->reset(new SpdkDirectory());
504 		return Status::OK();
505 	}
506 	virtual Status FileExists(const std::string &fname) override
507 	{
508 		struct spdk_file_stat stat;
509 		int rc;
510 		std::string name = sanitize_path(fname, mDirectory);
511 
512 		set_channel();
513 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
514 		if (rc == 0) {
515 			return Status::OK();
516 		}
517 		return EnvWrapper::FileExists(fname);
518 	}
519 	virtual Status RenameFile(const std::string &src, const std::string &t) override
520 	{
521 		int rc;
522 		std::string src_name = sanitize_path(src, mDirectory);
523 		std::string target_name = sanitize_path(t, mDirectory);
524 
525 		set_channel();
526 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
527 					 src_name.c_str(), target_name.c_str());
528 		if (rc == -ENOENT) {
529 			return EnvWrapper::RenameFile(src, t);
530 		}
531 		return Status::OK();
532 	}
533 	virtual Status LinkFile(__attribute__((unused)) const std::string &src,
534 				__attribute__((unused)) const std::string &t) override
535 	{
536 		return Status::NotSupported("SpdkEnv does not support LinkFile");
537 	}
538 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
539 	{
540 		struct spdk_file_stat stat;
541 		int rc;
542 		std::string name = sanitize_path(fname, mDirectory);
543 
544 		set_channel();
545 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
546 		if (rc == -ENOENT) {
547 			return EnvWrapper::GetFileSize(fname, size);
548 		}
549 		*size = stat.size;
550 		return Status::OK();
551 	}
552 	virtual Status DeleteFile(const std::string &fname) override
553 	{
554 		int rc;
555 		std::string name = sanitize_path(fname, mDirectory);
556 
557 		set_channel();
558 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
559 		if (rc == -ENOENT) {
560 			return EnvWrapper::DeleteFile(fname);
561 		}
562 		return Status::OK();
563 	}
564 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
565 	{
566 		std::string name = sanitize_path(fname, mDirectory);
567 		int64_t rc;
568 
569 		set_channel();
570 		rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
571 				       SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
572 		if (!rc) {
573 			return Status::OK();
574 		} else {
575 			errno = -rc;
576 			return Status::IOError(name, strerror(errno));
577 		}
578 	}
579 	virtual Status UnlockFile(FileLock *lock) override
580 	{
581 		set_channel();
582 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
583 		return Status::OK();
584 	}
585 	virtual Status GetChildren(const std::string &dir,
586 				   std::vector<std::string> *result) override
587 	{
588 		std::string::size_type pos;
589 		std::set<std::string> dir_and_file_set;
590 		std::string full_path;
591 		std::string filename;
592 		std::string dir_name;
593 
594 		if (dir.find("archive") != std::string::npos) {
595 			return Status::OK();
596 		}
597 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
598 			spdk_fs_iter iter;
599 			struct spdk_file *file;
600 			dir_name = sanitize_path(dir, mDirectory);
601 
602 			iter = spdk_fs_iter_first(g_fs);
603 			while (iter != NULL) {
604 				file = spdk_fs_iter_get_file(iter);
605 				full_path = spdk_file_get_name(file);
606 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
607 					iter = spdk_fs_iter_next(iter);
608 					continue;
609 				}
610 				pos = full_path.find("/", dir_name.length() + 1);
611 
612 				if (pos != std::string::npos) {
613 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
614 				} else {
615 					filename = full_path.substr(dir_name.length() + 1);
616 				}
617 				dir_and_file_set.insert(filename);
618 				iter = spdk_fs_iter_next(iter);
619 			}
620 
621 			for (auto &s : dir_and_file_set) {
622 				result->push_back(s);
623 			}
624 
625 			result->push_back(".");
626 			result->push_back("..");
627 
628 			return Status::OK();
629 		}
630 		return EnvWrapper::GetChildren(dir, result);
631 	}
632 };
633 
634 /* The thread local constructor doesn't work for the main thread, since
635  * the filesystem hasn't been loaded yet.  So we break out this
636  * SpdkInitializeThread function, so that the main thread can explicitly
637  * call it after the filesystem has been loaded.
638  */
639 void SpdkInitializeThread(void)
640 {
641 	struct spdk_thread *thread;
642 
643 	if (g_fs != NULL) {
644 		if (g_sync_args.channel) {
645 			spdk_fs_free_thread_ctx(g_sync_args.channel);
646 		}
647 		thread = spdk_thread_create("spdk_rocksdb", NULL);
648 		spdk_set_thread(thread);
649 		g_sync_args.channel = spdk_fs_alloc_thread_ctx(g_fs);
650 	}
651 }
652 
653 static void
654 fs_load_cb(__attribute__((unused)) void *ctx,
655 	   struct spdk_filesystem *fs, int fserrno)
656 {
657 	if (fserrno == 0) {
658 		g_fs = fs;
659 	}
660 	g_spdk_ready = true;
661 }
662 
663 static void
664 base_bdev_event_cb(enum spdk_bdev_event_type type, __attribute__((unused)) struct spdk_bdev *bdev,
665 		   __attribute__((unused)) void *event_ctx)
666 {
667 	printf("Unsupported bdev event: type %d\n", type);
668 }
669 
670 static void
671 rocksdb_run(__attribute__((unused)) void *arg1)
672 {
673 	int rc;
674 
675 	rc = spdk_bdev_create_bs_dev_ext(g_bdev_name.c_str(), base_bdev_event_cb, NULL,
676 					 &g_bs_dev);
677 	if (rc != 0) {
678 		printf("Could not create blob bdev\n");
679 		spdk_app_stop(0);
680 		exit(1);
681 	}
682 
683 	g_lcore = spdk_env_get_first_core();
684 
685 	printf("using bdev %s\n", g_bdev_name.c_str());
686 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
687 }
688 
689 static void
690 fs_unload_cb(__attribute__((unused)) void *ctx,
691 	     __attribute__((unused)) int fserrno)
692 {
693 	assert(fserrno == 0);
694 
695 	spdk_app_stop(0);
696 }
697 
698 static void
699 rocksdb_shutdown(void)
700 {
701 	if (g_fs != NULL) {
702 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
703 	} else {
704 		fs_unload_cb(NULL, 0);
705 	}
706 }
707 
708 static void *
709 initialize_spdk(void *arg)
710 {
711 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
712 	int rc;
713 
714 	rc = spdk_app_start(opts, rocksdb_run, NULL);
715 	/*
716 	 * TODO:  Revisit for case of internal failure of
717 	 * spdk_app_start(), itself.  At this time, it's known
718 	 * the only application's use of spdk_app_stop() passes
719 	 * a zero; i.e. no fail (non-zero) cases so here we
720 	 * assume there was an internal failure and flag it
721 	 * so we can throw an exception.
722 	 */
723 	if (rc) {
724 		g_spdk_start_failure = true;
725 	} else {
726 		spdk_app_fini();
727 		delete opts;
728 	}
729 	pthread_exit(NULL);
730 
731 }
732 
733 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
734 		 const std::string &bdev, uint64_t cache_size_in_mb)
735 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
736 {
737 	struct spdk_app_opts *opts = new struct spdk_app_opts;
738 
739 	spdk_app_opts_init(opts, sizeof(*opts));
740 	opts->name = "rocksdb";
741 	opts->json_config_file = mConfig.c_str();
742 	opts->shutdown_cb = rocksdb_shutdown;
743 	opts->tpoint_group_mask = "0x80";
744 
745 	spdk_fs_set_cache_size(cache_size_in_mb);
746 	g_bdev_name = mBdev;
747 
748 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
749 	while (!g_spdk_ready && !g_spdk_start_failure)
750 		;
751 	if (g_spdk_start_failure) {
752 		delete opts;
753 		throw SpdkAppStartException("spdk_app_start() unable to start rocksdb_run()");
754 	}
755 
756 	SpdkInitializeThread();
757 }
758 
759 SpdkEnv::~SpdkEnv()
760 {
761 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
762 	 * do the work before the test quit.
763 	 */
764 	if (g_fs != NULL) {
765 		spdk_fs_iter iter;
766 		struct spdk_file *file;
767 
768 		if (!g_sync_args.channel) {
769 			SpdkInitializeThread();
770 		}
771 
772 		iter = spdk_fs_iter_first(g_fs);
773 		while (iter != NULL) {
774 			file = spdk_fs_iter_get_file(iter);
775 			spdk_file_close(file, g_sync_args.channel);
776 			iter = spdk_fs_iter_next(iter);
777 		}
778 	}
779 
780 	spdk_app_start_shutdown();
781 	pthread_join(mSpdkTid, NULL);
782 }
783 
784 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
785 		const std::string &bdev, uint64_t cache_size_in_mb)
786 {
787 	try {
788 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
789 		if (g_fs != NULL) {
790 			return spdk_env;
791 		} else {
792 			delete spdk_env;
793 			return NULL;
794 		}
795 	} catch (SpdkAppStartException &e) {
796 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
797 		return NULL;
798 	} catch (...) {
799 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
800 		return NULL;
801 	}
802 }
803 
804 } // namespace rocksdb
805