xref: /spdk/lib/blobfs/blobfs.c (revision 70c171602a12549fd300b89a7b9dbb2a4e630fad)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "tree.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk/log.h"
45 #include "spdk/trace.h"
46 
47 #include "spdk_internal/trace_defs.h"
48 
49 #define BLOBFS_TRACE(file, str, args...) \
50 	SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args)
51 
52 #define BLOBFS_TRACE_RW(file, str, args...) \
53 	SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args)
54 
55 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
56 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
57 
58 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
59 
60 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
61 static struct spdk_mempool *g_cache_pool;
62 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches);
63 static struct spdk_poller *g_cache_pool_mgmt_poller;
64 static struct spdk_thread *g_cache_pool_thread;
65 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
66 static int g_fs_count = 0;
67 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
68 
69 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
70 {
71 	struct spdk_trace_tpoint_opts opts[] = {
72 		{
73 			"BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START,
74 			OWNER_NONE, OBJECT_NONE, 0,
75 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
76 		},
77 		{
78 			"BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END,
79 			OWNER_NONE, OBJECT_NONE, 0,
80 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
81 		},
82 		{
83 			"BLOBFS_OPEN", TRACE_BLOBFS_OPEN,
84 			OWNER_NONE, OBJECT_NONE, 0,
85 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
86 		},
87 		{
88 			"BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE,
89 			OWNER_NONE, OBJECT_NONE, 0,
90 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
91 		},
92 		{
93 			"BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START,
94 			OWNER_NONE, OBJECT_NONE, 0,
95 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
96 		},
97 		{
98 			"BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE,
99 			OWNER_NONE, OBJECT_NONE, 0,
100 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
101 		}
102 	};
103 
104 	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
105 }
106 
107 void
108 cache_buffer_free(struct cache_buffer *cache_buffer)
109 {
110 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
111 	free(cache_buffer);
112 }
113 
114 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
115 
116 struct spdk_file {
117 	struct spdk_filesystem	*fs;
118 	struct spdk_blob	*blob;
119 	char			*name;
120 	uint64_t		length;
121 	bool                    is_deleted;
122 	bool			open_for_writing;
123 	uint64_t		length_flushed;
124 	uint64_t		length_xattr;
125 	uint64_t		append_pos;
126 	uint64_t		seq_byte_count;
127 	uint64_t		next_seq_offset;
128 	uint32_t		priority;
129 	TAILQ_ENTRY(spdk_file)	tailq;
130 	spdk_blob_id		blobid;
131 	uint32_t		ref_count;
132 	pthread_spinlock_t	lock;
133 	struct cache_buffer	*last;
134 	struct cache_tree	*tree;
135 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
136 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
137 	TAILQ_ENTRY(spdk_file)	cache_tailq;
138 };
139 
140 struct spdk_deleted_file {
141 	spdk_blob_id	id;
142 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
143 };
144 
145 struct spdk_filesystem {
146 	struct spdk_blob_store	*bs;
147 	TAILQ_HEAD(, spdk_file)	files;
148 	struct spdk_bs_opts	bs_opts;
149 	struct spdk_bs_dev	*bdev;
150 	fs_send_request_fn	send_request;
151 
152 	struct {
153 		uint32_t		max_ops;
154 		struct spdk_io_channel	*sync_io_channel;
155 		struct spdk_fs_channel	*sync_fs_channel;
156 	} sync_target;
157 
158 	struct {
159 		uint32_t		max_ops;
160 		struct spdk_io_channel	*md_io_channel;
161 		struct spdk_fs_channel	*md_fs_channel;
162 	} md_target;
163 
164 	struct {
165 		uint32_t		max_ops;
166 	} io_target;
167 };
168 
169 struct spdk_fs_cb_args {
170 	union {
171 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
172 		spdk_fs_op_complete			fs_op;
173 		spdk_file_op_with_handle_complete	file_op_with_handle;
174 		spdk_file_op_complete			file_op;
175 		spdk_file_stat_op_complete		stat_op;
176 	} fn;
177 	void *arg;
178 	sem_t *sem;
179 	struct spdk_filesystem *fs;
180 	struct spdk_file *file;
181 	int rc;
182 	int *rwerrno;
183 	struct iovec *iovs;
184 	uint32_t iovcnt;
185 	struct iovec iov;
186 	union {
187 		struct {
188 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
189 		} fs_load;
190 		struct {
191 			uint64_t	length;
192 		} truncate;
193 		struct {
194 			struct spdk_io_channel	*channel;
195 			void		*pin_buf;
196 			int		is_read;
197 			off_t		offset;
198 			size_t		length;
199 			uint64_t	start_lba;
200 			uint64_t	num_lba;
201 			uint32_t	blocklen;
202 		} rw;
203 		struct {
204 			const char	*old_name;
205 			const char	*new_name;
206 		} rename;
207 		struct {
208 			struct cache_buffer	*cache_buffer;
209 			uint64_t		length;
210 		} flush;
211 		struct {
212 			struct cache_buffer	*cache_buffer;
213 			uint64_t		length;
214 			uint64_t		offset;
215 		} readahead;
216 		struct {
217 			/* offset of the file when the sync request was made */
218 			uint64_t			offset;
219 			TAILQ_ENTRY(spdk_fs_request)	tailq;
220 			bool				xattr_in_progress;
221 			/* length written to the xattr for this file - this should
222 			 * always be the same as the offset if only one thread is
223 			 * writing to the file, but could differ if multiple threads
224 			 * are appending
225 			 */
226 			uint64_t			length;
227 		} sync;
228 		struct {
229 			uint32_t			num_clusters;
230 		} resize;
231 		struct {
232 			const char	*name;
233 			uint32_t	flags;
234 			TAILQ_ENTRY(spdk_fs_request)	tailq;
235 		} open;
236 		struct {
237 			const char		*name;
238 			struct spdk_blob	*blob;
239 		} create;
240 		struct {
241 			const char	*name;
242 		} delete;
243 		struct {
244 			const char	*name;
245 		} stat;
246 	} op;
247 };
248 
249 static void file_free(struct spdk_file *file);
250 static void fs_io_device_unregister(struct spdk_filesystem *fs);
251 static void fs_free_io_channels(struct spdk_filesystem *fs);
252 
253 void
254 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
255 {
256 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
257 }
258 
259 static int _blobfs_cache_pool_reclaim(void *arg);
260 
261 static bool
262 blobfs_cache_pool_need_reclaim(void)
263 {
264 	size_t count;
265 
266 	count = spdk_mempool_count(g_cache_pool);
267 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
268 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
269 	 */
270 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
271 		return false;
272 	}
273 
274 	return true;
275 }
276 
277 static void
278 __start_cache_pool_mgmt(void *ctx)
279 {
280 	assert(g_cache_pool == NULL);
281 
282 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
283 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
284 					   CACHE_BUFFER_SIZE,
285 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
286 					   SPDK_ENV_SOCKET_ID_ANY);
287 	if (!g_cache_pool) {
288 		SPDK_ERRLOG("Create mempool failed, you may "
289 			    "increase the memory and try again\n");
290 		assert(false);
291 	}
292 
293 	assert(g_cache_pool_mgmt_poller == NULL);
294 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
295 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
296 }
297 
298 static void
299 __stop_cache_pool_mgmt(void *ctx)
300 {
301 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
302 
303 	assert(g_cache_pool != NULL);
304 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
305 	spdk_mempool_free(g_cache_pool);
306 	g_cache_pool = NULL;
307 
308 	spdk_thread_exit(g_cache_pool_thread);
309 }
310 
311 static void
312 initialize_global_cache(void)
313 {
314 	pthread_mutex_lock(&g_cache_init_lock);
315 	if (g_fs_count == 0) {
316 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
317 		assert(g_cache_pool_thread != NULL);
318 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
319 	}
320 	g_fs_count++;
321 	pthread_mutex_unlock(&g_cache_init_lock);
322 }
323 
324 static void
325 free_global_cache(void)
326 {
327 	pthread_mutex_lock(&g_cache_init_lock);
328 	g_fs_count--;
329 	if (g_fs_count == 0) {
330 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
331 	}
332 	pthread_mutex_unlock(&g_cache_init_lock);
333 }
334 
335 static uint64_t
336 __file_get_blob_size(struct spdk_file *file)
337 {
338 	uint64_t cluster_sz;
339 
340 	cluster_sz = file->fs->bs_opts.cluster_sz;
341 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
342 }
343 
344 struct spdk_fs_request {
345 	struct spdk_fs_cb_args		args;
346 	TAILQ_ENTRY(spdk_fs_request)	link;
347 	struct spdk_fs_channel		*channel;
348 };
349 
350 struct spdk_fs_channel {
351 	struct spdk_fs_request		*req_mem;
352 	TAILQ_HEAD(, spdk_fs_request)	reqs;
353 	sem_t				sem;
354 	struct spdk_filesystem		*fs;
355 	struct spdk_io_channel		*bs_channel;
356 	fs_send_request_fn		send_request;
357 	bool				sync;
358 	uint32_t			outstanding_reqs;
359 	pthread_spinlock_t		lock;
360 };
361 
362 /* For now, this is effectively an alias. But eventually we'll shift
363  * some data members over. */
364 struct spdk_fs_thread_ctx {
365 	struct spdk_fs_channel	ch;
366 };
367 
368 static struct spdk_fs_request *
369 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
370 {
371 	struct spdk_fs_request *req;
372 	struct iovec *iovs = NULL;
373 
374 	if (iovcnt > 1) {
375 		iovs = calloc(iovcnt, sizeof(struct iovec));
376 		if (!iovs) {
377 			return NULL;
378 		}
379 	}
380 
381 	if (channel->sync) {
382 		pthread_spin_lock(&channel->lock);
383 	}
384 
385 	req = TAILQ_FIRST(&channel->reqs);
386 	if (req) {
387 		channel->outstanding_reqs++;
388 		TAILQ_REMOVE(&channel->reqs, req, link);
389 	}
390 
391 	if (channel->sync) {
392 		pthread_spin_unlock(&channel->lock);
393 	}
394 
395 	if (req == NULL) {
396 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
397 		free(iovs);
398 		return NULL;
399 	}
400 	memset(req, 0, sizeof(*req));
401 	req->channel = channel;
402 	if (iovcnt > 1) {
403 		req->args.iovs = iovs;
404 	} else {
405 		req->args.iovs = &req->args.iov;
406 	}
407 	req->args.iovcnt = iovcnt;
408 
409 	return req;
410 }
411 
412 static struct spdk_fs_request *
413 alloc_fs_request(struct spdk_fs_channel *channel)
414 {
415 	return alloc_fs_request_with_iov(channel, 0);
416 }
417 
418 static void
419 free_fs_request(struct spdk_fs_request *req)
420 {
421 	struct spdk_fs_channel *channel = req->channel;
422 
423 	if (req->args.iovcnt > 1) {
424 		free(req->args.iovs);
425 	}
426 
427 	if (channel->sync) {
428 		pthread_spin_lock(&channel->lock);
429 	}
430 
431 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
432 	channel->outstanding_reqs--;
433 
434 	if (channel->sync) {
435 		pthread_spin_unlock(&channel->lock);
436 	}
437 }
438 
439 static int
440 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
441 		  uint32_t max_ops)
442 {
443 	uint32_t i;
444 
445 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
446 	if (!channel->req_mem) {
447 		return -1;
448 	}
449 
450 	channel->outstanding_reqs = 0;
451 	TAILQ_INIT(&channel->reqs);
452 	sem_init(&channel->sem, 0, 0);
453 
454 	for (i = 0; i < max_ops; i++) {
455 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
456 	}
457 
458 	channel->fs = fs;
459 
460 	return 0;
461 }
462 
463 static int
464 fs_md_channel_create(void *io_device, void *ctx_buf)
465 {
466 	struct spdk_filesystem		*fs;
467 	struct spdk_fs_channel		*channel = ctx_buf;
468 
469 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
470 
471 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
472 }
473 
474 static int
475 fs_sync_channel_create(void *io_device, void *ctx_buf)
476 {
477 	struct spdk_filesystem		*fs;
478 	struct spdk_fs_channel		*channel = ctx_buf;
479 
480 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
481 
482 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
483 }
484 
485 static int
486 fs_io_channel_create(void *io_device, void *ctx_buf)
487 {
488 	struct spdk_filesystem		*fs;
489 	struct spdk_fs_channel		*channel = ctx_buf;
490 
491 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
492 
493 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
494 }
495 
496 static void
497 fs_channel_destroy(void *io_device, void *ctx_buf)
498 {
499 	struct spdk_fs_channel *channel = ctx_buf;
500 
501 	if (channel->outstanding_reqs > 0) {
502 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
503 			    channel->outstanding_reqs);
504 	}
505 
506 	free(channel->req_mem);
507 	if (channel->bs_channel != NULL) {
508 		spdk_bs_free_io_channel(channel->bs_channel);
509 	}
510 }
511 
512 static void
513 __send_request_direct(fs_request_fn fn, void *arg)
514 {
515 	fn(arg);
516 }
517 
518 static void
519 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
520 {
521 	fs->bs = bs;
522 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
523 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
524 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
525 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
526 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
527 
528 	initialize_global_cache();
529 }
530 
531 static void
532 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
533 {
534 	struct spdk_fs_request *req = ctx;
535 	struct spdk_fs_cb_args *args = &req->args;
536 	struct spdk_filesystem *fs = args->fs;
537 
538 	if (bserrno == 0) {
539 		common_fs_bs_init(fs, bs);
540 	} else {
541 		free(fs);
542 		fs = NULL;
543 	}
544 
545 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
546 	free_fs_request(req);
547 }
548 
549 static struct spdk_filesystem *
550 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
551 {
552 	struct spdk_filesystem *fs;
553 
554 	fs = calloc(1, sizeof(*fs));
555 	if (fs == NULL) {
556 		return NULL;
557 	}
558 
559 	fs->bdev = dev;
560 	fs->send_request = send_request_fn;
561 	TAILQ_INIT(&fs->files);
562 
563 	fs->md_target.max_ops = 512;
564 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
565 				sizeof(struct spdk_fs_channel), "blobfs_md");
566 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
567 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
568 
569 	fs->sync_target.max_ops = 512;
570 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
571 				sizeof(struct spdk_fs_channel), "blobfs_sync");
572 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
573 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
574 
575 	fs->io_target.max_ops = 512;
576 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
577 				sizeof(struct spdk_fs_channel), "blobfs_io");
578 
579 	return fs;
580 }
581 
582 static void
583 __wake_caller(void *arg, int fserrno)
584 {
585 	struct spdk_fs_cb_args *args = arg;
586 
587 	if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) {
588 		*(args->rwerrno) = fserrno;
589 	}
590 	args->rc = fserrno;
591 	sem_post(args->sem);
592 }
593 
594 void
595 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
596 	     fs_send_request_fn send_request_fn,
597 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
598 {
599 	struct spdk_filesystem *fs;
600 	struct spdk_fs_request *req;
601 	struct spdk_fs_cb_args *args;
602 	struct spdk_bs_opts opts = {};
603 
604 	fs = fs_alloc(dev, send_request_fn);
605 	if (fs == NULL) {
606 		cb_fn(cb_arg, NULL, -ENOMEM);
607 		return;
608 	}
609 
610 	req = alloc_fs_request(fs->md_target.md_fs_channel);
611 	if (req == NULL) {
612 		fs_free_io_channels(fs);
613 		fs_io_device_unregister(fs);
614 		cb_fn(cb_arg, NULL, -ENOMEM);
615 		return;
616 	}
617 
618 	args = &req->args;
619 	args->fn.fs_op_with_handle = cb_fn;
620 	args->arg = cb_arg;
621 	args->fs = fs;
622 
623 	spdk_bs_opts_init(&opts, sizeof(opts));
624 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
625 	if (opt) {
626 		opts.cluster_sz = opt->cluster_sz;
627 	}
628 	spdk_bs_init(dev, &opts, init_cb, req);
629 }
630 
631 static struct spdk_file *
632 file_alloc(struct spdk_filesystem *fs)
633 {
634 	struct spdk_file *file;
635 
636 	file = calloc(1, sizeof(*file));
637 	if (file == NULL) {
638 		return NULL;
639 	}
640 
641 	file->tree = calloc(1, sizeof(*file->tree));
642 	if (file->tree == NULL) {
643 		free(file);
644 		return NULL;
645 	}
646 
647 	if (pthread_spin_init(&file->lock, 0)) {
648 		free(file->tree);
649 		free(file);
650 		return NULL;
651 	}
652 
653 	file->fs = fs;
654 	TAILQ_INIT(&file->open_requests);
655 	TAILQ_INIT(&file->sync_requests);
656 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
657 	file->priority = SPDK_FILE_PRIORITY_LOW;
658 	return file;
659 }
660 
661 static void fs_load_done(void *ctx, int bserrno);
662 
663 static int
664 _handle_deleted_files(struct spdk_fs_request *req)
665 {
666 	struct spdk_fs_cb_args *args = &req->args;
667 	struct spdk_filesystem *fs = args->fs;
668 
669 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
670 		struct spdk_deleted_file *deleted_file;
671 
672 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
673 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
674 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
675 		free(deleted_file);
676 		return 0;
677 	}
678 
679 	return 1;
680 }
681 
682 static void
683 fs_load_done(void *ctx, int bserrno)
684 {
685 	struct spdk_fs_request *req = ctx;
686 	struct spdk_fs_cb_args *args = &req->args;
687 	struct spdk_filesystem *fs = args->fs;
688 
689 	/* The filesystem has been loaded.  Now check if there are any files that
690 	 *  were marked for deletion before last unload.  Do not complete the
691 	 *  fs_load callback until all of them have been deleted on disk.
692 	 */
693 	if (_handle_deleted_files(req) == 0) {
694 		/* We found a file that's been marked for deleting but not actually
695 		 *  deleted yet.  This function will get called again once the delete
696 		 *  operation is completed.
697 		 */
698 		return;
699 	}
700 
701 	args->fn.fs_op_with_handle(args->arg, fs, 0);
702 	free_fs_request(req);
703 
704 }
705 
706 static void
707 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
708 {
709 	struct spdk_fs_request *req = ctx;
710 	struct spdk_fs_cb_args *args = &req->args;
711 	struct spdk_filesystem *fs = args->fs;
712 	uint64_t *length;
713 	const char *name;
714 	uint32_t *is_deleted;
715 	size_t value_len;
716 
717 	if (rc < 0) {
718 		args->fn.fs_op_with_handle(args->arg, fs, rc);
719 		free_fs_request(req);
720 		return;
721 	}
722 
723 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
724 	if (rc < 0) {
725 		args->fn.fs_op_with_handle(args->arg, fs, rc);
726 		free_fs_request(req);
727 		return;
728 	}
729 
730 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
731 	if (rc < 0) {
732 		args->fn.fs_op_with_handle(args->arg, fs, rc);
733 		free_fs_request(req);
734 		return;
735 	}
736 
737 	assert(value_len == 8);
738 
739 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
740 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
741 	if (rc < 0) {
742 		struct spdk_file *f;
743 
744 		f = file_alloc(fs);
745 		if (f == NULL) {
746 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
747 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
748 			free_fs_request(req);
749 			return;
750 		}
751 
752 		f->name = strdup(name);
753 		f->blobid = spdk_blob_get_id(blob);
754 		f->length = *length;
755 		f->length_flushed = *length;
756 		f->length_xattr = *length;
757 		f->append_pos = *length;
758 		SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length);
759 	} else {
760 		struct spdk_deleted_file *deleted_file;
761 
762 		deleted_file = calloc(1, sizeof(*deleted_file));
763 		if (deleted_file == NULL) {
764 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
765 			free_fs_request(req);
766 			return;
767 		}
768 		deleted_file->id = spdk_blob_get_id(blob);
769 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
770 	}
771 }
772 
773 static void
774 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
775 {
776 	struct spdk_fs_request *req = ctx;
777 	struct spdk_fs_cb_args *args = &req->args;
778 	struct spdk_filesystem *fs = args->fs;
779 	struct spdk_bs_type bstype;
780 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
781 	static const struct spdk_bs_type zeros;
782 
783 	if (bserrno != 0) {
784 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
785 		free_fs_request(req);
786 		fs_free_io_channels(fs);
787 		fs_io_device_unregister(fs);
788 		return;
789 	}
790 
791 	bstype = spdk_bs_get_bstype(bs);
792 
793 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
794 		SPDK_DEBUGLOG(blobfs, "assigning bstype\n");
795 		spdk_bs_set_bstype(bs, blobfs_type);
796 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
797 		SPDK_ERRLOG("not blobfs\n");
798 		SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype));
799 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
800 		free_fs_request(req);
801 		fs_free_io_channels(fs);
802 		fs_io_device_unregister(fs);
803 		return;
804 	}
805 
806 	common_fs_bs_init(fs, bs);
807 	fs_load_done(req, 0);
808 }
809 
810 static void
811 fs_io_device_unregister(struct spdk_filesystem *fs)
812 {
813 	assert(fs != NULL);
814 	spdk_io_device_unregister(&fs->md_target, NULL);
815 	spdk_io_device_unregister(&fs->sync_target, NULL);
816 	spdk_io_device_unregister(&fs->io_target, NULL);
817 	free(fs);
818 }
819 
820 static void
821 fs_free_io_channels(struct spdk_filesystem *fs)
822 {
823 	assert(fs != NULL);
824 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
825 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
826 }
827 
828 void
829 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
830 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
831 {
832 	struct spdk_filesystem *fs;
833 	struct spdk_fs_cb_args *args;
834 	struct spdk_fs_request *req;
835 	struct spdk_bs_opts	bs_opts;
836 
837 	fs = fs_alloc(dev, send_request_fn);
838 	if (fs == NULL) {
839 		cb_fn(cb_arg, NULL, -ENOMEM);
840 		return;
841 	}
842 
843 	req = alloc_fs_request(fs->md_target.md_fs_channel);
844 	if (req == NULL) {
845 		fs_free_io_channels(fs);
846 		fs_io_device_unregister(fs);
847 		cb_fn(cb_arg, NULL, -ENOMEM);
848 		return;
849 	}
850 
851 	args = &req->args;
852 	args->fn.fs_op_with_handle = cb_fn;
853 	args->arg = cb_arg;
854 	args->fs = fs;
855 	TAILQ_INIT(&args->op.fs_load.deleted_files);
856 	spdk_bs_opts_init(&bs_opts, sizeof(bs_opts));
857 	bs_opts.iter_cb_fn = iter_cb;
858 	bs_opts.iter_cb_arg = req;
859 	spdk_bs_load(dev, &bs_opts, load_cb, req);
860 }
861 
862 static void
863 unload_cb(void *ctx, int bserrno)
864 {
865 	struct spdk_fs_request *req = ctx;
866 	struct spdk_fs_cb_args *args = &req->args;
867 	struct spdk_filesystem *fs = args->fs;
868 	struct spdk_file *file, *tmp;
869 
870 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
871 		TAILQ_REMOVE(&fs->files, file, tailq);
872 		file_free(file);
873 	}
874 
875 	free_global_cache();
876 
877 	args->fn.fs_op(args->arg, bserrno);
878 	free(req);
879 
880 	fs_io_device_unregister(fs);
881 }
882 
883 void
884 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
885 {
886 	struct spdk_fs_request *req;
887 	struct spdk_fs_cb_args *args;
888 
889 	/*
890 	 * We must free the md_channel before unloading the blobstore, so just
891 	 *  allocate this request from the general heap.
892 	 */
893 	req = calloc(1, sizeof(*req));
894 	if (req == NULL) {
895 		cb_fn(cb_arg, -ENOMEM);
896 		return;
897 	}
898 
899 	args = &req->args;
900 	args->fn.fs_op = cb_fn;
901 	args->arg = cb_arg;
902 	args->fs = fs;
903 
904 	fs_free_io_channels(fs);
905 	spdk_bs_unload(fs->bs, unload_cb, req);
906 }
907 
908 static struct spdk_file *
909 fs_find_file(struct spdk_filesystem *fs, const char *name)
910 {
911 	struct spdk_file *file;
912 
913 	TAILQ_FOREACH(file, &fs->files, tailq) {
914 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
915 			return file;
916 		}
917 	}
918 
919 	return NULL;
920 }
921 
922 void
923 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
924 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
925 {
926 	struct spdk_file_stat stat;
927 	struct spdk_file *f = NULL;
928 
929 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
930 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
931 		return;
932 	}
933 
934 	f = fs_find_file(fs, name);
935 	if (f != NULL) {
936 		stat.blobid = f->blobid;
937 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
938 		cb_fn(cb_arg, &stat, 0);
939 		return;
940 	}
941 
942 	cb_fn(cb_arg, NULL, -ENOENT);
943 }
944 
945 static void
946 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
947 {
948 	struct spdk_fs_request *req = arg;
949 	struct spdk_fs_cb_args *args = &req->args;
950 
951 	args->rc = fserrno;
952 	if (fserrno == 0) {
953 		memcpy(args->arg, stat, sizeof(*stat));
954 	}
955 	sem_post(args->sem);
956 }
957 
958 static void
959 __file_stat(void *arg)
960 {
961 	struct spdk_fs_request *req = arg;
962 	struct spdk_fs_cb_args *args = &req->args;
963 
964 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
965 				args->fn.stat_op, req);
966 }
967 
968 int
969 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
970 		  const char *name, struct spdk_file_stat *stat)
971 {
972 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
973 	struct spdk_fs_request *req;
974 	int rc;
975 
976 	req = alloc_fs_request(channel);
977 	if (req == NULL) {
978 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
979 		return -ENOMEM;
980 	}
981 
982 	req->args.fs = fs;
983 	req->args.op.stat.name = name;
984 	req->args.fn.stat_op = __copy_stat;
985 	req->args.arg = stat;
986 	req->args.sem = &channel->sem;
987 	channel->send_request(__file_stat, req);
988 	sem_wait(&channel->sem);
989 
990 	rc = req->args.rc;
991 	free_fs_request(req);
992 
993 	return rc;
994 }
995 
996 static void
997 fs_create_blob_close_cb(void *ctx, int bserrno)
998 {
999 	int rc;
1000 	struct spdk_fs_request *req = ctx;
1001 	struct spdk_fs_cb_args *args = &req->args;
1002 
1003 	rc = args->rc ? args->rc : bserrno;
1004 	args->fn.file_op(args->arg, rc);
1005 	free_fs_request(req);
1006 }
1007 
1008 static void
1009 fs_create_blob_resize_cb(void *ctx, int bserrno)
1010 {
1011 	struct spdk_fs_request *req = ctx;
1012 	struct spdk_fs_cb_args *args = &req->args;
1013 	struct spdk_file *f = args->file;
1014 	struct spdk_blob *blob = args->op.create.blob;
1015 	uint64_t length = 0;
1016 
1017 	args->rc = bserrno;
1018 	if (bserrno) {
1019 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1020 		return;
1021 	}
1022 
1023 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1024 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1025 
1026 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1027 }
1028 
1029 static void
1030 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1031 {
1032 	struct spdk_fs_request *req = ctx;
1033 	struct spdk_fs_cb_args *args = &req->args;
1034 
1035 	if (bserrno) {
1036 		args->fn.file_op(args->arg, bserrno);
1037 		free_fs_request(req);
1038 		return;
1039 	}
1040 
1041 	args->op.create.blob = blob;
1042 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1043 }
1044 
1045 static void
1046 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1047 {
1048 	struct spdk_fs_request *req = ctx;
1049 	struct spdk_fs_cb_args *args = &req->args;
1050 	struct spdk_file *f = args->file;
1051 
1052 	if (bserrno) {
1053 		args->fn.file_op(args->arg, bserrno);
1054 		free_fs_request(req);
1055 		return;
1056 	}
1057 
1058 	f->blobid = blobid;
1059 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1060 }
1061 
1062 void
1063 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1064 			  spdk_file_op_complete cb_fn, void *cb_arg)
1065 {
1066 	struct spdk_file *file;
1067 	struct spdk_fs_request *req;
1068 	struct spdk_fs_cb_args *args;
1069 
1070 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1071 		cb_fn(cb_arg, -ENAMETOOLONG);
1072 		return;
1073 	}
1074 
1075 	file = fs_find_file(fs, name);
1076 	if (file != NULL) {
1077 		cb_fn(cb_arg, -EEXIST);
1078 		return;
1079 	}
1080 
1081 	file = file_alloc(fs);
1082 	if (file == NULL) {
1083 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1084 		cb_fn(cb_arg, -ENOMEM);
1085 		return;
1086 	}
1087 
1088 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1089 	if (req == NULL) {
1090 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1091 		TAILQ_REMOVE(&fs->files, file, tailq);
1092 		file_free(file);
1093 		cb_fn(cb_arg, -ENOMEM);
1094 		return;
1095 	}
1096 
1097 	args = &req->args;
1098 	args->file = file;
1099 	args->fn.file_op = cb_fn;
1100 	args->arg = cb_arg;
1101 
1102 	file->name = strdup(name);
1103 	if (!file->name) {
1104 		SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name);
1105 		free_fs_request(req);
1106 		TAILQ_REMOVE(&fs->files, file, tailq);
1107 		file_free(file);
1108 		cb_fn(cb_arg, -ENOMEM);
1109 		return;
1110 	}
1111 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1112 }
1113 
1114 static void
1115 __fs_create_file_done(void *arg, int fserrno)
1116 {
1117 	struct spdk_fs_request *req = arg;
1118 	struct spdk_fs_cb_args *args = &req->args;
1119 
1120 	__wake_caller(args, fserrno);
1121 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1122 }
1123 
1124 static void
1125 __fs_create_file(void *arg)
1126 {
1127 	struct spdk_fs_request *req = arg;
1128 	struct spdk_fs_cb_args *args = &req->args;
1129 
1130 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1131 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1132 }
1133 
1134 int
1135 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1136 {
1137 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1138 	struct spdk_fs_request *req;
1139 	struct spdk_fs_cb_args *args;
1140 	int rc;
1141 
1142 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1143 
1144 	req = alloc_fs_request(channel);
1145 	if (req == NULL) {
1146 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1147 		return -ENOMEM;
1148 	}
1149 
1150 	args = &req->args;
1151 	args->fs = fs;
1152 	args->op.create.name = name;
1153 	args->sem = &channel->sem;
1154 	fs->send_request(__fs_create_file, req);
1155 	sem_wait(&channel->sem);
1156 	rc = args->rc;
1157 	free_fs_request(req);
1158 
1159 	return rc;
1160 }
1161 
1162 static void
1163 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1164 {
1165 	struct spdk_fs_request *req = ctx;
1166 	struct spdk_fs_cb_args *args = &req->args;
1167 	struct spdk_file *f = args->file;
1168 
1169 	f->blob = blob;
1170 	while (!TAILQ_EMPTY(&f->open_requests)) {
1171 		req = TAILQ_FIRST(&f->open_requests);
1172 		args = &req->args;
1173 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1174 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name);
1175 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1176 		free_fs_request(req);
1177 	}
1178 }
1179 
1180 static void
1181 fs_open_blob_create_cb(void *ctx, int bserrno)
1182 {
1183 	struct spdk_fs_request *req = ctx;
1184 	struct spdk_fs_cb_args *args = &req->args;
1185 	struct spdk_file *file = args->file;
1186 	struct spdk_filesystem *fs = args->fs;
1187 
1188 	if (file == NULL) {
1189 		/*
1190 		 * This is from an open with CREATE flag - the file
1191 		 *  is now created so look it up in the file list for this
1192 		 *  filesystem.
1193 		 */
1194 		file = fs_find_file(fs, args->op.open.name);
1195 		assert(file != NULL);
1196 		args->file = file;
1197 	}
1198 
1199 	file->ref_count++;
1200 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1201 	if (file->ref_count == 1) {
1202 		assert(file->blob == NULL);
1203 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1204 	} else if (file->blob != NULL) {
1205 		fs_open_blob_done(req, file->blob, 0);
1206 	} else {
1207 		/*
1208 		 * The blob open for this file is in progress due to a previous
1209 		 *  open request.  When that open completes, it will invoke the
1210 		 *  open callback for this request.
1211 		 */
1212 	}
1213 }
1214 
1215 void
1216 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1217 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1218 {
1219 	struct spdk_file *f = NULL;
1220 	struct spdk_fs_request *req;
1221 	struct spdk_fs_cb_args *args;
1222 
1223 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1224 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1225 		return;
1226 	}
1227 
1228 	f = fs_find_file(fs, name);
1229 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1230 		cb_fn(cb_arg, NULL, -ENOENT);
1231 		return;
1232 	}
1233 
1234 	if (f != NULL && f->is_deleted == true) {
1235 		cb_fn(cb_arg, NULL, -ENOENT);
1236 		return;
1237 	}
1238 
1239 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1240 	if (req == NULL) {
1241 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1242 		cb_fn(cb_arg, NULL, -ENOMEM);
1243 		return;
1244 	}
1245 
1246 	args = &req->args;
1247 	args->fn.file_op_with_handle = cb_fn;
1248 	args->arg = cb_arg;
1249 	args->file = f;
1250 	args->fs = fs;
1251 	args->op.open.name = name;
1252 
1253 	if (f == NULL) {
1254 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1255 	} else {
1256 		fs_open_blob_create_cb(req, 0);
1257 	}
1258 }
1259 
1260 static void
1261 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1262 {
1263 	struct spdk_fs_request *req = arg;
1264 	struct spdk_fs_cb_args *args = &req->args;
1265 
1266 	args->file = file;
1267 	__wake_caller(args, bserrno);
1268 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1269 }
1270 
1271 static void
1272 __fs_open_file(void *arg)
1273 {
1274 	struct spdk_fs_request *req = arg;
1275 	struct spdk_fs_cb_args *args = &req->args;
1276 
1277 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1278 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1279 				__fs_open_file_done, req);
1280 }
1281 
1282 int
1283 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1284 		  const char *name, uint32_t flags, struct spdk_file **file)
1285 {
1286 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1287 	struct spdk_fs_request *req;
1288 	struct spdk_fs_cb_args *args;
1289 	int rc;
1290 
1291 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1292 
1293 	req = alloc_fs_request(channel);
1294 	if (req == NULL) {
1295 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1296 		return -ENOMEM;
1297 	}
1298 
1299 	args = &req->args;
1300 	args->fs = fs;
1301 	args->op.open.name = name;
1302 	args->op.open.flags = flags;
1303 	args->sem = &channel->sem;
1304 	fs->send_request(__fs_open_file, req);
1305 	sem_wait(&channel->sem);
1306 	rc = args->rc;
1307 	if (rc == 0) {
1308 		*file = args->file;
1309 	} else {
1310 		*file = NULL;
1311 	}
1312 	free_fs_request(req);
1313 
1314 	return rc;
1315 }
1316 
1317 static void
1318 fs_rename_blob_close_cb(void *ctx, int bserrno)
1319 {
1320 	struct spdk_fs_request *req = ctx;
1321 	struct spdk_fs_cb_args *args = &req->args;
1322 
1323 	args->fn.fs_op(args->arg, bserrno);
1324 	free_fs_request(req);
1325 }
1326 
1327 static void
1328 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1329 {
1330 	struct spdk_fs_request *req = ctx;
1331 	struct spdk_fs_cb_args *args = &req->args;
1332 	const char *new_name = args->op.rename.new_name;
1333 
1334 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1335 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1336 }
1337 
1338 static void
1339 _fs_md_rename_file(struct spdk_fs_request *req)
1340 {
1341 	struct spdk_fs_cb_args *args = &req->args;
1342 	struct spdk_file *f;
1343 
1344 	f = fs_find_file(args->fs, args->op.rename.old_name);
1345 	if (f == NULL) {
1346 		args->fn.fs_op(args->arg, -ENOENT);
1347 		free_fs_request(req);
1348 		return;
1349 	}
1350 
1351 	free(f->name);
1352 	f->name = strdup(args->op.rename.new_name);
1353 	args->file = f;
1354 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1355 }
1356 
1357 static void
1358 fs_rename_delete_done(void *arg, int fserrno)
1359 {
1360 	_fs_md_rename_file(arg);
1361 }
1362 
1363 void
1364 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1365 			  const char *old_name, const char *new_name,
1366 			  spdk_file_op_complete cb_fn, void *cb_arg)
1367 {
1368 	struct spdk_file *f;
1369 	struct spdk_fs_request *req;
1370 	struct spdk_fs_cb_args *args;
1371 
1372 	SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name);
1373 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1374 		cb_fn(cb_arg, -ENAMETOOLONG);
1375 		return;
1376 	}
1377 
1378 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1379 	if (req == NULL) {
1380 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1381 			    new_name);
1382 		cb_fn(cb_arg, -ENOMEM);
1383 		return;
1384 	}
1385 
1386 	args = &req->args;
1387 	args->fn.fs_op = cb_fn;
1388 	args->fs = fs;
1389 	args->arg = cb_arg;
1390 	args->op.rename.old_name = old_name;
1391 	args->op.rename.new_name = new_name;
1392 
1393 	f = fs_find_file(fs, new_name);
1394 	if (f == NULL) {
1395 		_fs_md_rename_file(req);
1396 		return;
1397 	}
1398 
1399 	/*
1400 	 * The rename overwrites an existing file.  So delete the existing file, then
1401 	 *  do the actual rename.
1402 	 */
1403 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1404 }
1405 
1406 static void
1407 __fs_rename_file_done(void *arg, int fserrno)
1408 {
1409 	struct spdk_fs_request *req = arg;
1410 	struct spdk_fs_cb_args *args = &req->args;
1411 
1412 	__wake_caller(args, fserrno);
1413 }
1414 
1415 static void
1416 __fs_rename_file(void *arg)
1417 {
1418 	struct spdk_fs_request *req = arg;
1419 	struct spdk_fs_cb_args *args = &req->args;
1420 
1421 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1422 				  __fs_rename_file_done, req);
1423 }
1424 
1425 int
1426 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1427 		    const char *old_name, const char *new_name)
1428 {
1429 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1430 	struct spdk_fs_request *req;
1431 	struct spdk_fs_cb_args *args;
1432 	int rc;
1433 
1434 	req = alloc_fs_request(channel);
1435 	if (req == NULL) {
1436 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1437 		return -ENOMEM;
1438 	}
1439 
1440 	args = &req->args;
1441 
1442 	args->fs = fs;
1443 	args->op.rename.old_name = old_name;
1444 	args->op.rename.new_name = new_name;
1445 	args->sem = &channel->sem;
1446 	fs->send_request(__fs_rename_file, req);
1447 	sem_wait(&channel->sem);
1448 	rc = args->rc;
1449 	free_fs_request(req);
1450 	return rc;
1451 }
1452 
1453 static void
1454 blob_delete_cb(void *ctx, int bserrno)
1455 {
1456 	struct spdk_fs_request *req = ctx;
1457 	struct spdk_fs_cb_args *args = &req->args;
1458 
1459 	args->fn.file_op(args->arg, bserrno);
1460 	free_fs_request(req);
1461 }
1462 
1463 void
1464 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1465 			  spdk_file_op_complete cb_fn, void *cb_arg)
1466 {
1467 	struct spdk_file *f;
1468 	spdk_blob_id blobid;
1469 	struct spdk_fs_request *req;
1470 	struct spdk_fs_cb_args *args;
1471 
1472 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1473 
1474 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1475 		cb_fn(cb_arg, -ENAMETOOLONG);
1476 		return;
1477 	}
1478 
1479 	f = fs_find_file(fs, name);
1480 	if (f == NULL) {
1481 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1482 		cb_fn(cb_arg, -ENOENT);
1483 		return;
1484 	}
1485 
1486 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1487 	if (req == NULL) {
1488 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1489 		cb_fn(cb_arg, -ENOMEM);
1490 		return;
1491 	}
1492 
1493 	args = &req->args;
1494 	args->fn.file_op = cb_fn;
1495 	args->arg = cb_arg;
1496 
1497 	if (f->ref_count > 0) {
1498 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1499 		f->is_deleted = true;
1500 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1501 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1502 		return;
1503 	}
1504 
1505 	blobid = f->blobid;
1506 	TAILQ_REMOVE(&fs->files, f, tailq);
1507 
1508 	file_free(f);
1509 
1510 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1511 }
1512 
1513 static void
1514 __fs_delete_file_done(void *arg, int fserrno)
1515 {
1516 	struct spdk_fs_request *req = arg;
1517 	struct spdk_fs_cb_args *args = &req->args;
1518 
1519 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name);
1520 	__wake_caller(args, fserrno);
1521 }
1522 
1523 static void
1524 __fs_delete_file(void *arg)
1525 {
1526 	struct spdk_fs_request *req = arg;
1527 	struct spdk_fs_cb_args *args = &req->args;
1528 
1529 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name);
1530 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1531 }
1532 
1533 int
1534 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1535 		    const char *name)
1536 {
1537 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1538 	struct spdk_fs_request *req;
1539 	struct spdk_fs_cb_args *args;
1540 	int rc;
1541 
1542 	req = alloc_fs_request(channel);
1543 	if (req == NULL) {
1544 		SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name);
1545 		return -ENOMEM;
1546 	}
1547 
1548 	args = &req->args;
1549 	args->fs = fs;
1550 	args->op.delete.name = name;
1551 	args->sem = &channel->sem;
1552 	fs->send_request(__fs_delete_file, req);
1553 	sem_wait(&channel->sem);
1554 	rc = args->rc;
1555 	free_fs_request(req);
1556 
1557 	return rc;
1558 }
1559 
1560 spdk_fs_iter
1561 spdk_fs_iter_first(struct spdk_filesystem *fs)
1562 {
1563 	struct spdk_file *f;
1564 
1565 	f = TAILQ_FIRST(&fs->files);
1566 	return f;
1567 }
1568 
1569 spdk_fs_iter
1570 spdk_fs_iter_next(spdk_fs_iter iter)
1571 {
1572 	struct spdk_file *f = iter;
1573 
1574 	if (f == NULL) {
1575 		return NULL;
1576 	}
1577 
1578 	f = TAILQ_NEXT(f, tailq);
1579 	return f;
1580 }
1581 
1582 const char *
1583 spdk_file_get_name(struct spdk_file *file)
1584 {
1585 	return file->name;
1586 }
1587 
1588 uint64_t
1589 spdk_file_get_length(struct spdk_file *file)
1590 {
1591 	uint64_t length;
1592 
1593 	assert(file != NULL);
1594 
1595 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1596 	SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length);
1597 	return length;
1598 }
1599 
1600 static void
1601 fs_truncate_complete_cb(void *ctx, int bserrno)
1602 {
1603 	struct spdk_fs_request *req = ctx;
1604 	struct spdk_fs_cb_args *args = &req->args;
1605 
1606 	args->fn.file_op(args->arg, bserrno);
1607 	free_fs_request(req);
1608 }
1609 
1610 static void
1611 fs_truncate_resize_cb(void *ctx, int bserrno)
1612 {
1613 	struct spdk_fs_request *req = ctx;
1614 	struct spdk_fs_cb_args *args = &req->args;
1615 	struct spdk_file *file = args->file;
1616 	uint64_t *length = &args->op.truncate.length;
1617 
1618 	if (bserrno) {
1619 		args->fn.file_op(args->arg, bserrno);
1620 		free_fs_request(req);
1621 		return;
1622 	}
1623 
1624 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1625 
1626 	file->length = *length;
1627 	if (file->append_pos > file->length) {
1628 		file->append_pos = file->length;
1629 	}
1630 
1631 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1632 }
1633 
1634 static uint64_t
1635 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1636 {
1637 	return (length + cluster_sz - 1) / cluster_sz;
1638 }
1639 
1640 void
1641 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1642 			 spdk_file_op_complete cb_fn, void *cb_arg)
1643 {
1644 	struct spdk_filesystem *fs;
1645 	size_t num_clusters;
1646 	struct spdk_fs_request *req;
1647 	struct spdk_fs_cb_args *args;
1648 
1649 	SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1650 	if (length == file->length) {
1651 		cb_fn(cb_arg, 0);
1652 		return;
1653 	}
1654 
1655 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1656 	if (req == NULL) {
1657 		cb_fn(cb_arg, -ENOMEM);
1658 		return;
1659 	}
1660 
1661 	args = &req->args;
1662 	args->fn.file_op = cb_fn;
1663 	args->arg = cb_arg;
1664 	args->file = file;
1665 	args->op.truncate.length = length;
1666 	fs = file->fs;
1667 
1668 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1669 
1670 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1671 }
1672 
1673 static void
1674 __truncate(void *arg)
1675 {
1676 	struct spdk_fs_request *req = arg;
1677 	struct spdk_fs_cb_args *args = &req->args;
1678 
1679 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1680 				 args->fn.file_op, args);
1681 }
1682 
1683 int
1684 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1685 		   uint64_t length)
1686 {
1687 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1688 	struct spdk_fs_request *req;
1689 	struct spdk_fs_cb_args *args;
1690 	int rc;
1691 
1692 	req = alloc_fs_request(channel);
1693 	if (req == NULL) {
1694 		return -ENOMEM;
1695 	}
1696 
1697 	args = &req->args;
1698 
1699 	args->file = file;
1700 	args->op.truncate.length = length;
1701 	args->fn.file_op = __wake_caller;
1702 	args->sem = &channel->sem;
1703 
1704 	channel->send_request(__truncate, req);
1705 	sem_wait(&channel->sem);
1706 	rc = args->rc;
1707 	free_fs_request(req);
1708 
1709 	return rc;
1710 }
1711 
1712 static void
1713 __rw_done(void *ctx, int bserrno)
1714 {
1715 	struct spdk_fs_request *req = ctx;
1716 	struct spdk_fs_cb_args *args = &req->args;
1717 
1718 	spdk_free(args->op.rw.pin_buf);
1719 	args->fn.file_op(args->arg, bserrno);
1720 	free_fs_request(req);
1721 }
1722 
1723 static void
1724 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1725 {
1726 	int i;
1727 	size_t len;
1728 
1729 	for (i = 0; i < iovcnt; i++) {
1730 		len = spdk_min(iovs[i].iov_len, buf_len);
1731 		memcpy(buf, iovs[i].iov_base, len);
1732 		buf += len;
1733 		assert(buf_len >= len);
1734 		buf_len -= len;
1735 	}
1736 }
1737 
1738 static void
1739 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1740 {
1741 	int i;
1742 	size_t len;
1743 
1744 	for (i = 0; i < iovcnt; i++) {
1745 		len = spdk_min(iovs[i].iov_len, buf_len);
1746 		memcpy(iovs[i].iov_base, buf, len);
1747 		buf += len;
1748 		assert(buf_len >= len);
1749 		buf_len -= len;
1750 	}
1751 }
1752 
1753 static void
1754 __read_done(void *ctx, int bserrno)
1755 {
1756 	struct spdk_fs_request *req = ctx;
1757 	struct spdk_fs_cb_args *args = &req->args;
1758 	void *buf;
1759 
1760 	assert(req != NULL);
1761 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1762 	if (args->op.rw.is_read) {
1763 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1764 		__rw_done(req, 0);
1765 	} else {
1766 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1767 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1768 				   args->op.rw.pin_buf,
1769 				   args->op.rw.start_lba, args->op.rw.num_lba,
1770 				   __rw_done, req);
1771 	}
1772 }
1773 
1774 static void
1775 __do_blob_read(void *ctx, int fserrno)
1776 {
1777 	struct spdk_fs_request *req = ctx;
1778 	struct spdk_fs_cb_args *args = &req->args;
1779 
1780 	if (fserrno) {
1781 		__rw_done(req, fserrno);
1782 		return;
1783 	}
1784 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1785 			  args->op.rw.pin_buf,
1786 			  args->op.rw.start_lba, args->op.rw.num_lba,
1787 			  __read_done, req);
1788 }
1789 
1790 static void
1791 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1792 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1793 {
1794 	uint64_t end_lba;
1795 
1796 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1797 	*start_lba = offset / *lba_size;
1798 	end_lba = (offset + length - 1) / *lba_size;
1799 	*num_lba = (end_lba - *start_lba + 1);
1800 }
1801 
1802 static bool
1803 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1804 {
1805 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1806 
1807 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1808 		return true;
1809 	}
1810 
1811 	return false;
1812 }
1813 
1814 static void
1815 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1816 {
1817 	uint32_t i;
1818 
1819 	for (i = 0; i < iovcnt; i++) {
1820 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1821 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1822 	}
1823 }
1824 
1825 static void
1826 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1827 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1828 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1829 {
1830 	struct spdk_fs_request *req;
1831 	struct spdk_fs_cb_args *args;
1832 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1833 	uint64_t start_lba, num_lba, pin_buf_length;
1834 	uint32_t lba_size;
1835 
1836 	if (is_read && offset + length > file->length) {
1837 		cb_fn(cb_arg, -EINVAL);
1838 		return;
1839 	}
1840 
1841 	req = alloc_fs_request_with_iov(channel, iovcnt);
1842 	if (req == NULL) {
1843 		cb_fn(cb_arg, -ENOMEM);
1844 		return;
1845 	}
1846 
1847 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1848 
1849 	args = &req->args;
1850 	args->fn.file_op = cb_fn;
1851 	args->arg = cb_arg;
1852 	args->file = file;
1853 	args->op.rw.channel = channel->bs_channel;
1854 	_fs_request_setup_iovs(req, iovs, iovcnt);
1855 	args->op.rw.is_read = is_read;
1856 	args->op.rw.offset = offset;
1857 	args->op.rw.blocklen = lba_size;
1858 
1859 	pin_buf_length = num_lba * lba_size;
1860 	args->op.rw.length = pin_buf_length;
1861 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1862 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1863 	if (args->op.rw.pin_buf == NULL) {
1864 		SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1865 			      file->name, offset, length);
1866 		free_fs_request(req);
1867 		cb_fn(cb_arg, -ENOMEM);
1868 		return;
1869 	}
1870 
1871 	args->op.rw.start_lba = start_lba;
1872 	args->op.rw.num_lba = num_lba;
1873 
1874 	if (!is_read && file->length < offset + length) {
1875 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1876 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1877 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1878 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1879 				   args->op.rw.pin_buf,
1880 				   args->op.rw.start_lba, args->op.rw.num_lba,
1881 				   __rw_done, req);
1882 	} else {
1883 		__do_blob_read(req, 0);
1884 	}
1885 }
1886 
1887 static void
1888 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1889 	    void *payload, uint64_t offset, uint64_t length,
1890 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1891 {
1892 	struct iovec iov;
1893 
1894 	iov.iov_base = payload;
1895 	iov.iov_len = (size_t)length;
1896 
1897 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1898 }
1899 
1900 void
1901 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1902 		      void *payload, uint64_t offset, uint64_t length,
1903 		      spdk_file_op_complete cb_fn, void *cb_arg)
1904 {
1905 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1906 }
1907 
1908 void
1909 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1910 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1911 		       spdk_file_op_complete cb_fn, void *cb_arg)
1912 {
1913 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1914 		      file->name, offset, length);
1915 
1916 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1917 }
1918 
1919 void
1920 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1921 		     void *payload, uint64_t offset, uint64_t length,
1922 		     spdk_file_op_complete cb_fn, void *cb_arg)
1923 {
1924 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1925 		      file->name, offset, length);
1926 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1927 }
1928 
1929 void
1930 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1931 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1932 		      spdk_file_op_complete cb_fn, void *cb_arg)
1933 {
1934 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1935 		      file->name, offset, length);
1936 
1937 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1938 }
1939 
1940 struct spdk_io_channel *
1941 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1942 {
1943 	struct spdk_io_channel *io_channel;
1944 	struct spdk_fs_channel *fs_channel;
1945 
1946 	io_channel = spdk_get_io_channel(&fs->io_target);
1947 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1948 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1949 	fs_channel->send_request = __send_request_direct;
1950 
1951 	return io_channel;
1952 }
1953 
1954 void
1955 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1956 {
1957 	spdk_put_io_channel(channel);
1958 }
1959 
1960 struct spdk_fs_thread_ctx *
1961 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1962 {
1963 	struct spdk_fs_thread_ctx *ctx;
1964 
1965 	ctx = calloc(1, sizeof(*ctx));
1966 	if (!ctx) {
1967 		return NULL;
1968 	}
1969 
1970 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
1971 		free(ctx);
1972 		return NULL;
1973 	}
1974 
1975 	fs_channel_create(fs, &ctx->ch, 512);
1976 
1977 	ctx->ch.send_request = fs->send_request;
1978 	ctx->ch.sync = 1;
1979 
1980 	return ctx;
1981 }
1982 
1983 
1984 void
1985 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1986 {
1987 	assert(ctx->ch.sync == 1);
1988 
1989 	while (true) {
1990 		pthread_spin_lock(&ctx->ch.lock);
1991 		if (ctx->ch.outstanding_reqs == 0) {
1992 			pthread_spin_unlock(&ctx->ch.lock);
1993 			break;
1994 		}
1995 		pthread_spin_unlock(&ctx->ch.lock);
1996 		usleep(1000);
1997 	}
1998 
1999 	fs_channel_destroy(NULL, &ctx->ch);
2000 	free(ctx);
2001 }
2002 
2003 int
2004 spdk_fs_set_cache_size(uint64_t size_in_mb)
2005 {
2006 	/* setting g_fs_cache_size is only permitted if cache pool
2007 	 * is already freed or hasn't been initialized
2008 	 */
2009 	if (g_cache_pool != NULL) {
2010 		return -EPERM;
2011 	}
2012 
2013 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2014 
2015 	return 0;
2016 }
2017 
2018 uint64_t
2019 spdk_fs_get_cache_size(void)
2020 {
2021 	return g_fs_cache_size / (1024 * 1024);
2022 }
2023 
2024 static void __file_flush(void *ctx);
2025 
2026 /* Try to free some cache buffers from this file.
2027  */
2028 static int
2029 reclaim_cache_buffers(struct spdk_file *file)
2030 {
2031 	int rc;
2032 
2033 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2034 
2035 	/* The function is safe to be called with any threads, while the file
2036 	 * lock maybe locked by other thread for now, so try to get the file
2037 	 * lock here.
2038 	 */
2039 	rc = pthread_spin_trylock(&file->lock);
2040 	if (rc != 0) {
2041 		return -1;
2042 	}
2043 
2044 	if (file->tree->present_mask == 0) {
2045 		pthread_spin_unlock(&file->lock);
2046 		return -1;
2047 	}
2048 	tree_free_buffers(file->tree);
2049 
2050 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2051 	/* If not freed, put it in the end of the queue */
2052 	if (file->tree->present_mask != 0) {
2053 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2054 	} else {
2055 		file->last = NULL;
2056 	}
2057 	pthread_spin_unlock(&file->lock);
2058 
2059 	return 0;
2060 }
2061 
2062 static int
2063 _blobfs_cache_pool_reclaim(void *arg)
2064 {
2065 	struct spdk_file *file, *tmp;
2066 	int rc;
2067 
2068 	if (!blobfs_cache_pool_need_reclaim()) {
2069 		return SPDK_POLLER_IDLE;
2070 	}
2071 
2072 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2073 		if (!file->open_for_writing &&
2074 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2075 			rc = reclaim_cache_buffers(file);
2076 			if (rc < 0) {
2077 				continue;
2078 			}
2079 			if (!blobfs_cache_pool_need_reclaim()) {
2080 				return SPDK_POLLER_BUSY;
2081 			}
2082 			break;
2083 		}
2084 	}
2085 
2086 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2087 		if (!file->open_for_writing) {
2088 			rc = reclaim_cache_buffers(file);
2089 			if (rc < 0) {
2090 				continue;
2091 			}
2092 			if (!blobfs_cache_pool_need_reclaim()) {
2093 				return SPDK_POLLER_BUSY;
2094 			}
2095 			break;
2096 		}
2097 	}
2098 
2099 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2100 		rc = reclaim_cache_buffers(file);
2101 		if (rc < 0) {
2102 			continue;
2103 		}
2104 		break;
2105 	}
2106 
2107 	return SPDK_POLLER_BUSY;
2108 }
2109 
2110 static void
2111 _add_file_to_cache_pool(void *ctx)
2112 {
2113 	struct spdk_file *file = ctx;
2114 
2115 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2116 }
2117 
2118 static void
2119 _remove_file_from_cache_pool(void *ctx)
2120 {
2121 	struct spdk_file *file = ctx;
2122 
2123 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2124 }
2125 
2126 static struct cache_buffer *
2127 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2128 {
2129 	struct cache_buffer *buf;
2130 	int count = 0;
2131 	bool need_update = false;
2132 
2133 	buf = calloc(1, sizeof(*buf));
2134 	if (buf == NULL) {
2135 		SPDK_DEBUGLOG(blobfs, "calloc failed\n");
2136 		return NULL;
2137 	}
2138 
2139 	do {
2140 		buf->buf = spdk_mempool_get(g_cache_pool);
2141 		if (buf->buf) {
2142 			break;
2143 		}
2144 		if (count++ == 100) {
2145 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2146 				    file, offset);
2147 			free(buf);
2148 			return NULL;
2149 		}
2150 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2151 	} while (true);
2152 
2153 	buf->buf_size = CACHE_BUFFER_SIZE;
2154 	buf->offset = offset;
2155 
2156 	if (file->tree->present_mask == 0) {
2157 		need_update = true;
2158 	}
2159 	file->tree = tree_insert_buffer(file->tree, buf);
2160 
2161 	if (need_update) {
2162 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2163 	}
2164 
2165 	return buf;
2166 }
2167 
2168 static struct cache_buffer *
2169 cache_append_buffer(struct spdk_file *file)
2170 {
2171 	struct cache_buffer *last;
2172 
2173 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2174 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2175 
2176 	last = cache_insert_buffer(file, file->append_pos);
2177 	if (last == NULL) {
2178 		SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n");
2179 		return NULL;
2180 	}
2181 
2182 	file->last = last;
2183 
2184 	return last;
2185 }
2186 
2187 static void __check_sync_reqs(struct spdk_file *file);
2188 
2189 static void
2190 __file_cache_finish_sync(void *ctx, int bserrno)
2191 {
2192 	struct spdk_file *file;
2193 	struct spdk_fs_request *sync_req = ctx;
2194 	struct spdk_fs_cb_args *sync_args;
2195 
2196 	sync_args = &sync_req->args;
2197 	file = sync_args->file;
2198 	pthread_spin_lock(&file->lock);
2199 	file->length_xattr = sync_args->op.sync.length;
2200 	assert(sync_args->op.sync.offset <= file->length_flushed);
2201 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2202 			  0, file->name);
2203 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2204 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2205 	pthread_spin_unlock(&file->lock);
2206 
2207 	sync_args->fn.file_op(sync_args->arg, bserrno);
2208 
2209 	free_fs_request(sync_req);
2210 	__check_sync_reqs(file);
2211 }
2212 
2213 static void
2214 __check_sync_reqs(struct spdk_file *file)
2215 {
2216 	struct spdk_fs_request *sync_req;
2217 
2218 	pthread_spin_lock(&file->lock);
2219 
2220 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2221 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2222 			break;
2223 		}
2224 	}
2225 
2226 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2227 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2228 		sync_req->args.op.sync.xattr_in_progress = true;
2229 		sync_req->args.op.sync.length = file->length_flushed;
2230 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2231 				    sizeof(file->length_flushed));
2232 
2233 		pthread_spin_unlock(&file->lock);
2234 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2235 				  0, file->name);
2236 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2237 	} else {
2238 		pthread_spin_unlock(&file->lock);
2239 	}
2240 }
2241 
2242 static void
2243 __file_flush_done(void *ctx, int bserrno)
2244 {
2245 	struct spdk_fs_request *req = ctx;
2246 	struct spdk_fs_cb_args *args = &req->args;
2247 	struct spdk_file *file = args->file;
2248 	struct cache_buffer *next = args->op.flush.cache_buffer;
2249 
2250 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2251 
2252 	pthread_spin_lock(&file->lock);
2253 	next->in_progress = false;
2254 	next->bytes_flushed += args->op.flush.length;
2255 	file->length_flushed += args->op.flush.length;
2256 	if (file->length_flushed > file->length) {
2257 		file->length = file->length_flushed;
2258 	}
2259 	if (next->bytes_flushed == next->buf_size) {
2260 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2261 		next = tree_find_buffer(file->tree, file->length_flushed);
2262 	}
2263 
2264 	/*
2265 	 * Assert that there is no cached data that extends past the end of the underlying
2266 	 *  blob.
2267 	 */
2268 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2269 	       next->bytes_filled == 0);
2270 
2271 	pthread_spin_unlock(&file->lock);
2272 
2273 	__check_sync_reqs(file);
2274 
2275 	__file_flush(req);
2276 }
2277 
2278 static void
2279 __file_flush(void *ctx)
2280 {
2281 	struct spdk_fs_request *req = ctx;
2282 	struct spdk_fs_cb_args *args = &req->args;
2283 	struct spdk_file *file = args->file;
2284 	struct cache_buffer *next;
2285 	uint64_t offset, length, start_lba, num_lba;
2286 	uint32_t lba_size;
2287 
2288 	pthread_spin_lock(&file->lock);
2289 	next = tree_find_buffer(file->tree, file->length_flushed);
2290 	if (next == NULL || next->in_progress ||
2291 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2292 		/*
2293 		 * There is either no data to flush, a flush I/O is already in
2294 		 *  progress, or the next buffer is partially filled but there's no
2295 		 *  outstanding request to sync it.
2296 		 * So return immediately - if a flush I/O is in progress we will flush
2297 		 *  more data after that is completed, or a partial buffer will get flushed
2298 		 *  when it is either filled or the file is synced.
2299 		 */
2300 		free_fs_request(req);
2301 		if (next == NULL) {
2302 			/*
2303 			 * For cases where a file's cache was evicted, and then the
2304 			 *  file was later appended, we will write the data directly
2305 			 *  to disk and bypass cache.  So just update length_flushed
2306 			 *  here to reflect that all data was already written to disk.
2307 			 */
2308 			file->length_flushed = file->append_pos;
2309 		}
2310 		pthread_spin_unlock(&file->lock);
2311 		if (next == NULL) {
2312 			/*
2313 			 * There is no data to flush, but we still need to check for any
2314 			 *  outstanding sync requests to make sure metadata gets updated.
2315 			 */
2316 			__check_sync_reqs(file);
2317 		}
2318 		return;
2319 	}
2320 
2321 	offset = next->offset + next->bytes_flushed;
2322 	length = next->bytes_filled - next->bytes_flushed;
2323 	if (length == 0) {
2324 		free_fs_request(req);
2325 		pthread_spin_unlock(&file->lock);
2326 		/*
2327 		 * There is no data to flush, but we still need to check for any
2328 		 *  outstanding sync requests to make sure metadata gets updated.
2329 		 */
2330 		__check_sync_reqs(file);
2331 		return;
2332 	}
2333 	args->op.flush.length = length;
2334 	args->op.flush.cache_buffer = next;
2335 
2336 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2337 
2338 	next->in_progress = true;
2339 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2340 		     offset, length, start_lba, num_lba);
2341 	pthread_spin_unlock(&file->lock);
2342 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2343 			   next->buf + (start_lba * lba_size) - next->offset,
2344 			   start_lba, num_lba, __file_flush_done, req);
2345 }
2346 
2347 static void
2348 __file_extend_done(void *arg, int bserrno)
2349 {
2350 	struct spdk_fs_cb_args *args = arg;
2351 
2352 	__wake_caller(args, bserrno);
2353 }
2354 
2355 static void
2356 __file_extend_resize_cb(void *_args, int bserrno)
2357 {
2358 	struct spdk_fs_cb_args *args = _args;
2359 	struct spdk_file *file = args->file;
2360 
2361 	if (bserrno) {
2362 		__wake_caller(args, bserrno);
2363 		return;
2364 	}
2365 
2366 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2367 }
2368 
2369 static void
2370 __file_extend_blob(void *_args)
2371 {
2372 	struct spdk_fs_cb_args *args = _args;
2373 	struct spdk_file *file = args->file;
2374 
2375 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2376 }
2377 
2378 static void
2379 __rw_from_file_done(void *ctx, int bserrno)
2380 {
2381 	struct spdk_fs_request *req = ctx;
2382 
2383 	__wake_caller(&req->args, bserrno);
2384 	free_fs_request(req);
2385 }
2386 
2387 static void
2388 __rw_from_file(void *ctx)
2389 {
2390 	struct spdk_fs_request *req = ctx;
2391 	struct spdk_fs_cb_args *args = &req->args;
2392 	struct spdk_file *file = args->file;
2393 
2394 	if (args->op.rw.is_read) {
2395 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2396 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2397 				     __rw_from_file_done, req);
2398 	} else {
2399 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2400 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2401 				      __rw_from_file_done, req);
2402 	}
2403 }
2404 
2405 struct rw_from_file_arg {
2406 	struct spdk_fs_channel *channel;
2407 	int rwerrno;
2408 };
2409 
2410 static int
2411 __send_rw_from_file(struct spdk_file *file, void *payload,
2412 		    uint64_t offset, uint64_t length, bool is_read,
2413 		    struct rw_from_file_arg *arg)
2414 {
2415 	struct spdk_fs_request *req;
2416 	struct spdk_fs_cb_args *args;
2417 
2418 	req = alloc_fs_request_with_iov(arg->channel, 1);
2419 	if (req == NULL) {
2420 		sem_post(&arg->channel->sem);
2421 		return -ENOMEM;
2422 	}
2423 
2424 	args = &req->args;
2425 	args->file = file;
2426 	args->sem = &arg->channel->sem;
2427 	args->iovs[0].iov_base = payload;
2428 	args->iovs[0].iov_len = (size_t)length;
2429 	args->op.rw.offset = offset;
2430 	args->op.rw.is_read = is_read;
2431 	args->rwerrno = &arg->rwerrno;
2432 	file->fs->send_request(__rw_from_file, req);
2433 	return 0;
2434 }
2435 
2436 int
2437 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2438 		void *payload, uint64_t offset, uint64_t length)
2439 {
2440 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2441 	struct spdk_fs_request *flush_req;
2442 	uint64_t rem_length, copy, blob_size, cluster_sz;
2443 	uint32_t cache_buffers_filled = 0;
2444 	uint8_t *cur_payload;
2445 	struct cache_buffer *last;
2446 
2447 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2448 
2449 	if (length == 0) {
2450 		return 0;
2451 	}
2452 
2453 	if (offset != file->append_pos) {
2454 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2455 		return -EINVAL;
2456 	}
2457 
2458 	pthread_spin_lock(&file->lock);
2459 	file->open_for_writing = true;
2460 
2461 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2462 		cache_append_buffer(file);
2463 	}
2464 
2465 	if (file->last == NULL) {
2466 		struct rw_from_file_arg arg = {};
2467 		int rc;
2468 
2469 		arg.channel = channel;
2470 		arg.rwerrno = 0;
2471 		file->append_pos += length;
2472 		pthread_spin_unlock(&file->lock);
2473 		rc = __send_rw_from_file(file, payload, offset, length, false, &arg);
2474 		if (rc != 0) {
2475 			return rc;
2476 		}
2477 		sem_wait(&channel->sem);
2478 		return arg.rwerrno;
2479 	}
2480 
2481 	blob_size = __file_get_blob_size(file);
2482 
2483 	if ((offset + length) > blob_size) {
2484 		struct spdk_fs_cb_args extend_args = {};
2485 
2486 		cluster_sz = file->fs->bs_opts.cluster_sz;
2487 		extend_args.sem = &channel->sem;
2488 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2489 		extend_args.file = file;
2490 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2491 		pthread_spin_unlock(&file->lock);
2492 		file->fs->send_request(__file_extend_blob, &extend_args);
2493 		sem_wait(&channel->sem);
2494 		if (extend_args.rc) {
2495 			return extend_args.rc;
2496 		}
2497 	}
2498 
2499 	flush_req = alloc_fs_request(channel);
2500 	if (flush_req == NULL) {
2501 		pthread_spin_unlock(&file->lock);
2502 		return -ENOMEM;
2503 	}
2504 
2505 	last = file->last;
2506 	rem_length = length;
2507 	cur_payload = payload;
2508 	while (rem_length > 0) {
2509 		copy = last->buf_size - last->bytes_filled;
2510 		if (copy > rem_length) {
2511 			copy = rem_length;
2512 		}
2513 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2514 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2515 		file->append_pos += copy;
2516 		if (file->length < file->append_pos) {
2517 			file->length = file->append_pos;
2518 		}
2519 		cur_payload += copy;
2520 		last->bytes_filled += copy;
2521 		rem_length -= copy;
2522 		if (last->bytes_filled == last->buf_size) {
2523 			cache_buffers_filled++;
2524 			last = cache_append_buffer(file);
2525 			if (last == NULL) {
2526 				BLOBFS_TRACE(file, "nomem\n");
2527 				free_fs_request(flush_req);
2528 				pthread_spin_unlock(&file->lock);
2529 				return -ENOMEM;
2530 			}
2531 		}
2532 	}
2533 
2534 	pthread_spin_unlock(&file->lock);
2535 
2536 	if (cache_buffers_filled == 0) {
2537 		free_fs_request(flush_req);
2538 		return 0;
2539 	}
2540 
2541 	flush_req->args.file = file;
2542 	file->fs->send_request(__file_flush, flush_req);
2543 	return 0;
2544 }
2545 
2546 static void
2547 __readahead_done(void *ctx, int bserrno)
2548 {
2549 	struct spdk_fs_request *req = ctx;
2550 	struct spdk_fs_cb_args *args = &req->args;
2551 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2552 	struct spdk_file *file = args->file;
2553 
2554 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2555 
2556 	pthread_spin_lock(&file->lock);
2557 	cache_buffer->bytes_filled = args->op.readahead.length;
2558 	cache_buffer->bytes_flushed = args->op.readahead.length;
2559 	cache_buffer->in_progress = false;
2560 	pthread_spin_unlock(&file->lock);
2561 
2562 	free_fs_request(req);
2563 }
2564 
2565 static void
2566 __readahead(void *ctx)
2567 {
2568 	struct spdk_fs_request *req = ctx;
2569 	struct spdk_fs_cb_args *args = &req->args;
2570 	struct spdk_file *file = args->file;
2571 	uint64_t offset, length, start_lba, num_lba;
2572 	uint32_t lba_size;
2573 
2574 	offset = args->op.readahead.offset;
2575 	length = args->op.readahead.length;
2576 	assert(length > 0);
2577 
2578 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2579 
2580 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2581 		     offset, length, start_lba, num_lba);
2582 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2583 			  args->op.readahead.cache_buffer->buf,
2584 			  start_lba, num_lba, __readahead_done, req);
2585 }
2586 
2587 static uint64_t
2588 __next_cache_buffer_offset(uint64_t offset)
2589 {
2590 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2591 }
2592 
2593 static void
2594 check_readahead(struct spdk_file *file, uint64_t offset,
2595 		struct spdk_fs_channel *channel)
2596 {
2597 	struct spdk_fs_request *req;
2598 	struct spdk_fs_cb_args *args;
2599 
2600 	offset = __next_cache_buffer_offset(offset);
2601 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2602 		return;
2603 	}
2604 
2605 	req = alloc_fs_request(channel);
2606 	if (req == NULL) {
2607 		return;
2608 	}
2609 	args = &req->args;
2610 
2611 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2612 
2613 	args->file = file;
2614 	args->op.readahead.offset = offset;
2615 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2616 	if (!args->op.readahead.cache_buffer) {
2617 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2618 		free_fs_request(req);
2619 		return;
2620 	}
2621 
2622 	args->op.readahead.cache_buffer->in_progress = true;
2623 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2624 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2625 	} else {
2626 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2627 	}
2628 	file->fs->send_request(__readahead, req);
2629 }
2630 
2631 int64_t
2632 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2633 	       void *payload, uint64_t offset, uint64_t length)
2634 {
2635 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2636 	uint64_t final_offset, final_length;
2637 	uint32_t sub_reads = 0;
2638 	struct cache_buffer *buf;
2639 	uint64_t read_len;
2640 	struct rw_from_file_arg arg = {};
2641 
2642 	pthread_spin_lock(&file->lock);
2643 
2644 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2645 
2646 	file->open_for_writing = false;
2647 
2648 	if (length == 0 || offset >= file->append_pos) {
2649 		pthread_spin_unlock(&file->lock);
2650 		return 0;
2651 	}
2652 
2653 	if (offset + length > file->append_pos) {
2654 		length = file->append_pos - offset;
2655 	}
2656 
2657 	if (offset != file->next_seq_offset) {
2658 		file->seq_byte_count = 0;
2659 	}
2660 	file->seq_byte_count += length;
2661 	file->next_seq_offset = offset + length;
2662 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2663 		check_readahead(file, offset, channel);
2664 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2665 	}
2666 
2667 	arg.channel = channel;
2668 	arg.rwerrno = 0;
2669 	final_length = 0;
2670 	final_offset = offset + length;
2671 	while (offset < final_offset) {
2672 		int ret = 0;
2673 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2674 		if (length > (final_offset - offset)) {
2675 			length = final_offset - offset;
2676 		}
2677 
2678 		buf = tree_find_filled_buffer(file->tree, offset);
2679 		if (buf == NULL) {
2680 			pthread_spin_unlock(&file->lock);
2681 			ret = __send_rw_from_file(file, payload, offset, length, true, &arg);
2682 			pthread_spin_lock(&file->lock);
2683 			if (ret == 0) {
2684 				sub_reads++;
2685 			}
2686 		} else {
2687 			read_len = length;
2688 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2689 				read_len = buf->offset + buf->bytes_filled - offset;
2690 			}
2691 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2692 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2693 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2694 				tree_remove_buffer(file->tree, buf);
2695 				if (file->tree->present_mask == 0) {
2696 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2697 				}
2698 			}
2699 		}
2700 
2701 		if (ret == 0) {
2702 			final_length += length;
2703 		} else {
2704 			arg.rwerrno = ret;
2705 			break;
2706 		}
2707 		payload += length;
2708 		offset += length;
2709 	}
2710 	pthread_spin_unlock(&file->lock);
2711 	while (sub_reads > 0) {
2712 		sem_wait(&channel->sem);
2713 		sub_reads--;
2714 	}
2715 	if (arg.rwerrno == 0) {
2716 		return final_length;
2717 	} else {
2718 		return arg.rwerrno;
2719 	}
2720 }
2721 
2722 static void
2723 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2724 	   spdk_file_op_complete cb_fn, void *cb_arg)
2725 {
2726 	struct spdk_fs_request *sync_req;
2727 	struct spdk_fs_request *flush_req;
2728 	struct spdk_fs_cb_args *sync_args;
2729 	struct spdk_fs_cb_args *flush_args;
2730 
2731 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2732 
2733 	pthread_spin_lock(&file->lock);
2734 	if (file->append_pos <= file->length_xattr) {
2735 		BLOBFS_TRACE(file, "done - file already synced\n");
2736 		pthread_spin_unlock(&file->lock);
2737 		cb_fn(cb_arg, 0);
2738 		return;
2739 	}
2740 
2741 	sync_req = alloc_fs_request(channel);
2742 	if (!sync_req) {
2743 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2744 		pthread_spin_unlock(&file->lock);
2745 		cb_fn(cb_arg, -ENOMEM);
2746 		return;
2747 	}
2748 	sync_args = &sync_req->args;
2749 
2750 	flush_req = alloc_fs_request(channel);
2751 	if (!flush_req) {
2752 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2753 		free_fs_request(sync_req);
2754 		pthread_spin_unlock(&file->lock);
2755 		cb_fn(cb_arg, -ENOMEM);
2756 		return;
2757 	}
2758 	flush_args = &flush_req->args;
2759 
2760 	sync_args->file = file;
2761 	sync_args->fn.file_op = cb_fn;
2762 	sync_args->arg = cb_arg;
2763 	sync_args->op.sync.offset = file->append_pos;
2764 	sync_args->op.sync.xattr_in_progress = false;
2765 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2766 	pthread_spin_unlock(&file->lock);
2767 
2768 	flush_args->file = file;
2769 	channel->send_request(__file_flush, flush_req);
2770 }
2771 
2772 int
2773 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2774 {
2775 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2776 	struct spdk_fs_cb_args args = {};
2777 
2778 	args.sem = &channel->sem;
2779 	_file_sync(file, channel, __wake_caller, &args);
2780 	sem_wait(&channel->sem);
2781 
2782 	return args.rc;
2783 }
2784 
2785 void
2786 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2787 		     spdk_file_op_complete cb_fn, void *cb_arg)
2788 {
2789 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2790 
2791 	_file_sync(file, channel, cb_fn, cb_arg);
2792 }
2793 
2794 void
2795 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2796 {
2797 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2798 	file->priority = priority;
2799 
2800 }
2801 
2802 /*
2803  * Close routines
2804  */
2805 
2806 static void
2807 __file_close_async_done(void *ctx, int bserrno)
2808 {
2809 	struct spdk_fs_request *req = ctx;
2810 	struct spdk_fs_cb_args *args = &req->args;
2811 	struct spdk_file *file = args->file;
2812 
2813 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name);
2814 
2815 	if (file->is_deleted) {
2816 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2817 		return;
2818 	}
2819 
2820 	args->fn.file_op(args->arg, bserrno);
2821 	free_fs_request(req);
2822 }
2823 
2824 static void
2825 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2826 {
2827 	struct spdk_blob *blob;
2828 
2829 	pthread_spin_lock(&file->lock);
2830 	if (file->ref_count == 0) {
2831 		pthread_spin_unlock(&file->lock);
2832 		__file_close_async_done(req, -EBADF);
2833 		return;
2834 	}
2835 
2836 	file->ref_count--;
2837 	if (file->ref_count > 0) {
2838 		pthread_spin_unlock(&file->lock);
2839 		req->args.fn.file_op(req->args.arg, 0);
2840 		free_fs_request(req);
2841 		return;
2842 	}
2843 
2844 	pthread_spin_unlock(&file->lock);
2845 
2846 	blob = file->blob;
2847 	file->blob = NULL;
2848 	spdk_blob_close(blob, __file_close_async_done, req);
2849 }
2850 
2851 static void
2852 __file_close_async__sync_done(void *arg, int fserrno)
2853 {
2854 	struct spdk_fs_request *req = arg;
2855 	struct spdk_fs_cb_args *args = &req->args;
2856 
2857 	__file_close_async(args->file, req);
2858 }
2859 
2860 void
2861 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2862 {
2863 	struct spdk_fs_request *req;
2864 	struct spdk_fs_cb_args *args;
2865 
2866 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2867 	if (req == NULL) {
2868 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2869 		cb_fn(cb_arg, -ENOMEM);
2870 		return;
2871 	}
2872 
2873 	args = &req->args;
2874 	args->file = file;
2875 	args->fn.file_op = cb_fn;
2876 	args->arg = cb_arg;
2877 
2878 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2879 }
2880 
2881 static void
2882 __file_close(void *arg)
2883 {
2884 	struct spdk_fs_request *req = arg;
2885 	struct spdk_fs_cb_args *args = &req->args;
2886 	struct spdk_file *file = args->file;
2887 
2888 	__file_close_async(file, req);
2889 }
2890 
2891 int
2892 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2893 {
2894 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2895 	struct spdk_fs_request *req;
2896 	struct spdk_fs_cb_args *args;
2897 
2898 	req = alloc_fs_request(channel);
2899 	if (req == NULL) {
2900 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2901 		return -ENOMEM;
2902 	}
2903 
2904 	args = &req->args;
2905 
2906 	spdk_file_sync(file, ctx);
2907 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2908 	args->file = file;
2909 	args->sem = &channel->sem;
2910 	args->fn.file_op = __wake_caller;
2911 	args->arg = args;
2912 	channel->send_request(__file_close, req);
2913 	sem_wait(&channel->sem);
2914 
2915 	return args->rc;
2916 }
2917 
2918 int
2919 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2920 {
2921 	if (size < sizeof(spdk_blob_id)) {
2922 		return -EINVAL;
2923 	}
2924 
2925 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2926 
2927 	return sizeof(spdk_blob_id);
2928 }
2929 
2930 static void
2931 _file_free(void *ctx)
2932 {
2933 	struct spdk_file *file = ctx;
2934 
2935 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2936 
2937 	free(file->name);
2938 	free(file->tree);
2939 	free(file);
2940 }
2941 
2942 static void
2943 file_free(struct spdk_file *file)
2944 {
2945 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2946 	pthread_spin_lock(&file->lock);
2947 	if (file->tree->present_mask == 0) {
2948 		pthread_spin_unlock(&file->lock);
2949 		free(file->name);
2950 		free(file->tree);
2951 		free(file);
2952 		return;
2953 	}
2954 
2955 	tree_free_buffers(file->tree);
2956 	assert(file->tree->present_mask == 0);
2957 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2958 	pthread_spin_unlock(&file->lock);
2959 }
2960 
2961 SPDK_LOG_REGISTER_COMPONENT(blobfs)
2962 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw)
2963