xref: /spdk/lib/rocksdb/env_spdk.cc (revision 2bcabb20043c52f45a3168b1e1e30c25242e4855)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "env_posix.cc"
35 
36 extern "C" {
37 #include "spdk/env.h"
38 #include "spdk/event.h"
39 #include "spdk/blob.h"
40 #include "spdk/blobfs.h"
41 #include "spdk/blob_bdev.h"
42 #include "spdk/log.h"
43 #include "spdk/io_channel.h"
44 #include "spdk/bdev.h"
45 }
46 
47 namespace rocksdb
48 {
49 
50 struct spdk_filesystem *g_fs = NULL;
51 struct spdk_bs_dev *g_bs_dev;
52 std::string g_bdev_name;
53 volatile bool g_spdk_ready = false;
54 struct sync_args {
55 	struct spdk_io_channel *channel;
56 };
57 
58 __thread struct sync_args g_sync_args;
59 
60 static void
61 __call_fn(void *arg1, void *arg2)
62 {
63 	fs_request_fn fn;
64 
65 	fn = (fs_request_fn)arg1;
66 	fn(arg2);
67 }
68 
69 static void
70 __send_request(fs_request_fn fn, void *arg)
71 {
72 	struct spdk_event *event;
73 
74 	event = spdk_event_allocate(0, __call_fn, (void *)fn, arg);
75 	spdk_event_call(event);
76 }
77 
78 class SpdkSequentialFile : public SequentialFile
79 {
80 	struct spdk_file *mFile;
81 	uint64_t mOffset;
82 public:
83 	SpdkSequentialFile(struct spdk_file *file) : mFile(file), mOffset(0) {}
84 	virtual ~SpdkSequentialFile();
85 
86 	virtual Status Read(size_t n, Slice *result, char *scratch) override;
87 	virtual Status Skip(uint64_t n) override;
88 	virtual Status InvalidateCache(size_t offset, size_t length) override;
89 };
90 
91 static std::string
92 basename(std::string full)
93 {
94 	return full.substr(full.rfind("/") + 1);
95 }
96 
97 SpdkSequentialFile::~SpdkSequentialFile(void)
98 {
99 	spdk_file_close(mFile, g_sync_args.channel);
100 }
101 
102 Status
103 SpdkSequentialFile::Read(size_t n, Slice *result, char *scratch)
104 {
105 	uint64_t ret;
106 
107 	ret = spdk_file_read(mFile, g_sync_args.channel, scratch, mOffset, n);
108 	mOffset += ret;
109 	*result = Slice(scratch, ret);
110 	return Status::OK();
111 }
112 
113 Status
114 SpdkSequentialFile::Skip(uint64_t n)
115 {
116 	mOffset += n;
117 	return Status::OK();
118 }
119 
120 Status
121 SpdkSequentialFile::InvalidateCache(size_t offset, size_t length)
122 {
123 	return Status::OK();
124 }
125 
126 class SpdkRandomAccessFile : public RandomAccessFile
127 {
128 	struct spdk_file *mFile;
129 public:
130 	SpdkRandomAccessFile(const std::string &fname, const EnvOptions &options);
131 	virtual ~SpdkRandomAccessFile();
132 
133 	virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const override;
134 	virtual Status InvalidateCache(size_t offset, size_t length) override;
135 };
136 
137 SpdkRandomAccessFile::SpdkRandomAccessFile(const std::string &fname, const EnvOptions &options)
138 {
139 	spdk_fs_open_file(g_fs, g_sync_args.channel, fname.c_str(), SPDK_BLOBFS_OPEN_CREATE, &mFile);
140 }
141 
142 SpdkRandomAccessFile::~SpdkRandomAccessFile(void)
143 {
144 	spdk_file_close(mFile, g_sync_args.channel);
145 }
146 
147 Status
148 SpdkRandomAccessFile::Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
149 {
150 	spdk_file_read(mFile, g_sync_args.channel, scratch, offset, n);
151 	*result = Slice(scratch, n);
152 	return Status::OK();
153 }
154 
155 Status
156 SpdkRandomAccessFile::InvalidateCache(size_t offset, size_t length)
157 {
158 	return Status::OK();
159 }
160 
161 class SpdkWritableFile : public WritableFile
162 {
163 	struct spdk_file *mFile;
164 	uint32_t mSize;
165 
166 public:
167 	SpdkWritableFile(const std::string &fname, const EnvOptions &options);
168 	~SpdkWritableFile()
169 	{
170 		if (mFile != NULL) {
171 			Close();
172 		}
173 	}
174 
175 	virtual void SetIOPriority(Env::IOPriority pri)
176 	{
177 		if (pri == Env::IO_HIGH) {
178 			spdk_file_set_priority(mFile, SPDK_FILE_PRIORITY_HIGH);
179 		}
180 	}
181 
182 	virtual Status Truncate(uint64_t size) override
183 	{
184 		spdk_file_truncate(mFile, g_sync_args.channel, size);
185 		mSize = size;
186 		return Status::OK();
187 	}
188 	virtual Status Close() override
189 	{
190 		spdk_file_close(mFile, g_sync_args.channel);
191 		mFile = NULL;
192 		return Status::OK();
193 	}
194 	virtual Status Append(const Slice &data) override;
195 	virtual Status Flush() override
196 	{
197 		return Status::OK();
198 	}
199 	virtual Status Sync() override
200 	{
201 		spdk_file_sync(mFile, g_sync_args.channel);
202 		return Status::OK();
203 	}
204 	virtual Status Fsync() override
205 	{
206 		spdk_file_sync(mFile, g_sync_args.channel);
207 		return Status::OK();
208 	}
209 	virtual bool IsSyncThreadSafe() const override
210 	{
211 		return true;
212 	}
213 	virtual uint64_t GetFileSize() override
214 	{
215 		return mSize;
216 	}
217 	virtual Status InvalidateCache(size_t offset, size_t length) override
218 	{
219 		return Status::OK();
220 	}
221 #ifdef ROCKSDB_FALLOCATE_PRESENT
222 	virtual Status Allocate(uint64_t offset, uint64_t len) override
223 	{
224 		spdk_file_truncate(mFile, g_sync_args.channel, offset + len);
225 		return Status::OK();
226 	}
227 	virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override
228 	{
229 		/*
230 		 * SPDK BlobFS does not have a range sync operation yet, so just sync
231 		 *  the whole file.
232 		 */
233 		spdk_file_sync(mFile, g_sync_args.channel);
234 		return Status::OK();
235 	}
236 	virtual size_t GetUniqueId(char *id, size_t max_size) const override
237 	{
238 		return 0;
239 	}
240 #endif
241 };
242 
243 SpdkWritableFile::SpdkWritableFile(const std::string &fname, const EnvOptions &options) : mSize(0)
244 {
245 	spdk_fs_open_file(g_fs, g_sync_args.channel, fname.c_str(), SPDK_BLOBFS_OPEN_CREATE, &mFile);
246 }
247 
248 Status
249 SpdkWritableFile::Append(const Slice &data)
250 {
251 	spdk_file_write(mFile, g_sync_args.channel, (void *)data.data(), mSize, data.size());
252 	mSize += data.size();
253 
254 	return Status::OK();
255 }
256 
257 class SpdkDirectory : public Directory
258 {
259 public:
260 	SpdkDirectory() {}
261 	~SpdkDirectory() {}
262 	Status Fsync() override
263 	{
264 		return Status::OK();
265 	}
266 };
267 
268 class SpdkEnv : public PosixEnv
269 {
270 private:
271 	pthread_t mSpdkTid;
272 	std::string mDirectory;
273 	std::string mConfig;
274 	std::string mBdev;
275 
276 public:
277 	SpdkEnv(const std::string &dir, const std::string &conf,
278 		const std::string &bdev, uint64_t cache_size_in_mb);
279 
280 	virtual ~SpdkEnv();
281 
282 	virtual Status NewSequentialFile(const std::string &fname,
283 					 unique_ptr<SequentialFile> *result,
284 					 const EnvOptions &options) override
285 	{
286 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
287 			struct spdk_file *file;
288 			int rc;
289 
290 			rc = spdk_fs_open_file(g_fs, g_sync_args.channel,
291 					       basename(fname).c_str(), 0, &file);
292 			if (rc == 0) {
293 				result->reset(new SpdkSequentialFile(file));
294 				return Status::OK();
295 			} else {
296 				/* Myrocks engine uses errno(ENOENT) as one
297 				 * special condition, for the purpose to
298 				 * support MySQL, set the errno to right value.
299 				 */
300 				errno = -rc;
301 				return IOError(fname, errno);
302 			}
303 		} else {
304 			return PosixEnv::NewSequentialFile(fname, result, options);
305 		}
306 	}
307 
308 	virtual Status NewRandomAccessFile(const std::string &fname,
309 					   unique_ptr<RandomAccessFile> *result,
310 					   const EnvOptions &options) override
311 	{
312 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
313 			result->reset(new SpdkRandomAccessFile(basename(fname), options));
314 			return Status::OK();
315 		} else {
316 			return PosixEnv::NewRandomAccessFile(fname, result, options);
317 		}
318 	}
319 
320 	virtual Status NewWritableFile(const std::string &fname,
321 				       unique_ptr<WritableFile> *result,
322 				       const EnvOptions &options) override
323 	{
324 		if (fname.compare(0, mDirectory.length(), mDirectory) == 0) {
325 			result->reset(new SpdkWritableFile(basename(fname), options));
326 			return Status::OK();
327 		} else {
328 			return PosixEnv::NewWritableFile(fname, result, options);
329 		}
330 	}
331 
332 	virtual Status ReuseWritableFile(const std::string &fname,
333 					 const std::string &old_fname,
334 					 unique_ptr<WritableFile> *result,
335 					 const EnvOptions &options) override
336 	{
337 		return PosixEnv::ReuseWritableFile(fname, old_fname, result, options);
338 	}
339 
340 	virtual Status NewDirectory(const std::string &name,
341 				    unique_ptr<Directory> *result) override
342 	{
343 		result->reset(new SpdkDirectory());
344 		return Status::OK();
345 	}
346 	virtual Status FileExists(const std::string &fname) override
347 	{
348 		struct spdk_file_stat stat;
349 		std::string fname_base = basename(fname);
350 		int rc;
351 
352 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, fname_base.c_str(), &stat);
353 		if (rc == 0) {
354 			return Status::OK();
355 		}
356 		return PosixEnv::FileExists(fname);
357 	}
358 	virtual Status RenameFile(const std::string &src, const std::string &target) override
359 	{
360 		std::string target_base = basename(target);
361 		std::string src_base = basename(src);
362 		int rc;
363 
364 		rc = spdk_fs_rename_file(g_fs, g_sync_args.channel,
365 					 src_base.c_str(), target_base.c_str());
366 		if (rc == -ENOENT) {
367 			return PosixEnv::RenameFile(src, target);
368 		}
369 		return Status::OK();
370 	}
371 	virtual Status LinkFile(const std::string &src, const std::string &target) override
372 	{
373 		return Status::NotSupported("SpdkEnv does not support LinkFile");
374 	}
375 	virtual Status GetFileSize(const std::string &fname, uint64_t *size) override
376 	{
377 		struct spdk_file_stat stat;
378 		std::string fname_base = basename(fname);
379 		int rc;
380 
381 		rc = spdk_fs_file_stat(g_fs, g_sync_args.channel, fname_base.c_str(), &stat);
382 		if (rc == -ENOENT) {
383 			return PosixEnv::GetFileSize(fname, size);
384 		}
385 		*size = stat.size;
386 		return Status::OK();
387 	}
388 	virtual Status DeleteFile(const std::string &fname) override
389 	{
390 		int rc;
391 		std::string fname_base = basename(fname);
392 		rc = spdk_fs_delete_file(g_fs, g_sync_args.channel, fname_base.c_str());
393 		if (rc == -ENOENT) {
394 			return PosixEnv::DeleteFile(fname);
395 		}
396 		return Status::OK();
397 	}
398 	virtual void StartThread(void (*function)(void *arg), void *arg) override;
399 	virtual Status LockFile(const std::string &fname, FileLock **lock) override
400 	{
401 		spdk_fs_open_file(g_fs, g_sync_args.channel, basename(fname).c_str(),
402 				  SPDK_BLOBFS_OPEN_CREATE, (struct spdk_file **)lock);
403 		return Status::OK();
404 	}
405 	virtual Status UnlockFile(FileLock *lock) override
406 	{
407 		spdk_file_close((struct spdk_file *)lock, g_sync_args.channel);
408 		return Status::OK();
409 	}
410 	virtual Status GetChildren(const std::string &dir,
411 				   std::vector<std::string> *result) override
412 	{
413 		if (dir.find("archive") != std::string::npos) {
414 			return Status::OK();
415 		}
416 		if (dir.compare(0, mDirectory.length(), mDirectory) == 0) {
417 			spdk_fs_iter iter;
418 			struct spdk_file *file;
419 
420 			iter = spdk_fs_iter_first(g_fs);
421 			while (iter != NULL) {
422 				file = spdk_fs_iter_get_file(iter);
423 				result->push_back(std::string(spdk_file_get_name(file)));
424 				iter = spdk_fs_iter_next(iter);
425 			}
426 			return Status::OK();
427 		}
428 		return PosixEnv::GetChildren(dir, result);
429 	}
430 };
431 
432 static void
433 _spdk_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
434 {
435 	/* Not supported */
436 	assert(false);
437 }
438 
439 void SpdkInitializeThread(void)
440 {
441 	if (g_fs != NULL) {
442 		spdk_allocate_thread(_spdk_send_msg, NULL);
443 		g_sync_args.channel = spdk_fs_alloc_io_channel_sync(g_fs);
444 	}
445 }
446 
447 static void SpdkStartThreadWrapper(void *arg)
448 {
449 	StartThreadState *state = reinterpret_cast<StartThreadState *>(arg);
450 
451 	SpdkInitializeThread();
452 	StartThreadWrapper(state);
453 }
454 
455 void SpdkEnv::StartThread(void (*function)(void *arg), void *arg)
456 {
457 	StartThreadState *state = new StartThreadState;
458 	state->user_function = function;
459 	state->arg = arg;
460 	PosixEnv::StartThread(SpdkStartThreadWrapper, state);
461 }
462 
463 static void
464 fs_load_cb(void *ctx, struct spdk_filesystem *fs, int fserrno)
465 {
466 	if (fserrno == 0) {
467 		g_fs = fs;
468 	}
469 	g_spdk_ready = true;
470 }
471 
472 static void
473 spdk_rocksdb_run(void *arg1, void *arg2)
474 {
475 	struct spdk_bdev *bdev;
476 
477 	pthread_setname_np(pthread_self(), "spdk");
478 	bdev = spdk_bdev_get_by_name(g_bdev_name.c_str());
479 
480 	if (bdev == NULL) {
481 		SPDK_ERRLOG("bdev %s not found\n", g_bdev_name.c_str());
482 		exit(1);
483 	}
484 
485 	g_bs_dev = spdk_bdev_create_bs_dev(bdev);
486 	printf("using bdev %s\n", g_bdev_name.c_str());
487 	spdk_fs_load(g_bs_dev, __send_request, fs_load_cb, NULL);
488 }
489 
490 static void
491 fs_unload_cb(void *ctx, int fserrno)
492 {
493 	assert(fserrno == 0);
494 
495 	spdk_app_stop(0);
496 }
497 
498 static void
499 spdk_rocksdb_shutdown(void)
500 {
501 	if (g_fs != NULL) {
502 		spdk_fs_unload(g_fs, fs_unload_cb, NULL);
503 	} else {
504 		fs_unload_cb(NULL, 0);
505 	}
506 }
507 
508 static void *
509 initialize_spdk(void *arg)
510 {
511 	struct spdk_app_opts *opts = (struct spdk_app_opts *)arg;
512 
513 	spdk_app_start(opts, spdk_rocksdb_run, NULL, NULL);
514 	spdk_app_fini();
515 
516 	delete opts;
517 	pthread_exit(NULL);
518 }
519 
520 SpdkEnv::SpdkEnv(const std::string &dir, const std::string &conf,
521 		 const std::string &bdev, uint64_t cache_size_in_mb)
522 	: PosixEnv(), mDirectory(dir), mConfig(conf), mBdev(bdev)
523 {
524 	struct spdk_app_opts *opts = new struct spdk_app_opts;
525 
526 	spdk_app_opts_init(opts);
527 	opts->name = "rocksdb";
528 	opts->config_file = mConfig.c_str();
529 	opts->reactor_mask = "0x1";
530 	opts->mem_size = 1024 + cache_size_in_mb;
531 	opts->shutdown_cb = spdk_rocksdb_shutdown;
532 
533 	spdk_fs_set_cache_size(cache_size_in_mb);
534 	g_bdev_name = mBdev;
535 
536 	pthread_create(&mSpdkTid, NULL, &initialize_spdk, opts);
537 	while (!g_spdk_ready)
538 		;
539 
540 	SpdkInitializeThread();
541 }
542 
543 SpdkEnv::~SpdkEnv()
544 {
545 	spdk_app_start_shutdown();
546 	pthread_join(mSpdkTid, NULL);
547 }
548 
549 void NewSpdkEnv(Env **env, const std::string &dir, const std::string &conf,
550 		const std::string &bdev, uint64_t cache_size_in_mb)
551 {
552 	SpdkEnv *spdk_env = new SpdkEnv(dir, conf, bdev, cache_size_in_mb);
553 
554 	if (g_fs != NULL) {
555 		*env = spdk_env;
556 	} else {
557 		*env = NULL;
558 		delete spdk_env;
559 	}
560 }
561 
562 } // namespace rocksdb
563