xref: /spdk/lib/blobfs/blobfs.c (revision 88e3ffd7b6c5ec1ea1a660354d25f02c766092e1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "tree.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk/log.h"
45 #include "spdk/trace.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
57 
58 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
59 static struct spdk_mempool *g_cache_pool;
60 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches);
61 static struct spdk_poller *g_cache_pool_mgmt_poller;
62 static struct spdk_thread *g_cache_pool_thread;
63 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
64 static int g_fs_count = 0;
65 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
66 
67 #define TRACE_GROUP_BLOBFS	0x7
68 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
69 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
70 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
71 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
72 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
73 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
74 
75 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
76 {
77 	spdk_trace_register_description("BLOBFS_XATTR_START",
78 					TRACE_BLOBFS_XATTR_START,
79 					OWNER_NONE, OBJECT_NONE, 0,
80 					SPDK_TRACE_ARG_TYPE_STR,
81 					"file:    ");
82 	spdk_trace_register_description("BLOBFS_XATTR_END",
83 					TRACE_BLOBFS_XATTR_END,
84 					OWNER_NONE, OBJECT_NONE, 0,
85 					SPDK_TRACE_ARG_TYPE_STR,
86 					"file:    ");
87 	spdk_trace_register_description("BLOBFS_OPEN",
88 					TRACE_BLOBFS_OPEN,
89 					OWNER_NONE, OBJECT_NONE, 0,
90 					SPDK_TRACE_ARG_TYPE_STR,
91 					"file:    ");
92 	spdk_trace_register_description("BLOBFS_CLOSE",
93 					TRACE_BLOBFS_CLOSE,
94 					OWNER_NONE, OBJECT_NONE, 0,
95 					SPDK_TRACE_ARG_TYPE_STR,
96 					"file:    ");
97 	spdk_trace_register_description("BLOBFS_DELETE_START",
98 					TRACE_BLOBFS_DELETE_START,
99 					OWNER_NONE, OBJECT_NONE, 0,
100 					SPDK_TRACE_ARG_TYPE_STR,
101 					"file:    ");
102 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
103 					TRACE_BLOBFS_DELETE_DONE,
104 					OWNER_NONE, OBJECT_NONE, 0,
105 					SPDK_TRACE_ARG_TYPE_STR,
106 					"file:    ");
107 }
108 
109 void
110 cache_buffer_free(struct cache_buffer *cache_buffer)
111 {
112 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
113 	free(cache_buffer);
114 }
115 
116 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
117 
118 struct spdk_file {
119 	struct spdk_filesystem	*fs;
120 	struct spdk_blob	*blob;
121 	char			*name;
122 	uint64_t		trace_arg_name;
123 	uint64_t		length;
124 	bool                    is_deleted;
125 	bool			open_for_writing;
126 	uint64_t		length_flushed;
127 	uint64_t		length_xattr;
128 	uint64_t		append_pos;
129 	uint64_t		seq_byte_count;
130 	uint64_t		next_seq_offset;
131 	uint32_t		priority;
132 	TAILQ_ENTRY(spdk_file)	tailq;
133 	spdk_blob_id		blobid;
134 	uint32_t		ref_count;
135 	pthread_spinlock_t	lock;
136 	struct cache_buffer	*last;
137 	struct cache_tree	*tree;
138 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
139 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
140 	TAILQ_ENTRY(spdk_file)	cache_tailq;
141 };
142 
143 struct spdk_deleted_file {
144 	spdk_blob_id	id;
145 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
146 };
147 
148 struct spdk_filesystem {
149 	struct spdk_blob_store	*bs;
150 	TAILQ_HEAD(, spdk_file)	files;
151 	struct spdk_bs_opts	bs_opts;
152 	struct spdk_bs_dev	*bdev;
153 	fs_send_request_fn	send_request;
154 
155 	struct {
156 		uint32_t		max_ops;
157 		struct spdk_io_channel	*sync_io_channel;
158 		struct spdk_fs_channel	*sync_fs_channel;
159 	} sync_target;
160 
161 	struct {
162 		uint32_t		max_ops;
163 		struct spdk_io_channel	*md_io_channel;
164 		struct spdk_fs_channel	*md_fs_channel;
165 	} md_target;
166 
167 	struct {
168 		uint32_t		max_ops;
169 	} io_target;
170 };
171 
172 struct spdk_fs_cb_args {
173 	union {
174 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
175 		spdk_fs_op_complete			fs_op;
176 		spdk_file_op_with_handle_complete	file_op_with_handle;
177 		spdk_file_op_complete			file_op;
178 		spdk_file_stat_op_complete		stat_op;
179 	} fn;
180 	void *arg;
181 	sem_t *sem;
182 	struct spdk_filesystem *fs;
183 	struct spdk_file *file;
184 	int rc;
185 	int *rwerrno;
186 	struct iovec *iovs;
187 	uint32_t iovcnt;
188 	struct iovec iov;
189 	union {
190 		struct {
191 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
192 		} fs_load;
193 		struct {
194 			uint64_t	length;
195 		} truncate;
196 		struct {
197 			struct spdk_io_channel	*channel;
198 			void		*pin_buf;
199 			int		is_read;
200 			off_t		offset;
201 			size_t		length;
202 			uint64_t	start_lba;
203 			uint64_t	num_lba;
204 			uint32_t	blocklen;
205 		} rw;
206 		struct {
207 			const char	*old_name;
208 			const char	*new_name;
209 		} rename;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 		} flush;
214 		struct {
215 			struct cache_buffer	*cache_buffer;
216 			uint64_t		length;
217 			uint64_t		offset;
218 		} readahead;
219 		struct {
220 			/* offset of the file when the sync request was made */
221 			uint64_t			offset;
222 			TAILQ_ENTRY(spdk_fs_request)	tailq;
223 			bool				xattr_in_progress;
224 			/* length written to the xattr for this file - this should
225 			 * always be the same as the offset if only one thread is
226 			 * writing to the file, but could differ if multiple threads
227 			 * are appending
228 			 */
229 			uint64_t			length;
230 		} sync;
231 		struct {
232 			uint32_t			num_clusters;
233 		} resize;
234 		struct {
235 			const char	*name;
236 			uint32_t	flags;
237 			TAILQ_ENTRY(spdk_fs_request)	tailq;
238 		} open;
239 		struct {
240 			const char		*name;
241 			struct spdk_blob	*blob;
242 		} create;
243 		struct {
244 			const char	*name;
245 		} delete;
246 		struct {
247 			const char	*name;
248 		} stat;
249 	} op;
250 };
251 
252 static void file_free(struct spdk_file *file);
253 static void fs_io_device_unregister(struct spdk_filesystem *fs);
254 static void fs_free_io_channels(struct spdk_filesystem *fs);
255 
256 void
257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
258 {
259 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
260 }
261 
262 static int _blobfs_cache_pool_reclaim(void *arg);
263 
264 static bool
265 blobfs_cache_pool_need_reclaim(void)
266 {
267 	size_t count;
268 
269 	count = spdk_mempool_count(g_cache_pool);
270 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
271 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
272 	 */
273 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
274 		return false;
275 	}
276 
277 	return true;
278 }
279 
280 static void
281 __start_cache_pool_mgmt(void *ctx)
282 {
283 	assert(g_cache_pool == NULL);
284 
285 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
286 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
287 					   CACHE_BUFFER_SIZE,
288 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
289 					   SPDK_ENV_SOCKET_ID_ANY);
290 	if (!g_cache_pool) {
291 		SPDK_ERRLOG("Create mempool failed, you may "
292 			    "increase the memory and try again\n");
293 		assert(false);
294 	}
295 
296 	assert(g_cache_pool_mgmt_poller == NULL);
297 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
298 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
299 }
300 
301 static void
302 __stop_cache_pool_mgmt(void *ctx)
303 {
304 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
305 
306 	assert(g_cache_pool != NULL);
307 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
308 	spdk_mempool_free(g_cache_pool);
309 	g_cache_pool = NULL;
310 
311 	spdk_thread_exit(g_cache_pool_thread);
312 }
313 
314 static void
315 initialize_global_cache(void)
316 {
317 	pthread_mutex_lock(&g_cache_init_lock);
318 	if (g_fs_count == 0) {
319 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
320 		assert(g_cache_pool_thread != NULL);
321 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
322 	}
323 	g_fs_count++;
324 	pthread_mutex_unlock(&g_cache_init_lock);
325 }
326 
327 static void
328 free_global_cache(void)
329 {
330 	pthread_mutex_lock(&g_cache_init_lock);
331 	g_fs_count--;
332 	if (g_fs_count == 0) {
333 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
334 	}
335 	pthread_mutex_unlock(&g_cache_init_lock);
336 }
337 
338 static uint64_t
339 __file_get_blob_size(struct spdk_file *file)
340 {
341 	uint64_t cluster_sz;
342 
343 	cluster_sz = file->fs->bs_opts.cluster_sz;
344 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
345 }
346 
347 struct spdk_fs_request {
348 	struct spdk_fs_cb_args		args;
349 	TAILQ_ENTRY(spdk_fs_request)	link;
350 	struct spdk_fs_channel		*channel;
351 };
352 
353 struct spdk_fs_channel {
354 	struct spdk_fs_request		*req_mem;
355 	TAILQ_HEAD(, spdk_fs_request)	reqs;
356 	sem_t				sem;
357 	struct spdk_filesystem		*fs;
358 	struct spdk_io_channel		*bs_channel;
359 	fs_send_request_fn		send_request;
360 	bool				sync;
361 	uint32_t			outstanding_reqs;
362 	pthread_spinlock_t		lock;
363 };
364 
365 /* For now, this is effectively an alias. But eventually we'll shift
366  * some data members over. */
367 struct spdk_fs_thread_ctx {
368 	struct spdk_fs_channel	ch;
369 };
370 
371 static struct spdk_fs_request *
372 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
373 {
374 	struct spdk_fs_request *req;
375 	struct iovec *iovs = NULL;
376 
377 	if (iovcnt > 1) {
378 		iovs = calloc(iovcnt, sizeof(struct iovec));
379 		if (!iovs) {
380 			return NULL;
381 		}
382 	}
383 
384 	if (channel->sync) {
385 		pthread_spin_lock(&channel->lock);
386 	}
387 
388 	req = TAILQ_FIRST(&channel->reqs);
389 	if (req) {
390 		channel->outstanding_reqs++;
391 		TAILQ_REMOVE(&channel->reqs, req, link);
392 	}
393 
394 	if (channel->sync) {
395 		pthread_spin_unlock(&channel->lock);
396 	}
397 
398 	if (req == NULL) {
399 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
400 		free(iovs);
401 		return NULL;
402 	}
403 	memset(req, 0, sizeof(*req));
404 	req->channel = channel;
405 	if (iovcnt > 1) {
406 		req->args.iovs = iovs;
407 	} else {
408 		req->args.iovs = &req->args.iov;
409 	}
410 	req->args.iovcnt = iovcnt;
411 
412 	return req;
413 }
414 
415 static struct spdk_fs_request *
416 alloc_fs_request(struct spdk_fs_channel *channel)
417 {
418 	return alloc_fs_request_with_iov(channel, 0);
419 }
420 
421 static void
422 free_fs_request(struct spdk_fs_request *req)
423 {
424 	struct spdk_fs_channel *channel = req->channel;
425 
426 	if (req->args.iovcnt > 1) {
427 		free(req->args.iovs);
428 	}
429 
430 	if (channel->sync) {
431 		pthread_spin_lock(&channel->lock);
432 	}
433 
434 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
435 	channel->outstanding_reqs--;
436 
437 	if (channel->sync) {
438 		pthread_spin_unlock(&channel->lock);
439 	}
440 }
441 
442 static int
443 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
444 		  uint32_t max_ops)
445 {
446 	uint32_t i;
447 
448 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
449 	if (!channel->req_mem) {
450 		return -1;
451 	}
452 
453 	channel->outstanding_reqs = 0;
454 	TAILQ_INIT(&channel->reqs);
455 	sem_init(&channel->sem, 0, 0);
456 
457 	for (i = 0; i < max_ops; i++) {
458 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
459 	}
460 
461 	channel->fs = fs;
462 
463 	return 0;
464 }
465 
466 static int
467 fs_md_channel_create(void *io_device, void *ctx_buf)
468 {
469 	struct spdk_filesystem		*fs;
470 	struct spdk_fs_channel		*channel = ctx_buf;
471 
472 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
473 
474 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
475 }
476 
477 static int
478 fs_sync_channel_create(void *io_device, void *ctx_buf)
479 {
480 	struct spdk_filesystem		*fs;
481 	struct spdk_fs_channel		*channel = ctx_buf;
482 
483 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
484 
485 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
486 }
487 
488 static int
489 fs_io_channel_create(void *io_device, void *ctx_buf)
490 {
491 	struct spdk_filesystem		*fs;
492 	struct spdk_fs_channel		*channel = ctx_buf;
493 
494 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
495 
496 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
497 }
498 
499 static void
500 fs_channel_destroy(void *io_device, void *ctx_buf)
501 {
502 	struct spdk_fs_channel *channel = ctx_buf;
503 
504 	if (channel->outstanding_reqs > 0) {
505 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
506 			    channel->outstanding_reqs);
507 	}
508 
509 	free(channel->req_mem);
510 	if (channel->bs_channel != NULL) {
511 		spdk_bs_free_io_channel(channel->bs_channel);
512 	}
513 }
514 
515 static void
516 __send_request_direct(fs_request_fn fn, void *arg)
517 {
518 	fn(arg);
519 }
520 
521 static void
522 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
523 {
524 	fs->bs = bs;
525 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
526 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
527 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
528 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
529 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
530 
531 	initialize_global_cache();
532 }
533 
534 static void
535 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
536 {
537 	struct spdk_fs_request *req = ctx;
538 	struct spdk_fs_cb_args *args = &req->args;
539 	struct spdk_filesystem *fs = args->fs;
540 
541 	if (bserrno == 0) {
542 		common_fs_bs_init(fs, bs);
543 	} else {
544 		free(fs);
545 		fs = NULL;
546 	}
547 
548 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
549 	free_fs_request(req);
550 }
551 
552 static struct spdk_filesystem *
553 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
554 {
555 	struct spdk_filesystem *fs;
556 
557 	fs = calloc(1, sizeof(*fs));
558 	if (fs == NULL) {
559 		return NULL;
560 	}
561 
562 	fs->bdev = dev;
563 	fs->send_request = send_request_fn;
564 	TAILQ_INIT(&fs->files);
565 
566 	fs->md_target.max_ops = 512;
567 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
568 				sizeof(struct spdk_fs_channel), "blobfs_md");
569 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
570 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
571 
572 	fs->sync_target.max_ops = 512;
573 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
574 				sizeof(struct spdk_fs_channel), "blobfs_sync");
575 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
576 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
577 
578 	fs->io_target.max_ops = 512;
579 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
580 				sizeof(struct spdk_fs_channel), "blobfs_io");
581 
582 	return fs;
583 }
584 
585 static void
586 __wake_caller(void *arg, int fserrno)
587 {
588 	struct spdk_fs_cb_args *args = arg;
589 
590 	if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) {
591 		*(args->rwerrno) = fserrno;
592 	}
593 	args->rc = fserrno;
594 	sem_post(args->sem);
595 }
596 
597 void
598 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
599 	     fs_send_request_fn send_request_fn,
600 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
601 {
602 	struct spdk_filesystem *fs;
603 	struct spdk_fs_request *req;
604 	struct spdk_fs_cb_args *args;
605 	struct spdk_bs_opts opts = {};
606 
607 	fs = fs_alloc(dev, send_request_fn);
608 	if (fs == NULL) {
609 		cb_fn(cb_arg, NULL, -ENOMEM);
610 		return;
611 	}
612 
613 	req = alloc_fs_request(fs->md_target.md_fs_channel);
614 	if (req == NULL) {
615 		fs_free_io_channels(fs);
616 		fs_io_device_unregister(fs);
617 		cb_fn(cb_arg, NULL, -ENOMEM);
618 		return;
619 	}
620 
621 	args = &req->args;
622 	args->fn.fs_op_with_handle = cb_fn;
623 	args->arg = cb_arg;
624 	args->fs = fs;
625 
626 	spdk_bs_opts_init(&opts, sizeof(opts));
627 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
628 	if (opt) {
629 		opts.cluster_sz = opt->cluster_sz;
630 	}
631 	spdk_bs_init(dev, &opts, init_cb, req);
632 }
633 
634 static struct spdk_file *
635 file_alloc(struct spdk_filesystem *fs)
636 {
637 	struct spdk_file *file;
638 
639 	file = calloc(1, sizeof(*file));
640 	if (file == NULL) {
641 		return NULL;
642 	}
643 
644 	file->tree = calloc(1, sizeof(*file->tree));
645 	if (file->tree == NULL) {
646 		free(file);
647 		return NULL;
648 	}
649 
650 	if (pthread_spin_init(&file->lock, 0)) {
651 		free(file->tree);
652 		free(file);
653 		return NULL;
654 	}
655 
656 	file->fs = fs;
657 	TAILQ_INIT(&file->open_requests);
658 	TAILQ_INIT(&file->sync_requests);
659 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
660 	file->priority = SPDK_FILE_PRIORITY_LOW;
661 	return file;
662 }
663 
664 static void fs_load_done(void *ctx, int bserrno);
665 
666 static int
667 _handle_deleted_files(struct spdk_fs_request *req)
668 {
669 	struct spdk_fs_cb_args *args = &req->args;
670 	struct spdk_filesystem *fs = args->fs;
671 
672 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
673 		struct spdk_deleted_file *deleted_file;
674 
675 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
676 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
677 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
678 		free(deleted_file);
679 		return 0;
680 	}
681 
682 	return 1;
683 }
684 
685 static void
686 fs_load_done(void *ctx, int bserrno)
687 {
688 	struct spdk_fs_request *req = ctx;
689 	struct spdk_fs_cb_args *args = &req->args;
690 	struct spdk_filesystem *fs = args->fs;
691 
692 	/* The filesystem has been loaded.  Now check if there are any files that
693 	 *  were marked for deletion before last unload.  Do not complete the
694 	 *  fs_load callback until all of them have been deleted on disk.
695 	 */
696 	if (_handle_deleted_files(req) == 0) {
697 		/* We found a file that's been marked for deleting but not actually
698 		 *  deleted yet.  This function will get called again once the delete
699 		 *  operation is completed.
700 		 */
701 		return;
702 	}
703 
704 	args->fn.fs_op_with_handle(args->arg, fs, 0);
705 	free_fs_request(req);
706 
707 }
708 
709 static void
710 _file_build_trace_arg_name(struct spdk_file *f)
711 {
712 	f->trace_arg_name = 0;
713 	memcpy(&f->trace_arg_name, f->name,
714 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
715 }
716 
717 static void
718 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
719 {
720 	struct spdk_fs_request *req = ctx;
721 	struct spdk_fs_cb_args *args = &req->args;
722 	struct spdk_filesystem *fs = args->fs;
723 	uint64_t *length;
724 	const char *name;
725 	uint32_t *is_deleted;
726 	size_t value_len;
727 
728 	if (rc < 0) {
729 		args->fn.fs_op_with_handle(args->arg, fs, rc);
730 		free_fs_request(req);
731 		return;
732 	}
733 
734 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
735 	if (rc < 0) {
736 		args->fn.fs_op_with_handle(args->arg, fs, rc);
737 		free_fs_request(req);
738 		return;
739 	}
740 
741 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
742 	if (rc < 0) {
743 		args->fn.fs_op_with_handle(args->arg, fs, rc);
744 		free_fs_request(req);
745 		return;
746 	}
747 
748 	assert(value_len == 8);
749 
750 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
751 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
752 	if (rc < 0) {
753 		struct spdk_file *f;
754 
755 		f = file_alloc(fs);
756 		if (f == NULL) {
757 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
758 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
759 			free_fs_request(req);
760 			return;
761 		}
762 
763 		f->name = strdup(name);
764 		_file_build_trace_arg_name(f);
765 		f->blobid = spdk_blob_get_id(blob);
766 		f->length = *length;
767 		f->length_flushed = *length;
768 		f->length_xattr = *length;
769 		f->append_pos = *length;
770 		SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length);
771 	} else {
772 		struct spdk_deleted_file *deleted_file;
773 
774 		deleted_file = calloc(1, sizeof(*deleted_file));
775 		if (deleted_file == NULL) {
776 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
777 			free_fs_request(req);
778 			return;
779 		}
780 		deleted_file->id = spdk_blob_get_id(blob);
781 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
782 	}
783 }
784 
785 static void
786 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
787 {
788 	struct spdk_fs_request *req = ctx;
789 	struct spdk_fs_cb_args *args = &req->args;
790 	struct spdk_filesystem *fs = args->fs;
791 	struct spdk_bs_type bstype;
792 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
793 	static const struct spdk_bs_type zeros;
794 
795 	if (bserrno != 0) {
796 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
797 		free_fs_request(req);
798 		fs_free_io_channels(fs);
799 		fs_io_device_unregister(fs);
800 		return;
801 	}
802 
803 	bstype = spdk_bs_get_bstype(bs);
804 
805 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
806 		SPDK_DEBUGLOG(blobfs, "assigning bstype\n");
807 		spdk_bs_set_bstype(bs, blobfs_type);
808 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
809 		SPDK_ERRLOG("not blobfs\n");
810 		SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype));
811 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
812 		free_fs_request(req);
813 		fs_free_io_channels(fs);
814 		fs_io_device_unregister(fs);
815 		return;
816 	}
817 
818 	common_fs_bs_init(fs, bs);
819 	fs_load_done(req, 0);
820 }
821 
822 static void
823 fs_io_device_unregister(struct spdk_filesystem *fs)
824 {
825 	assert(fs != NULL);
826 	spdk_io_device_unregister(&fs->md_target, NULL);
827 	spdk_io_device_unregister(&fs->sync_target, NULL);
828 	spdk_io_device_unregister(&fs->io_target, NULL);
829 	free(fs);
830 }
831 
832 static void
833 fs_free_io_channels(struct spdk_filesystem *fs)
834 {
835 	assert(fs != NULL);
836 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
837 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
838 }
839 
840 void
841 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
842 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
843 {
844 	struct spdk_filesystem *fs;
845 	struct spdk_fs_cb_args *args;
846 	struct spdk_fs_request *req;
847 	struct spdk_bs_opts	bs_opts;
848 
849 	fs = fs_alloc(dev, send_request_fn);
850 	if (fs == NULL) {
851 		cb_fn(cb_arg, NULL, -ENOMEM);
852 		return;
853 	}
854 
855 	req = alloc_fs_request(fs->md_target.md_fs_channel);
856 	if (req == NULL) {
857 		fs_free_io_channels(fs);
858 		fs_io_device_unregister(fs);
859 		cb_fn(cb_arg, NULL, -ENOMEM);
860 		return;
861 	}
862 
863 	args = &req->args;
864 	args->fn.fs_op_with_handle = cb_fn;
865 	args->arg = cb_arg;
866 	args->fs = fs;
867 	TAILQ_INIT(&args->op.fs_load.deleted_files);
868 	spdk_bs_opts_init(&bs_opts, sizeof(bs_opts));
869 	bs_opts.iter_cb_fn = iter_cb;
870 	bs_opts.iter_cb_arg = req;
871 	spdk_bs_load(dev, &bs_opts, load_cb, req);
872 }
873 
874 static void
875 unload_cb(void *ctx, int bserrno)
876 {
877 	struct spdk_fs_request *req = ctx;
878 	struct spdk_fs_cb_args *args = &req->args;
879 	struct spdk_filesystem *fs = args->fs;
880 	struct spdk_file *file, *tmp;
881 
882 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
883 		TAILQ_REMOVE(&fs->files, file, tailq);
884 		file_free(file);
885 	}
886 
887 	free_global_cache();
888 
889 	args->fn.fs_op(args->arg, bserrno);
890 	free(req);
891 
892 	fs_io_device_unregister(fs);
893 }
894 
895 void
896 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
897 {
898 	struct spdk_fs_request *req;
899 	struct spdk_fs_cb_args *args;
900 
901 	/*
902 	 * We must free the md_channel before unloading the blobstore, so just
903 	 *  allocate this request from the general heap.
904 	 */
905 	req = calloc(1, sizeof(*req));
906 	if (req == NULL) {
907 		cb_fn(cb_arg, -ENOMEM);
908 		return;
909 	}
910 
911 	args = &req->args;
912 	args->fn.fs_op = cb_fn;
913 	args->arg = cb_arg;
914 	args->fs = fs;
915 
916 	fs_free_io_channels(fs);
917 	spdk_bs_unload(fs->bs, unload_cb, req);
918 }
919 
920 static struct spdk_file *
921 fs_find_file(struct spdk_filesystem *fs, const char *name)
922 {
923 	struct spdk_file *file;
924 
925 	TAILQ_FOREACH(file, &fs->files, tailq) {
926 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
927 			return file;
928 		}
929 	}
930 
931 	return NULL;
932 }
933 
934 void
935 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
936 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
937 {
938 	struct spdk_file_stat stat;
939 	struct spdk_file *f = NULL;
940 
941 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
942 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
943 		return;
944 	}
945 
946 	f = fs_find_file(fs, name);
947 	if (f != NULL) {
948 		stat.blobid = f->blobid;
949 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
950 		cb_fn(cb_arg, &stat, 0);
951 		return;
952 	}
953 
954 	cb_fn(cb_arg, NULL, -ENOENT);
955 }
956 
957 static void
958 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
959 {
960 	struct spdk_fs_request *req = arg;
961 	struct spdk_fs_cb_args *args = &req->args;
962 
963 	args->rc = fserrno;
964 	if (fserrno == 0) {
965 		memcpy(args->arg, stat, sizeof(*stat));
966 	}
967 	sem_post(args->sem);
968 }
969 
970 static void
971 __file_stat(void *arg)
972 {
973 	struct spdk_fs_request *req = arg;
974 	struct spdk_fs_cb_args *args = &req->args;
975 
976 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
977 				args->fn.stat_op, req);
978 }
979 
980 int
981 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
982 		  const char *name, struct spdk_file_stat *stat)
983 {
984 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
985 	struct spdk_fs_request *req;
986 	int rc;
987 
988 	req = alloc_fs_request(channel);
989 	if (req == NULL) {
990 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
991 		return -ENOMEM;
992 	}
993 
994 	req->args.fs = fs;
995 	req->args.op.stat.name = name;
996 	req->args.fn.stat_op = __copy_stat;
997 	req->args.arg = stat;
998 	req->args.sem = &channel->sem;
999 	channel->send_request(__file_stat, req);
1000 	sem_wait(&channel->sem);
1001 
1002 	rc = req->args.rc;
1003 	free_fs_request(req);
1004 
1005 	return rc;
1006 }
1007 
1008 static void
1009 fs_create_blob_close_cb(void *ctx, int bserrno)
1010 {
1011 	int rc;
1012 	struct spdk_fs_request *req = ctx;
1013 	struct spdk_fs_cb_args *args = &req->args;
1014 
1015 	rc = args->rc ? args->rc : bserrno;
1016 	args->fn.file_op(args->arg, rc);
1017 	free_fs_request(req);
1018 }
1019 
1020 static void
1021 fs_create_blob_resize_cb(void *ctx, int bserrno)
1022 {
1023 	struct spdk_fs_request *req = ctx;
1024 	struct spdk_fs_cb_args *args = &req->args;
1025 	struct spdk_file *f = args->file;
1026 	struct spdk_blob *blob = args->op.create.blob;
1027 	uint64_t length = 0;
1028 
1029 	args->rc = bserrno;
1030 	if (bserrno) {
1031 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1032 		return;
1033 	}
1034 
1035 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1036 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1037 
1038 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1039 }
1040 
1041 static void
1042 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1043 {
1044 	struct spdk_fs_request *req = ctx;
1045 	struct spdk_fs_cb_args *args = &req->args;
1046 
1047 	if (bserrno) {
1048 		args->fn.file_op(args->arg, bserrno);
1049 		free_fs_request(req);
1050 		return;
1051 	}
1052 
1053 	args->op.create.blob = blob;
1054 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1055 }
1056 
1057 static void
1058 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1059 {
1060 	struct spdk_fs_request *req = ctx;
1061 	struct spdk_fs_cb_args *args = &req->args;
1062 	struct spdk_file *f = args->file;
1063 
1064 	if (bserrno) {
1065 		args->fn.file_op(args->arg, bserrno);
1066 		free_fs_request(req);
1067 		return;
1068 	}
1069 
1070 	f->blobid = blobid;
1071 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1072 }
1073 
1074 void
1075 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1076 			  spdk_file_op_complete cb_fn, void *cb_arg)
1077 {
1078 	struct spdk_file *file;
1079 	struct spdk_fs_request *req;
1080 	struct spdk_fs_cb_args *args;
1081 
1082 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1083 		cb_fn(cb_arg, -ENAMETOOLONG);
1084 		return;
1085 	}
1086 
1087 	file = fs_find_file(fs, name);
1088 	if (file != NULL) {
1089 		cb_fn(cb_arg, -EEXIST);
1090 		return;
1091 	}
1092 
1093 	file = file_alloc(fs);
1094 	if (file == NULL) {
1095 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1096 		cb_fn(cb_arg, -ENOMEM);
1097 		return;
1098 	}
1099 
1100 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1101 	if (req == NULL) {
1102 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1103 		cb_fn(cb_arg, -ENOMEM);
1104 		return;
1105 	}
1106 
1107 	args = &req->args;
1108 	args->file = file;
1109 	args->fn.file_op = cb_fn;
1110 	args->arg = cb_arg;
1111 
1112 	file->name = strdup(name);
1113 	_file_build_trace_arg_name(file);
1114 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1115 }
1116 
1117 static void
1118 __fs_create_file_done(void *arg, int fserrno)
1119 {
1120 	struct spdk_fs_request *req = arg;
1121 	struct spdk_fs_cb_args *args = &req->args;
1122 
1123 	__wake_caller(args, fserrno);
1124 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1125 }
1126 
1127 static void
1128 __fs_create_file(void *arg)
1129 {
1130 	struct spdk_fs_request *req = arg;
1131 	struct spdk_fs_cb_args *args = &req->args;
1132 
1133 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1134 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1135 }
1136 
1137 int
1138 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1139 {
1140 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1141 	struct spdk_fs_request *req;
1142 	struct spdk_fs_cb_args *args;
1143 	int rc;
1144 
1145 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1146 
1147 	req = alloc_fs_request(channel);
1148 	if (req == NULL) {
1149 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1150 		return -ENOMEM;
1151 	}
1152 
1153 	args = &req->args;
1154 	args->fs = fs;
1155 	args->op.create.name = name;
1156 	args->sem = &channel->sem;
1157 	fs->send_request(__fs_create_file, req);
1158 	sem_wait(&channel->sem);
1159 	rc = args->rc;
1160 	free_fs_request(req);
1161 
1162 	return rc;
1163 }
1164 
1165 static void
1166 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1167 {
1168 	struct spdk_fs_request *req = ctx;
1169 	struct spdk_fs_cb_args *args = &req->args;
1170 	struct spdk_file *f = args->file;
1171 
1172 	f->blob = blob;
1173 	while (!TAILQ_EMPTY(&f->open_requests)) {
1174 		req = TAILQ_FIRST(&f->open_requests);
1175 		args = &req->args;
1176 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1177 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1178 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1179 		free_fs_request(req);
1180 	}
1181 }
1182 
1183 static void
1184 fs_open_blob_create_cb(void *ctx, int bserrno)
1185 {
1186 	struct spdk_fs_request *req = ctx;
1187 	struct spdk_fs_cb_args *args = &req->args;
1188 	struct spdk_file *file = args->file;
1189 	struct spdk_filesystem *fs = args->fs;
1190 
1191 	if (file == NULL) {
1192 		/*
1193 		 * This is from an open with CREATE flag - the file
1194 		 *  is now created so look it up in the file list for this
1195 		 *  filesystem.
1196 		 */
1197 		file = fs_find_file(fs, args->op.open.name);
1198 		assert(file != NULL);
1199 		args->file = file;
1200 	}
1201 
1202 	file->ref_count++;
1203 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1204 	if (file->ref_count == 1) {
1205 		assert(file->blob == NULL);
1206 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1207 	} else if (file->blob != NULL) {
1208 		fs_open_blob_done(req, file->blob, 0);
1209 	} else {
1210 		/*
1211 		 * The blob open for this file is in progress due to a previous
1212 		 *  open request.  When that open completes, it will invoke the
1213 		 *  open callback for this request.
1214 		 */
1215 	}
1216 }
1217 
1218 void
1219 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1220 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1221 {
1222 	struct spdk_file *f = NULL;
1223 	struct spdk_fs_request *req;
1224 	struct spdk_fs_cb_args *args;
1225 
1226 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1227 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1228 		return;
1229 	}
1230 
1231 	f = fs_find_file(fs, name);
1232 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1233 		cb_fn(cb_arg, NULL, -ENOENT);
1234 		return;
1235 	}
1236 
1237 	if (f != NULL && f->is_deleted == true) {
1238 		cb_fn(cb_arg, NULL, -ENOENT);
1239 		return;
1240 	}
1241 
1242 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1243 	if (req == NULL) {
1244 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1245 		cb_fn(cb_arg, NULL, -ENOMEM);
1246 		return;
1247 	}
1248 
1249 	args = &req->args;
1250 	args->fn.file_op_with_handle = cb_fn;
1251 	args->arg = cb_arg;
1252 	args->file = f;
1253 	args->fs = fs;
1254 	args->op.open.name = name;
1255 
1256 	if (f == NULL) {
1257 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1258 	} else {
1259 		fs_open_blob_create_cb(req, 0);
1260 	}
1261 }
1262 
1263 static void
1264 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1265 {
1266 	struct spdk_fs_request *req = arg;
1267 	struct spdk_fs_cb_args *args = &req->args;
1268 
1269 	args->file = file;
1270 	__wake_caller(args, bserrno);
1271 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1272 }
1273 
1274 static void
1275 __fs_open_file(void *arg)
1276 {
1277 	struct spdk_fs_request *req = arg;
1278 	struct spdk_fs_cb_args *args = &req->args;
1279 
1280 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1281 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1282 				__fs_open_file_done, req);
1283 }
1284 
1285 int
1286 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1287 		  const char *name, uint32_t flags, struct spdk_file **file)
1288 {
1289 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1290 	struct spdk_fs_request *req;
1291 	struct spdk_fs_cb_args *args;
1292 	int rc;
1293 
1294 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1295 
1296 	req = alloc_fs_request(channel);
1297 	if (req == NULL) {
1298 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1299 		return -ENOMEM;
1300 	}
1301 
1302 	args = &req->args;
1303 	args->fs = fs;
1304 	args->op.open.name = name;
1305 	args->op.open.flags = flags;
1306 	args->sem = &channel->sem;
1307 	fs->send_request(__fs_open_file, req);
1308 	sem_wait(&channel->sem);
1309 	rc = args->rc;
1310 	if (rc == 0) {
1311 		*file = args->file;
1312 	} else {
1313 		*file = NULL;
1314 	}
1315 	free_fs_request(req);
1316 
1317 	return rc;
1318 }
1319 
1320 static void
1321 fs_rename_blob_close_cb(void *ctx, int bserrno)
1322 {
1323 	struct spdk_fs_request *req = ctx;
1324 	struct spdk_fs_cb_args *args = &req->args;
1325 
1326 	args->fn.fs_op(args->arg, bserrno);
1327 	free_fs_request(req);
1328 }
1329 
1330 static void
1331 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1332 {
1333 	struct spdk_fs_request *req = ctx;
1334 	struct spdk_fs_cb_args *args = &req->args;
1335 	const char *new_name = args->op.rename.new_name;
1336 
1337 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1338 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1339 }
1340 
1341 static void
1342 _fs_md_rename_file(struct spdk_fs_request *req)
1343 {
1344 	struct spdk_fs_cb_args *args = &req->args;
1345 	struct spdk_file *f;
1346 
1347 	f = fs_find_file(args->fs, args->op.rename.old_name);
1348 	if (f == NULL) {
1349 		args->fn.fs_op(args->arg, -ENOENT);
1350 		free_fs_request(req);
1351 		return;
1352 	}
1353 
1354 	free(f->name);
1355 	f->name = strdup(args->op.rename.new_name);
1356 	_file_build_trace_arg_name(f);
1357 	args->file = f;
1358 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1359 }
1360 
1361 static void
1362 fs_rename_delete_done(void *arg, int fserrno)
1363 {
1364 	_fs_md_rename_file(arg);
1365 }
1366 
1367 void
1368 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1369 			  const char *old_name, const char *new_name,
1370 			  spdk_file_op_complete cb_fn, void *cb_arg)
1371 {
1372 	struct spdk_file *f;
1373 	struct spdk_fs_request *req;
1374 	struct spdk_fs_cb_args *args;
1375 
1376 	SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name);
1377 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1378 		cb_fn(cb_arg, -ENAMETOOLONG);
1379 		return;
1380 	}
1381 
1382 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1383 	if (req == NULL) {
1384 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1385 			    new_name);
1386 		cb_fn(cb_arg, -ENOMEM);
1387 		return;
1388 	}
1389 
1390 	args = &req->args;
1391 	args->fn.fs_op = cb_fn;
1392 	args->fs = fs;
1393 	args->arg = cb_arg;
1394 	args->op.rename.old_name = old_name;
1395 	args->op.rename.new_name = new_name;
1396 
1397 	f = fs_find_file(fs, new_name);
1398 	if (f == NULL) {
1399 		_fs_md_rename_file(req);
1400 		return;
1401 	}
1402 
1403 	/*
1404 	 * The rename overwrites an existing file.  So delete the existing file, then
1405 	 *  do the actual rename.
1406 	 */
1407 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1408 }
1409 
1410 static void
1411 __fs_rename_file_done(void *arg, int fserrno)
1412 {
1413 	struct spdk_fs_request *req = arg;
1414 	struct spdk_fs_cb_args *args = &req->args;
1415 
1416 	__wake_caller(args, fserrno);
1417 }
1418 
1419 static void
1420 __fs_rename_file(void *arg)
1421 {
1422 	struct spdk_fs_request *req = arg;
1423 	struct spdk_fs_cb_args *args = &req->args;
1424 
1425 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1426 				  __fs_rename_file_done, req);
1427 }
1428 
1429 int
1430 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1431 		    const char *old_name, const char *new_name)
1432 {
1433 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1434 	struct spdk_fs_request *req;
1435 	struct spdk_fs_cb_args *args;
1436 	int rc;
1437 
1438 	req = alloc_fs_request(channel);
1439 	if (req == NULL) {
1440 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1441 		return -ENOMEM;
1442 	}
1443 
1444 	args = &req->args;
1445 
1446 	args->fs = fs;
1447 	args->op.rename.old_name = old_name;
1448 	args->op.rename.new_name = new_name;
1449 	args->sem = &channel->sem;
1450 	fs->send_request(__fs_rename_file, req);
1451 	sem_wait(&channel->sem);
1452 	rc = args->rc;
1453 	free_fs_request(req);
1454 	return rc;
1455 }
1456 
1457 static void
1458 blob_delete_cb(void *ctx, int bserrno)
1459 {
1460 	struct spdk_fs_request *req = ctx;
1461 	struct spdk_fs_cb_args *args = &req->args;
1462 
1463 	args->fn.file_op(args->arg, bserrno);
1464 	free_fs_request(req);
1465 }
1466 
1467 void
1468 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1469 			  spdk_file_op_complete cb_fn, void *cb_arg)
1470 {
1471 	struct spdk_file *f;
1472 	spdk_blob_id blobid;
1473 	struct spdk_fs_request *req;
1474 	struct spdk_fs_cb_args *args;
1475 
1476 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1477 
1478 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1479 		cb_fn(cb_arg, -ENAMETOOLONG);
1480 		return;
1481 	}
1482 
1483 	f = fs_find_file(fs, name);
1484 	if (f == NULL) {
1485 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1486 		cb_fn(cb_arg, -ENOENT);
1487 		return;
1488 	}
1489 
1490 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1491 	if (req == NULL) {
1492 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1493 		cb_fn(cb_arg, -ENOMEM);
1494 		return;
1495 	}
1496 
1497 	args = &req->args;
1498 	args->fn.file_op = cb_fn;
1499 	args->arg = cb_arg;
1500 
1501 	if (f->ref_count > 0) {
1502 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1503 		f->is_deleted = true;
1504 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1505 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1506 		return;
1507 	}
1508 
1509 	blobid = f->blobid;
1510 	TAILQ_REMOVE(&fs->files, f, tailq);
1511 
1512 	file_free(f);
1513 
1514 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1515 }
1516 
1517 static uint64_t
1518 fs_name_to_uint64(const char *name)
1519 {
1520 	uint64_t result = 0;
1521 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1522 	return result;
1523 }
1524 
1525 static void
1526 __fs_delete_file_done(void *arg, int fserrno)
1527 {
1528 	struct spdk_fs_request *req = arg;
1529 	struct spdk_fs_cb_args *args = &req->args;
1530 
1531 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1532 	__wake_caller(args, fserrno);
1533 }
1534 
1535 static void
1536 __fs_delete_file(void *arg)
1537 {
1538 	struct spdk_fs_request *req = arg;
1539 	struct spdk_fs_cb_args *args = &req->args;
1540 
1541 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1542 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1543 }
1544 
1545 int
1546 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1547 		    const char *name)
1548 {
1549 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1550 	struct spdk_fs_request *req;
1551 	struct spdk_fs_cb_args *args;
1552 	int rc;
1553 
1554 	req = alloc_fs_request(channel);
1555 	if (req == NULL) {
1556 		SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name);
1557 		return -ENOMEM;
1558 	}
1559 
1560 	args = &req->args;
1561 	args->fs = fs;
1562 	args->op.delete.name = name;
1563 	args->sem = &channel->sem;
1564 	fs->send_request(__fs_delete_file, req);
1565 	sem_wait(&channel->sem);
1566 	rc = args->rc;
1567 	free_fs_request(req);
1568 
1569 	return rc;
1570 }
1571 
1572 spdk_fs_iter
1573 spdk_fs_iter_first(struct spdk_filesystem *fs)
1574 {
1575 	struct spdk_file *f;
1576 
1577 	f = TAILQ_FIRST(&fs->files);
1578 	return f;
1579 }
1580 
1581 spdk_fs_iter
1582 spdk_fs_iter_next(spdk_fs_iter iter)
1583 {
1584 	struct spdk_file *f = iter;
1585 
1586 	if (f == NULL) {
1587 		return NULL;
1588 	}
1589 
1590 	f = TAILQ_NEXT(f, tailq);
1591 	return f;
1592 }
1593 
1594 const char *
1595 spdk_file_get_name(struct spdk_file *file)
1596 {
1597 	return file->name;
1598 }
1599 
1600 uint64_t
1601 spdk_file_get_length(struct spdk_file *file)
1602 {
1603 	uint64_t length;
1604 
1605 	assert(file != NULL);
1606 
1607 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1608 	SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length);
1609 	return length;
1610 }
1611 
1612 static void
1613 fs_truncate_complete_cb(void *ctx, int bserrno)
1614 {
1615 	struct spdk_fs_request *req = ctx;
1616 	struct spdk_fs_cb_args *args = &req->args;
1617 
1618 	args->fn.file_op(args->arg, bserrno);
1619 	free_fs_request(req);
1620 }
1621 
1622 static void
1623 fs_truncate_resize_cb(void *ctx, int bserrno)
1624 {
1625 	struct spdk_fs_request *req = ctx;
1626 	struct spdk_fs_cb_args *args = &req->args;
1627 	struct spdk_file *file = args->file;
1628 	uint64_t *length = &args->op.truncate.length;
1629 
1630 	if (bserrno) {
1631 		args->fn.file_op(args->arg, bserrno);
1632 		free_fs_request(req);
1633 		return;
1634 	}
1635 
1636 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1637 
1638 	file->length = *length;
1639 	if (file->append_pos > file->length) {
1640 		file->append_pos = file->length;
1641 	}
1642 
1643 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1644 }
1645 
1646 static uint64_t
1647 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1648 {
1649 	return (length + cluster_sz - 1) / cluster_sz;
1650 }
1651 
1652 void
1653 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1654 			 spdk_file_op_complete cb_fn, void *cb_arg)
1655 {
1656 	struct spdk_filesystem *fs;
1657 	size_t num_clusters;
1658 	struct spdk_fs_request *req;
1659 	struct spdk_fs_cb_args *args;
1660 
1661 	SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1662 	if (length == file->length) {
1663 		cb_fn(cb_arg, 0);
1664 		return;
1665 	}
1666 
1667 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1668 	if (req == NULL) {
1669 		cb_fn(cb_arg, -ENOMEM);
1670 		return;
1671 	}
1672 
1673 	args = &req->args;
1674 	args->fn.file_op = cb_fn;
1675 	args->arg = cb_arg;
1676 	args->file = file;
1677 	args->op.truncate.length = length;
1678 	fs = file->fs;
1679 
1680 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1681 
1682 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1683 }
1684 
1685 static void
1686 __truncate(void *arg)
1687 {
1688 	struct spdk_fs_request *req = arg;
1689 	struct spdk_fs_cb_args *args = &req->args;
1690 
1691 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1692 				 args->fn.file_op, args);
1693 }
1694 
1695 int
1696 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1697 		   uint64_t length)
1698 {
1699 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1700 	struct spdk_fs_request *req;
1701 	struct spdk_fs_cb_args *args;
1702 	int rc;
1703 
1704 	req = alloc_fs_request(channel);
1705 	if (req == NULL) {
1706 		return -ENOMEM;
1707 	}
1708 
1709 	args = &req->args;
1710 
1711 	args->file = file;
1712 	args->op.truncate.length = length;
1713 	args->fn.file_op = __wake_caller;
1714 	args->sem = &channel->sem;
1715 
1716 	channel->send_request(__truncate, req);
1717 	sem_wait(&channel->sem);
1718 	rc = args->rc;
1719 	free_fs_request(req);
1720 
1721 	return rc;
1722 }
1723 
1724 static void
1725 __rw_done(void *ctx, int bserrno)
1726 {
1727 	struct spdk_fs_request *req = ctx;
1728 	struct spdk_fs_cb_args *args = &req->args;
1729 
1730 	spdk_free(args->op.rw.pin_buf);
1731 	args->fn.file_op(args->arg, bserrno);
1732 	free_fs_request(req);
1733 }
1734 
1735 static void
1736 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1737 {
1738 	int i;
1739 	size_t len;
1740 
1741 	for (i = 0; i < iovcnt; i++) {
1742 		len = spdk_min(iovs[i].iov_len, buf_len);
1743 		memcpy(buf, iovs[i].iov_base, len);
1744 		buf += len;
1745 		assert(buf_len >= len);
1746 		buf_len -= len;
1747 	}
1748 }
1749 
1750 static void
1751 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1752 {
1753 	int i;
1754 	size_t len;
1755 
1756 	for (i = 0; i < iovcnt; i++) {
1757 		len = spdk_min(iovs[i].iov_len, buf_len);
1758 		memcpy(iovs[i].iov_base, buf, len);
1759 		buf += len;
1760 		assert(buf_len >= len);
1761 		buf_len -= len;
1762 	}
1763 }
1764 
1765 static void
1766 __read_done(void *ctx, int bserrno)
1767 {
1768 	struct spdk_fs_request *req = ctx;
1769 	struct spdk_fs_cb_args *args = &req->args;
1770 	void *buf;
1771 
1772 	assert(req != NULL);
1773 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1774 	if (args->op.rw.is_read) {
1775 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1776 		__rw_done(req, 0);
1777 	} else {
1778 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1779 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1780 				   args->op.rw.pin_buf,
1781 				   args->op.rw.start_lba, args->op.rw.num_lba,
1782 				   __rw_done, req);
1783 	}
1784 }
1785 
1786 static void
1787 __do_blob_read(void *ctx, int fserrno)
1788 {
1789 	struct spdk_fs_request *req = ctx;
1790 	struct spdk_fs_cb_args *args = &req->args;
1791 
1792 	if (fserrno) {
1793 		__rw_done(req, fserrno);
1794 		return;
1795 	}
1796 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1797 			  args->op.rw.pin_buf,
1798 			  args->op.rw.start_lba, args->op.rw.num_lba,
1799 			  __read_done, req);
1800 }
1801 
1802 static void
1803 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1804 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1805 {
1806 	uint64_t end_lba;
1807 
1808 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1809 	*start_lba = offset / *lba_size;
1810 	end_lba = (offset + length - 1) / *lba_size;
1811 	*num_lba = (end_lba - *start_lba + 1);
1812 }
1813 
1814 static bool
1815 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1816 {
1817 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1818 
1819 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1820 		return true;
1821 	}
1822 
1823 	return false;
1824 }
1825 
1826 static void
1827 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1828 {
1829 	uint32_t i;
1830 
1831 	for (i = 0; i < iovcnt; i++) {
1832 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1833 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1834 	}
1835 }
1836 
1837 static void
1838 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1839 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1840 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1841 {
1842 	struct spdk_fs_request *req;
1843 	struct spdk_fs_cb_args *args;
1844 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1845 	uint64_t start_lba, num_lba, pin_buf_length;
1846 	uint32_t lba_size;
1847 
1848 	if (is_read && offset + length > file->length) {
1849 		cb_fn(cb_arg, -EINVAL);
1850 		return;
1851 	}
1852 
1853 	req = alloc_fs_request_with_iov(channel, iovcnt);
1854 	if (req == NULL) {
1855 		cb_fn(cb_arg, -ENOMEM);
1856 		return;
1857 	}
1858 
1859 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1860 
1861 	args = &req->args;
1862 	args->fn.file_op = cb_fn;
1863 	args->arg = cb_arg;
1864 	args->file = file;
1865 	args->op.rw.channel = channel->bs_channel;
1866 	_fs_request_setup_iovs(req, iovs, iovcnt);
1867 	args->op.rw.is_read = is_read;
1868 	args->op.rw.offset = offset;
1869 	args->op.rw.blocklen = lba_size;
1870 
1871 	pin_buf_length = num_lba * lba_size;
1872 	args->op.rw.length = pin_buf_length;
1873 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1874 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1875 	if (args->op.rw.pin_buf == NULL) {
1876 		SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1877 			      file->name, offset, length);
1878 		free_fs_request(req);
1879 		cb_fn(cb_arg, -ENOMEM);
1880 		return;
1881 	}
1882 
1883 	args->op.rw.start_lba = start_lba;
1884 	args->op.rw.num_lba = num_lba;
1885 
1886 	if (!is_read && file->length < offset + length) {
1887 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1888 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1889 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1890 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1891 				   args->op.rw.pin_buf,
1892 				   args->op.rw.start_lba, args->op.rw.num_lba,
1893 				   __rw_done, req);
1894 	} else {
1895 		__do_blob_read(req, 0);
1896 	}
1897 }
1898 
1899 static void
1900 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1901 	    void *payload, uint64_t offset, uint64_t length,
1902 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1903 {
1904 	struct iovec iov;
1905 
1906 	iov.iov_base = payload;
1907 	iov.iov_len = (size_t)length;
1908 
1909 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1910 }
1911 
1912 void
1913 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1914 		      void *payload, uint64_t offset, uint64_t length,
1915 		      spdk_file_op_complete cb_fn, void *cb_arg)
1916 {
1917 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1918 }
1919 
1920 void
1921 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1922 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1923 		       spdk_file_op_complete cb_fn, void *cb_arg)
1924 {
1925 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1926 		      file->name, offset, length);
1927 
1928 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1929 }
1930 
1931 void
1932 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1933 		     void *payload, uint64_t offset, uint64_t length,
1934 		     spdk_file_op_complete cb_fn, void *cb_arg)
1935 {
1936 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1937 		      file->name, offset, length);
1938 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1939 }
1940 
1941 void
1942 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1943 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1944 		      spdk_file_op_complete cb_fn, void *cb_arg)
1945 {
1946 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1947 		      file->name, offset, length);
1948 
1949 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1950 }
1951 
1952 struct spdk_io_channel *
1953 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1954 {
1955 	struct spdk_io_channel *io_channel;
1956 	struct spdk_fs_channel *fs_channel;
1957 
1958 	io_channel = spdk_get_io_channel(&fs->io_target);
1959 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1960 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1961 	fs_channel->send_request = __send_request_direct;
1962 
1963 	return io_channel;
1964 }
1965 
1966 void
1967 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1968 {
1969 	spdk_put_io_channel(channel);
1970 }
1971 
1972 struct spdk_fs_thread_ctx *
1973 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1974 {
1975 	struct spdk_fs_thread_ctx *ctx;
1976 
1977 	ctx = calloc(1, sizeof(*ctx));
1978 	if (!ctx) {
1979 		return NULL;
1980 	}
1981 
1982 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
1983 		free(ctx);
1984 		return NULL;
1985 	}
1986 
1987 	fs_channel_create(fs, &ctx->ch, 512);
1988 
1989 	ctx->ch.send_request = fs->send_request;
1990 	ctx->ch.sync = 1;
1991 
1992 	return ctx;
1993 }
1994 
1995 
1996 void
1997 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1998 {
1999 	assert(ctx->ch.sync == 1);
2000 
2001 	while (true) {
2002 		pthread_spin_lock(&ctx->ch.lock);
2003 		if (ctx->ch.outstanding_reqs == 0) {
2004 			pthread_spin_unlock(&ctx->ch.lock);
2005 			break;
2006 		}
2007 		pthread_spin_unlock(&ctx->ch.lock);
2008 		usleep(1000);
2009 	}
2010 
2011 	fs_channel_destroy(NULL, &ctx->ch);
2012 	free(ctx);
2013 }
2014 
2015 int
2016 spdk_fs_set_cache_size(uint64_t size_in_mb)
2017 {
2018 	/* setting g_fs_cache_size is only permitted if cache pool
2019 	 * is already freed or hasn't been initialized
2020 	 */
2021 	if (g_cache_pool != NULL) {
2022 		return -EPERM;
2023 	}
2024 
2025 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2026 
2027 	return 0;
2028 }
2029 
2030 uint64_t
2031 spdk_fs_get_cache_size(void)
2032 {
2033 	return g_fs_cache_size / (1024 * 1024);
2034 }
2035 
2036 static void __file_flush(void *ctx);
2037 
2038 /* Try to free some cache buffers from this file.
2039  */
2040 static int
2041 reclaim_cache_buffers(struct spdk_file *file)
2042 {
2043 	int rc;
2044 
2045 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2046 
2047 	/* The function is safe to be called with any threads, while the file
2048 	 * lock maybe locked by other thread for now, so try to get the file
2049 	 * lock here.
2050 	 */
2051 	rc = pthread_spin_trylock(&file->lock);
2052 	if (rc != 0) {
2053 		return -1;
2054 	}
2055 
2056 	if (file->tree->present_mask == 0) {
2057 		pthread_spin_unlock(&file->lock);
2058 		return -1;
2059 	}
2060 	tree_free_buffers(file->tree);
2061 
2062 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2063 	/* If not freed, put it in the end of the queue */
2064 	if (file->tree->present_mask != 0) {
2065 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2066 	} else {
2067 		file->last = NULL;
2068 	}
2069 	pthread_spin_unlock(&file->lock);
2070 
2071 	return 0;
2072 }
2073 
2074 static int
2075 _blobfs_cache_pool_reclaim(void *arg)
2076 {
2077 	struct spdk_file *file, *tmp;
2078 	int rc;
2079 
2080 	if (!blobfs_cache_pool_need_reclaim()) {
2081 		return SPDK_POLLER_IDLE;
2082 	}
2083 
2084 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2085 		if (!file->open_for_writing &&
2086 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2087 			rc = reclaim_cache_buffers(file);
2088 			if (rc < 0) {
2089 				continue;
2090 			}
2091 			if (!blobfs_cache_pool_need_reclaim()) {
2092 				return SPDK_POLLER_BUSY;
2093 			}
2094 			break;
2095 		}
2096 	}
2097 
2098 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2099 		if (!file->open_for_writing) {
2100 			rc = reclaim_cache_buffers(file);
2101 			if (rc < 0) {
2102 				continue;
2103 			}
2104 			if (!blobfs_cache_pool_need_reclaim()) {
2105 				return SPDK_POLLER_BUSY;
2106 			}
2107 			break;
2108 		}
2109 	}
2110 
2111 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2112 		rc = reclaim_cache_buffers(file);
2113 		if (rc < 0) {
2114 			continue;
2115 		}
2116 		break;
2117 	}
2118 
2119 	return SPDK_POLLER_BUSY;
2120 }
2121 
2122 static void
2123 _add_file_to_cache_pool(void *ctx)
2124 {
2125 	struct spdk_file *file = ctx;
2126 
2127 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2128 }
2129 
2130 static void
2131 _remove_file_from_cache_pool(void *ctx)
2132 {
2133 	struct spdk_file *file = ctx;
2134 
2135 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2136 }
2137 
2138 static struct cache_buffer *
2139 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2140 {
2141 	struct cache_buffer *buf;
2142 	int count = 0;
2143 	bool need_update = false;
2144 
2145 	buf = calloc(1, sizeof(*buf));
2146 	if (buf == NULL) {
2147 		SPDK_DEBUGLOG(blobfs, "calloc failed\n");
2148 		return NULL;
2149 	}
2150 
2151 	do {
2152 		buf->buf = spdk_mempool_get(g_cache_pool);
2153 		if (buf->buf) {
2154 			break;
2155 		}
2156 		if (count++ == 100) {
2157 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2158 				    file, offset);
2159 			free(buf);
2160 			return NULL;
2161 		}
2162 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2163 	} while (true);
2164 
2165 	buf->buf_size = CACHE_BUFFER_SIZE;
2166 	buf->offset = offset;
2167 
2168 	if (file->tree->present_mask == 0) {
2169 		need_update = true;
2170 	}
2171 	file->tree = tree_insert_buffer(file->tree, buf);
2172 
2173 	if (need_update) {
2174 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2175 	}
2176 
2177 	return buf;
2178 }
2179 
2180 static struct cache_buffer *
2181 cache_append_buffer(struct spdk_file *file)
2182 {
2183 	struct cache_buffer *last;
2184 
2185 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2186 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2187 
2188 	last = cache_insert_buffer(file, file->append_pos);
2189 	if (last == NULL) {
2190 		SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n");
2191 		return NULL;
2192 	}
2193 
2194 	file->last = last;
2195 
2196 	return last;
2197 }
2198 
2199 static void __check_sync_reqs(struct spdk_file *file);
2200 
2201 static void
2202 __file_cache_finish_sync(void *ctx, int bserrno)
2203 {
2204 	struct spdk_file *file;
2205 	struct spdk_fs_request *sync_req = ctx;
2206 	struct spdk_fs_cb_args *sync_args;
2207 
2208 	sync_args = &sync_req->args;
2209 	file = sync_args->file;
2210 	pthread_spin_lock(&file->lock);
2211 	file->length_xattr = sync_args->op.sync.length;
2212 	assert(sync_args->op.sync.offset <= file->length_flushed);
2213 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2214 			  0, file->trace_arg_name);
2215 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2216 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2217 	pthread_spin_unlock(&file->lock);
2218 
2219 	sync_args->fn.file_op(sync_args->arg, bserrno);
2220 
2221 	free_fs_request(sync_req);
2222 	__check_sync_reqs(file);
2223 }
2224 
2225 static void
2226 __check_sync_reqs(struct spdk_file *file)
2227 {
2228 	struct spdk_fs_request *sync_req;
2229 
2230 	pthread_spin_lock(&file->lock);
2231 
2232 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2233 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2234 			break;
2235 		}
2236 	}
2237 
2238 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2239 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2240 		sync_req->args.op.sync.xattr_in_progress = true;
2241 		sync_req->args.op.sync.length = file->length_flushed;
2242 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2243 				    sizeof(file->length_flushed));
2244 
2245 		pthread_spin_unlock(&file->lock);
2246 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2247 				  0, file->trace_arg_name);
2248 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2249 	} else {
2250 		pthread_spin_unlock(&file->lock);
2251 	}
2252 }
2253 
2254 static void
2255 __file_flush_done(void *ctx, int bserrno)
2256 {
2257 	struct spdk_fs_request *req = ctx;
2258 	struct spdk_fs_cb_args *args = &req->args;
2259 	struct spdk_file *file = args->file;
2260 	struct cache_buffer *next = args->op.flush.cache_buffer;
2261 
2262 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2263 
2264 	pthread_spin_lock(&file->lock);
2265 	next->in_progress = false;
2266 	next->bytes_flushed += args->op.flush.length;
2267 	file->length_flushed += args->op.flush.length;
2268 	if (file->length_flushed > file->length) {
2269 		file->length = file->length_flushed;
2270 	}
2271 	if (next->bytes_flushed == next->buf_size) {
2272 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2273 		next = tree_find_buffer(file->tree, file->length_flushed);
2274 	}
2275 
2276 	/*
2277 	 * Assert that there is no cached data that extends past the end of the underlying
2278 	 *  blob.
2279 	 */
2280 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2281 	       next->bytes_filled == 0);
2282 
2283 	pthread_spin_unlock(&file->lock);
2284 
2285 	__check_sync_reqs(file);
2286 
2287 	__file_flush(req);
2288 }
2289 
2290 static void
2291 __file_flush(void *ctx)
2292 {
2293 	struct spdk_fs_request *req = ctx;
2294 	struct spdk_fs_cb_args *args = &req->args;
2295 	struct spdk_file *file = args->file;
2296 	struct cache_buffer *next;
2297 	uint64_t offset, length, start_lba, num_lba;
2298 	uint32_t lba_size;
2299 
2300 	pthread_spin_lock(&file->lock);
2301 	next = tree_find_buffer(file->tree, file->length_flushed);
2302 	if (next == NULL || next->in_progress ||
2303 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2304 		/*
2305 		 * There is either no data to flush, a flush I/O is already in
2306 		 *  progress, or the next buffer is partially filled but there's no
2307 		 *  outstanding request to sync it.
2308 		 * So return immediately - if a flush I/O is in progress we will flush
2309 		 *  more data after that is completed, or a partial buffer will get flushed
2310 		 *  when it is either filled or the file is synced.
2311 		 */
2312 		free_fs_request(req);
2313 		if (next == NULL) {
2314 			/*
2315 			 * For cases where a file's cache was evicted, and then the
2316 			 *  file was later appended, we will write the data directly
2317 			 *  to disk and bypass cache.  So just update length_flushed
2318 			 *  here to reflect that all data was already written to disk.
2319 			 */
2320 			file->length_flushed = file->append_pos;
2321 		}
2322 		pthread_spin_unlock(&file->lock);
2323 		if (next == NULL) {
2324 			/*
2325 			 * There is no data to flush, but we still need to check for any
2326 			 *  outstanding sync requests to make sure metadata gets updated.
2327 			 */
2328 			__check_sync_reqs(file);
2329 		}
2330 		return;
2331 	}
2332 
2333 	offset = next->offset + next->bytes_flushed;
2334 	length = next->bytes_filled - next->bytes_flushed;
2335 	if (length == 0) {
2336 		free_fs_request(req);
2337 		pthread_spin_unlock(&file->lock);
2338 		/*
2339 		 * There is no data to flush, but we still need to check for any
2340 		 *  outstanding sync requests to make sure metadata gets updated.
2341 		 */
2342 		__check_sync_reqs(file);
2343 		return;
2344 	}
2345 	args->op.flush.length = length;
2346 	args->op.flush.cache_buffer = next;
2347 
2348 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2349 
2350 	next->in_progress = true;
2351 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2352 		     offset, length, start_lba, num_lba);
2353 	pthread_spin_unlock(&file->lock);
2354 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2355 			   next->buf + (start_lba * lba_size) - next->offset,
2356 			   start_lba, num_lba, __file_flush_done, req);
2357 }
2358 
2359 static void
2360 __file_extend_done(void *arg, int bserrno)
2361 {
2362 	struct spdk_fs_cb_args *args = arg;
2363 
2364 	__wake_caller(args, bserrno);
2365 }
2366 
2367 static void
2368 __file_extend_resize_cb(void *_args, int bserrno)
2369 {
2370 	struct spdk_fs_cb_args *args = _args;
2371 	struct spdk_file *file = args->file;
2372 
2373 	if (bserrno) {
2374 		__wake_caller(args, bserrno);
2375 		return;
2376 	}
2377 
2378 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2379 }
2380 
2381 static void
2382 __file_extend_blob(void *_args)
2383 {
2384 	struct spdk_fs_cb_args *args = _args;
2385 	struct spdk_file *file = args->file;
2386 
2387 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2388 }
2389 
2390 static void
2391 __rw_from_file_done(void *ctx, int bserrno)
2392 {
2393 	struct spdk_fs_request *req = ctx;
2394 
2395 	__wake_caller(&req->args, bserrno);
2396 	free_fs_request(req);
2397 }
2398 
2399 static void
2400 __rw_from_file(void *ctx)
2401 {
2402 	struct spdk_fs_request *req = ctx;
2403 	struct spdk_fs_cb_args *args = &req->args;
2404 	struct spdk_file *file = args->file;
2405 
2406 	if (args->op.rw.is_read) {
2407 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2408 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2409 				     __rw_from_file_done, req);
2410 	} else {
2411 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2412 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2413 				      __rw_from_file_done, req);
2414 	}
2415 }
2416 
2417 struct rw_from_file_arg {
2418 	struct spdk_fs_channel *channel;
2419 	int rwerrno;
2420 };
2421 
2422 static int
2423 __send_rw_from_file(struct spdk_file *file, void *payload,
2424 		    uint64_t offset, uint64_t length, bool is_read,
2425 		    struct rw_from_file_arg *arg)
2426 {
2427 	struct spdk_fs_request *req;
2428 	struct spdk_fs_cb_args *args;
2429 
2430 	req = alloc_fs_request_with_iov(arg->channel, 1);
2431 	if (req == NULL) {
2432 		sem_post(&arg->channel->sem);
2433 		return -ENOMEM;
2434 	}
2435 
2436 	args = &req->args;
2437 	args->file = file;
2438 	args->sem = &arg->channel->sem;
2439 	args->iovs[0].iov_base = payload;
2440 	args->iovs[0].iov_len = (size_t)length;
2441 	args->op.rw.offset = offset;
2442 	args->op.rw.is_read = is_read;
2443 	args->rwerrno = &arg->rwerrno;
2444 	file->fs->send_request(__rw_from_file, req);
2445 	return 0;
2446 }
2447 
2448 int
2449 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2450 		void *payload, uint64_t offset, uint64_t length)
2451 {
2452 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2453 	struct spdk_fs_request *flush_req;
2454 	uint64_t rem_length, copy, blob_size, cluster_sz;
2455 	uint32_t cache_buffers_filled = 0;
2456 	uint8_t *cur_payload;
2457 	struct cache_buffer *last;
2458 
2459 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2460 
2461 	if (length == 0) {
2462 		return 0;
2463 	}
2464 
2465 	if (offset != file->append_pos) {
2466 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2467 		return -EINVAL;
2468 	}
2469 
2470 	pthread_spin_lock(&file->lock);
2471 	file->open_for_writing = true;
2472 
2473 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2474 		cache_append_buffer(file);
2475 	}
2476 
2477 	if (file->last == NULL) {
2478 		struct rw_from_file_arg arg = {};
2479 		int rc;
2480 
2481 		arg.channel = channel;
2482 		arg.rwerrno = 0;
2483 		file->append_pos += length;
2484 		pthread_spin_unlock(&file->lock);
2485 		rc = __send_rw_from_file(file, payload, offset, length, false, &arg);
2486 		if (rc != 0) {
2487 			return rc;
2488 		}
2489 		sem_wait(&channel->sem);
2490 		return arg.rwerrno;
2491 	}
2492 
2493 	blob_size = __file_get_blob_size(file);
2494 
2495 	if ((offset + length) > blob_size) {
2496 		struct spdk_fs_cb_args extend_args = {};
2497 
2498 		cluster_sz = file->fs->bs_opts.cluster_sz;
2499 		extend_args.sem = &channel->sem;
2500 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2501 		extend_args.file = file;
2502 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2503 		pthread_spin_unlock(&file->lock);
2504 		file->fs->send_request(__file_extend_blob, &extend_args);
2505 		sem_wait(&channel->sem);
2506 		if (extend_args.rc) {
2507 			return extend_args.rc;
2508 		}
2509 	}
2510 
2511 	flush_req = alloc_fs_request(channel);
2512 	if (flush_req == NULL) {
2513 		pthread_spin_unlock(&file->lock);
2514 		return -ENOMEM;
2515 	}
2516 
2517 	last = file->last;
2518 	rem_length = length;
2519 	cur_payload = payload;
2520 	while (rem_length > 0) {
2521 		copy = last->buf_size - last->bytes_filled;
2522 		if (copy > rem_length) {
2523 			copy = rem_length;
2524 		}
2525 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2526 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2527 		file->append_pos += copy;
2528 		if (file->length < file->append_pos) {
2529 			file->length = file->append_pos;
2530 		}
2531 		cur_payload += copy;
2532 		last->bytes_filled += copy;
2533 		rem_length -= copy;
2534 		if (last->bytes_filled == last->buf_size) {
2535 			cache_buffers_filled++;
2536 			last = cache_append_buffer(file);
2537 			if (last == NULL) {
2538 				BLOBFS_TRACE(file, "nomem\n");
2539 				free_fs_request(flush_req);
2540 				pthread_spin_unlock(&file->lock);
2541 				return -ENOMEM;
2542 			}
2543 		}
2544 	}
2545 
2546 	pthread_spin_unlock(&file->lock);
2547 
2548 	if (cache_buffers_filled == 0) {
2549 		free_fs_request(flush_req);
2550 		return 0;
2551 	}
2552 
2553 	flush_req->args.file = file;
2554 	file->fs->send_request(__file_flush, flush_req);
2555 	return 0;
2556 }
2557 
2558 static void
2559 __readahead_done(void *ctx, int bserrno)
2560 {
2561 	struct spdk_fs_request *req = ctx;
2562 	struct spdk_fs_cb_args *args = &req->args;
2563 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2564 	struct spdk_file *file = args->file;
2565 
2566 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2567 
2568 	pthread_spin_lock(&file->lock);
2569 	cache_buffer->bytes_filled = args->op.readahead.length;
2570 	cache_buffer->bytes_flushed = args->op.readahead.length;
2571 	cache_buffer->in_progress = false;
2572 	pthread_spin_unlock(&file->lock);
2573 
2574 	free_fs_request(req);
2575 }
2576 
2577 static void
2578 __readahead(void *ctx)
2579 {
2580 	struct spdk_fs_request *req = ctx;
2581 	struct spdk_fs_cb_args *args = &req->args;
2582 	struct spdk_file *file = args->file;
2583 	uint64_t offset, length, start_lba, num_lba;
2584 	uint32_t lba_size;
2585 
2586 	offset = args->op.readahead.offset;
2587 	length = args->op.readahead.length;
2588 	assert(length > 0);
2589 
2590 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2591 
2592 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2593 		     offset, length, start_lba, num_lba);
2594 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2595 			  args->op.readahead.cache_buffer->buf,
2596 			  start_lba, num_lba, __readahead_done, req);
2597 }
2598 
2599 static uint64_t
2600 __next_cache_buffer_offset(uint64_t offset)
2601 {
2602 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2603 }
2604 
2605 static void
2606 check_readahead(struct spdk_file *file, uint64_t offset,
2607 		struct spdk_fs_channel *channel)
2608 {
2609 	struct spdk_fs_request *req;
2610 	struct spdk_fs_cb_args *args;
2611 
2612 	offset = __next_cache_buffer_offset(offset);
2613 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2614 		return;
2615 	}
2616 
2617 	req = alloc_fs_request(channel);
2618 	if (req == NULL) {
2619 		return;
2620 	}
2621 	args = &req->args;
2622 
2623 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2624 
2625 	args->file = file;
2626 	args->op.readahead.offset = offset;
2627 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2628 	if (!args->op.readahead.cache_buffer) {
2629 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2630 		free_fs_request(req);
2631 		return;
2632 	}
2633 
2634 	args->op.readahead.cache_buffer->in_progress = true;
2635 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2636 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2637 	} else {
2638 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2639 	}
2640 	file->fs->send_request(__readahead, req);
2641 }
2642 
2643 int64_t
2644 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2645 	       void *payload, uint64_t offset, uint64_t length)
2646 {
2647 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2648 	uint64_t final_offset, final_length;
2649 	uint32_t sub_reads = 0;
2650 	struct cache_buffer *buf;
2651 	uint64_t read_len;
2652 	struct rw_from_file_arg arg = {};
2653 
2654 	pthread_spin_lock(&file->lock);
2655 
2656 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2657 
2658 	file->open_for_writing = false;
2659 
2660 	if (length == 0 || offset >= file->append_pos) {
2661 		pthread_spin_unlock(&file->lock);
2662 		return 0;
2663 	}
2664 
2665 	if (offset + length > file->append_pos) {
2666 		length = file->append_pos - offset;
2667 	}
2668 
2669 	if (offset != file->next_seq_offset) {
2670 		file->seq_byte_count = 0;
2671 	}
2672 	file->seq_byte_count += length;
2673 	file->next_seq_offset = offset + length;
2674 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2675 		check_readahead(file, offset, channel);
2676 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2677 	}
2678 
2679 	arg.channel = channel;
2680 	arg.rwerrno = 0;
2681 	final_length = 0;
2682 	final_offset = offset + length;
2683 	while (offset < final_offset) {
2684 		int ret = 0;
2685 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2686 		if (length > (final_offset - offset)) {
2687 			length = final_offset - offset;
2688 		}
2689 
2690 		buf = tree_find_filled_buffer(file->tree, offset);
2691 		if (buf == NULL) {
2692 			pthread_spin_unlock(&file->lock);
2693 			ret = __send_rw_from_file(file, payload, offset, length, true, &arg);
2694 			pthread_spin_lock(&file->lock);
2695 			if (ret == 0) {
2696 				sub_reads++;
2697 			}
2698 		} else {
2699 			read_len = length;
2700 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2701 				read_len = buf->offset + buf->bytes_filled - offset;
2702 			}
2703 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2704 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2705 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2706 				tree_remove_buffer(file->tree, buf);
2707 				if (file->tree->present_mask == 0) {
2708 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2709 				}
2710 			}
2711 		}
2712 
2713 		if (ret == 0) {
2714 			final_length += length;
2715 		} else {
2716 			arg.rwerrno = ret;
2717 			break;
2718 		}
2719 		payload += length;
2720 		offset += length;
2721 	}
2722 	pthread_spin_unlock(&file->lock);
2723 	while (sub_reads > 0) {
2724 		sem_wait(&channel->sem);
2725 		sub_reads--;
2726 	}
2727 	if (arg.rwerrno == 0) {
2728 		return final_length;
2729 	} else {
2730 		return arg.rwerrno;
2731 	}
2732 }
2733 
2734 static void
2735 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2736 	   spdk_file_op_complete cb_fn, void *cb_arg)
2737 {
2738 	struct spdk_fs_request *sync_req;
2739 	struct spdk_fs_request *flush_req;
2740 	struct spdk_fs_cb_args *sync_args;
2741 	struct spdk_fs_cb_args *flush_args;
2742 
2743 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2744 
2745 	pthread_spin_lock(&file->lock);
2746 	if (file->append_pos <= file->length_xattr) {
2747 		BLOBFS_TRACE(file, "done - file already synced\n");
2748 		pthread_spin_unlock(&file->lock);
2749 		cb_fn(cb_arg, 0);
2750 		return;
2751 	}
2752 
2753 	sync_req = alloc_fs_request(channel);
2754 	if (!sync_req) {
2755 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2756 		pthread_spin_unlock(&file->lock);
2757 		cb_fn(cb_arg, -ENOMEM);
2758 		return;
2759 	}
2760 	sync_args = &sync_req->args;
2761 
2762 	flush_req = alloc_fs_request(channel);
2763 	if (!flush_req) {
2764 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2765 		free_fs_request(sync_req);
2766 		pthread_spin_unlock(&file->lock);
2767 		cb_fn(cb_arg, -ENOMEM);
2768 		return;
2769 	}
2770 	flush_args = &flush_req->args;
2771 
2772 	sync_args->file = file;
2773 	sync_args->fn.file_op = cb_fn;
2774 	sync_args->arg = cb_arg;
2775 	sync_args->op.sync.offset = file->append_pos;
2776 	sync_args->op.sync.xattr_in_progress = false;
2777 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2778 	pthread_spin_unlock(&file->lock);
2779 
2780 	flush_args->file = file;
2781 	channel->send_request(__file_flush, flush_req);
2782 }
2783 
2784 int
2785 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2786 {
2787 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2788 	struct spdk_fs_cb_args args = {};
2789 
2790 	args.sem = &channel->sem;
2791 	_file_sync(file, channel, __wake_caller, &args);
2792 	sem_wait(&channel->sem);
2793 
2794 	return args.rc;
2795 }
2796 
2797 void
2798 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2799 		     spdk_file_op_complete cb_fn, void *cb_arg)
2800 {
2801 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2802 
2803 	_file_sync(file, channel, cb_fn, cb_arg);
2804 }
2805 
2806 void
2807 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2808 {
2809 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2810 	file->priority = priority;
2811 
2812 }
2813 
2814 /*
2815  * Close routines
2816  */
2817 
2818 static void
2819 __file_close_async_done(void *ctx, int bserrno)
2820 {
2821 	struct spdk_fs_request *req = ctx;
2822 	struct spdk_fs_cb_args *args = &req->args;
2823 	struct spdk_file *file = args->file;
2824 
2825 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2826 
2827 	if (file->is_deleted) {
2828 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2829 		return;
2830 	}
2831 
2832 	args->fn.file_op(args->arg, bserrno);
2833 	free_fs_request(req);
2834 }
2835 
2836 static void
2837 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2838 {
2839 	struct spdk_blob *blob;
2840 
2841 	pthread_spin_lock(&file->lock);
2842 	if (file->ref_count == 0) {
2843 		pthread_spin_unlock(&file->lock);
2844 		__file_close_async_done(req, -EBADF);
2845 		return;
2846 	}
2847 
2848 	file->ref_count--;
2849 	if (file->ref_count > 0) {
2850 		pthread_spin_unlock(&file->lock);
2851 		req->args.fn.file_op(req->args.arg, 0);
2852 		free_fs_request(req);
2853 		return;
2854 	}
2855 
2856 	pthread_spin_unlock(&file->lock);
2857 
2858 	blob = file->blob;
2859 	file->blob = NULL;
2860 	spdk_blob_close(blob, __file_close_async_done, req);
2861 }
2862 
2863 static void
2864 __file_close_async__sync_done(void *arg, int fserrno)
2865 {
2866 	struct spdk_fs_request *req = arg;
2867 	struct spdk_fs_cb_args *args = &req->args;
2868 
2869 	__file_close_async(args->file, req);
2870 }
2871 
2872 void
2873 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2874 {
2875 	struct spdk_fs_request *req;
2876 	struct spdk_fs_cb_args *args;
2877 
2878 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2879 	if (req == NULL) {
2880 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2881 		cb_fn(cb_arg, -ENOMEM);
2882 		return;
2883 	}
2884 
2885 	args = &req->args;
2886 	args->file = file;
2887 	args->fn.file_op = cb_fn;
2888 	args->arg = cb_arg;
2889 
2890 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2891 }
2892 
2893 static void
2894 __file_close(void *arg)
2895 {
2896 	struct spdk_fs_request *req = arg;
2897 	struct spdk_fs_cb_args *args = &req->args;
2898 	struct spdk_file *file = args->file;
2899 
2900 	__file_close_async(file, req);
2901 }
2902 
2903 int
2904 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2905 {
2906 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2907 	struct spdk_fs_request *req;
2908 	struct spdk_fs_cb_args *args;
2909 
2910 	req = alloc_fs_request(channel);
2911 	if (req == NULL) {
2912 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2913 		return -ENOMEM;
2914 	}
2915 
2916 	args = &req->args;
2917 
2918 	spdk_file_sync(file, ctx);
2919 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2920 	args->file = file;
2921 	args->sem = &channel->sem;
2922 	args->fn.file_op = __wake_caller;
2923 	args->arg = args;
2924 	channel->send_request(__file_close, req);
2925 	sem_wait(&channel->sem);
2926 
2927 	return args->rc;
2928 }
2929 
2930 int
2931 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2932 {
2933 	if (size < sizeof(spdk_blob_id)) {
2934 		return -EINVAL;
2935 	}
2936 
2937 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2938 
2939 	return sizeof(spdk_blob_id);
2940 }
2941 
2942 static void
2943 _file_free(void *ctx)
2944 {
2945 	struct spdk_file *file = ctx;
2946 
2947 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2948 
2949 	free(file->name);
2950 	free(file->tree);
2951 	free(file);
2952 }
2953 
2954 static void
2955 file_free(struct spdk_file *file)
2956 {
2957 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2958 	pthread_spin_lock(&file->lock);
2959 	if (file->tree->present_mask == 0) {
2960 		pthread_spin_unlock(&file->lock);
2961 		free(file->name);
2962 		free(file->tree);
2963 		free(file);
2964 		return;
2965 	}
2966 
2967 	tree_free_buffers(file->tree);
2968 	assert(file->tree->present_mask == 0);
2969 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2970 	pthread_spin_unlock(&file->lock);
2971 }
2972 
2973 SPDK_LOG_REGISTER_COMPONENT(blobfs)
2974 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw)
2975