xref: /spdk/lib/rocksdb/env_spdk.cc (revision 8a0a98d35e21f282088edf28b9e8da66ec390e3a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "rocksdb/env.h"
35 #include <set>
36 #include <iostream>
37 #include <stdexcept>
38 
39 extern "C" {
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/blob.h"
43 #include "spdk/blobfs.h"
44 #include "spdk/blob_bdev.h"
45 #include "spdk/log.h"
46 #include "spdk/thread.h"
47 #include "spdk/bdev.h"
48 }
49 
50 namespace rocksdb
51 {
52 
53 struct spdk_filesystem *g_fs = NULL;
54 struct spdk_bs_dev *g_bs_dev;
55 uint32_t g_lcore = 0;
56 std::string g_bdev_name;
57 volatile bool g_spdk_ready = false;
58 volatile bool g_spdk_start_failure = false;
59 struct sync_args {
60 	struct spdk_io_channel *channel;
61 };
62 
63 __thread struct sync_args g_sync_args;
64 
65 static void
66 __call_fn(void *arg1, void *arg2)
67 {
68 	fs_request_fn fn;
69 
70 	fn = (fs_request_fn)arg1;
71 	fn(arg2);
72 }
73 
74 static void
75 __send_request(fs_request_fn fn, void *arg)
76 {
77 	struct spdk_event *event;
78 
79 	event = spdk_event_allocate(g_lcore, __call_fn, (void *)fn, arg);
80 	spdk_event_call(event);
81 }
82 
83 static std::string
84 sanitize_path(const std::string &input, const std::string &mount_directory)
85 {
86 	int index = 0;
87 	std::string name;
88 	std::string input_tmp;
89 
90 	input_tmp = input.substr(mount_directory.length(), input.length());
91 	for (const char &c : input_tmp) {
92 		if (index == 0) {
93 			if (c != '/') {
94 				name = name.insert(index, 1, '/');
95 				index++;
96 			}
97 			name = name.insert(index, 1, c);
98 			index++;
99 		} else {
100 			if (name[index - 1] == '/' && c == '/') {
101 				continue;
102 			} else {
103 				name = name.insert(index, 1, c);
104 				index++;
105 			}
106 		}
107 	}
108 
109 	if (name[name.size() - 1] == '/') {
110 		name = name.erase(name.size() - 1, 1);
111 	}
112 	return name;
113 }
114 
115 class SpdkSequentialFile : public SequentialFile
116 {
117 	struct spdk_file *mFile;
118 	uint64_t mOffset;
119 public:
120 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
121 	virtual ~SpdkSequentialFile();
122 
123 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
124 	virtual Status Skip(uint64_t n) override;
125 	virtual Status InvalidateCache(size_t offset, size_t length) override;
126 };
127 
128 SpdkSequentialFile::~SpdkSequentialFile(void)
129 {
130 	spdk_file_close(mFile, g_sync_args.channel);
131 }
132 
133 Status
134 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
135 {
136 	uint64_t ret;
137 
138 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
139 	mOffset += ret;
140 	*result = Slice(scratch, ret);
141 	return Status::OK();
142 }
143 
144 Status
145 SpdkSequentialFile::Skip(uint64_t n)
146 {
147 	mOffset += n;
148 	return Status::OK();
149 }
150 
151 Status
152 SpdkSequentialFile::InvalidateCache(size_t offset, size_t length)
153 {
154 	return Status::OK();
155 }
156 
157 class SpdkRandomAccessFile : public RandomAccessFile
158 {
159 	struct spdk_file *mFile;
160 public:
161 	SpdkRandomAccessFile(struct spdk_file *file) : mFile(file) {}
162 	virtual ~SpdkRandomAccessFile();
163 
164 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
165 	virtual Status InvalidateCache(size_t offset, size_t length) override;
166 };
167 
168 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
169 {
170 	spdk_file_close(mFile, g_sync_args.channel);
171 }
172 
173 Status
174 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
175 {
176 	spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
177 	*result = Slice(scratch, n);
178 	return Status::OK();
179 }
180 
181 Status
182 SpdkRandomAccessFile::InvalidateCache(size_t offset, size_t length)
183 {
184 	return Status::OK();
185 }
186 
187 class SpdkWritableFile : public WritableFile
188 {
189 	struct spdk_file *mFile;
190 	uint64_t mSize;
191 
192 public:
193 	SpdkWritableFile(struct spdk_file *file) : mFile(file), mSize(0) {}
194 	~SpdkWritableFile()
195 	{
196 		if (mFile != NULL) {
197 			Close();
198 		}
199 	}
200 
201 	virtual void SetIOPriority(Env::IOPriority pri)
202 	{
203 		if (pri == Env::IO_HIGH) {
204 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
205 		}
206 	}
207 
208 	virtual Status Truncate(uint64_t size) override
209 	{
210 		spdk_file_truncate(mFile, g_sync_args.channel, size);
211 		mSize = size;
212 		return Status::OK();
213 	}
214 	virtual Status Close() override
215 	{
216 		spdk_file_close(mFile, g_sync_args.channel);
217 		mFile = NULL;
218 		return Status::OK();
219 	}
220 	virtual Status Append(const Slice &data) override;
221 	virtual Status Flush() override
222 	{
223 		return Status::OK();
224 	}
225 	virtual Status Sync() override
226 	{
227 		spdk_file_sync(mFile, g_sync_args.channel);
228 		return Status::OK();
229 	}
230 	virtual Status Fsync() override
231 	{
232 		spdk_file_sync(mFile, g_sync_args.channel);
233 		return Status::OK();
234 	}
235 	virtual bool IsSyncThreadSafe() const override
236 	{
237 		return true;
238 	}
239 	virtual uint64_t GetFileSize() override
240 	{
241 		return mSize;
242 	}
243 	virtual Status InvalidateCache(size_t offset, size_t length) override
244 	{
245 		return Status::OK();
246 	}
247 	virtual Status Allocate(uint64_t offset, uint64_t len) override
248 	{
249 		spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
250 		return Status::OK();
251 	}
252 	virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override
253 	{
254 		/*
255 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
256 		 *  the whole file.
257 		 */
258 		spdk_file_sync(mFile, g_sync_args.channel);
259 		return Status::OK();
260 	}
261 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
262 	{
263 		return 0;
264 	}
265 };
266 
267 Status
268 SpdkWritableFile::Append(const Slice &data)
269 {
270 	spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
271 	mSize += data.size();
272 
273 	return Status::OK();
274 }
275 
276 class SpdkDirectory : public Directory
277 {
278 public:
279 	SpdkDirectory() {}
280 	~SpdkDirectory() {}
281 	Status Fsync() override
282 	{
283 		return Status::OK();
284 	}
285 };
286 
287 class SpdkAppStartException : public std::runtime_error
288 {
289 public:
290 	SpdkAppStartException(std::string mess): std::runtime_error(mess) {}
291 };
292 
293 class SpdkEnv : public EnvWrapper
294 {
295 private:
296 	pthread_t mSpdkTid;
297 	std::string mDirectory;
298 	std::string mConfig;
299 	std::string mBdev;
300 
301 public:
302 	SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
303 		const std::string &bdev, uint64_t cache_size_in_mb);
304 
305 	virtual ~SpdkEnv();
306 
307 	virtual Status NewSequentialFile(const std::string &fname,
308 					 unique_ptr<SequentialFile> *result,
309 					 const EnvOptions &options) override
310 	{
311 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
312 			struct spdk_file *file;
313 			int rc;
314 
315 			std::string name = sanitize_path(fname, mDirectory);
316 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
317 					       name.c_str(), 0, &file);
318 			if (rc == 0) {
319 				result->reset(new SpdkSequentialFile(file));
320 				return Status::OK();
321 			} else {
322 				/* Myrocks engine uses errno(ENOENT) as one
323 				 * special condition, for the purpose to
324 				 * support MySQL, set the errno to right value.
325 				 */
326 				errno = -rc;
327 				return Status::IOError(name, strerror(errno));
328 			}
329 		} else {
330 			return EnvWrapper::NewSequentialFile(fname, result, options);
331 		}
332 	}
333 
334 	virtual Status NewRandomAccessFile(const std::string &fname,
335 					   unique_ptr<RandomAccessFile> *result,
336 					   const EnvOptions &options) override
337 	{
338 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
339 			std::string name = sanitize_path(fname, mDirectory);
340 			struct spdk_file *file;
341 			int rc;
342 
343 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
344 					       name.c_str(), 0, &file);
345 			if (rc == 0) {
346 				result->reset(new SpdkRandomAccessFile(file));
347 				return Status::OK();
348 			} else {
349 				errno = -rc;
350 				return Status::IOError(name, strerror(errno));
351 			}
352 		} else {
353 			return EnvWrapper::NewRandomAccessFile(fname, result, options);
354 		}
355 	}
356 
357 	virtual Status NewWritableFile(const std::string &fname,
358 				       unique_ptr<WritableFile> *result,
359 				       const EnvOptions &options) override
360 	{
361 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
362 			std::string name = sanitize_path(fname, mDirectory);
363 			struct spdk_file *file;
364 			int rc;
365 
366 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
367 					       SPDK_BLOBFS_OPEN_CREATE, &file);
368 			if (rc == 0) {
369 				result->reset(new SpdkWritableFile(file));
370 				return Status::OK();
371 			} else {
372 				errno = -rc;
373 				return Status::IOError(name, strerror(errno));
374 			}
375 		} else {
376 			return EnvWrapper::NewWritableFile(fname, result, options);
377 		}
378 	}
379 
380 	virtual Status ReuseWritableFile(const std::string &fname,
381 					 const std::string &old_fname,
382 					 unique_ptr<WritableFile> *result,
383 					 const EnvOptions &options) override
384 	{
385 		return EnvWrapper::ReuseWritableFile(fname, old_fname, result, options);
386 	}
387 
388 	virtual Status NewDirectory(const std::string &name,
389 				    unique_ptr<Directory> *result) override
390 	{
391 		result->reset(new SpdkDirectory());
392 		return Status::OK();
393 	}
394 	virtual Status FileExists(const std::string &fname) override
395 	{
396 		struct spdk_file_stat stat;
397 		int rc;
398 		std::string name = sanitize_path(fname, mDirectory);
399 
400 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
401 		if (rc == 0) {
402 			return Status::OK();
403 		}
404 		return EnvWrapper::FileExists(fname);
405 	}
406 	virtual Status RenameFile(const std::string &src, const std::string &t) override
407 	{
408 		int rc;
409 		std::string src_name = sanitize_path(src, mDirectory);
410 		std::string target_name = sanitize_path(t, mDirectory);
411 
412 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
413 					 src_name.c_str(), target_name.c_str());
414 		if (rc == -ENOENT) {
415 			return EnvWrapper::RenameFile(src, t);
416 		}
417 		return Status::OK();
418 	}
419 	virtual Status LinkFile(const std::string &src, const std::string &t) override
420 	{
421 		return Status::NotSupported("SpdkEnv does not support LinkFile");
422 	}
423 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
424 	{
425 		struct spdk_file_stat stat;
426 		int rc;
427 		std::string name = sanitize_path(fname, mDirectory);
428 
429 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, name.c_str(), &stat);
430 		if (rc == -ENOENT) {
431 			return EnvWrapper::GetFileSize(fname, size);
432 		}
433 		*size = stat.size;
434 		return Status::OK();
435 	}
436 	virtual Status DeleteFile(const std::string &fname) override
437 	{
438 		int rc;
439 		std::string name = sanitize_path(fname, mDirectory);
440 
441 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, name.c_str());
442 		if (rc == -ENOENT) {
443 			return EnvWrapper::DeleteFile(fname);
444 		}
445 		return Status::OK();
446 	}
447 	virtual void StartThread(void (*function)(void *arg), void *arg) override;
448 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
449 	{
450 		std::string name = sanitize_path(fname, mDirectory);
451 
452 		spdk_fs_open_file(g_fs, g_sync_args.channel, name.c_str(),
453 				  SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
454 		return Status::OK();
455 	}
456 	virtual Status UnlockFile(FileLock *lock) override
457 	{
458 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
459 		return Status::OK();
460 	}
461 	virtual Status GetChildren(const std::string &dir,
462 				   std::vector<std::string> *result) override
463 	{
464 		std::string::size_type pos;
465 		std::set<std::string> dir_and_file_set;
466 		std::string full_path;
467 		std::string filename;
468 		std::string dir_name;
469 
470 		if (dir.find("archive") != std::string::npos) {
471 			return Status::OK();
472 		}
473 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
474 			spdk_fs_iter iter;
475 			struct spdk_file *file;
476 			dir_name = sanitize_path(dir, mDirectory);
477 
478 			iter = spdk_fs_iter_first(g_fs);
479 			while (iter != NULL) {
480 				file = spdk_fs_iter_get_file(iter);
481 				full_path = spdk_file_get_name(file);
482 				if (strncmp(dir_name.c_str(), full_path.c_str(), dir_name.length())) {
483 					iter = spdk_fs_iter_next(iter);
484 					continue;
485 				}
486 				pos = full_path.find("/", dir_name.length() + 1);
487 
488 				if (pos != std::string::npos) {
489 					filename = full_path.substr(dir_name.length() + 1, pos - dir_name.length() - 1);
490 				} else {
491 					filename = full_path.substr(dir_name.length() + 1);
492 				}
493 				dir_and_file_set.insert(filename);
494 				iter = spdk_fs_iter_next(iter);
495 			}
496 
497 			for (auto &s : dir_and_file_set) {
498 				result->push_back(s);
499 			}
500 
501 			result->push_back(".");
502 			result->push_back("..");
503 
504 			return Status::OK();
505 		}
506 		return EnvWrapper::GetChildren(dir, result);
507 	}
508 };
509 
510 static void
511 _spdk_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
512 {
513 	/* Not supported */
514 	assert(false);
515 }
516 
517 void SpdkInitializeThread(void)
518 {
519 	if (g_fs != NULL) {
520 		/* TODO: Add an event lib call to dynamically register a thread */
521 		spdk_allocate_thread(_spdk_send_msg, NULL, NULL, NULL, "spdk_rocksdb");
522 		g_sync_args.channel = spdk_fs_alloc_io_channel_sync(g_fs);
523 	}
524 }
525 
526 struct SpdkThreadState {
527 	void (*user_function)(void *);
528 	void *arg;
529 };
530 
531 static void SpdkStartThreadWrapper(void *arg)
532 {
533 	SpdkThreadState *state = reinterpret_cast<SpdkThreadState *>(arg);
534 
535 	SpdkInitializeThread();
536 	state->user_function(state->arg);
537 	delete state;
538 }
539 
540 void SpdkEnv::StartThread(void (*function)(void *arg), void *arg)
541 {
542 	SpdkThreadState *state = new SpdkThreadState;
543 	state->user_function = function;
544 	state->arg = arg;
545 	EnvWrapper::StartThread(SpdkStartThreadWrapper, state);
546 }
547 
548 static void
549 fs_load_cb(void *ctx, struct spdk_filesystem *fs, int fserrno)
550 {
551 	if (fserrno == 0) {
552 		g_fs = fs;
553 	}
554 	g_spdk_ready = true;
555 }
556 
557 static void
558 spdk_rocksdb_run(void *arg1, void *arg2)
559 {
560 	struct spdk_bdev *bdev;
561 
562 	bdev = spdk_bdev_get_by_name(g_bdev_name.c_str());
563 
564 	if (bdev == NULL) {
565 		SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str());
566 		exit(1);
567 	}
568 
569 	g_lcore = spdk_env_get_first_core();
570 
571 	g_bs_dev = spdk_bdev_create_bs_dev(bdev, NULL, NULL);
572 	printf("using bdev %s\n", g_bdev_name.c_str());
573 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
574 }
575 
576 static void
577 fs_unload_cb(void *ctx, int fserrno)
578 {
579 	assert(fserrno == 0);
580 
581 	spdk_app_stop(0);
582 }
583 
584 static void
585 spdk_rocksdb_shutdown(void)
586 {
587 	if (g_fs != NULL) {
588 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
589 	} else {
590 		fs_unload_cb(NULL, 0);
591 	}
592 }
593 
594 static void *
595 initialize_spdk(void *arg)
596 {
597 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
598 	int rc;
599 
600 	rc = spdk_app_start(opts, spdk_rocksdb_run, NULL, NULL);
601 	/*
602 	 * TODO:  Revisit for case of internal failure of
603 	 * spdk_app_start(), itself.  At this time, it's known
604 	 * the only application's use of spdk_app_stop() passes
605 	 * a zero; i.e. no fail (non-zero) cases so here we
606 	 * assume there was an internal failure and flag it
607 	 * so we can throw an exception.
608 	 */
609 	if (rc) {
610 		g_spdk_start_failure = true;
611 	} else {
612 		spdk_app_fini();
613 		delete opts;
614 	}
615 	pthread_exit(NULL);
616 
617 }
618 
619 SpdkEnv::SpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
620 		 const std::string &bdev, uint64_t cache_size_in_mb)
621 	: EnvWrapper(base_env), mDirectory(dir), mConfig(conf), mBdev(bdev)
622 {
623 	struct spdk_app_opts *opts = new struct spdk_app_opts;
624 
625 	spdk_app_opts_init(opts);
626 	opts->name = "rocksdb";
627 	opts->config_file = mConfig.c_str();
628 	opts->mem_size = 1024 + cache_size_in_mb;
629 	opts->shutdown_cb = spdk_rocksdb_shutdown;
630 
631 	spdk_fs_set_cache_size(cache_size_in_mb);
632 	g_bdev_name = mBdev;
633 
634 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
635 	while (!g_spdk_ready && !g_spdk_start_failure)
636 		;
637 	if (g_spdk_start_failure) {
638 		delete opts;
639 		throw SpdkAppStartException("spdk_app_start() unable to start spdk_rocksdb_run()");
640 	}
641 
642 	SpdkInitializeThread();
643 }
644 
645 SpdkEnv::~SpdkEnv()
646 {
647 	/* This is a workaround for rocksdb test, we close the files if the rocksdb not
648 	 * do the work before the test quit.
649 	 */
650 	if (g_fs != NULL) {
651 		spdk_fs_iter iter;
652 		struct spdk_file *file;
653 
654 		if (!g_sync_args.channel) {
655 			SpdkInitializeThread();
656 		}
657 		iter = spdk_fs_iter_first(g_fs);
658 		while (iter != NULL) {
659 			file = spdk_fs_iter_get_file(iter);
660 			spdk_file_close(file, g_sync_args.channel);
661 			iter = spdk_fs_iter_next(iter);
662 		}
663 	}
664 
665 	spdk_app_start_shutdown();
666 	pthread_join(mSpdkTid, NULL);
667 }
668 
669 Env *NewSpdkEnv(Env *base_env, const std::string &dir, const std::string &conf,
670 		const std::string &bdev, uint64_t cache_size_in_mb)
671 {
672 	try {
673 		SpdkEnv *spdk_env = new SpdkEnv(base_env, dir, conf, bdev, cache_size_in_mb);
674 		if (g_fs != NULL) {
675 			return spdk_env;
676 		} else {
677 			delete spdk_env;
678 			return NULL;
679 		}
680 	} catch (SpdkAppStartException &e) {
681 		SPDK_ERRLOG("NewSpdkEnv: exception caught: %s", e.what());
682 		return NULL;
683 	} catch (...) {
684 		SPDK_ERRLOG("NewSpdkEnv: default exception caught");
685 		return NULL;
686 	}
687 }
688 
689 } // namespace rocksdb
690