xref: /spdk/lib/blobfs/blobfs.c (revision 5fc0475c140b86802ab8759a43845dcc34e7329d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "tree.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk/log.h"
45 #include "spdk/trace.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
57 
58 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
59 static struct spdk_mempool *g_cache_pool;
60 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches);
61 static struct spdk_poller *g_cache_pool_mgmt_poller;
62 static struct spdk_thread *g_cache_pool_thread;
63 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
64 static int g_fs_count = 0;
65 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
66 
67 #define TRACE_GROUP_BLOBFS	0x7
68 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
69 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
70 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
71 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
72 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
73 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
74 
75 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
76 {
77 	spdk_trace_register_description("BLOBFS_XATTR_START",
78 					TRACE_BLOBFS_XATTR_START,
79 					OWNER_NONE, OBJECT_NONE, 0,
80 					SPDK_TRACE_ARG_TYPE_STR,
81 					"file");
82 	spdk_trace_register_description("BLOBFS_XATTR_END",
83 					TRACE_BLOBFS_XATTR_END,
84 					OWNER_NONE, OBJECT_NONE, 0,
85 					SPDK_TRACE_ARG_TYPE_STR,
86 					"file");
87 	spdk_trace_register_description("BLOBFS_OPEN",
88 					TRACE_BLOBFS_OPEN,
89 					OWNER_NONE, OBJECT_NONE, 0,
90 					SPDK_TRACE_ARG_TYPE_STR,
91 					"file");
92 	spdk_trace_register_description("BLOBFS_CLOSE",
93 					TRACE_BLOBFS_CLOSE,
94 					OWNER_NONE, OBJECT_NONE, 0,
95 					SPDK_TRACE_ARG_TYPE_STR,
96 					"file");
97 	spdk_trace_register_description("BLOBFS_DELETE_START",
98 					TRACE_BLOBFS_DELETE_START,
99 					OWNER_NONE, OBJECT_NONE, 0,
100 					SPDK_TRACE_ARG_TYPE_STR,
101 					"file");
102 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
103 					TRACE_BLOBFS_DELETE_DONE,
104 					OWNER_NONE, OBJECT_NONE, 0,
105 					SPDK_TRACE_ARG_TYPE_STR,
106 					"file");
107 }
108 
109 void
110 cache_buffer_free(struct cache_buffer *cache_buffer)
111 {
112 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
113 	free(cache_buffer);
114 }
115 
116 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
117 
118 struct spdk_file {
119 	struct spdk_filesystem	*fs;
120 	struct spdk_blob	*blob;
121 	char			*name;
122 	uint64_t		trace_arg_name;
123 	uint64_t		length;
124 	bool                    is_deleted;
125 	bool			open_for_writing;
126 	uint64_t		length_flushed;
127 	uint64_t		length_xattr;
128 	uint64_t		append_pos;
129 	uint64_t		seq_byte_count;
130 	uint64_t		next_seq_offset;
131 	uint32_t		priority;
132 	TAILQ_ENTRY(spdk_file)	tailq;
133 	spdk_blob_id		blobid;
134 	uint32_t		ref_count;
135 	pthread_spinlock_t	lock;
136 	struct cache_buffer	*last;
137 	struct cache_tree	*tree;
138 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
139 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
140 	TAILQ_ENTRY(spdk_file)	cache_tailq;
141 };
142 
143 struct spdk_deleted_file {
144 	spdk_blob_id	id;
145 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
146 };
147 
148 struct spdk_filesystem {
149 	struct spdk_blob_store	*bs;
150 	TAILQ_HEAD(, spdk_file)	files;
151 	struct spdk_bs_opts	bs_opts;
152 	struct spdk_bs_dev	*bdev;
153 	fs_send_request_fn	send_request;
154 
155 	struct {
156 		uint32_t		max_ops;
157 		struct spdk_io_channel	*sync_io_channel;
158 		struct spdk_fs_channel	*sync_fs_channel;
159 	} sync_target;
160 
161 	struct {
162 		uint32_t		max_ops;
163 		struct spdk_io_channel	*md_io_channel;
164 		struct spdk_fs_channel	*md_fs_channel;
165 	} md_target;
166 
167 	struct {
168 		uint32_t		max_ops;
169 	} io_target;
170 };
171 
172 struct spdk_fs_cb_args {
173 	union {
174 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
175 		spdk_fs_op_complete			fs_op;
176 		spdk_file_op_with_handle_complete	file_op_with_handle;
177 		spdk_file_op_complete			file_op;
178 		spdk_file_stat_op_complete		stat_op;
179 	} fn;
180 	void *arg;
181 	sem_t *sem;
182 	struct spdk_filesystem *fs;
183 	struct spdk_file *file;
184 	int rc;
185 	int *rwerrno;
186 	struct iovec *iovs;
187 	uint32_t iovcnt;
188 	struct iovec iov;
189 	union {
190 		struct {
191 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
192 		} fs_load;
193 		struct {
194 			uint64_t	length;
195 		} truncate;
196 		struct {
197 			struct spdk_io_channel	*channel;
198 			void		*pin_buf;
199 			int		is_read;
200 			off_t		offset;
201 			size_t		length;
202 			uint64_t	start_lba;
203 			uint64_t	num_lba;
204 			uint32_t	blocklen;
205 		} rw;
206 		struct {
207 			const char	*old_name;
208 			const char	*new_name;
209 		} rename;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 		} flush;
214 		struct {
215 			struct cache_buffer	*cache_buffer;
216 			uint64_t		length;
217 			uint64_t		offset;
218 		} readahead;
219 		struct {
220 			/* offset of the file when the sync request was made */
221 			uint64_t			offset;
222 			TAILQ_ENTRY(spdk_fs_request)	tailq;
223 			bool				xattr_in_progress;
224 			/* length written to the xattr for this file - this should
225 			 * always be the same as the offset if only one thread is
226 			 * writing to the file, but could differ if multiple threads
227 			 * are appending
228 			 */
229 			uint64_t			length;
230 		} sync;
231 		struct {
232 			uint32_t			num_clusters;
233 		} resize;
234 		struct {
235 			const char	*name;
236 			uint32_t	flags;
237 			TAILQ_ENTRY(spdk_fs_request)	tailq;
238 		} open;
239 		struct {
240 			const char		*name;
241 			struct spdk_blob	*blob;
242 		} create;
243 		struct {
244 			const char	*name;
245 		} delete;
246 		struct {
247 			const char	*name;
248 		} stat;
249 	} op;
250 };
251 
252 static void file_free(struct spdk_file *file);
253 static void fs_io_device_unregister(struct spdk_filesystem *fs);
254 static void fs_free_io_channels(struct spdk_filesystem *fs);
255 
256 void
257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
258 {
259 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
260 }
261 
262 static int _blobfs_cache_pool_reclaim(void *arg);
263 
264 static bool
265 blobfs_cache_pool_need_reclaim(void)
266 {
267 	size_t count;
268 
269 	count = spdk_mempool_count(g_cache_pool);
270 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
271 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
272 	 */
273 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
274 		return false;
275 	}
276 
277 	return true;
278 }
279 
280 static void
281 __start_cache_pool_mgmt(void *ctx)
282 {
283 	assert(g_cache_pool == NULL);
284 
285 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
286 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
287 					   CACHE_BUFFER_SIZE,
288 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
289 					   SPDK_ENV_SOCKET_ID_ANY);
290 	if (!g_cache_pool) {
291 		SPDK_ERRLOG("Create mempool failed, you may "
292 			    "increase the memory and try again\n");
293 		assert(false);
294 	}
295 
296 	assert(g_cache_pool_mgmt_poller == NULL);
297 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
298 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
299 }
300 
301 static void
302 __stop_cache_pool_mgmt(void *ctx)
303 {
304 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
305 
306 	assert(g_cache_pool != NULL);
307 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
308 	spdk_mempool_free(g_cache_pool);
309 	g_cache_pool = NULL;
310 
311 	spdk_thread_exit(g_cache_pool_thread);
312 }
313 
314 static void
315 initialize_global_cache(void)
316 {
317 	pthread_mutex_lock(&g_cache_init_lock);
318 	if (g_fs_count == 0) {
319 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
320 		assert(g_cache_pool_thread != NULL);
321 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
322 	}
323 	g_fs_count++;
324 	pthread_mutex_unlock(&g_cache_init_lock);
325 }
326 
327 static void
328 free_global_cache(void)
329 {
330 	pthread_mutex_lock(&g_cache_init_lock);
331 	g_fs_count--;
332 	if (g_fs_count == 0) {
333 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
334 	}
335 	pthread_mutex_unlock(&g_cache_init_lock);
336 }
337 
338 static uint64_t
339 __file_get_blob_size(struct spdk_file *file)
340 {
341 	uint64_t cluster_sz;
342 
343 	cluster_sz = file->fs->bs_opts.cluster_sz;
344 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
345 }
346 
347 struct spdk_fs_request {
348 	struct spdk_fs_cb_args		args;
349 	TAILQ_ENTRY(spdk_fs_request)	link;
350 	struct spdk_fs_channel		*channel;
351 };
352 
353 struct spdk_fs_channel {
354 	struct spdk_fs_request		*req_mem;
355 	TAILQ_HEAD(, spdk_fs_request)	reqs;
356 	sem_t				sem;
357 	struct spdk_filesystem		*fs;
358 	struct spdk_io_channel		*bs_channel;
359 	fs_send_request_fn		send_request;
360 	bool				sync;
361 	uint32_t			outstanding_reqs;
362 	pthread_spinlock_t		lock;
363 };
364 
365 /* For now, this is effectively an alias. But eventually we'll shift
366  * some data members over. */
367 struct spdk_fs_thread_ctx {
368 	struct spdk_fs_channel	ch;
369 };
370 
371 static struct spdk_fs_request *
372 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
373 {
374 	struct spdk_fs_request *req;
375 	struct iovec *iovs = NULL;
376 
377 	if (iovcnt > 1) {
378 		iovs = calloc(iovcnt, sizeof(struct iovec));
379 		if (!iovs) {
380 			return NULL;
381 		}
382 	}
383 
384 	if (channel->sync) {
385 		pthread_spin_lock(&channel->lock);
386 	}
387 
388 	req = TAILQ_FIRST(&channel->reqs);
389 	if (req) {
390 		channel->outstanding_reqs++;
391 		TAILQ_REMOVE(&channel->reqs, req, link);
392 	}
393 
394 	if (channel->sync) {
395 		pthread_spin_unlock(&channel->lock);
396 	}
397 
398 	if (req == NULL) {
399 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
400 		free(iovs);
401 		return NULL;
402 	}
403 	memset(req, 0, sizeof(*req));
404 	req->channel = channel;
405 	if (iovcnt > 1) {
406 		req->args.iovs = iovs;
407 	} else {
408 		req->args.iovs = &req->args.iov;
409 	}
410 	req->args.iovcnt = iovcnt;
411 
412 	return req;
413 }
414 
415 static struct spdk_fs_request *
416 alloc_fs_request(struct spdk_fs_channel *channel)
417 {
418 	return alloc_fs_request_with_iov(channel, 0);
419 }
420 
421 static void
422 free_fs_request(struct spdk_fs_request *req)
423 {
424 	struct spdk_fs_channel *channel = req->channel;
425 
426 	if (req->args.iovcnt > 1) {
427 		free(req->args.iovs);
428 	}
429 
430 	if (channel->sync) {
431 		pthread_spin_lock(&channel->lock);
432 	}
433 
434 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
435 	channel->outstanding_reqs--;
436 
437 	if (channel->sync) {
438 		pthread_spin_unlock(&channel->lock);
439 	}
440 }
441 
442 static int
443 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
444 		  uint32_t max_ops)
445 {
446 	uint32_t i;
447 
448 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
449 	if (!channel->req_mem) {
450 		return -1;
451 	}
452 
453 	channel->outstanding_reqs = 0;
454 	TAILQ_INIT(&channel->reqs);
455 	sem_init(&channel->sem, 0, 0);
456 
457 	for (i = 0; i < max_ops; i++) {
458 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
459 	}
460 
461 	channel->fs = fs;
462 
463 	return 0;
464 }
465 
466 static int
467 fs_md_channel_create(void *io_device, void *ctx_buf)
468 {
469 	struct spdk_filesystem		*fs;
470 	struct spdk_fs_channel		*channel = ctx_buf;
471 
472 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
473 
474 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
475 }
476 
477 static int
478 fs_sync_channel_create(void *io_device, void *ctx_buf)
479 {
480 	struct spdk_filesystem		*fs;
481 	struct spdk_fs_channel		*channel = ctx_buf;
482 
483 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
484 
485 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
486 }
487 
488 static int
489 fs_io_channel_create(void *io_device, void *ctx_buf)
490 {
491 	struct spdk_filesystem		*fs;
492 	struct spdk_fs_channel		*channel = ctx_buf;
493 
494 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
495 
496 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
497 }
498 
499 static void
500 fs_channel_destroy(void *io_device, void *ctx_buf)
501 {
502 	struct spdk_fs_channel *channel = ctx_buf;
503 
504 	if (channel->outstanding_reqs > 0) {
505 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
506 			    channel->outstanding_reqs);
507 	}
508 
509 	free(channel->req_mem);
510 	if (channel->bs_channel != NULL) {
511 		spdk_bs_free_io_channel(channel->bs_channel);
512 	}
513 }
514 
515 static void
516 __send_request_direct(fs_request_fn fn, void *arg)
517 {
518 	fn(arg);
519 }
520 
521 static void
522 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
523 {
524 	fs->bs = bs;
525 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
526 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
527 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
528 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
529 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
530 
531 	initialize_global_cache();
532 }
533 
534 static void
535 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
536 {
537 	struct spdk_fs_request *req = ctx;
538 	struct spdk_fs_cb_args *args = &req->args;
539 	struct spdk_filesystem *fs = args->fs;
540 
541 	if (bserrno == 0) {
542 		common_fs_bs_init(fs, bs);
543 	} else {
544 		free(fs);
545 		fs = NULL;
546 	}
547 
548 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
549 	free_fs_request(req);
550 }
551 
552 static struct spdk_filesystem *
553 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
554 {
555 	struct spdk_filesystem *fs;
556 
557 	fs = calloc(1, sizeof(*fs));
558 	if (fs == NULL) {
559 		return NULL;
560 	}
561 
562 	fs->bdev = dev;
563 	fs->send_request = send_request_fn;
564 	TAILQ_INIT(&fs->files);
565 
566 	fs->md_target.max_ops = 512;
567 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
568 				sizeof(struct spdk_fs_channel), "blobfs_md");
569 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
570 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
571 
572 	fs->sync_target.max_ops = 512;
573 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
574 				sizeof(struct spdk_fs_channel), "blobfs_sync");
575 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
576 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
577 
578 	fs->io_target.max_ops = 512;
579 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
580 				sizeof(struct spdk_fs_channel), "blobfs_io");
581 
582 	return fs;
583 }
584 
585 static void
586 __wake_caller(void *arg, int fserrno)
587 {
588 	struct spdk_fs_cb_args *args = arg;
589 
590 	if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) {
591 		*(args->rwerrno) = fserrno;
592 	}
593 	args->rc = fserrno;
594 	sem_post(args->sem);
595 }
596 
597 void
598 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
599 	     fs_send_request_fn send_request_fn,
600 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
601 {
602 	struct spdk_filesystem *fs;
603 	struct spdk_fs_request *req;
604 	struct spdk_fs_cb_args *args;
605 	struct spdk_bs_opts opts = {};
606 
607 	fs = fs_alloc(dev, send_request_fn);
608 	if (fs == NULL) {
609 		cb_fn(cb_arg, NULL, -ENOMEM);
610 		return;
611 	}
612 
613 	req = alloc_fs_request(fs->md_target.md_fs_channel);
614 	if (req == NULL) {
615 		fs_free_io_channels(fs);
616 		fs_io_device_unregister(fs);
617 		cb_fn(cb_arg, NULL, -ENOMEM);
618 		return;
619 	}
620 
621 	args = &req->args;
622 	args->fn.fs_op_with_handle = cb_fn;
623 	args->arg = cb_arg;
624 	args->fs = fs;
625 
626 	spdk_bs_opts_init(&opts, sizeof(opts));
627 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
628 	if (opt) {
629 		opts.cluster_sz = opt->cluster_sz;
630 	}
631 	spdk_bs_init(dev, &opts, init_cb, req);
632 }
633 
634 static struct spdk_file *
635 file_alloc(struct spdk_filesystem *fs)
636 {
637 	struct spdk_file *file;
638 
639 	file = calloc(1, sizeof(*file));
640 	if (file == NULL) {
641 		return NULL;
642 	}
643 
644 	file->tree = calloc(1, sizeof(*file->tree));
645 	if (file->tree == NULL) {
646 		free(file);
647 		return NULL;
648 	}
649 
650 	if (pthread_spin_init(&file->lock, 0)) {
651 		free(file->tree);
652 		free(file);
653 		return NULL;
654 	}
655 
656 	file->fs = fs;
657 	TAILQ_INIT(&file->open_requests);
658 	TAILQ_INIT(&file->sync_requests);
659 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
660 	file->priority = SPDK_FILE_PRIORITY_LOW;
661 	return file;
662 }
663 
664 static void fs_load_done(void *ctx, int bserrno);
665 
666 static int
667 _handle_deleted_files(struct spdk_fs_request *req)
668 {
669 	struct spdk_fs_cb_args *args = &req->args;
670 	struct spdk_filesystem *fs = args->fs;
671 
672 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
673 		struct spdk_deleted_file *deleted_file;
674 
675 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
676 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
677 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
678 		free(deleted_file);
679 		return 0;
680 	}
681 
682 	return 1;
683 }
684 
685 static void
686 fs_load_done(void *ctx, int bserrno)
687 {
688 	struct spdk_fs_request *req = ctx;
689 	struct spdk_fs_cb_args *args = &req->args;
690 	struct spdk_filesystem *fs = args->fs;
691 
692 	/* The filesystem has been loaded.  Now check if there are any files that
693 	 *  were marked for deletion before last unload.  Do not complete the
694 	 *  fs_load callback until all of them have been deleted on disk.
695 	 */
696 	if (_handle_deleted_files(req) == 0) {
697 		/* We found a file that's been marked for deleting but not actually
698 		 *  deleted yet.  This function will get called again once the delete
699 		 *  operation is completed.
700 		 */
701 		return;
702 	}
703 
704 	args->fn.fs_op_with_handle(args->arg, fs, 0);
705 	free_fs_request(req);
706 
707 }
708 
709 static void
710 _file_build_trace_arg_name(struct spdk_file *f)
711 {
712 	f->trace_arg_name = 0;
713 	memcpy(&f->trace_arg_name, f->name,
714 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
715 }
716 
717 static void
718 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
719 {
720 	struct spdk_fs_request *req = ctx;
721 	struct spdk_fs_cb_args *args = &req->args;
722 	struct spdk_filesystem *fs = args->fs;
723 	uint64_t *length;
724 	const char *name;
725 	uint32_t *is_deleted;
726 	size_t value_len;
727 
728 	if (rc < 0) {
729 		args->fn.fs_op_with_handle(args->arg, fs, rc);
730 		free_fs_request(req);
731 		return;
732 	}
733 
734 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
735 	if (rc < 0) {
736 		args->fn.fs_op_with_handle(args->arg, fs, rc);
737 		free_fs_request(req);
738 		return;
739 	}
740 
741 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
742 	if (rc < 0) {
743 		args->fn.fs_op_with_handle(args->arg, fs, rc);
744 		free_fs_request(req);
745 		return;
746 	}
747 
748 	assert(value_len == 8);
749 
750 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
751 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
752 	if (rc < 0) {
753 		struct spdk_file *f;
754 
755 		f = file_alloc(fs);
756 		if (f == NULL) {
757 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
758 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
759 			free_fs_request(req);
760 			return;
761 		}
762 
763 		f->name = strdup(name);
764 		_file_build_trace_arg_name(f);
765 		f->blobid = spdk_blob_get_id(blob);
766 		f->length = *length;
767 		f->length_flushed = *length;
768 		f->length_xattr = *length;
769 		f->append_pos = *length;
770 		SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length);
771 	} else {
772 		struct spdk_deleted_file *deleted_file;
773 
774 		deleted_file = calloc(1, sizeof(*deleted_file));
775 		if (deleted_file == NULL) {
776 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
777 			free_fs_request(req);
778 			return;
779 		}
780 		deleted_file->id = spdk_blob_get_id(blob);
781 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
782 	}
783 }
784 
785 static void
786 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
787 {
788 	struct spdk_fs_request *req = ctx;
789 	struct spdk_fs_cb_args *args = &req->args;
790 	struct spdk_filesystem *fs = args->fs;
791 	struct spdk_bs_type bstype;
792 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
793 	static const struct spdk_bs_type zeros;
794 
795 	if (bserrno != 0) {
796 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
797 		free_fs_request(req);
798 		fs_free_io_channels(fs);
799 		fs_io_device_unregister(fs);
800 		return;
801 	}
802 
803 	bstype = spdk_bs_get_bstype(bs);
804 
805 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
806 		SPDK_DEBUGLOG(blobfs, "assigning bstype\n");
807 		spdk_bs_set_bstype(bs, blobfs_type);
808 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
809 		SPDK_ERRLOG("not blobfs\n");
810 		SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype));
811 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
812 		free_fs_request(req);
813 		fs_free_io_channels(fs);
814 		fs_io_device_unregister(fs);
815 		return;
816 	}
817 
818 	common_fs_bs_init(fs, bs);
819 	fs_load_done(req, 0);
820 }
821 
822 static void
823 fs_io_device_unregister(struct spdk_filesystem *fs)
824 {
825 	assert(fs != NULL);
826 	spdk_io_device_unregister(&fs->md_target, NULL);
827 	spdk_io_device_unregister(&fs->sync_target, NULL);
828 	spdk_io_device_unregister(&fs->io_target, NULL);
829 	free(fs);
830 }
831 
832 static void
833 fs_free_io_channels(struct spdk_filesystem *fs)
834 {
835 	assert(fs != NULL);
836 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
837 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
838 }
839 
840 void
841 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
842 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
843 {
844 	struct spdk_filesystem *fs;
845 	struct spdk_fs_cb_args *args;
846 	struct spdk_fs_request *req;
847 	struct spdk_bs_opts	bs_opts;
848 
849 	fs = fs_alloc(dev, send_request_fn);
850 	if (fs == NULL) {
851 		cb_fn(cb_arg, NULL, -ENOMEM);
852 		return;
853 	}
854 
855 	req = alloc_fs_request(fs->md_target.md_fs_channel);
856 	if (req == NULL) {
857 		fs_free_io_channels(fs);
858 		fs_io_device_unregister(fs);
859 		cb_fn(cb_arg, NULL, -ENOMEM);
860 		return;
861 	}
862 
863 	args = &req->args;
864 	args->fn.fs_op_with_handle = cb_fn;
865 	args->arg = cb_arg;
866 	args->fs = fs;
867 	TAILQ_INIT(&args->op.fs_load.deleted_files);
868 	spdk_bs_opts_init(&bs_opts, sizeof(bs_opts));
869 	bs_opts.iter_cb_fn = iter_cb;
870 	bs_opts.iter_cb_arg = req;
871 	spdk_bs_load(dev, &bs_opts, load_cb, req);
872 }
873 
874 static void
875 unload_cb(void *ctx, int bserrno)
876 {
877 	struct spdk_fs_request *req = ctx;
878 	struct spdk_fs_cb_args *args = &req->args;
879 	struct spdk_filesystem *fs = args->fs;
880 	struct spdk_file *file, *tmp;
881 
882 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
883 		TAILQ_REMOVE(&fs->files, file, tailq);
884 		file_free(file);
885 	}
886 
887 	free_global_cache();
888 
889 	args->fn.fs_op(args->arg, bserrno);
890 	free(req);
891 
892 	fs_io_device_unregister(fs);
893 }
894 
895 void
896 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
897 {
898 	struct spdk_fs_request *req;
899 	struct spdk_fs_cb_args *args;
900 
901 	/*
902 	 * We must free the md_channel before unloading the blobstore, so just
903 	 *  allocate this request from the general heap.
904 	 */
905 	req = calloc(1, sizeof(*req));
906 	if (req == NULL) {
907 		cb_fn(cb_arg, -ENOMEM);
908 		return;
909 	}
910 
911 	args = &req->args;
912 	args->fn.fs_op = cb_fn;
913 	args->arg = cb_arg;
914 	args->fs = fs;
915 
916 	fs_free_io_channels(fs);
917 	spdk_bs_unload(fs->bs, unload_cb, req);
918 }
919 
920 static struct spdk_file *
921 fs_find_file(struct spdk_filesystem *fs, const char *name)
922 {
923 	struct spdk_file *file;
924 
925 	TAILQ_FOREACH(file, &fs->files, tailq) {
926 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
927 			return file;
928 		}
929 	}
930 
931 	return NULL;
932 }
933 
934 void
935 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
936 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
937 {
938 	struct spdk_file_stat stat;
939 	struct spdk_file *f = NULL;
940 
941 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
942 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
943 		return;
944 	}
945 
946 	f = fs_find_file(fs, name);
947 	if (f != NULL) {
948 		stat.blobid = f->blobid;
949 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
950 		cb_fn(cb_arg, &stat, 0);
951 		return;
952 	}
953 
954 	cb_fn(cb_arg, NULL, -ENOENT);
955 }
956 
957 static void
958 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
959 {
960 	struct spdk_fs_request *req = arg;
961 	struct spdk_fs_cb_args *args = &req->args;
962 
963 	args->rc = fserrno;
964 	if (fserrno == 0) {
965 		memcpy(args->arg, stat, sizeof(*stat));
966 	}
967 	sem_post(args->sem);
968 }
969 
970 static void
971 __file_stat(void *arg)
972 {
973 	struct spdk_fs_request *req = arg;
974 	struct spdk_fs_cb_args *args = &req->args;
975 
976 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
977 				args->fn.stat_op, req);
978 }
979 
980 int
981 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
982 		  const char *name, struct spdk_file_stat *stat)
983 {
984 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
985 	struct spdk_fs_request *req;
986 	int rc;
987 
988 	req = alloc_fs_request(channel);
989 	if (req == NULL) {
990 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
991 		return -ENOMEM;
992 	}
993 
994 	req->args.fs = fs;
995 	req->args.op.stat.name = name;
996 	req->args.fn.stat_op = __copy_stat;
997 	req->args.arg = stat;
998 	req->args.sem = &channel->sem;
999 	channel->send_request(__file_stat, req);
1000 	sem_wait(&channel->sem);
1001 
1002 	rc = req->args.rc;
1003 	free_fs_request(req);
1004 
1005 	return rc;
1006 }
1007 
1008 static void
1009 fs_create_blob_close_cb(void *ctx, int bserrno)
1010 {
1011 	int rc;
1012 	struct spdk_fs_request *req = ctx;
1013 	struct spdk_fs_cb_args *args = &req->args;
1014 
1015 	rc = args->rc ? args->rc : bserrno;
1016 	args->fn.file_op(args->arg, rc);
1017 	free_fs_request(req);
1018 }
1019 
1020 static void
1021 fs_create_blob_resize_cb(void *ctx, int bserrno)
1022 {
1023 	struct spdk_fs_request *req = ctx;
1024 	struct spdk_fs_cb_args *args = &req->args;
1025 	struct spdk_file *f = args->file;
1026 	struct spdk_blob *blob = args->op.create.blob;
1027 	uint64_t length = 0;
1028 
1029 	args->rc = bserrno;
1030 	if (bserrno) {
1031 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1032 		return;
1033 	}
1034 
1035 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1036 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1037 
1038 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1039 }
1040 
1041 static void
1042 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1043 {
1044 	struct spdk_fs_request *req = ctx;
1045 	struct spdk_fs_cb_args *args = &req->args;
1046 
1047 	if (bserrno) {
1048 		args->fn.file_op(args->arg, bserrno);
1049 		free_fs_request(req);
1050 		return;
1051 	}
1052 
1053 	args->op.create.blob = blob;
1054 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1055 }
1056 
1057 static void
1058 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1059 {
1060 	struct spdk_fs_request *req = ctx;
1061 	struct spdk_fs_cb_args *args = &req->args;
1062 	struct spdk_file *f = args->file;
1063 
1064 	if (bserrno) {
1065 		args->fn.file_op(args->arg, bserrno);
1066 		free_fs_request(req);
1067 		return;
1068 	}
1069 
1070 	f->blobid = blobid;
1071 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1072 }
1073 
1074 void
1075 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1076 			  spdk_file_op_complete cb_fn, void *cb_arg)
1077 {
1078 	struct spdk_file *file;
1079 	struct spdk_fs_request *req;
1080 	struct spdk_fs_cb_args *args;
1081 
1082 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1083 		cb_fn(cb_arg, -ENAMETOOLONG);
1084 		return;
1085 	}
1086 
1087 	file = fs_find_file(fs, name);
1088 	if (file != NULL) {
1089 		cb_fn(cb_arg, -EEXIST);
1090 		return;
1091 	}
1092 
1093 	file = file_alloc(fs);
1094 	if (file == NULL) {
1095 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1096 		cb_fn(cb_arg, -ENOMEM);
1097 		return;
1098 	}
1099 
1100 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1101 	if (req == NULL) {
1102 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1103 		TAILQ_REMOVE(&fs->files, file, tailq);
1104 		file_free(file);
1105 		cb_fn(cb_arg, -ENOMEM);
1106 		return;
1107 	}
1108 
1109 	args = &req->args;
1110 	args->file = file;
1111 	args->fn.file_op = cb_fn;
1112 	args->arg = cb_arg;
1113 
1114 	file->name = strdup(name);
1115 	if (!file->name) {
1116 		SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name);
1117 		free_fs_request(req);
1118 		TAILQ_REMOVE(&fs->files, file, tailq);
1119 		file_free(file);
1120 		cb_fn(cb_arg, -ENOMEM);
1121 		return;
1122 	}
1123 	_file_build_trace_arg_name(file);
1124 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1125 }
1126 
1127 static void
1128 __fs_create_file_done(void *arg, int fserrno)
1129 {
1130 	struct spdk_fs_request *req = arg;
1131 	struct spdk_fs_cb_args *args = &req->args;
1132 
1133 	__wake_caller(args, fserrno);
1134 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1135 }
1136 
1137 static void
1138 __fs_create_file(void *arg)
1139 {
1140 	struct spdk_fs_request *req = arg;
1141 	struct spdk_fs_cb_args *args = &req->args;
1142 
1143 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1144 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1145 }
1146 
1147 int
1148 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1149 {
1150 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1151 	struct spdk_fs_request *req;
1152 	struct spdk_fs_cb_args *args;
1153 	int rc;
1154 
1155 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1156 
1157 	req = alloc_fs_request(channel);
1158 	if (req == NULL) {
1159 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1160 		return -ENOMEM;
1161 	}
1162 
1163 	args = &req->args;
1164 	args->fs = fs;
1165 	args->op.create.name = name;
1166 	args->sem = &channel->sem;
1167 	fs->send_request(__fs_create_file, req);
1168 	sem_wait(&channel->sem);
1169 	rc = args->rc;
1170 	free_fs_request(req);
1171 
1172 	return rc;
1173 }
1174 
1175 static void
1176 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1177 {
1178 	struct spdk_fs_request *req = ctx;
1179 	struct spdk_fs_cb_args *args = &req->args;
1180 	struct spdk_file *f = args->file;
1181 
1182 	f->blob = blob;
1183 	while (!TAILQ_EMPTY(&f->open_requests)) {
1184 		req = TAILQ_FIRST(&f->open_requests);
1185 		args = &req->args;
1186 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1187 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1188 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1189 		free_fs_request(req);
1190 	}
1191 }
1192 
1193 static void
1194 fs_open_blob_create_cb(void *ctx, int bserrno)
1195 {
1196 	struct spdk_fs_request *req = ctx;
1197 	struct spdk_fs_cb_args *args = &req->args;
1198 	struct spdk_file *file = args->file;
1199 	struct spdk_filesystem *fs = args->fs;
1200 
1201 	if (file == NULL) {
1202 		/*
1203 		 * This is from an open with CREATE flag - the file
1204 		 *  is now created so look it up in the file list for this
1205 		 *  filesystem.
1206 		 */
1207 		file = fs_find_file(fs, args->op.open.name);
1208 		assert(file != NULL);
1209 		args->file = file;
1210 	}
1211 
1212 	file->ref_count++;
1213 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1214 	if (file->ref_count == 1) {
1215 		assert(file->blob == NULL);
1216 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1217 	} else if (file->blob != NULL) {
1218 		fs_open_blob_done(req, file->blob, 0);
1219 	} else {
1220 		/*
1221 		 * The blob open for this file is in progress due to a previous
1222 		 *  open request.  When that open completes, it will invoke the
1223 		 *  open callback for this request.
1224 		 */
1225 	}
1226 }
1227 
1228 void
1229 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1230 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1231 {
1232 	struct spdk_file *f = NULL;
1233 	struct spdk_fs_request *req;
1234 	struct spdk_fs_cb_args *args;
1235 
1236 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1237 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1238 		return;
1239 	}
1240 
1241 	f = fs_find_file(fs, name);
1242 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1243 		cb_fn(cb_arg, NULL, -ENOENT);
1244 		return;
1245 	}
1246 
1247 	if (f != NULL && f->is_deleted == true) {
1248 		cb_fn(cb_arg, NULL, -ENOENT);
1249 		return;
1250 	}
1251 
1252 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1253 	if (req == NULL) {
1254 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1255 		cb_fn(cb_arg, NULL, -ENOMEM);
1256 		return;
1257 	}
1258 
1259 	args = &req->args;
1260 	args->fn.file_op_with_handle = cb_fn;
1261 	args->arg = cb_arg;
1262 	args->file = f;
1263 	args->fs = fs;
1264 	args->op.open.name = name;
1265 
1266 	if (f == NULL) {
1267 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1268 	} else {
1269 		fs_open_blob_create_cb(req, 0);
1270 	}
1271 }
1272 
1273 static void
1274 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1275 {
1276 	struct spdk_fs_request *req = arg;
1277 	struct spdk_fs_cb_args *args = &req->args;
1278 
1279 	args->file = file;
1280 	__wake_caller(args, bserrno);
1281 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1282 }
1283 
1284 static void
1285 __fs_open_file(void *arg)
1286 {
1287 	struct spdk_fs_request *req = arg;
1288 	struct spdk_fs_cb_args *args = &req->args;
1289 
1290 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1291 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1292 				__fs_open_file_done, req);
1293 }
1294 
1295 int
1296 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1297 		  const char *name, uint32_t flags, struct spdk_file **file)
1298 {
1299 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1300 	struct spdk_fs_request *req;
1301 	struct spdk_fs_cb_args *args;
1302 	int rc;
1303 
1304 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1305 
1306 	req = alloc_fs_request(channel);
1307 	if (req == NULL) {
1308 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1309 		return -ENOMEM;
1310 	}
1311 
1312 	args = &req->args;
1313 	args->fs = fs;
1314 	args->op.open.name = name;
1315 	args->op.open.flags = flags;
1316 	args->sem = &channel->sem;
1317 	fs->send_request(__fs_open_file, req);
1318 	sem_wait(&channel->sem);
1319 	rc = args->rc;
1320 	if (rc == 0) {
1321 		*file = args->file;
1322 	} else {
1323 		*file = NULL;
1324 	}
1325 	free_fs_request(req);
1326 
1327 	return rc;
1328 }
1329 
1330 static void
1331 fs_rename_blob_close_cb(void *ctx, int bserrno)
1332 {
1333 	struct spdk_fs_request *req = ctx;
1334 	struct spdk_fs_cb_args *args = &req->args;
1335 
1336 	args->fn.fs_op(args->arg, bserrno);
1337 	free_fs_request(req);
1338 }
1339 
1340 static void
1341 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1342 {
1343 	struct spdk_fs_request *req = ctx;
1344 	struct spdk_fs_cb_args *args = &req->args;
1345 	const char *new_name = args->op.rename.new_name;
1346 
1347 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1348 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1349 }
1350 
1351 static void
1352 _fs_md_rename_file(struct spdk_fs_request *req)
1353 {
1354 	struct spdk_fs_cb_args *args = &req->args;
1355 	struct spdk_file *f;
1356 
1357 	f = fs_find_file(args->fs, args->op.rename.old_name);
1358 	if (f == NULL) {
1359 		args->fn.fs_op(args->arg, -ENOENT);
1360 		free_fs_request(req);
1361 		return;
1362 	}
1363 
1364 	free(f->name);
1365 	f->name = strdup(args->op.rename.new_name);
1366 	_file_build_trace_arg_name(f);
1367 	args->file = f;
1368 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1369 }
1370 
1371 static void
1372 fs_rename_delete_done(void *arg, int fserrno)
1373 {
1374 	_fs_md_rename_file(arg);
1375 }
1376 
1377 void
1378 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1379 			  const char *old_name, const char *new_name,
1380 			  spdk_file_op_complete cb_fn, void *cb_arg)
1381 {
1382 	struct spdk_file *f;
1383 	struct spdk_fs_request *req;
1384 	struct spdk_fs_cb_args *args;
1385 
1386 	SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name);
1387 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1388 		cb_fn(cb_arg, -ENAMETOOLONG);
1389 		return;
1390 	}
1391 
1392 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1393 	if (req == NULL) {
1394 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1395 			    new_name);
1396 		cb_fn(cb_arg, -ENOMEM);
1397 		return;
1398 	}
1399 
1400 	args = &req->args;
1401 	args->fn.fs_op = cb_fn;
1402 	args->fs = fs;
1403 	args->arg = cb_arg;
1404 	args->op.rename.old_name = old_name;
1405 	args->op.rename.new_name = new_name;
1406 
1407 	f = fs_find_file(fs, new_name);
1408 	if (f == NULL) {
1409 		_fs_md_rename_file(req);
1410 		return;
1411 	}
1412 
1413 	/*
1414 	 * The rename overwrites an existing file.  So delete the existing file, then
1415 	 *  do the actual rename.
1416 	 */
1417 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1418 }
1419 
1420 static void
1421 __fs_rename_file_done(void *arg, int fserrno)
1422 {
1423 	struct spdk_fs_request *req = arg;
1424 	struct spdk_fs_cb_args *args = &req->args;
1425 
1426 	__wake_caller(args, fserrno);
1427 }
1428 
1429 static void
1430 __fs_rename_file(void *arg)
1431 {
1432 	struct spdk_fs_request *req = arg;
1433 	struct spdk_fs_cb_args *args = &req->args;
1434 
1435 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1436 				  __fs_rename_file_done, req);
1437 }
1438 
1439 int
1440 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1441 		    const char *old_name, const char *new_name)
1442 {
1443 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1444 	struct spdk_fs_request *req;
1445 	struct spdk_fs_cb_args *args;
1446 	int rc;
1447 
1448 	req = alloc_fs_request(channel);
1449 	if (req == NULL) {
1450 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1451 		return -ENOMEM;
1452 	}
1453 
1454 	args = &req->args;
1455 
1456 	args->fs = fs;
1457 	args->op.rename.old_name = old_name;
1458 	args->op.rename.new_name = new_name;
1459 	args->sem = &channel->sem;
1460 	fs->send_request(__fs_rename_file, req);
1461 	sem_wait(&channel->sem);
1462 	rc = args->rc;
1463 	free_fs_request(req);
1464 	return rc;
1465 }
1466 
1467 static void
1468 blob_delete_cb(void *ctx, int bserrno)
1469 {
1470 	struct spdk_fs_request *req = ctx;
1471 	struct spdk_fs_cb_args *args = &req->args;
1472 
1473 	args->fn.file_op(args->arg, bserrno);
1474 	free_fs_request(req);
1475 }
1476 
1477 void
1478 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1479 			  spdk_file_op_complete cb_fn, void *cb_arg)
1480 {
1481 	struct spdk_file *f;
1482 	spdk_blob_id blobid;
1483 	struct spdk_fs_request *req;
1484 	struct spdk_fs_cb_args *args;
1485 
1486 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1487 
1488 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1489 		cb_fn(cb_arg, -ENAMETOOLONG);
1490 		return;
1491 	}
1492 
1493 	f = fs_find_file(fs, name);
1494 	if (f == NULL) {
1495 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1496 		cb_fn(cb_arg, -ENOENT);
1497 		return;
1498 	}
1499 
1500 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1501 	if (req == NULL) {
1502 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1503 		cb_fn(cb_arg, -ENOMEM);
1504 		return;
1505 	}
1506 
1507 	args = &req->args;
1508 	args->fn.file_op = cb_fn;
1509 	args->arg = cb_arg;
1510 
1511 	if (f->ref_count > 0) {
1512 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1513 		f->is_deleted = true;
1514 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1515 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1516 		return;
1517 	}
1518 
1519 	blobid = f->blobid;
1520 	TAILQ_REMOVE(&fs->files, f, tailq);
1521 
1522 	file_free(f);
1523 
1524 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1525 }
1526 
1527 static uint64_t
1528 fs_name_to_uint64(const char *name)
1529 {
1530 	uint64_t result = 0;
1531 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1532 	return result;
1533 }
1534 
1535 static void
1536 __fs_delete_file_done(void *arg, int fserrno)
1537 {
1538 	struct spdk_fs_request *req = arg;
1539 	struct spdk_fs_cb_args *args = &req->args;
1540 
1541 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1542 	__wake_caller(args, fserrno);
1543 }
1544 
1545 static void
1546 __fs_delete_file(void *arg)
1547 {
1548 	struct spdk_fs_request *req = arg;
1549 	struct spdk_fs_cb_args *args = &req->args;
1550 
1551 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1552 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1553 }
1554 
1555 int
1556 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1557 		    const char *name)
1558 {
1559 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1560 	struct spdk_fs_request *req;
1561 	struct spdk_fs_cb_args *args;
1562 	int rc;
1563 
1564 	req = alloc_fs_request(channel);
1565 	if (req == NULL) {
1566 		SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name);
1567 		return -ENOMEM;
1568 	}
1569 
1570 	args = &req->args;
1571 	args->fs = fs;
1572 	args->op.delete.name = name;
1573 	args->sem = &channel->sem;
1574 	fs->send_request(__fs_delete_file, req);
1575 	sem_wait(&channel->sem);
1576 	rc = args->rc;
1577 	free_fs_request(req);
1578 
1579 	return rc;
1580 }
1581 
1582 spdk_fs_iter
1583 spdk_fs_iter_first(struct spdk_filesystem *fs)
1584 {
1585 	struct spdk_file *f;
1586 
1587 	f = TAILQ_FIRST(&fs->files);
1588 	return f;
1589 }
1590 
1591 spdk_fs_iter
1592 spdk_fs_iter_next(spdk_fs_iter iter)
1593 {
1594 	struct spdk_file *f = iter;
1595 
1596 	if (f == NULL) {
1597 		return NULL;
1598 	}
1599 
1600 	f = TAILQ_NEXT(f, tailq);
1601 	return f;
1602 }
1603 
1604 const char *
1605 spdk_file_get_name(struct spdk_file *file)
1606 {
1607 	return file->name;
1608 }
1609 
1610 uint64_t
1611 spdk_file_get_length(struct spdk_file *file)
1612 {
1613 	uint64_t length;
1614 
1615 	assert(file != NULL);
1616 
1617 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1618 	SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length);
1619 	return length;
1620 }
1621 
1622 static void
1623 fs_truncate_complete_cb(void *ctx, int bserrno)
1624 {
1625 	struct spdk_fs_request *req = ctx;
1626 	struct spdk_fs_cb_args *args = &req->args;
1627 
1628 	args->fn.file_op(args->arg, bserrno);
1629 	free_fs_request(req);
1630 }
1631 
1632 static void
1633 fs_truncate_resize_cb(void *ctx, int bserrno)
1634 {
1635 	struct spdk_fs_request *req = ctx;
1636 	struct spdk_fs_cb_args *args = &req->args;
1637 	struct spdk_file *file = args->file;
1638 	uint64_t *length = &args->op.truncate.length;
1639 
1640 	if (bserrno) {
1641 		args->fn.file_op(args->arg, bserrno);
1642 		free_fs_request(req);
1643 		return;
1644 	}
1645 
1646 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1647 
1648 	file->length = *length;
1649 	if (file->append_pos > file->length) {
1650 		file->append_pos = file->length;
1651 	}
1652 
1653 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1654 }
1655 
1656 static uint64_t
1657 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1658 {
1659 	return (length + cluster_sz - 1) / cluster_sz;
1660 }
1661 
1662 void
1663 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1664 			 spdk_file_op_complete cb_fn, void *cb_arg)
1665 {
1666 	struct spdk_filesystem *fs;
1667 	size_t num_clusters;
1668 	struct spdk_fs_request *req;
1669 	struct spdk_fs_cb_args *args;
1670 
1671 	SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1672 	if (length == file->length) {
1673 		cb_fn(cb_arg, 0);
1674 		return;
1675 	}
1676 
1677 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1678 	if (req == NULL) {
1679 		cb_fn(cb_arg, -ENOMEM);
1680 		return;
1681 	}
1682 
1683 	args = &req->args;
1684 	args->fn.file_op = cb_fn;
1685 	args->arg = cb_arg;
1686 	args->file = file;
1687 	args->op.truncate.length = length;
1688 	fs = file->fs;
1689 
1690 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1691 
1692 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1693 }
1694 
1695 static void
1696 __truncate(void *arg)
1697 {
1698 	struct spdk_fs_request *req = arg;
1699 	struct spdk_fs_cb_args *args = &req->args;
1700 
1701 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1702 				 args->fn.file_op, args);
1703 }
1704 
1705 int
1706 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1707 		   uint64_t length)
1708 {
1709 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1710 	struct spdk_fs_request *req;
1711 	struct spdk_fs_cb_args *args;
1712 	int rc;
1713 
1714 	req = alloc_fs_request(channel);
1715 	if (req == NULL) {
1716 		return -ENOMEM;
1717 	}
1718 
1719 	args = &req->args;
1720 
1721 	args->file = file;
1722 	args->op.truncate.length = length;
1723 	args->fn.file_op = __wake_caller;
1724 	args->sem = &channel->sem;
1725 
1726 	channel->send_request(__truncate, req);
1727 	sem_wait(&channel->sem);
1728 	rc = args->rc;
1729 	free_fs_request(req);
1730 
1731 	return rc;
1732 }
1733 
1734 static void
1735 __rw_done(void *ctx, int bserrno)
1736 {
1737 	struct spdk_fs_request *req = ctx;
1738 	struct spdk_fs_cb_args *args = &req->args;
1739 
1740 	spdk_free(args->op.rw.pin_buf);
1741 	args->fn.file_op(args->arg, bserrno);
1742 	free_fs_request(req);
1743 }
1744 
1745 static void
1746 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1747 {
1748 	int i;
1749 	size_t len;
1750 
1751 	for (i = 0; i < iovcnt; i++) {
1752 		len = spdk_min(iovs[i].iov_len, buf_len);
1753 		memcpy(buf, iovs[i].iov_base, len);
1754 		buf += len;
1755 		assert(buf_len >= len);
1756 		buf_len -= len;
1757 	}
1758 }
1759 
1760 static void
1761 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1762 {
1763 	int i;
1764 	size_t len;
1765 
1766 	for (i = 0; i < iovcnt; i++) {
1767 		len = spdk_min(iovs[i].iov_len, buf_len);
1768 		memcpy(iovs[i].iov_base, buf, len);
1769 		buf += len;
1770 		assert(buf_len >= len);
1771 		buf_len -= len;
1772 	}
1773 }
1774 
1775 static void
1776 __read_done(void *ctx, int bserrno)
1777 {
1778 	struct spdk_fs_request *req = ctx;
1779 	struct spdk_fs_cb_args *args = &req->args;
1780 	void *buf;
1781 
1782 	assert(req != NULL);
1783 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1784 	if (args->op.rw.is_read) {
1785 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1786 		__rw_done(req, 0);
1787 	} else {
1788 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1789 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1790 				   args->op.rw.pin_buf,
1791 				   args->op.rw.start_lba, args->op.rw.num_lba,
1792 				   __rw_done, req);
1793 	}
1794 }
1795 
1796 static void
1797 __do_blob_read(void *ctx, int fserrno)
1798 {
1799 	struct spdk_fs_request *req = ctx;
1800 	struct spdk_fs_cb_args *args = &req->args;
1801 
1802 	if (fserrno) {
1803 		__rw_done(req, fserrno);
1804 		return;
1805 	}
1806 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1807 			  args->op.rw.pin_buf,
1808 			  args->op.rw.start_lba, args->op.rw.num_lba,
1809 			  __read_done, req);
1810 }
1811 
1812 static void
1813 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1814 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1815 {
1816 	uint64_t end_lba;
1817 
1818 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1819 	*start_lba = offset / *lba_size;
1820 	end_lba = (offset + length - 1) / *lba_size;
1821 	*num_lba = (end_lba - *start_lba + 1);
1822 }
1823 
1824 static bool
1825 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1826 {
1827 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1828 
1829 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1830 		return true;
1831 	}
1832 
1833 	return false;
1834 }
1835 
1836 static void
1837 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1838 {
1839 	uint32_t i;
1840 
1841 	for (i = 0; i < iovcnt; i++) {
1842 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1843 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1844 	}
1845 }
1846 
1847 static void
1848 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1849 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1850 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1851 {
1852 	struct spdk_fs_request *req;
1853 	struct spdk_fs_cb_args *args;
1854 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1855 	uint64_t start_lba, num_lba, pin_buf_length;
1856 	uint32_t lba_size;
1857 
1858 	if (is_read && offset + length > file->length) {
1859 		cb_fn(cb_arg, -EINVAL);
1860 		return;
1861 	}
1862 
1863 	req = alloc_fs_request_with_iov(channel, iovcnt);
1864 	if (req == NULL) {
1865 		cb_fn(cb_arg, -ENOMEM);
1866 		return;
1867 	}
1868 
1869 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1870 
1871 	args = &req->args;
1872 	args->fn.file_op = cb_fn;
1873 	args->arg = cb_arg;
1874 	args->file = file;
1875 	args->op.rw.channel = channel->bs_channel;
1876 	_fs_request_setup_iovs(req, iovs, iovcnt);
1877 	args->op.rw.is_read = is_read;
1878 	args->op.rw.offset = offset;
1879 	args->op.rw.blocklen = lba_size;
1880 
1881 	pin_buf_length = num_lba * lba_size;
1882 	args->op.rw.length = pin_buf_length;
1883 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1884 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1885 	if (args->op.rw.pin_buf == NULL) {
1886 		SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1887 			      file->name, offset, length);
1888 		free_fs_request(req);
1889 		cb_fn(cb_arg, -ENOMEM);
1890 		return;
1891 	}
1892 
1893 	args->op.rw.start_lba = start_lba;
1894 	args->op.rw.num_lba = num_lba;
1895 
1896 	if (!is_read && file->length < offset + length) {
1897 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1898 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1899 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1900 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1901 				   args->op.rw.pin_buf,
1902 				   args->op.rw.start_lba, args->op.rw.num_lba,
1903 				   __rw_done, req);
1904 	} else {
1905 		__do_blob_read(req, 0);
1906 	}
1907 }
1908 
1909 static void
1910 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1911 	    void *payload, uint64_t offset, uint64_t length,
1912 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1913 {
1914 	struct iovec iov;
1915 
1916 	iov.iov_base = payload;
1917 	iov.iov_len = (size_t)length;
1918 
1919 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1920 }
1921 
1922 void
1923 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1924 		      void *payload, uint64_t offset, uint64_t length,
1925 		      spdk_file_op_complete cb_fn, void *cb_arg)
1926 {
1927 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1928 }
1929 
1930 void
1931 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1932 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1933 		       spdk_file_op_complete cb_fn, void *cb_arg)
1934 {
1935 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1936 		      file->name, offset, length);
1937 
1938 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1939 }
1940 
1941 void
1942 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1943 		     void *payload, uint64_t offset, uint64_t length,
1944 		     spdk_file_op_complete cb_fn, void *cb_arg)
1945 {
1946 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1947 		      file->name, offset, length);
1948 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1949 }
1950 
1951 void
1952 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1953 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1954 		      spdk_file_op_complete cb_fn, void *cb_arg)
1955 {
1956 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1957 		      file->name, offset, length);
1958 
1959 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1960 }
1961 
1962 struct spdk_io_channel *
1963 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1964 {
1965 	struct spdk_io_channel *io_channel;
1966 	struct spdk_fs_channel *fs_channel;
1967 
1968 	io_channel = spdk_get_io_channel(&fs->io_target);
1969 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1970 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1971 	fs_channel->send_request = __send_request_direct;
1972 
1973 	return io_channel;
1974 }
1975 
1976 void
1977 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1978 {
1979 	spdk_put_io_channel(channel);
1980 }
1981 
1982 struct spdk_fs_thread_ctx *
1983 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1984 {
1985 	struct spdk_fs_thread_ctx *ctx;
1986 
1987 	ctx = calloc(1, sizeof(*ctx));
1988 	if (!ctx) {
1989 		return NULL;
1990 	}
1991 
1992 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
1993 		free(ctx);
1994 		return NULL;
1995 	}
1996 
1997 	fs_channel_create(fs, &ctx->ch, 512);
1998 
1999 	ctx->ch.send_request = fs->send_request;
2000 	ctx->ch.sync = 1;
2001 
2002 	return ctx;
2003 }
2004 
2005 
2006 void
2007 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
2008 {
2009 	assert(ctx->ch.sync == 1);
2010 
2011 	while (true) {
2012 		pthread_spin_lock(&ctx->ch.lock);
2013 		if (ctx->ch.outstanding_reqs == 0) {
2014 			pthread_spin_unlock(&ctx->ch.lock);
2015 			break;
2016 		}
2017 		pthread_spin_unlock(&ctx->ch.lock);
2018 		usleep(1000);
2019 	}
2020 
2021 	fs_channel_destroy(NULL, &ctx->ch);
2022 	free(ctx);
2023 }
2024 
2025 int
2026 spdk_fs_set_cache_size(uint64_t size_in_mb)
2027 {
2028 	/* setting g_fs_cache_size is only permitted if cache pool
2029 	 * is already freed or hasn't been initialized
2030 	 */
2031 	if (g_cache_pool != NULL) {
2032 		return -EPERM;
2033 	}
2034 
2035 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2036 
2037 	return 0;
2038 }
2039 
2040 uint64_t
2041 spdk_fs_get_cache_size(void)
2042 {
2043 	return g_fs_cache_size / (1024 * 1024);
2044 }
2045 
2046 static void __file_flush(void *ctx);
2047 
2048 /* Try to free some cache buffers from this file.
2049  */
2050 static int
2051 reclaim_cache_buffers(struct spdk_file *file)
2052 {
2053 	int rc;
2054 
2055 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2056 
2057 	/* The function is safe to be called with any threads, while the file
2058 	 * lock maybe locked by other thread for now, so try to get the file
2059 	 * lock here.
2060 	 */
2061 	rc = pthread_spin_trylock(&file->lock);
2062 	if (rc != 0) {
2063 		return -1;
2064 	}
2065 
2066 	if (file->tree->present_mask == 0) {
2067 		pthread_spin_unlock(&file->lock);
2068 		return -1;
2069 	}
2070 	tree_free_buffers(file->tree);
2071 
2072 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2073 	/* If not freed, put it in the end of the queue */
2074 	if (file->tree->present_mask != 0) {
2075 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2076 	} else {
2077 		file->last = NULL;
2078 	}
2079 	pthread_spin_unlock(&file->lock);
2080 
2081 	return 0;
2082 }
2083 
2084 static int
2085 _blobfs_cache_pool_reclaim(void *arg)
2086 {
2087 	struct spdk_file *file, *tmp;
2088 	int rc;
2089 
2090 	if (!blobfs_cache_pool_need_reclaim()) {
2091 		return SPDK_POLLER_IDLE;
2092 	}
2093 
2094 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2095 		if (!file->open_for_writing &&
2096 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2097 			rc = reclaim_cache_buffers(file);
2098 			if (rc < 0) {
2099 				continue;
2100 			}
2101 			if (!blobfs_cache_pool_need_reclaim()) {
2102 				return SPDK_POLLER_BUSY;
2103 			}
2104 			break;
2105 		}
2106 	}
2107 
2108 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2109 		if (!file->open_for_writing) {
2110 			rc = reclaim_cache_buffers(file);
2111 			if (rc < 0) {
2112 				continue;
2113 			}
2114 			if (!blobfs_cache_pool_need_reclaim()) {
2115 				return SPDK_POLLER_BUSY;
2116 			}
2117 			break;
2118 		}
2119 	}
2120 
2121 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2122 		rc = reclaim_cache_buffers(file);
2123 		if (rc < 0) {
2124 			continue;
2125 		}
2126 		break;
2127 	}
2128 
2129 	return SPDK_POLLER_BUSY;
2130 }
2131 
2132 static void
2133 _add_file_to_cache_pool(void *ctx)
2134 {
2135 	struct spdk_file *file = ctx;
2136 
2137 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2138 }
2139 
2140 static void
2141 _remove_file_from_cache_pool(void *ctx)
2142 {
2143 	struct spdk_file *file = ctx;
2144 
2145 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2146 }
2147 
2148 static struct cache_buffer *
2149 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2150 {
2151 	struct cache_buffer *buf;
2152 	int count = 0;
2153 	bool need_update = false;
2154 
2155 	buf = calloc(1, sizeof(*buf));
2156 	if (buf == NULL) {
2157 		SPDK_DEBUGLOG(blobfs, "calloc failed\n");
2158 		return NULL;
2159 	}
2160 
2161 	do {
2162 		buf->buf = spdk_mempool_get(g_cache_pool);
2163 		if (buf->buf) {
2164 			break;
2165 		}
2166 		if (count++ == 100) {
2167 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2168 				    file, offset);
2169 			free(buf);
2170 			return NULL;
2171 		}
2172 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2173 	} while (true);
2174 
2175 	buf->buf_size = CACHE_BUFFER_SIZE;
2176 	buf->offset = offset;
2177 
2178 	if (file->tree->present_mask == 0) {
2179 		need_update = true;
2180 	}
2181 	file->tree = tree_insert_buffer(file->tree, buf);
2182 
2183 	if (need_update) {
2184 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2185 	}
2186 
2187 	return buf;
2188 }
2189 
2190 static struct cache_buffer *
2191 cache_append_buffer(struct spdk_file *file)
2192 {
2193 	struct cache_buffer *last;
2194 
2195 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2196 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2197 
2198 	last = cache_insert_buffer(file, file->append_pos);
2199 	if (last == NULL) {
2200 		SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n");
2201 		return NULL;
2202 	}
2203 
2204 	file->last = last;
2205 
2206 	return last;
2207 }
2208 
2209 static void __check_sync_reqs(struct spdk_file *file);
2210 
2211 static void
2212 __file_cache_finish_sync(void *ctx, int bserrno)
2213 {
2214 	struct spdk_file *file;
2215 	struct spdk_fs_request *sync_req = ctx;
2216 	struct spdk_fs_cb_args *sync_args;
2217 
2218 	sync_args = &sync_req->args;
2219 	file = sync_args->file;
2220 	pthread_spin_lock(&file->lock);
2221 	file->length_xattr = sync_args->op.sync.length;
2222 	assert(sync_args->op.sync.offset <= file->length_flushed);
2223 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2224 			  0, file->trace_arg_name);
2225 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2226 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2227 	pthread_spin_unlock(&file->lock);
2228 
2229 	sync_args->fn.file_op(sync_args->arg, bserrno);
2230 
2231 	free_fs_request(sync_req);
2232 	__check_sync_reqs(file);
2233 }
2234 
2235 static void
2236 __check_sync_reqs(struct spdk_file *file)
2237 {
2238 	struct spdk_fs_request *sync_req;
2239 
2240 	pthread_spin_lock(&file->lock);
2241 
2242 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2243 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2244 			break;
2245 		}
2246 	}
2247 
2248 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2249 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2250 		sync_req->args.op.sync.xattr_in_progress = true;
2251 		sync_req->args.op.sync.length = file->length_flushed;
2252 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2253 				    sizeof(file->length_flushed));
2254 
2255 		pthread_spin_unlock(&file->lock);
2256 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2257 				  0, file->trace_arg_name);
2258 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2259 	} else {
2260 		pthread_spin_unlock(&file->lock);
2261 	}
2262 }
2263 
2264 static void
2265 __file_flush_done(void *ctx, int bserrno)
2266 {
2267 	struct spdk_fs_request *req = ctx;
2268 	struct spdk_fs_cb_args *args = &req->args;
2269 	struct spdk_file *file = args->file;
2270 	struct cache_buffer *next = args->op.flush.cache_buffer;
2271 
2272 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2273 
2274 	pthread_spin_lock(&file->lock);
2275 	next->in_progress = false;
2276 	next->bytes_flushed += args->op.flush.length;
2277 	file->length_flushed += args->op.flush.length;
2278 	if (file->length_flushed > file->length) {
2279 		file->length = file->length_flushed;
2280 	}
2281 	if (next->bytes_flushed == next->buf_size) {
2282 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2283 		next = tree_find_buffer(file->tree, file->length_flushed);
2284 	}
2285 
2286 	/*
2287 	 * Assert that there is no cached data that extends past the end of the underlying
2288 	 *  blob.
2289 	 */
2290 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2291 	       next->bytes_filled == 0);
2292 
2293 	pthread_spin_unlock(&file->lock);
2294 
2295 	__check_sync_reqs(file);
2296 
2297 	__file_flush(req);
2298 }
2299 
2300 static void
2301 __file_flush(void *ctx)
2302 {
2303 	struct spdk_fs_request *req = ctx;
2304 	struct spdk_fs_cb_args *args = &req->args;
2305 	struct spdk_file *file = args->file;
2306 	struct cache_buffer *next;
2307 	uint64_t offset, length, start_lba, num_lba;
2308 	uint32_t lba_size;
2309 
2310 	pthread_spin_lock(&file->lock);
2311 	next = tree_find_buffer(file->tree, file->length_flushed);
2312 	if (next == NULL || next->in_progress ||
2313 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2314 		/*
2315 		 * There is either no data to flush, a flush I/O is already in
2316 		 *  progress, or the next buffer is partially filled but there's no
2317 		 *  outstanding request to sync it.
2318 		 * So return immediately - if a flush I/O is in progress we will flush
2319 		 *  more data after that is completed, or a partial buffer will get flushed
2320 		 *  when it is either filled or the file is synced.
2321 		 */
2322 		free_fs_request(req);
2323 		if (next == NULL) {
2324 			/*
2325 			 * For cases where a file's cache was evicted, and then the
2326 			 *  file was later appended, we will write the data directly
2327 			 *  to disk and bypass cache.  So just update length_flushed
2328 			 *  here to reflect that all data was already written to disk.
2329 			 */
2330 			file->length_flushed = file->append_pos;
2331 		}
2332 		pthread_spin_unlock(&file->lock);
2333 		if (next == NULL) {
2334 			/*
2335 			 * There is no data to flush, but we still need to check for any
2336 			 *  outstanding sync requests to make sure metadata gets updated.
2337 			 */
2338 			__check_sync_reqs(file);
2339 		}
2340 		return;
2341 	}
2342 
2343 	offset = next->offset + next->bytes_flushed;
2344 	length = next->bytes_filled - next->bytes_flushed;
2345 	if (length == 0) {
2346 		free_fs_request(req);
2347 		pthread_spin_unlock(&file->lock);
2348 		/*
2349 		 * There is no data to flush, but we still need to check for any
2350 		 *  outstanding sync requests to make sure metadata gets updated.
2351 		 */
2352 		__check_sync_reqs(file);
2353 		return;
2354 	}
2355 	args->op.flush.length = length;
2356 	args->op.flush.cache_buffer = next;
2357 
2358 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2359 
2360 	next->in_progress = true;
2361 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2362 		     offset, length, start_lba, num_lba);
2363 	pthread_spin_unlock(&file->lock);
2364 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2365 			   next->buf + (start_lba * lba_size) - next->offset,
2366 			   start_lba, num_lba, __file_flush_done, req);
2367 }
2368 
2369 static void
2370 __file_extend_done(void *arg, int bserrno)
2371 {
2372 	struct spdk_fs_cb_args *args = arg;
2373 
2374 	__wake_caller(args, bserrno);
2375 }
2376 
2377 static void
2378 __file_extend_resize_cb(void *_args, int bserrno)
2379 {
2380 	struct spdk_fs_cb_args *args = _args;
2381 	struct spdk_file *file = args->file;
2382 
2383 	if (bserrno) {
2384 		__wake_caller(args, bserrno);
2385 		return;
2386 	}
2387 
2388 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2389 }
2390 
2391 static void
2392 __file_extend_blob(void *_args)
2393 {
2394 	struct spdk_fs_cb_args *args = _args;
2395 	struct spdk_file *file = args->file;
2396 
2397 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2398 }
2399 
2400 static void
2401 __rw_from_file_done(void *ctx, int bserrno)
2402 {
2403 	struct spdk_fs_request *req = ctx;
2404 
2405 	__wake_caller(&req->args, bserrno);
2406 	free_fs_request(req);
2407 }
2408 
2409 static void
2410 __rw_from_file(void *ctx)
2411 {
2412 	struct spdk_fs_request *req = ctx;
2413 	struct spdk_fs_cb_args *args = &req->args;
2414 	struct spdk_file *file = args->file;
2415 
2416 	if (args->op.rw.is_read) {
2417 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2418 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2419 				     __rw_from_file_done, req);
2420 	} else {
2421 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2422 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2423 				      __rw_from_file_done, req);
2424 	}
2425 }
2426 
2427 struct rw_from_file_arg {
2428 	struct spdk_fs_channel *channel;
2429 	int rwerrno;
2430 };
2431 
2432 static int
2433 __send_rw_from_file(struct spdk_file *file, void *payload,
2434 		    uint64_t offset, uint64_t length, bool is_read,
2435 		    struct rw_from_file_arg *arg)
2436 {
2437 	struct spdk_fs_request *req;
2438 	struct spdk_fs_cb_args *args;
2439 
2440 	req = alloc_fs_request_with_iov(arg->channel, 1);
2441 	if (req == NULL) {
2442 		sem_post(&arg->channel->sem);
2443 		return -ENOMEM;
2444 	}
2445 
2446 	args = &req->args;
2447 	args->file = file;
2448 	args->sem = &arg->channel->sem;
2449 	args->iovs[0].iov_base = payload;
2450 	args->iovs[0].iov_len = (size_t)length;
2451 	args->op.rw.offset = offset;
2452 	args->op.rw.is_read = is_read;
2453 	args->rwerrno = &arg->rwerrno;
2454 	file->fs->send_request(__rw_from_file, req);
2455 	return 0;
2456 }
2457 
2458 int
2459 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2460 		void *payload, uint64_t offset, uint64_t length)
2461 {
2462 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2463 	struct spdk_fs_request *flush_req;
2464 	uint64_t rem_length, copy, blob_size, cluster_sz;
2465 	uint32_t cache_buffers_filled = 0;
2466 	uint8_t *cur_payload;
2467 	struct cache_buffer *last;
2468 
2469 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2470 
2471 	if (length == 0) {
2472 		return 0;
2473 	}
2474 
2475 	if (offset != file->append_pos) {
2476 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2477 		return -EINVAL;
2478 	}
2479 
2480 	pthread_spin_lock(&file->lock);
2481 	file->open_for_writing = true;
2482 
2483 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2484 		cache_append_buffer(file);
2485 	}
2486 
2487 	if (file->last == NULL) {
2488 		struct rw_from_file_arg arg = {};
2489 		int rc;
2490 
2491 		arg.channel = channel;
2492 		arg.rwerrno = 0;
2493 		file->append_pos += length;
2494 		pthread_spin_unlock(&file->lock);
2495 		rc = __send_rw_from_file(file, payload, offset, length, false, &arg);
2496 		if (rc != 0) {
2497 			return rc;
2498 		}
2499 		sem_wait(&channel->sem);
2500 		return arg.rwerrno;
2501 	}
2502 
2503 	blob_size = __file_get_blob_size(file);
2504 
2505 	if ((offset + length) > blob_size) {
2506 		struct spdk_fs_cb_args extend_args = {};
2507 
2508 		cluster_sz = file->fs->bs_opts.cluster_sz;
2509 		extend_args.sem = &channel->sem;
2510 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2511 		extend_args.file = file;
2512 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2513 		pthread_spin_unlock(&file->lock);
2514 		file->fs->send_request(__file_extend_blob, &extend_args);
2515 		sem_wait(&channel->sem);
2516 		if (extend_args.rc) {
2517 			return extend_args.rc;
2518 		}
2519 	}
2520 
2521 	flush_req = alloc_fs_request(channel);
2522 	if (flush_req == NULL) {
2523 		pthread_spin_unlock(&file->lock);
2524 		return -ENOMEM;
2525 	}
2526 
2527 	last = file->last;
2528 	rem_length = length;
2529 	cur_payload = payload;
2530 	while (rem_length > 0) {
2531 		copy = last->buf_size - last->bytes_filled;
2532 		if (copy > rem_length) {
2533 			copy = rem_length;
2534 		}
2535 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2536 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2537 		file->append_pos += copy;
2538 		if (file->length < file->append_pos) {
2539 			file->length = file->append_pos;
2540 		}
2541 		cur_payload += copy;
2542 		last->bytes_filled += copy;
2543 		rem_length -= copy;
2544 		if (last->bytes_filled == last->buf_size) {
2545 			cache_buffers_filled++;
2546 			last = cache_append_buffer(file);
2547 			if (last == NULL) {
2548 				BLOBFS_TRACE(file, "nomem\n");
2549 				free_fs_request(flush_req);
2550 				pthread_spin_unlock(&file->lock);
2551 				return -ENOMEM;
2552 			}
2553 		}
2554 	}
2555 
2556 	pthread_spin_unlock(&file->lock);
2557 
2558 	if (cache_buffers_filled == 0) {
2559 		free_fs_request(flush_req);
2560 		return 0;
2561 	}
2562 
2563 	flush_req->args.file = file;
2564 	file->fs->send_request(__file_flush, flush_req);
2565 	return 0;
2566 }
2567 
2568 static void
2569 __readahead_done(void *ctx, int bserrno)
2570 {
2571 	struct spdk_fs_request *req = ctx;
2572 	struct spdk_fs_cb_args *args = &req->args;
2573 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2574 	struct spdk_file *file = args->file;
2575 
2576 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2577 
2578 	pthread_spin_lock(&file->lock);
2579 	cache_buffer->bytes_filled = args->op.readahead.length;
2580 	cache_buffer->bytes_flushed = args->op.readahead.length;
2581 	cache_buffer->in_progress = false;
2582 	pthread_spin_unlock(&file->lock);
2583 
2584 	free_fs_request(req);
2585 }
2586 
2587 static void
2588 __readahead(void *ctx)
2589 {
2590 	struct spdk_fs_request *req = ctx;
2591 	struct spdk_fs_cb_args *args = &req->args;
2592 	struct spdk_file *file = args->file;
2593 	uint64_t offset, length, start_lba, num_lba;
2594 	uint32_t lba_size;
2595 
2596 	offset = args->op.readahead.offset;
2597 	length = args->op.readahead.length;
2598 	assert(length > 0);
2599 
2600 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2601 
2602 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2603 		     offset, length, start_lba, num_lba);
2604 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2605 			  args->op.readahead.cache_buffer->buf,
2606 			  start_lba, num_lba, __readahead_done, req);
2607 }
2608 
2609 static uint64_t
2610 __next_cache_buffer_offset(uint64_t offset)
2611 {
2612 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2613 }
2614 
2615 static void
2616 check_readahead(struct spdk_file *file, uint64_t offset,
2617 		struct spdk_fs_channel *channel)
2618 {
2619 	struct spdk_fs_request *req;
2620 	struct spdk_fs_cb_args *args;
2621 
2622 	offset = __next_cache_buffer_offset(offset);
2623 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2624 		return;
2625 	}
2626 
2627 	req = alloc_fs_request(channel);
2628 	if (req == NULL) {
2629 		return;
2630 	}
2631 	args = &req->args;
2632 
2633 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2634 
2635 	args->file = file;
2636 	args->op.readahead.offset = offset;
2637 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2638 	if (!args->op.readahead.cache_buffer) {
2639 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2640 		free_fs_request(req);
2641 		return;
2642 	}
2643 
2644 	args->op.readahead.cache_buffer->in_progress = true;
2645 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2646 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2647 	} else {
2648 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2649 	}
2650 	file->fs->send_request(__readahead, req);
2651 }
2652 
2653 int64_t
2654 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2655 	       void *payload, uint64_t offset, uint64_t length)
2656 {
2657 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2658 	uint64_t final_offset, final_length;
2659 	uint32_t sub_reads = 0;
2660 	struct cache_buffer *buf;
2661 	uint64_t read_len;
2662 	struct rw_from_file_arg arg = {};
2663 
2664 	pthread_spin_lock(&file->lock);
2665 
2666 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2667 
2668 	file->open_for_writing = false;
2669 
2670 	if (length == 0 || offset >= file->append_pos) {
2671 		pthread_spin_unlock(&file->lock);
2672 		return 0;
2673 	}
2674 
2675 	if (offset + length > file->append_pos) {
2676 		length = file->append_pos - offset;
2677 	}
2678 
2679 	if (offset != file->next_seq_offset) {
2680 		file->seq_byte_count = 0;
2681 	}
2682 	file->seq_byte_count += length;
2683 	file->next_seq_offset = offset + length;
2684 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2685 		check_readahead(file, offset, channel);
2686 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2687 	}
2688 
2689 	arg.channel = channel;
2690 	arg.rwerrno = 0;
2691 	final_length = 0;
2692 	final_offset = offset + length;
2693 	while (offset < final_offset) {
2694 		int ret = 0;
2695 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2696 		if (length > (final_offset - offset)) {
2697 			length = final_offset - offset;
2698 		}
2699 
2700 		buf = tree_find_filled_buffer(file->tree, offset);
2701 		if (buf == NULL) {
2702 			pthread_spin_unlock(&file->lock);
2703 			ret = __send_rw_from_file(file, payload, offset, length, true, &arg);
2704 			pthread_spin_lock(&file->lock);
2705 			if (ret == 0) {
2706 				sub_reads++;
2707 			}
2708 		} else {
2709 			read_len = length;
2710 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2711 				read_len = buf->offset + buf->bytes_filled - offset;
2712 			}
2713 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2714 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2715 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2716 				tree_remove_buffer(file->tree, buf);
2717 				if (file->tree->present_mask == 0) {
2718 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2719 				}
2720 			}
2721 		}
2722 
2723 		if (ret == 0) {
2724 			final_length += length;
2725 		} else {
2726 			arg.rwerrno = ret;
2727 			break;
2728 		}
2729 		payload += length;
2730 		offset += length;
2731 	}
2732 	pthread_spin_unlock(&file->lock);
2733 	while (sub_reads > 0) {
2734 		sem_wait(&channel->sem);
2735 		sub_reads--;
2736 	}
2737 	if (arg.rwerrno == 0) {
2738 		return final_length;
2739 	} else {
2740 		return arg.rwerrno;
2741 	}
2742 }
2743 
2744 static void
2745 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2746 	   spdk_file_op_complete cb_fn, void *cb_arg)
2747 {
2748 	struct spdk_fs_request *sync_req;
2749 	struct spdk_fs_request *flush_req;
2750 	struct spdk_fs_cb_args *sync_args;
2751 	struct spdk_fs_cb_args *flush_args;
2752 
2753 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2754 
2755 	pthread_spin_lock(&file->lock);
2756 	if (file->append_pos <= file->length_xattr) {
2757 		BLOBFS_TRACE(file, "done - file already synced\n");
2758 		pthread_spin_unlock(&file->lock);
2759 		cb_fn(cb_arg, 0);
2760 		return;
2761 	}
2762 
2763 	sync_req = alloc_fs_request(channel);
2764 	if (!sync_req) {
2765 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2766 		pthread_spin_unlock(&file->lock);
2767 		cb_fn(cb_arg, -ENOMEM);
2768 		return;
2769 	}
2770 	sync_args = &sync_req->args;
2771 
2772 	flush_req = alloc_fs_request(channel);
2773 	if (!flush_req) {
2774 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2775 		free_fs_request(sync_req);
2776 		pthread_spin_unlock(&file->lock);
2777 		cb_fn(cb_arg, -ENOMEM);
2778 		return;
2779 	}
2780 	flush_args = &flush_req->args;
2781 
2782 	sync_args->file = file;
2783 	sync_args->fn.file_op = cb_fn;
2784 	sync_args->arg = cb_arg;
2785 	sync_args->op.sync.offset = file->append_pos;
2786 	sync_args->op.sync.xattr_in_progress = false;
2787 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2788 	pthread_spin_unlock(&file->lock);
2789 
2790 	flush_args->file = file;
2791 	channel->send_request(__file_flush, flush_req);
2792 }
2793 
2794 int
2795 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2796 {
2797 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2798 	struct spdk_fs_cb_args args = {};
2799 
2800 	args.sem = &channel->sem;
2801 	_file_sync(file, channel, __wake_caller, &args);
2802 	sem_wait(&channel->sem);
2803 
2804 	return args.rc;
2805 }
2806 
2807 void
2808 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2809 		     spdk_file_op_complete cb_fn, void *cb_arg)
2810 {
2811 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2812 
2813 	_file_sync(file, channel, cb_fn, cb_arg);
2814 }
2815 
2816 void
2817 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2818 {
2819 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2820 	file->priority = priority;
2821 
2822 }
2823 
2824 /*
2825  * Close routines
2826  */
2827 
2828 static void
2829 __file_close_async_done(void *ctx, int bserrno)
2830 {
2831 	struct spdk_fs_request *req = ctx;
2832 	struct spdk_fs_cb_args *args = &req->args;
2833 	struct spdk_file *file = args->file;
2834 
2835 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2836 
2837 	if (file->is_deleted) {
2838 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2839 		return;
2840 	}
2841 
2842 	args->fn.file_op(args->arg, bserrno);
2843 	free_fs_request(req);
2844 }
2845 
2846 static void
2847 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2848 {
2849 	struct spdk_blob *blob;
2850 
2851 	pthread_spin_lock(&file->lock);
2852 	if (file->ref_count == 0) {
2853 		pthread_spin_unlock(&file->lock);
2854 		__file_close_async_done(req, -EBADF);
2855 		return;
2856 	}
2857 
2858 	file->ref_count--;
2859 	if (file->ref_count > 0) {
2860 		pthread_spin_unlock(&file->lock);
2861 		req->args.fn.file_op(req->args.arg, 0);
2862 		free_fs_request(req);
2863 		return;
2864 	}
2865 
2866 	pthread_spin_unlock(&file->lock);
2867 
2868 	blob = file->blob;
2869 	file->blob = NULL;
2870 	spdk_blob_close(blob, __file_close_async_done, req);
2871 }
2872 
2873 static void
2874 __file_close_async__sync_done(void *arg, int fserrno)
2875 {
2876 	struct spdk_fs_request *req = arg;
2877 	struct spdk_fs_cb_args *args = &req->args;
2878 
2879 	__file_close_async(args->file, req);
2880 }
2881 
2882 void
2883 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2884 {
2885 	struct spdk_fs_request *req;
2886 	struct spdk_fs_cb_args *args;
2887 
2888 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2889 	if (req == NULL) {
2890 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2891 		cb_fn(cb_arg, -ENOMEM);
2892 		return;
2893 	}
2894 
2895 	args = &req->args;
2896 	args->file = file;
2897 	args->fn.file_op = cb_fn;
2898 	args->arg = cb_arg;
2899 
2900 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2901 }
2902 
2903 static void
2904 __file_close(void *arg)
2905 {
2906 	struct spdk_fs_request *req = arg;
2907 	struct spdk_fs_cb_args *args = &req->args;
2908 	struct spdk_file *file = args->file;
2909 
2910 	__file_close_async(file, req);
2911 }
2912 
2913 int
2914 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2915 {
2916 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2917 	struct spdk_fs_request *req;
2918 	struct spdk_fs_cb_args *args;
2919 
2920 	req = alloc_fs_request(channel);
2921 	if (req == NULL) {
2922 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2923 		return -ENOMEM;
2924 	}
2925 
2926 	args = &req->args;
2927 
2928 	spdk_file_sync(file, ctx);
2929 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2930 	args->file = file;
2931 	args->sem = &channel->sem;
2932 	args->fn.file_op = __wake_caller;
2933 	args->arg = args;
2934 	channel->send_request(__file_close, req);
2935 	sem_wait(&channel->sem);
2936 
2937 	return args->rc;
2938 }
2939 
2940 int
2941 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2942 {
2943 	if (size < sizeof(spdk_blob_id)) {
2944 		return -EINVAL;
2945 	}
2946 
2947 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2948 
2949 	return sizeof(spdk_blob_id);
2950 }
2951 
2952 static void
2953 _file_free(void *ctx)
2954 {
2955 	struct spdk_file *file = ctx;
2956 
2957 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2958 
2959 	free(file->name);
2960 	free(file->tree);
2961 	free(file);
2962 }
2963 
2964 static void
2965 file_free(struct spdk_file *file)
2966 {
2967 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2968 	pthread_spin_lock(&file->lock);
2969 	if (file->tree->present_mask == 0) {
2970 		pthread_spin_unlock(&file->lock);
2971 		free(file->name);
2972 		free(file->tree);
2973 		free(file);
2974 		return;
2975 	}
2976 
2977 	tree_free_buffers(file->tree);
2978 	assert(file->tree->present_mask == 0);
2979 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2980 	pthread_spin_unlock(&file->lock);
2981 }
2982 
2983 SPDK_LOG_REGISTER_COMPONENT(blobfs)
2984 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw)
2985