xref: /spdk/lib/blobfs/blobfs.c (revision 8a76c2484a2eae4014a1c22e985b20b2cef801df)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "tree.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
58 
59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
60 static struct spdk_mempool *g_cache_pool;
61 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches);
62 static struct spdk_poller *g_cache_pool_mgmt_poller;
63 static struct spdk_thread *g_cache_pool_thread;
64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
65 static int g_fs_count = 0;
66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
67 
68 #define TRACE_GROUP_BLOBFS	0x7
69 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
70 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
71 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
72 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
73 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
74 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
75 
76 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
77 {
78 	spdk_trace_register_description("BLOBFS_XATTR_START",
79 					TRACE_BLOBFS_XATTR_START,
80 					OWNER_NONE, OBJECT_NONE, 0,
81 					SPDK_TRACE_ARG_TYPE_STR,
82 					"file:    ");
83 	spdk_trace_register_description("BLOBFS_XATTR_END",
84 					TRACE_BLOBFS_XATTR_END,
85 					OWNER_NONE, OBJECT_NONE, 0,
86 					SPDK_TRACE_ARG_TYPE_STR,
87 					"file:    ");
88 	spdk_trace_register_description("BLOBFS_OPEN",
89 					TRACE_BLOBFS_OPEN,
90 					OWNER_NONE, OBJECT_NONE, 0,
91 					SPDK_TRACE_ARG_TYPE_STR,
92 					"file:    ");
93 	spdk_trace_register_description("BLOBFS_CLOSE",
94 					TRACE_BLOBFS_CLOSE,
95 					OWNER_NONE, OBJECT_NONE, 0,
96 					SPDK_TRACE_ARG_TYPE_STR,
97 					"file:    ");
98 	spdk_trace_register_description("BLOBFS_DELETE_START",
99 					TRACE_BLOBFS_DELETE_START,
100 					OWNER_NONE, OBJECT_NONE, 0,
101 					SPDK_TRACE_ARG_TYPE_STR,
102 					"file:    ");
103 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
104 					TRACE_BLOBFS_DELETE_DONE,
105 					OWNER_NONE, OBJECT_NONE, 0,
106 					SPDK_TRACE_ARG_TYPE_STR,
107 					"file:    ");
108 }
109 
110 void
111 cache_buffer_free(struct cache_buffer *cache_buffer)
112 {
113 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
114 	free(cache_buffer);
115 }
116 
117 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
118 
119 struct spdk_file {
120 	struct spdk_filesystem	*fs;
121 	struct spdk_blob	*blob;
122 	char			*name;
123 	uint64_t		trace_arg_name;
124 	uint64_t		length;
125 	bool                    is_deleted;
126 	bool			open_for_writing;
127 	uint64_t		length_flushed;
128 	uint64_t		length_xattr;
129 	uint64_t		append_pos;
130 	uint64_t		seq_byte_count;
131 	uint64_t		next_seq_offset;
132 	uint32_t		priority;
133 	TAILQ_ENTRY(spdk_file)	tailq;
134 	spdk_blob_id		blobid;
135 	uint32_t		ref_count;
136 	pthread_spinlock_t	lock;
137 	struct cache_buffer	*last;
138 	struct cache_tree	*tree;
139 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
140 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
141 	TAILQ_ENTRY(spdk_file)	cache_tailq;
142 };
143 
144 struct spdk_deleted_file {
145 	spdk_blob_id	id;
146 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
147 };
148 
149 struct spdk_filesystem {
150 	struct spdk_blob_store	*bs;
151 	TAILQ_HEAD(, spdk_file)	files;
152 	struct spdk_bs_opts	bs_opts;
153 	struct spdk_bs_dev	*bdev;
154 	fs_send_request_fn	send_request;
155 
156 	struct {
157 		uint32_t		max_ops;
158 		struct spdk_io_channel	*sync_io_channel;
159 		struct spdk_fs_channel	*sync_fs_channel;
160 	} sync_target;
161 
162 	struct {
163 		uint32_t		max_ops;
164 		struct spdk_io_channel	*md_io_channel;
165 		struct spdk_fs_channel	*md_fs_channel;
166 	} md_target;
167 
168 	struct {
169 		uint32_t		max_ops;
170 	} io_target;
171 };
172 
173 struct spdk_fs_cb_args {
174 	union {
175 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
176 		spdk_fs_op_complete			fs_op;
177 		spdk_file_op_with_handle_complete	file_op_with_handle;
178 		spdk_file_op_complete			file_op;
179 		spdk_file_stat_op_complete		stat_op;
180 	} fn;
181 	void *arg;
182 	sem_t *sem;
183 	struct spdk_filesystem *fs;
184 	struct spdk_file *file;
185 	int rc;
186 	struct iovec *iovs;
187 	uint32_t iovcnt;
188 	struct iovec iov;
189 	union {
190 		struct {
191 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
192 		} fs_load;
193 		struct {
194 			uint64_t	length;
195 		} truncate;
196 		struct {
197 			struct spdk_io_channel	*channel;
198 			void		*pin_buf;
199 			int		is_read;
200 			off_t		offset;
201 			size_t		length;
202 			uint64_t	start_lba;
203 			uint64_t	num_lba;
204 			uint32_t	blocklen;
205 		} rw;
206 		struct {
207 			const char	*old_name;
208 			const char	*new_name;
209 		} rename;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 		} flush;
214 		struct {
215 			struct cache_buffer	*cache_buffer;
216 			uint64_t		length;
217 			uint64_t		offset;
218 		} readahead;
219 		struct {
220 			/* offset of the file when the sync request was made */
221 			uint64_t			offset;
222 			TAILQ_ENTRY(spdk_fs_request)	tailq;
223 			bool				xattr_in_progress;
224 			/* length written to the xattr for this file - this should
225 			 * always be the same as the offset if only one thread is
226 			 * writing to the file, but could differ if multiple threads
227 			 * are appending
228 			 */
229 			uint64_t			length;
230 		} sync;
231 		struct {
232 			uint32_t			num_clusters;
233 		} resize;
234 		struct {
235 			const char	*name;
236 			uint32_t	flags;
237 			TAILQ_ENTRY(spdk_fs_request)	tailq;
238 		} open;
239 		struct {
240 			const char		*name;
241 			struct spdk_blob	*blob;
242 		} create;
243 		struct {
244 			const char	*name;
245 		} delete;
246 		struct {
247 			const char	*name;
248 		} stat;
249 	} op;
250 };
251 
252 static void file_free(struct spdk_file *file);
253 static void fs_io_device_unregister(struct spdk_filesystem *fs);
254 static void fs_free_io_channels(struct spdk_filesystem *fs);
255 
256 void
257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
258 {
259 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
260 }
261 
262 static int _blobfs_cache_pool_reclaim(void *arg);
263 
264 static bool
265 blobfs_cache_pool_need_reclaim(void)
266 {
267 	size_t count;
268 
269 	count = spdk_mempool_count(g_cache_pool);
270 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
271 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
272 	 */
273 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
274 		return false;
275 	}
276 
277 	return true;
278 }
279 
280 static void
281 __start_cache_pool_mgmt(void *ctx)
282 {
283 	assert(g_cache_pool == NULL);
284 
285 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
286 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
287 					   CACHE_BUFFER_SIZE,
288 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
289 					   SPDK_ENV_SOCKET_ID_ANY);
290 	if (!g_cache_pool) {
291 		SPDK_ERRLOG("Create mempool failed, you may "
292 			    "increase the memory and try again\n");
293 		assert(false);
294 	}
295 
296 	assert(g_cache_pool_mgmt_poller == NULL);
297 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
298 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
299 }
300 
301 static void
302 __stop_cache_pool_mgmt(void *ctx)
303 {
304 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
305 
306 	assert(g_cache_pool != NULL);
307 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
308 	spdk_mempool_free(g_cache_pool);
309 	g_cache_pool = NULL;
310 
311 	spdk_thread_exit(g_cache_pool_thread);
312 }
313 
314 static void
315 initialize_global_cache(void)
316 {
317 	pthread_mutex_lock(&g_cache_init_lock);
318 	if (g_fs_count == 0) {
319 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
320 		assert(g_cache_pool_thread != NULL);
321 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
322 	}
323 	g_fs_count++;
324 	pthread_mutex_unlock(&g_cache_init_lock);
325 }
326 
327 static void
328 free_global_cache(void)
329 {
330 	pthread_mutex_lock(&g_cache_init_lock);
331 	g_fs_count--;
332 	if (g_fs_count == 0) {
333 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
334 	}
335 	pthread_mutex_unlock(&g_cache_init_lock);
336 }
337 
338 static uint64_t
339 __file_get_blob_size(struct spdk_file *file)
340 {
341 	uint64_t cluster_sz;
342 
343 	cluster_sz = file->fs->bs_opts.cluster_sz;
344 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
345 }
346 
347 struct spdk_fs_request {
348 	struct spdk_fs_cb_args		args;
349 	TAILQ_ENTRY(spdk_fs_request)	link;
350 	struct spdk_fs_channel		*channel;
351 };
352 
353 struct spdk_fs_channel {
354 	struct spdk_fs_request		*req_mem;
355 	TAILQ_HEAD(, spdk_fs_request)	reqs;
356 	sem_t				sem;
357 	struct spdk_filesystem		*fs;
358 	struct spdk_io_channel		*bs_channel;
359 	fs_send_request_fn		send_request;
360 	bool				sync;
361 	uint32_t			outstanding_reqs;
362 	pthread_spinlock_t		lock;
363 };
364 
365 /* For now, this is effectively an alias. But eventually we'll shift
366  * some data members over. */
367 struct spdk_fs_thread_ctx {
368 	struct spdk_fs_channel	ch;
369 };
370 
371 static struct spdk_fs_request *
372 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
373 {
374 	struct spdk_fs_request *req;
375 	struct iovec *iovs = NULL;
376 
377 	if (iovcnt > 1) {
378 		iovs = calloc(iovcnt, sizeof(struct iovec));
379 		if (!iovs) {
380 			return NULL;
381 		}
382 	}
383 
384 	if (channel->sync) {
385 		pthread_spin_lock(&channel->lock);
386 	}
387 
388 	req = TAILQ_FIRST(&channel->reqs);
389 	if (req) {
390 		channel->outstanding_reqs++;
391 		TAILQ_REMOVE(&channel->reqs, req, link);
392 	}
393 
394 	if (channel->sync) {
395 		pthread_spin_unlock(&channel->lock);
396 	}
397 
398 	if (req == NULL) {
399 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
400 		free(iovs);
401 		return NULL;
402 	}
403 	memset(req, 0, sizeof(*req));
404 	req->channel = channel;
405 	if (iovcnt > 1) {
406 		req->args.iovs = iovs;
407 	} else {
408 		req->args.iovs = &req->args.iov;
409 	}
410 	req->args.iovcnt = iovcnt;
411 
412 	return req;
413 }
414 
415 static struct spdk_fs_request *
416 alloc_fs_request(struct spdk_fs_channel *channel)
417 {
418 	return alloc_fs_request_with_iov(channel, 0);
419 }
420 
421 static void
422 free_fs_request(struct spdk_fs_request *req)
423 {
424 	struct spdk_fs_channel *channel = req->channel;
425 
426 	if (req->args.iovcnt > 1) {
427 		free(req->args.iovs);
428 	}
429 
430 	if (channel->sync) {
431 		pthread_spin_lock(&channel->lock);
432 	}
433 
434 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
435 	channel->outstanding_reqs--;
436 
437 	if (channel->sync) {
438 		pthread_spin_unlock(&channel->lock);
439 	}
440 }
441 
442 static int
443 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
444 		  uint32_t max_ops)
445 {
446 	uint32_t i;
447 
448 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
449 	if (!channel->req_mem) {
450 		return -1;
451 	}
452 
453 	channel->outstanding_reqs = 0;
454 	TAILQ_INIT(&channel->reqs);
455 	sem_init(&channel->sem, 0, 0);
456 
457 	for (i = 0; i < max_ops; i++) {
458 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
459 	}
460 
461 	channel->fs = fs;
462 
463 	return 0;
464 }
465 
466 static int
467 fs_md_channel_create(void *io_device, void *ctx_buf)
468 {
469 	struct spdk_filesystem		*fs;
470 	struct spdk_fs_channel		*channel = ctx_buf;
471 
472 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
473 
474 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
475 }
476 
477 static int
478 fs_sync_channel_create(void *io_device, void *ctx_buf)
479 {
480 	struct spdk_filesystem		*fs;
481 	struct spdk_fs_channel		*channel = ctx_buf;
482 
483 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
484 
485 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
486 }
487 
488 static int
489 fs_io_channel_create(void *io_device, void *ctx_buf)
490 {
491 	struct spdk_filesystem		*fs;
492 	struct spdk_fs_channel		*channel = ctx_buf;
493 
494 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
495 
496 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
497 }
498 
499 static void
500 fs_channel_destroy(void *io_device, void *ctx_buf)
501 {
502 	struct spdk_fs_channel *channel = ctx_buf;
503 
504 	if (channel->outstanding_reqs > 0) {
505 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
506 			    channel->outstanding_reqs);
507 	}
508 
509 	free(channel->req_mem);
510 	if (channel->bs_channel != NULL) {
511 		spdk_bs_free_io_channel(channel->bs_channel);
512 	}
513 }
514 
515 static void
516 __send_request_direct(fs_request_fn fn, void *arg)
517 {
518 	fn(arg);
519 }
520 
521 static void
522 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
523 {
524 	fs->bs = bs;
525 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
526 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
527 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
528 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
529 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
530 
531 	initialize_global_cache();
532 }
533 
534 static void
535 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
536 {
537 	struct spdk_fs_request *req = ctx;
538 	struct spdk_fs_cb_args *args = &req->args;
539 	struct spdk_filesystem *fs = args->fs;
540 
541 	if (bserrno == 0) {
542 		common_fs_bs_init(fs, bs);
543 	} else {
544 		free(fs);
545 		fs = NULL;
546 	}
547 
548 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
549 	free_fs_request(req);
550 }
551 
552 static void
553 fs_conf_parse(void)
554 {
555 	struct spdk_conf_section *sp;
556 	int cache_buffer_shift;
557 
558 	sp = spdk_conf_find_section(NULL, "Blobfs");
559 	if (sp == NULL) {
560 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
561 		return;
562 	}
563 
564 	cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
565 	if (cache_buffer_shift <= 0) {
566 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
567 	} else {
568 		g_fs_cache_buffer_shift = cache_buffer_shift;
569 	}
570 }
571 
572 static struct spdk_filesystem *
573 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
574 {
575 	struct spdk_filesystem *fs;
576 
577 	fs = calloc(1, sizeof(*fs));
578 	if (fs == NULL) {
579 		return NULL;
580 	}
581 
582 	fs->bdev = dev;
583 	fs->send_request = send_request_fn;
584 	TAILQ_INIT(&fs->files);
585 
586 	fs->md_target.max_ops = 512;
587 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
588 				sizeof(struct spdk_fs_channel), "blobfs_md");
589 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
590 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
591 
592 	fs->sync_target.max_ops = 512;
593 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
594 				sizeof(struct spdk_fs_channel), "blobfs_sync");
595 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
596 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
597 
598 	fs->io_target.max_ops = 512;
599 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
600 				sizeof(struct spdk_fs_channel), "blobfs_io");
601 
602 	return fs;
603 }
604 
605 static void
606 __wake_caller(void *arg, int fserrno)
607 {
608 	struct spdk_fs_cb_args *args = arg;
609 
610 	args->rc = fserrno;
611 	sem_post(args->sem);
612 }
613 
614 void
615 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
616 	     fs_send_request_fn send_request_fn,
617 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
618 {
619 	struct spdk_filesystem *fs;
620 	struct spdk_fs_request *req;
621 	struct spdk_fs_cb_args *args;
622 	struct spdk_bs_opts opts = {};
623 
624 	fs = fs_alloc(dev, send_request_fn);
625 	if (fs == NULL) {
626 		cb_fn(cb_arg, NULL, -ENOMEM);
627 		return;
628 	}
629 
630 	fs_conf_parse();
631 
632 	req = alloc_fs_request(fs->md_target.md_fs_channel);
633 	if (req == NULL) {
634 		fs_free_io_channels(fs);
635 		fs_io_device_unregister(fs);
636 		cb_fn(cb_arg, NULL, -ENOMEM);
637 		return;
638 	}
639 
640 	args = &req->args;
641 	args->fn.fs_op_with_handle = cb_fn;
642 	args->arg = cb_arg;
643 	args->fs = fs;
644 
645 	spdk_bs_opts_init(&opts);
646 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
647 	if (opt) {
648 		opts.cluster_sz = opt->cluster_sz;
649 	}
650 	spdk_bs_init(dev, &opts, init_cb, req);
651 }
652 
653 static struct spdk_file *
654 file_alloc(struct spdk_filesystem *fs)
655 {
656 	struct spdk_file *file;
657 
658 	file = calloc(1, sizeof(*file));
659 	if (file == NULL) {
660 		return NULL;
661 	}
662 
663 	file->tree = calloc(1, sizeof(*file->tree));
664 	if (file->tree == NULL) {
665 		free(file);
666 		return NULL;
667 	}
668 
669 	if (pthread_spin_init(&file->lock, 0)) {
670 		free(file->tree);
671 		free(file);
672 		return NULL;
673 	}
674 
675 	file->fs = fs;
676 	TAILQ_INIT(&file->open_requests);
677 	TAILQ_INIT(&file->sync_requests);
678 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
679 	file->priority = SPDK_FILE_PRIORITY_LOW;
680 	return file;
681 }
682 
683 static void fs_load_done(void *ctx, int bserrno);
684 
685 static int
686 _handle_deleted_files(struct spdk_fs_request *req)
687 {
688 	struct spdk_fs_cb_args *args = &req->args;
689 	struct spdk_filesystem *fs = args->fs;
690 
691 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
692 		struct spdk_deleted_file *deleted_file;
693 
694 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
695 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
696 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
697 		free(deleted_file);
698 		return 0;
699 	}
700 
701 	return 1;
702 }
703 
704 static void
705 fs_load_done(void *ctx, int bserrno)
706 {
707 	struct spdk_fs_request *req = ctx;
708 	struct spdk_fs_cb_args *args = &req->args;
709 	struct spdk_filesystem *fs = args->fs;
710 
711 	/* The filesystem has been loaded.  Now check if there are any files that
712 	 *  were marked for deletion before last unload.  Do not complete the
713 	 *  fs_load callback until all of them have been deleted on disk.
714 	 */
715 	if (_handle_deleted_files(req) == 0) {
716 		/* We found a file that's been marked for deleting but not actually
717 		 *  deleted yet.  This function will get called again once the delete
718 		 *  operation is completed.
719 		 */
720 		return;
721 	}
722 
723 	args->fn.fs_op_with_handle(args->arg, fs, 0);
724 	free_fs_request(req);
725 
726 }
727 
728 static void
729 _file_build_trace_arg_name(struct spdk_file *f)
730 {
731 	f->trace_arg_name = 0;
732 	memcpy(&f->trace_arg_name, f->name,
733 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
734 }
735 
736 static void
737 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
738 {
739 	struct spdk_fs_request *req = ctx;
740 	struct spdk_fs_cb_args *args = &req->args;
741 	struct spdk_filesystem *fs = args->fs;
742 	uint64_t *length;
743 	const char *name;
744 	uint32_t *is_deleted;
745 	size_t value_len;
746 
747 	if (rc < 0) {
748 		args->fn.fs_op_with_handle(args->arg, fs, rc);
749 		free_fs_request(req);
750 		return;
751 	}
752 
753 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
754 	if (rc < 0) {
755 		args->fn.fs_op_with_handle(args->arg, fs, rc);
756 		free_fs_request(req);
757 		return;
758 	}
759 
760 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
761 	if (rc < 0) {
762 		args->fn.fs_op_with_handle(args->arg, fs, rc);
763 		free_fs_request(req);
764 		return;
765 	}
766 
767 	assert(value_len == 8);
768 
769 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
770 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
771 	if (rc < 0) {
772 		struct spdk_file *f;
773 
774 		f = file_alloc(fs);
775 		if (f == NULL) {
776 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
777 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
778 			free_fs_request(req);
779 			return;
780 		}
781 
782 		f->name = strdup(name);
783 		_file_build_trace_arg_name(f);
784 		f->blobid = spdk_blob_get_id(blob);
785 		f->length = *length;
786 		f->length_flushed = *length;
787 		f->length_xattr = *length;
788 		f->append_pos = *length;
789 		SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length);
790 	} else {
791 		struct spdk_deleted_file *deleted_file;
792 
793 		deleted_file = calloc(1, sizeof(*deleted_file));
794 		if (deleted_file == NULL) {
795 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
796 			free_fs_request(req);
797 			return;
798 		}
799 		deleted_file->id = spdk_blob_get_id(blob);
800 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
801 	}
802 }
803 
804 static void
805 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
806 {
807 	struct spdk_fs_request *req = ctx;
808 	struct spdk_fs_cb_args *args = &req->args;
809 	struct spdk_filesystem *fs = args->fs;
810 	struct spdk_bs_type bstype;
811 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
812 	static const struct spdk_bs_type zeros;
813 
814 	if (bserrno != 0) {
815 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
816 		free_fs_request(req);
817 		fs_free_io_channels(fs);
818 		fs_io_device_unregister(fs);
819 		return;
820 	}
821 
822 	bstype = spdk_bs_get_bstype(bs);
823 
824 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
825 		SPDK_DEBUGLOG(blobfs, "assigning bstype\n");
826 		spdk_bs_set_bstype(bs, blobfs_type);
827 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
828 		SPDK_ERRLOG("not blobfs\n");
829 		SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype));
830 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
831 		free_fs_request(req);
832 		fs_free_io_channels(fs);
833 		fs_io_device_unregister(fs);
834 		return;
835 	}
836 
837 	common_fs_bs_init(fs, bs);
838 	fs_load_done(req, 0);
839 }
840 
841 static void
842 fs_io_device_unregister(struct spdk_filesystem *fs)
843 {
844 	assert(fs != NULL);
845 	spdk_io_device_unregister(&fs->md_target, NULL);
846 	spdk_io_device_unregister(&fs->sync_target, NULL);
847 	spdk_io_device_unregister(&fs->io_target, NULL);
848 	free(fs);
849 }
850 
851 static void
852 fs_free_io_channels(struct spdk_filesystem *fs)
853 {
854 	assert(fs != NULL);
855 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
856 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
857 }
858 
859 void
860 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
861 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
862 {
863 	struct spdk_filesystem *fs;
864 	struct spdk_fs_cb_args *args;
865 	struct spdk_fs_request *req;
866 	struct spdk_bs_opts	bs_opts;
867 
868 	fs = fs_alloc(dev, send_request_fn);
869 	if (fs == NULL) {
870 		cb_fn(cb_arg, NULL, -ENOMEM);
871 		return;
872 	}
873 
874 	fs_conf_parse();
875 
876 	req = alloc_fs_request(fs->md_target.md_fs_channel);
877 	if (req == NULL) {
878 		fs_free_io_channels(fs);
879 		fs_io_device_unregister(fs);
880 		cb_fn(cb_arg, NULL, -ENOMEM);
881 		return;
882 	}
883 
884 	args = &req->args;
885 	args->fn.fs_op_with_handle = cb_fn;
886 	args->arg = cb_arg;
887 	args->fs = fs;
888 	TAILQ_INIT(&args->op.fs_load.deleted_files);
889 	spdk_bs_opts_init(&bs_opts);
890 	bs_opts.iter_cb_fn = iter_cb;
891 	bs_opts.iter_cb_arg = req;
892 	spdk_bs_load(dev, &bs_opts, load_cb, req);
893 }
894 
895 static void
896 unload_cb(void *ctx, int bserrno)
897 {
898 	struct spdk_fs_request *req = ctx;
899 	struct spdk_fs_cb_args *args = &req->args;
900 	struct spdk_filesystem *fs = args->fs;
901 	struct spdk_file *file, *tmp;
902 
903 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
904 		TAILQ_REMOVE(&fs->files, file, tailq);
905 		file_free(file);
906 	}
907 
908 	free_global_cache();
909 
910 	args->fn.fs_op(args->arg, bserrno);
911 	free(req);
912 
913 	fs_io_device_unregister(fs);
914 }
915 
916 void
917 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
918 {
919 	struct spdk_fs_request *req;
920 	struct spdk_fs_cb_args *args;
921 
922 	/*
923 	 * We must free the md_channel before unloading the blobstore, so just
924 	 *  allocate this request from the general heap.
925 	 */
926 	req = calloc(1, sizeof(*req));
927 	if (req == NULL) {
928 		cb_fn(cb_arg, -ENOMEM);
929 		return;
930 	}
931 
932 	args = &req->args;
933 	args->fn.fs_op = cb_fn;
934 	args->arg = cb_arg;
935 	args->fs = fs;
936 
937 	fs_free_io_channels(fs);
938 	spdk_bs_unload(fs->bs, unload_cb, req);
939 }
940 
941 static struct spdk_file *
942 fs_find_file(struct spdk_filesystem *fs, const char *name)
943 {
944 	struct spdk_file *file;
945 
946 	TAILQ_FOREACH(file, &fs->files, tailq) {
947 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
948 			return file;
949 		}
950 	}
951 
952 	return NULL;
953 }
954 
955 void
956 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
957 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
958 {
959 	struct spdk_file_stat stat;
960 	struct spdk_file *f = NULL;
961 
962 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
963 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
964 		return;
965 	}
966 
967 	f = fs_find_file(fs, name);
968 	if (f != NULL) {
969 		stat.blobid = f->blobid;
970 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
971 		cb_fn(cb_arg, &stat, 0);
972 		return;
973 	}
974 
975 	cb_fn(cb_arg, NULL, -ENOENT);
976 }
977 
978 static void
979 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
980 {
981 	struct spdk_fs_request *req = arg;
982 	struct spdk_fs_cb_args *args = &req->args;
983 
984 	args->rc = fserrno;
985 	if (fserrno == 0) {
986 		memcpy(args->arg, stat, sizeof(*stat));
987 	}
988 	sem_post(args->sem);
989 }
990 
991 static void
992 __file_stat(void *arg)
993 {
994 	struct spdk_fs_request *req = arg;
995 	struct spdk_fs_cb_args *args = &req->args;
996 
997 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
998 				args->fn.stat_op, req);
999 }
1000 
1001 int
1002 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1003 		  const char *name, struct spdk_file_stat *stat)
1004 {
1005 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1006 	struct spdk_fs_request *req;
1007 	int rc;
1008 
1009 	req = alloc_fs_request(channel);
1010 	if (req == NULL) {
1011 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
1012 		return -ENOMEM;
1013 	}
1014 
1015 	req->args.fs = fs;
1016 	req->args.op.stat.name = name;
1017 	req->args.fn.stat_op = __copy_stat;
1018 	req->args.arg = stat;
1019 	req->args.sem = &channel->sem;
1020 	channel->send_request(__file_stat, req);
1021 	sem_wait(&channel->sem);
1022 
1023 	rc = req->args.rc;
1024 	free_fs_request(req);
1025 
1026 	return rc;
1027 }
1028 
1029 static void
1030 fs_create_blob_close_cb(void *ctx, int bserrno)
1031 {
1032 	int rc;
1033 	struct spdk_fs_request *req = ctx;
1034 	struct spdk_fs_cb_args *args = &req->args;
1035 
1036 	rc = args->rc ? args->rc : bserrno;
1037 	args->fn.file_op(args->arg, rc);
1038 	free_fs_request(req);
1039 }
1040 
1041 static void
1042 fs_create_blob_resize_cb(void *ctx, int bserrno)
1043 {
1044 	struct spdk_fs_request *req = ctx;
1045 	struct spdk_fs_cb_args *args = &req->args;
1046 	struct spdk_file *f = args->file;
1047 	struct spdk_blob *blob = args->op.create.blob;
1048 	uint64_t length = 0;
1049 
1050 	args->rc = bserrno;
1051 	if (bserrno) {
1052 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1053 		return;
1054 	}
1055 
1056 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1057 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1058 
1059 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1060 }
1061 
1062 static void
1063 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1064 {
1065 	struct spdk_fs_request *req = ctx;
1066 	struct spdk_fs_cb_args *args = &req->args;
1067 
1068 	if (bserrno) {
1069 		args->fn.file_op(args->arg, bserrno);
1070 		free_fs_request(req);
1071 		return;
1072 	}
1073 
1074 	args->op.create.blob = blob;
1075 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1076 }
1077 
1078 static void
1079 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1080 {
1081 	struct spdk_fs_request *req = ctx;
1082 	struct spdk_fs_cb_args *args = &req->args;
1083 	struct spdk_file *f = args->file;
1084 
1085 	if (bserrno) {
1086 		args->fn.file_op(args->arg, bserrno);
1087 		free_fs_request(req);
1088 		return;
1089 	}
1090 
1091 	f->blobid = blobid;
1092 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1093 }
1094 
1095 void
1096 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1097 			  spdk_file_op_complete cb_fn, void *cb_arg)
1098 {
1099 	struct spdk_file *file;
1100 	struct spdk_fs_request *req;
1101 	struct spdk_fs_cb_args *args;
1102 
1103 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1104 		cb_fn(cb_arg, -ENAMETOOLONG);
1105 		return;
1106 	}
1107 
1108 	file = fs_find_file(fs, name);
1109 	if (file != NULL) {
1110 		cb_fn(cb_arg, -EEXIST);
1111 		return;
1112 	}
1113 
1114 	file = file_alloc(fs);
1115 	if (file == NULL) {
1116 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1117 		cb_fn(cb_arg, -ENOMEM);
1118 		return;
1119 	}
1120 
1121 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1122 	if (req == NULL) {
1123 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1124 		cb_fn(cb_arg, -ENOMEM);
1125 		return;
1126 	}
1127 
1128 	args = &req->args;
1129 	args->file = file;
1130 	args->fn.file_op = cb_fn;
1131 	args->arg = cb_arg;
1132 
1133 	file->name = strdup(name);
1134 	_file_build_trace_arg_name(file);
1135 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1136 }
1137 
1138 static void
1139 __fs_create_file_done(void *arg, int fserrno)
1140 {
1141 	struct spdk_fs_request *req = arg;
1142 	struct spdk_fs_cb_args *args = &req->args;
1143 
1144 	__wake_caller(args, fserrno);
1145 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1146 }
1147 
1148 static void
1149 __fs_create_file(void *arg)
1150 {
1151 	struct spdk_fs_request *req = arg;
1152 	struct spdk_fs_cb_args *args = &req->args;
1153 
1154 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1155 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1156 }
1157 
1158 int
1159 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1160 {
1161 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1162 	struct spdk_fs_request *req;
1163 	struct spdk_fs_cb_args *args;
1164 	int rc;
1165 
1166 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1167 
1168 	req = alloc_fs_request(channel);
1169 	if (req == NULL) {
1170 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1171 		return -ENOMEM;
1172 	}
1173 
1174 	args = &req->args;
1175 	args->fs = fs;
1176 	args->op.create.name = name;
1177 	args->sem = &channel->sem;
1178 	fs->send_request(__fs_create_file, req);
1179 	sem_wait(&channel->sem);
1180 	rc = args->rc;
1181 	free_fs_request(req);
1182 
1183 	return rc;
1184 }
1185 
1186 static void
1187 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1188 {
1189 	struct spdk_fs_request *req = ctx;
1190 	struct spdk_fs_cb_args *args = &req->args;
1191 	struct spdk_file *f = args->file;
1192 
1193 	f->blob = blob;
1194 	while (!TAILQ_EMPTY(&f->open_requests)) {
1195 		req = TAILQ_FIRST(&f->open_requests);
1196 		args = &req->args;
1197 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1198 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1199 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1200 		free_fs_request(req);
1201 	}
1202 }
1203 
1204 static void
1205 fs_open_blob_create_cb(void *ctx, int bserrno)
1206 {
1207 	struct spdk_fs_request *req = ctx;
1208 	struct spdk_fs_cb_args *args = &req->args;
1209 	struct spdk_file *file = args->file;
1210 	struct spdk_filesystem *fs = args->fs;
1211 
1212 	if (file == NULL) {
1213 		/*
1214 		 * This is from an open with CREATE flag - the file
1215 		 *  is now created so look it up in the file list for this
1216 		 *  filesystem.
1217 		 */
1218 		file = fs_find_file(fs, args->op.open.name);
1219 		assert(file != NULL);
1220 		args->file = file;
1221 	}
1222 
1223 	file->ref_count++;
1224 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1225 	if (file->ref_count == 1) {
1226 		assert(file->blob == NULL);
1227 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1228 	} else if (file->blob != NULL) {
1229 		fs_open_blob_done(req, file->blob, 0);
1230 	} else {
1231 		/*
1232 		 * The blob open for this file is in progress due to a previous
1233 		 *  open request.  When that open completes, it will invoke the
1234 		 *  open callback for this request.
1235 		 */
1236 	}
1237 }
1238 
1239 void
1240 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1241 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1242 {
1243 	struct spdk_file *f = NULL;
1244 	struct spdk_fs_request *req;
1245 	struct spdk_fs_cb_args *args;
1246 
1247 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1248 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1249 		return;
1250 	}
1251 
1252 	f = fs_find_file(fs, name);
1253 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1254 		cb_fn(cb_arg, NULL, -ENOENT);
1255 		return;
1256 	}
1257 
1258 	if (f != NULL && f->is_deleted == true) {
1259 		cb_fn(cb_arg, NULL, -ENOENT);
1260 		return;
1261 	}
1262 
1263 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1264 	if (req == NULL) {
1265 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1266 		cb_fn(cb_arg, NULL, -ENOMEM);
1267 		return;
1268 	}
1269 
1270 	args = &req->args;
1271 	args->fn.file_op_with_handle = cb_fn;
1272 	args->arg = cb_arg;
1273 	args->file = f;
1274 	args->fs = fs;
1275 	args->op.open.name = name;
1276 
1277 	if (f == NULL) {
1278 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1279 	} else {
1280 		fs_open_blob_create_cb(req, 0);
1281 	}
1282 }
1283 
1284 static void
1285 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1286 {
1287 	struct spdk_fs_request *req = arg;
1288 	struct spdk_fs_cb_args *args = &req->args;
1289 
1290 	args->file = file;
1291 	__wake_caller(args, bserrno);
1292 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1293 }
1294 
1295 static void
1296 __fs_open_file(void *arg)
1297 {
1298 	struct spdk_fs_request *req = arg;
1299 	struct spdk_fs_cb_args *args = &req->args;
1300 
1301 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1302 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1303 				__fs_open_file_done, req);
1304 }
1305 
1306 int
1307 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1308 		  const char *name, uint32_t flags, struct spdk_file **file)
1309 {
1310 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1311 	struct spdk_fs_request *req;
1312 	struct spdk_fs_cb_args *args;
1313 	int rc;
1314 
1315 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1316 
1317 	req = alloc_fs_request(channel);
1318 	if (req == NULL) {
1319 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1320 		return -ENOMEM;
1321 	}
1322 
1323 	args = &req->args;
1324 	args->fs = fs;
1325 	args->op.open.name = name;
1326 	args->op.open.flags = flags;
1327 	args->sem = &channel->sem;
1328 	fs->send_request(__fs_open_file, req);
1329 	sem_wait(&channel->sem);
1330 	rc = args->rc;
1331 	if (rc == 0) {
1332 		*file = args->file;
1333 	} else {
1334 		*file = NULL;
1335 	}
1336 	free_fs_request(req);
1337 
1338 	return rc;
1339 }
1340 
1341 static void
1342 fs_rename_blob_close_cb(void *ctx, int bserrno)
1343 {
1344 	struct spdk_fs_request *req = ctx;
1345 	struct spdk_fs_cb_args *args = &req->args;
1346 
1347 	args->fn.fs_op(args->arg, bserrno);
1348 	free_fs_request(req);
1349 }
1350 
1351 static void
1352 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1353 {
1354 	struct spdk_fs_request *req = ctx;
1355 	struct spdk_fs_cb_args *args = &req->args;
1356 	const char *new_name = args->op.rename.new_name;
1357 
1358 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1359 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1360 }
1361 
1362 static void
1363 _fs_md_rename_file(struct spdk_fs_request *req)
1364 {
1365 	struct spdk_fs_cb_args *args = &req->args;
1366 	struct spdk_file *f;
1367 
1368 	f = fs_find_file(args->fs, args->op.rename.old_name);
1369 	if (f == NULL) {
1370 		args->fn.fs_op(args->arg, -ENOENT);
1371 		free_fs_request(req);
1372 		return;
1373 	}
1374 
1375 	free(f->name);
1376 	f->name = strdup(args->op.rename.new_name);
1377 	_file_build_trace_arg_name(f);
1378 	args->file = f;
1379 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1380 }
1381 
1382 static void
1383 fs_rename_delete_done(void *arg, int fserrno)
1384 {
1385 	_fs_md_rename_file(arg);
1386 }
1387 
1388 void
1389 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1390 			  const char *old_name, const char *new_name,
1391 			  spdk_file_op_complete cb_fn, void *cb_arg)
1392 {
1393 	struct spdk_file *f;
1394 	struct spdk_fs_request *req;
1395 	struct spdk_fs_cb_args *args;
1396 
1397 	SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name);
1398 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1399 		cb_fn(cb_arg, -ENAMETOOLONG);
1400 		return;
1401 	}
1402 
1403 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1404 	if (req == NULL) {
1405 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1406 			    new_name);
1407 		cb_fn(cb_arg, -ENOMEM);
1408 		return;
1409 	}
1410 
1411 	args = &req->args;
1412 	args->fn.fs_op = cb_fn;
1413 	args->fs = fs;
1414 	args->arg = cb_arg;
1415 	args->op.rename.old_name = old_name;
1416 	args->op.rename.new_name = new_name;
1417 
1418 	f = fs_find_file(fs, new_name);
1419 	if (f == NULL) {
1420 		_fs_md_rename_file(req);
1421 		return;
1422 	}
1423 
1424 	/*
1425 	 * The rename overwrites an existing file.  So delete the existing file, then
1426 	 *  do the actual rename.
1427 	 */
1428 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1429 }
1430 
1431 static void
1432 __fs_rename_file_done(void *arg, int fserrno)
1433 {
1434 	struct spdk_fs_request *req = arg;
1435 	struct spdk_fs_cb_args *args = &req->args;
1436 
1437 	__wake_caller(args, fserrno);
1438 }
1439 
1440 static void
1441 __fs_rename_file(void *arg)
1442 {
1443 	struct spdk_fs_request *req = arg;
1444 	struct spdk_fs_cb_args *args = &req->args;
1445 
1446 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1447 				  __fs_rename_file_done, req);
1448 }
1449 
1450 int
1451 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1452 		    const char *old_name, const char *new_name)
1453 {
1454 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1455 	struct spdk_fs_request *req;
1456 	struct spdk_fs_cb_args *args;
1457 	int rc;
1458 
1459 	req = alloc_fs_request(channel);
1460 	if (req == NULL) {
1461 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1462 		return -ENOMEM;
1463 	}
1464 
1465 	args = &req->args;
1466 
1467 	args->fs = fs;
1468 	args->op.rename.old_name = old_name;
1469 	args->op.rename.new_name = new_name;
1470 	args->sem = &channel->sem;
1471 	fs->send_request(__fs_rename_file, req);
1472 	sem_wait(&channel->sem);
1473 	rc = args->rc;
1474 	free_fs_request(req);
1475 	return rc;
1476 }
1477 
1478 static void
1479 blob_delete_cb(void *ctx, int bserrno)
1480 {
1481 	struct spdk_fs_request *req = ctx;
1482 	struct spdk_fs_cb_args *args = &req->args;
1483 
1484 	args->fn.file_op(args->arg, bserrno);
1485 	free_fs_request(req);
1486 }
1487 
1488 void
1489 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1490 			  spdk_file_op_complete cb_fn, void *cb_arg)
1491 {
1492 	struct spdk_file *f;
1493 	spdk_blob_id blobid;
1494 	struct spdk_fs_request *req;
1495 	struct spdk_fs_cb_args *args;
1496 
1497 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1498 
1499 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1500 		cb_fn(cb_arg, -ENAMETOOLONG);
1501 		return;
1502 	}
1503 
1504 	f = fs_find_file(fs, name);
1505 	if (f == NULL) {
1506 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1507 		cb_fn(cb_arg, -ENOENT);
1508 		return;
1509 	}
1510 
1511 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1512 	if (req == NULL) {
1513 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1514 		cb_fn(cb_arg, -ENOMEM);
1515 		return;
1516 	}
1517 
1518 	args = &req->args;
1519 	args->fn.file_op = cb_fn;
1520 	args->arg = cb_arg;
1521 
1522 	if (f->ref_count > 0) {
1523 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1524 		f->is_deleted = true;
1525 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1526 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1527 		return;
1528 	}
1529 
1530 	blobid = f->blobid;
1531 	TAILQ_REMOVE(&fs->files, f, tailq);
1532 
1533 	file_free(f);
1534 
1535 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1536 }
1537 
1538 static uint64_t
1539 fs_name_to_uint64(const char *name)
1540 {
1541 	uint64_t result = 0;
1542 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1543 	return result;
1544 }
1545 
1546 static void
1547 __fs_delete_file_done(void *arg, int fserrno)
1548 {
1549 	struct spdk_fs_request *req = arg;
1550 	struct spdk_fs_cb_args *args = &req->args;
1551 
1552 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1553 	__wake_caller(args, fserrno);
1554 }
1555 
1556 static void
1557 __fs_delete_file(void *arg)
1558 {
1559 	struct spdk_fs_request *req = arg;
1560 	struct spdk_fs_cb_args *args = &req->args;
1561 
1562 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1563 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1564 }
1565 
1566 int
1567 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1568 		    const char *name)
1569 {
1570 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1571 	struct spdk_fs_request *req;
1572 	struct spdk_fs_cb_args *args;
1573 	int rc;
1574 
1575 	req = alloc_fs_request(channel);
1576 	if (req == NULL) {
1577 		SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name);
1578 		return -ENOMEM;
1579 	}
1580 
1581 	args = &req->args;
1582 	args->fs = fs;
1583 	args->op.delete.name = name;
1584 	args->sem = &channel->sem;
1585 	fs->send_request(__fs_delete_file, req);
1586 	sem_wait(&channel->sem);
1587 	rc = args->rc;
1588 	free_fs_request(req);
1589 
1590 	return rc;
1591 }
1592 
1593 spdk_fs_iter
1594 spdk_fs_iter_first(struct spdk_filesystem *fs)
1595 {
1596 	struct spdk_file *f;
1597 
1598 	f = TAILQ_FIRST(&fs->files);
1599 	return f;
1600 }
1601 
1602 spdk_fs_iter
1603 spdk_fs_iter_next(spdk_fs_iter iter)
1604 {
1605 	struct spdk_file *f = iter;
1606 
1607 	if (f == NULL) {
1608 		return NULL;
1609 	}
1610 
1611 	f = TAILQ_NEXT(f, tailq);
1612 	return f;
1613 }
1614 
1615 const char *
1616 spdk_file_get_name(struct spdk_file *file)
1617 {
1618 	return file->name;
1619 }
1620 
1621 uint64_t
1622 spdk_file_get_length(struct spdk_file *file)
1623 {
1624 	uint64_t length;
1625 
1626 	assert(file != NULL);
1627 
1628 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1629 	SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length);
1630 	return length;
1631 }
1632 
1633 static void
1634 fs_truncate_complete_cb(void *ctx, int bserrno)
1635 {
1636 	struct spdk_fs_request *req = ctx;
1637 	struct spdk_fs_cb_args *args = &req->args;
1638 
1639 	args->fn.file_op(args->arg, bserrno);
1640 	free_fs_request(req);
1641 }
1642 
1643 static void
1644 fs_truncate_resize_cb(void *ctx, int bserrno)
1645 {
1646 	struct spdk_fs_request *req = ctx;
1647 	struct spdk_fs_cb_args *args = &req->args;
1648 	struct spdk_file *file = args->file;
1649 	uint64_t *length = &args->op.truncate.length;
1650 
1651 	if (bserrno) {
1652 		args->fn.file_op(args->arg, bserrno);
1653 		free_fs_request(req);
1654 		return;
1655 	}
1656 
1657 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1658 
1659 	file->length = *length;
1660 	if (file->append_pos > file->length) {
1661 		file->append_pos = file->length;
1662 	}
1663 
1664 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1665 }
1666 
1667 static uint64_t
1668 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1669 {
1670 	return (length + cluster_sz - 1) / cluster_sz;
1671 }
1672 
1673 void
1674 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1675 			 spdk_file_op_complete cb_fn, void *cb_arg)
1676 {
1677 	struct spdk_filesystem *fs;
1678 	size_t num_clusters;
1679 	struct spdk_fs_request *req;
1680 	struct spdk_fs_cb_args *args;
1681 
1682 	SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1683 	if (length == file->length) {
1684 		cb_fn(cb_arg, 0);
1685 		return;
1686 	}
1687 
1688 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1689 	if (req == NULL) {
1690 		cb_fn(cb_arg, -ENOMEM);
1691 		return;
1692 	}
1693 
1694 	args = &req->args;
1695 	args->fn.file_op = cb_fn;
1696 	args->arg = cb_arg;
1697 	args->file = file;
1698 	args->op.truncate.length = length;
1699 	fs = file->fs;
1700 
1701 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1702 
1703 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1704 }
1705 
1706 static void
1707 __truncate(void *arg)
1708 {
1709 	struct spdk_fs_request *req = arg;
1710 	struct spdk_fs_cb_args *args = &req->args;
1711 
1712 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1713 				 args->fn.file_op, args);
1714 }
1715 
1716 int
1717 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1718 		   uint64_t length)
1719 {
1720 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1721 	struct spdk_fs_request *req;
1722 	struct spdk_fs_cb_args *args;
1723 	int rc;
1724 
1725 	req = alloc_fs_request(channel);
1726 	if (req == NULL) {
1727 		return -ENOMEM;
1728 	}
1729 
1730 	args = &req->args;
1731 
1732 	args->file = file;
1733 	args->op.truncate.length = length;
1734 	args->fn.file_op = __wake_caller;
1735 	args->sem = &channel->sem;
1736 
1737 	channel->send_request(__truncate, req);
1738 	sem_wait(&channel->sem);
1739 	rc = args->rc;
1740 	free_fs_request(req);
1741 
1742 	return rc;
1743 }
1744 
1745 static void
1746 __rw_done(void *ctx, int bserrno)
1747 {
1748 	struct spdk_fs_request *req = ctx;
1749 	struct spdk_fs_cb_args *args = &req->args;
1750 
1751 	spdk_free(args->op.rw.pin_buf);
1752 	args->fn.file_op(args->arg, bserrno);
1753 	free_fs_request(req);
1754 }
1755 
1756 static void
1757 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1758 {
1759 	int i;
1760 	size_t len;
1761 
1762 	for (i = 0; i < iovcnt; i++) {
1763 		len = spdk_min(iovs[i].iov_len, buf_len);
1764 		memcpy(buf, iovs[i].iov_base, len);
1765 		buf += len;
1766 		assert(buf_len >= len);
1767 		buf_len -= len;
1768 	}
1769 }
1770 
1771 static void
1772 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1773 {
1774 	int i;
1775 	size_t len;
1776 
1777 	for (i = 0; i < iovcnt; i++) {
1778 		len = spdk_min(iovs[i].iov_len, buf_len);
1779 		memcpy(iovs[i].iov_base, buf, len);
1780 		buf += len;
1781 		assert(buf_len >= len);
1782 		buf_len -= len;
1783 	}
1784 }
1785 
1786 static void
1787 __read_done(void *ctx, int bserrno)
1788 {
1789 	struct spdk_fs_request *req = ctx;
1790 	struct spdk_fs_cb_args *args = &req->args;
1791 	void *buf;
1792 
1793 	assert(req != NULL);
1794 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1795 	if (args->op.rw.is_read) {
1796 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1797 		__rw_done(req, 0);
1798 	} else {
1799 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1800 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1801 				   args->op.rw.pin_buf,
1802 				   args->op.rw.start_lba, args->op.rw.num_lba,
1803 				   __rw_done, req);
1804 	}
1805 }
1806 
1807 static void
1808 __do_blob_read(void *ctx, int fserrno)
1809 {
1810 	struct spdk_fs_request *req = ctx;
1811 	struct spdk_fs_cb_args *args = &req->args;
1812 
1813 	if (fserrno) {
1814 		__rw_done(req, fserrno);
1815 		return;
1816 	}
1817 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1818 			  args->op.rw.pin_buf,
1819 			  args->op.rw.start_lba, args->op.rw.num_lba,
1820 			  __read_done, req);
1821 }
1822 
1823 static void
1824 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1825 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1826 {
1827 	uint64_t end_lba;
1828 
1829 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1830 	*start_lba = offset / *lba_size;
1831 	end_lba = (offset + length - 1) / *lba_size;
1832 	*num_lba = (end_lba - *start_lba + 1);
1833 }
1834 
1835 static bool
1836 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1837 {
1838 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1839 
1840 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1841 		return true;
1842 	}
1843 
1844 	return false;
1845 }
1846 
1847 static void
1848 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1849 {
1850 	uint32_t i;
1851 
1852 	for (i = 0; i < iovcnt; i++) {
1853 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1854 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1855 	}
1856 }
1857 
1858 static void
1859 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1860 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1861 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1862 {
1863 	struct spdk_fs_request *req;
1864 	struct spdk_fs_cb_args *args;
1865 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1866 	uint64_t start_lba, num_lba, pin_buf_length;
1867 	uint32_t lba_size;
1868 
1869 	if (is_read && offset + length > file->length) {
1870 		cb_fn(cb_arg, -EINVAL);
1871 		return;
1872 	}
1873 
1874 	req = alloc_fs_request_with_iov(channel, iovcnt);
1875 	if (req == NULL) {
1876 		cb_fn(cb_arg, -ENOMEM);
1877 		return;
1878 	}
1879 
1880 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1881 
1882 	args = &req->args;
1883 	args->fn.file_op = cb_fn;
1884 	args->arg = cb_arg;
1885 	args->file = file;
1886 	args->op.rw.channel = channel->bs_channel;
1887 	_fs_request_setup_iovs(req, iovs, iovcnt);
1888 	args->op.rw.is_read = is_read;
1889 	args->op.rw.offset = offset;
1890 	args->op.rw.blocklen = lba_size;
1891 
1892 	pin_buf_length = num_lba * lba_size;
1893 	args->op.rw.length = pin_buf_length;
1894 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1895 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1896 	if (args->op.rw.pin_buf == NULL) {
1897 		SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1898 			      file->name, offset, length);
1899 		free_fs_request(req);
1900 		cb_fn(cb_arg, -ENOMEM);
1901 		return;
1902 	}
1903 
1904 	args->op.rw.start_lba = start_lba;
1905 	args->op.rw.num_lba = num_lba;
1906 
1907 	if (!is_read && file->length < offset + length) {
1908 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1909 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1910 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1911 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1912 				   args->op.rw.pin_buf,
1913 				   args->op.rw.start_lba, args->op.rw.num_lba,
1914 				   __rw_done, req);
1915 	} else {
1916 		__do_blob_read(req, 0);
1917 	}
1918 }
1919 
1920 static void
1921 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1922 	    void *payload, uint64_t offset, uint64_t length,
1923 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1924 {
1925 	struct iovec iov;
1926 
1927 	iov.iov_base = payload;
1928 	iov.iov_len = (size_t)length;
1929 
1930 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1931 }
1932 
1933 void
1934 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1935 		      void *payload, uint64_t offset, uint64_t length,
1936 		      spdk_file_op_complete cb_fn, void *cb_arg)
1937 {
1938 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1939 }
1940 
1941 void
1942 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1943 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1944 		       spdk_file_op_complete cb_fn, void *cb_arg)
1945 {
1946 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1947 		      file->name, offset, length);
1948 
1949 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1950 }
1951 
1952 void
1953 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1954 		     void *payload, uint64_t offset, uint64_t length,
1955 		     spdk_file_op_complete cb_fn, void *cb_arg)
1956 {
1957 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1958 		      file->name, offset, length);
1959 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1960 }
1961 
1962 void
1963 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1964 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1965 		      spdk_file_op_complete cb_fn, void *cb_arg)
1966 {
1967 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1968 		      file->name, offset, length);
1969 
1970 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1971 }
1972 
1973 struct spdk_io_channel *
1974 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1975 {
1976 	struct spdk_io_channel *io_channel;
1977 	struct spdk_fs_channel *fs_channel;
1978 
1979 	io_channel = spdk_get_io_channel(&fs->io_target);
1980 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1981 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1982 	fs_channel->send_request = __send_request_direct;
1983 
1984 	return io_channel;
1985 }
1986 
1987 void
1988 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1989 {
1990 	spdk_put_io_channel(channel);
1991 }
1992 
1993 struct spdk_fs_thread_ctx *
1994 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1995 {
1996 	struct spdk_fs_thread_ctx *ctx;
1997 
1998 	ctx = calloc(1, sizeof(*ctx));
1999 	if (!ctx) {
2000 		return NULL;
2001 	}
2002 
2003 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
2004 		free(ctx);
2005 		return NULL;
2006 	}
2007 
2008 	fs_channel_create(fs, &ctx->ch, 512);
2009 
2010 	ctx->ch.send_request = fs->send_request;
2011 	ctx->ch.sync = 1;
2012 
2013 	return ctx;
2014 }
2015 
2016 
2017 void
2018 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
2019 {
2020 	assert(ctx->ch.sync == 1);
2021 
2022 	while (true) {
2023 		pthread_spin_lock(&ctx->ch.lock);
2024 		if (ctx->ch.outstanding_reqs == 0) {
2025 			pthread_spin_unlock(&ctx->ch.lock);
2026 			break;
2027 		}
2028 		pthread_spin_unlock(&ctx->ch.lock);
2029 		usleep(1000);
2030 	}
2031 
2032 	fs_channel_destroy(NULL, &ctx->ch);
2033 	free(ctx);
2034 }
2035 
2036 int
2037 spdk_fs_set_cache_size(uint64_t size_in_mb)
2038 {
2039 	/* setting g_fs_cache_size is only permitted if cache pool
2040 	 * is already freed or hasn't been initialized
2041 	 */
2042 	if (g_cache_pool != NULL) {
2043 		return -EPERM;
2044 	}
2045 
2046 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2047 
2048 	return 0;
2049 }
2050 
2051 uint64_t
2052 spdk_fs_get_cache_size(void)
2053 {
2054 	return g_fs_cache_size / (1024 * 1024);
2055 }
2056 
2057 static void __file_flush(void *ctx);
2058 
2059 /* Try to free some cache buffers from this file.
2060  */
2061 static int
2062 reclaim_cache_buffers(struct spdk_file *file)
2063 {
2064 	int rc;
2065 
2066 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2067 
2068 	/* The function is safe to be called with any threads, while the file
2069 	 * lock maybe locked by other thread for now, so try to get the file
2070 	 * lock here.
2071 	 */
2072 	rc = pthread_spin_trylock(&file->lock);
2073 	if (rc != 0) {
2074 		return -1;
2075 	}
2076 
2077 	if (file->tree->present_mask == 0) {
2078 		pthread_spin_unlock(&file->lock);
2079 		return -1;
2080 	}
2081 	tree_free_buffers(file->tree);
2082 
2083 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2084 	/* If not freed, put it in the end of the queue */
2085 	if (file->tree->present_mask != 0) {
2086 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2087 	} else {
2088 		file->last = NULL;
2089 	}
2090 	pthread_spin_unlock(&file->lock);
2091 
2092 	return 0;
2093 }
2094 
2095 static int
2096 _blobfs_cache_pool_reclaim(void *arg)
2097 {
2098 	struct spdk_file *file, *tmp;
2099 	int rc;
2100 
2101 	if (!blobfs_cache_pool_need_reclaim()) {
2102 		return SPDK_POLLER_IDLE;
2103 	}
2104 
2105 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2106 		if (!file->open_for_writing &&
2107 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2108 			rc = reclaim_cache_buffers(file);
2109 			if (rc < 0) {
2110 				continue;
2111 			}
2112 			if (!blobfs_cache_pool_need_reclaim()) {
2113 				return SPDK_POLLER_BUSY;
2114 			}
2115 			break;
2116 		}
2117 	}
2118 
2119 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2120 		if (!file->open_for_writing) {
2121 			rc = reclaim_cache_buffers(file);
2122 			if (rc < 0) {
2123 				continue;
2124 			}
2125 			if (!blobfs_cache_pool_need_reclaim()) {
2126 				return SPDK_POLLER_BUSY;
2127 			}
2128 			break;
2129 		}
2130 	}
2131 
2132 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2133 		rc = reclaim_cache_buffers(file);
2134 		if (rc < 0) {
2135 			continue;
2136 		}
2137 		break;
2138 	}
2139 
2140 	return SPDK_POLLER_BUSY;
2141 }
2142 
2143 static void
2144 _add_file_to_cache_pool(void *ctx)
2145 {
2146 	struct spdk_file *file = ctx;
2147 
2148 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2149 }
2150 
2151 static void
2152 _remove_file_from_cache_pool(void *ctx)
2153 {
2154 	struct spdk_file *file = ctx;
2155 
2156 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2157 }
2158 
2159 static struct cache_buffer *
2160 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2161 {
2162 	struct cache_buffer *buf;
2163 	int count = 0;
2164 	bool need_update = false;
2165 
2166 	buf = calloc(1, sizeof(*buf));
2167 	if (buf == NULL) {
2168 		SPDK_DEBUGLOG(blobfs, "calloc failed\n");
2169 		return NULL;
2170 	}
2171 
2172 	do {
2173 		buf->buf = spdk_mempool_get(g_cache_pool);
2174 		if (buf->buf) {
2175 			break;
2176 		}
2177 		if (count++ == 100) {
2178 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2179 				    file, offset);
2180 			free(buf);
2181 			return NULL;
2182 		}
2183 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2184 	} while (true);
2185 
2186 	buf->buf_size = CACHE_BUFFER_SIZE;
2187 	buf->offset = offset;
2188 
2189 	if (file->tree->present_mask == 0) {
2190 		need_update = true;
2191 	}
2192 	file->tree = tree_insert_buffer(file->tree, buf);
2193 
2194 	if (need_update) {
2195 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2196 	}
2197 
2198 	return buf;
2199 }
2200 
2201 static struct cache_buffer *
2202 cache_append_buffer(struct spdk_file *file)
2203 {
2204 	struct cache_buffer *last;
2205 
2206 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2207 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2208 
2209 	last = cache_insert_buffer(file, file->append_pos);
2210 	if (last == NULL) {
2211 		SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n");
2212 		return NULL;
2213 	}
2214 
2215 	file->last = last;
2216 
2217 	return last;
2218 }
2219 
2220 static void __check_sync_reqs(struct spdk_file *file);
2221 
2222 static void
2223 __file_cache_finish_sync(void *ctx, int bserrno)
2224 {
2225 	struct spdk_file *file;
2226 	struct spdk_fs_request *sync_req = ctx;
2227 	struct spdk_fs_cb_args *sync_args;
2228 
2229 	sync_args = &sync_req->args;
2230 	file = sync_args->file;
2231 	pthread_spin_lock(&file->lock);
2232 	file->length_xattr = sync_args->op.sync.length;
2233 	assert(sync_args->op.sync.offset <= file->length_flushed);
2234 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2235 			  0, file->trace_arg_name);
2236 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2237 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2238 	pthread_spin_unlock(&file->lock);
2239 
2240 	sync_args->fn.file_op(sync_args->arg, bserrno);
2241 
2242 	free_fs_request(sync_req);
2243 	__check_sync_reqs(file);
2244 }
2245 
2246 static void
2247 __check_sync_reqs(struct spdk_file *file)
2248 {
2249 	struct spdk_fs_request *sync_req;
2250 
2251 	pthread_spin_lock(&file->lock);
2252 
2253 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2254 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2255 			break;
2256 		}
2257 	}
2258 
2259 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2260 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2261 		sync_req->args.op.sync.xattr_in_progress = true;
2262 		sync_req->args.op.sync.length = file->length_flushed;
2263 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2264 				    sizeof(file->length_flushed));
2265 
2266 		pthread_spin_unlock(&file->lock);
2267 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2268 				  0, file->trace_arg_name);
2269 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2270 	} else {
2271 		pthread_spin_unlock(&file->lock);
2272 	}
2273 }
2274 
2275 static void
2276 __file_flush_done(void *ctx, int bserrno)
2277 {
2278 	struct spdk_fs_request *req = ctx;
2279 	struct spdk_fs_cb_args *args = &req->args;
2280 	struct spdk_file *file = args->file;
2281 	struct cache_buffer *next = args->op.flush.cache_buffer;
2282 
2283 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2284 
2285 	pthread_spin_lock(&file->lock);
2286 	next->in_progress = false;
2287 	next->bytes_flushed += args->op.flush.length;
2288 	file->length_flushed += args->op.flush.length;
2289 	if (file->length_flushed > file->length) {
2290 		file->length = file->length_flushed;
2291 	}
2292 	if (next->bytes_flushed == next->buf_size) {
2293 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2294 		next = tree_find_buffer(file->tree, file->length_flushed);
2295 	}
2296 
2297 	/*
2298 	 * Assert that there is no cached data that extends past the end of the underlying
2299 	 *  blob.
2300 	 */
2301 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2302 	       next->bytes_filled == 0);
2303 
2304 	pthread_spin_unlock(&file->lock);
2305 
2306 	__check_sync_reqs(file);
2307 
2308 	__file_flush(req);
2309 }
2310 
2311 static void
2312 __file_flush(void *ctx)
2313 {
2314 	struct spdk_fs_request *req = ctx;
2315 	struct spdk_fs_cb_args *args = &req->args;
2316 	struct spdk_file *file = args->file;
2317 	struct cache_buffer *next;
2318 	uint64_t offset, length, start_lba, num_lba;
2319 	uint32_t lba_size;
2320 
2321 	pthread_spin_lock(&file->lock);
2322 	next = tree_find_buffer(file->tree, file->length_flushed);
2323 	if (next == NULL || next->in_progress ||
2324 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2325 		/*
2326 		 * There is either no data to flush, a flush I/O is already in
2327 		 *  progress, or the next buffer is partially filled but there's no
2328 		 *  outstanding request to sync it.
2329 		 * So return immediately - if a flush I/O is in progress we will flush
2330 		 *  more data after that is completed, or a partial buffer will get flushed
2331 		 *  when it is either filled or the file is synced.
2332 		 */
2333 		free_fs_request(req);
2334 		if (next == NULL) {
2335 			/*
2336 			 * For cases where a file's cache was evicted, and then the
2337 			 *  file was later appended, we will write the data directly
2338 			 *  to disk and bypass cache.  So just update length_flushed
2339 			 *  here to reflect that all data was already written to disk.
2340 			 */
2341 			file->length_flushed = file->append_pos;
2342 		}
2343 		pthread_spin_unlock(&file->lock);
2344 		if (next == NULL) {
2345 			/*
2346 			 * There is no data to flush, but we still need to check for any
2347 			 *  outstanding sync requests to make sure metadata gets updated.
2348 			 */
2349 			__check_sync_reqs(file);
2350 		}
2351 		return;
2352 	}
2353 
2354 	offset = next->offset + next->bytes_flushed;
2355 	length = next->bytes_filled - next->bytes_flushed;
2356 	if (length == 0) {
2357 		free_fs_request(req);
2358 		pthread_spin_unlock(&file->lock);
2359 		/*
2360 		 * There is no data to flush, but we still need to check for any
2361 		 *  outstanding sync requests to make sure metadata gets updated.
2362 		 */
2363 		__check_sync_reqs(file);
2364 		return;
2365 	}
2366 	args->op.flush.length = length;
2367 	args->op.flush.cache_buffer = next;
2368 
2369 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2370 
2371 	next->in_progress = true;
2372 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2373 		     offset, length, start_lba, num_lba);
2374 	pthread_spin_unlock(&file->lock);
2375 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2376 			   next->buf + (start_lba * lba_size) - next->offset,
2377 			   start_lba, num_lba, __file_flush_done, req);
2378 }
2379 
2380 static void
2381 __file_extend_done(void *arg, int bserrno)
2382 {
2383 	struct spdk_fs_cb_args *args = arg;
2384 
2385 	__wake_caller(args, bserrno);
2386 }
2387 
2388 static void
2389 __file_extend_resize_cb(void *_args, int bserrno)
2390 {
2391 	struct spdk_fs_cb_args *args = _args;
2392 	struct spdk_file *file = args->file;
2393 
2394 	if (bserrno) {
2395 		__wake_caller(args, bserrno);
2396 		return;
2397 	}
2398 
2399 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2400 }
2401 
2402 static void
2403 __file_extend_blob(void *_args)
2404 {
2405 	struct spdk_fs_cb_args *args = _args;
2406 	struct spdk_file *file = args->file;
2407 
2408 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2409 }
2410 
2411 static void
2412 __rw_from_file_done(void *ctx, int bserrno)
2413 {
2414 	struct spdk_fs_request *req = ctx;
2415 
2416 	__wake_caller(&req->args, bserrno);
2417 	free_fs_request(req);
2418 }
2419 
2420 static void
2421 __rw_from_file(void *ctx)
2422 {
2423 	struct spdk_fs_request *req = ctx;
2424 	struct spdk_fs_cb_args *args = &req->args;
2425 	struct spdk_file *file = args->file;
2426 
2427 	if (args->op.rw.is_read) {
2428 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2429 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2430 				     __rw_from_file_done, req);
2431 	} else {
2432 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2433 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2434 				      __rw_from_file_done, req);
2435 	}
2436 }
2437 
2438 static int
2439 __send_rw_from_file(struct spdk_file *file, void *payload,
2440 		    uint64_t offset, uint64_t length, bool is_read,
2441 		    struct spdk_fs_channel *channel)
2442 {
2443 	struct spdk_fs_request *req;
2444 	struct spdk_fs_cb_args *args;
2445 
2446 	req = alloc_fs_request_with_iov(channel, 1);
2447 	if (req == NULL) {
2448 		sem_post(&channel->sem);
2449 		return -ENOMEM;
2450 	}
2451 
2452 	args = &req->args;
2453 	args->file = file;
2454 	args->sem = &channel->sem;
2455 	args->iovs[0].iov_base = payload;
2456 	args->iovs[0].iov_len = (size_t)length;
2457 	args->op.rw.offset = offset;
2458 	args->op.rw.is_read = is_read;
2459 	file->fs->send_request(__rw_from_file, req);
2460 	return 0;
2461 }
2462 
2463 int
2464 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2465 		void *payload, uint64_t offset, uint64_t length)
2466 {
2467 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2468 	struct spdk_fs_request *flush_req;
2469 	uint64_t rem_length, copy, blob_size, cluster_sz;
2470 	uint32_t cache_buffers_filled = 0;
2471 	uint8_t *cur_payload;
2472 	struct cache_buffer *last;
2473 
2474 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2475 
2476 	if (length == 0) {
2477 		return 0;
2478 	}
2479 
2480 	if (offset != file->append_pos) {
2481 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2482 		return -EINVAL;
2483 	}
2484 
2485 	pthread_spin_lock(&file->lock);
2486 	file->open_for_writing = true;
2487 
2488 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2489 		cache_append_buffer(file);
2490 	}
2491 
2492 	if (file->last == NULL) {
2493 		int rc;
2494 
2495 		file->append_pos += length;
2496 		pthread_spin_unlock(&file->lock);
2497 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2498 		sem_wait(&channel->sem);
2499 		return rc;
2500 	}
2501 
2502 	blob_size = __file_get_blob_size(file);
2503 
2504 	if ((offset + length) > blob_size) {
2505 		struct spdk_fs_cb_args extend_args = {};
2506 
2507 		cluster_sz = file->fs->bs_opts.cluster_sz;
2508 		extend_args.sem = &channel->sem;
2509 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2510 		extend_args.file = file;
2511 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2512 		pthread_spin_unlock(&file->lock);
2513 		file->fs->send_request(__file_extend_blob, &extend_args);
2514 		sem_wait(&channel->sem);
2515 		if (extend_args.rc) {
2516 			return extend_args.rc;
2517 		}
2518 	}
2519 
2520 	flush_req = alloc_fs_request(channel);
2521 	if (flush_req == NULL) {
2522 		pthread_spin_unlock(&file->lock);
2523 		return -ENOMEM;
2524 	}
2525 
2526 	last = file->last;
2527 	rem_length = length;
2528 	cur_payload = payload;
2529 	while (rem_length > 0) {
2530 		copy = last->buf_size - last->bytes_filled;
2531 		if (copy > rem_length) {
2532 			copy = rem_length;
2533 		}
2534 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2535 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2536 		file->append_pos += copy;
2537 		if (file->length < file->append_pos) {
2538 			file->length = file->append_pos;
2539 		}
2540 		cur_payload += copy;
2541 		last->bytes_filled += copy;
2542 		rem_length -= copy;
2543 		if (last->bytes_filled == last->buf_size) {
2544 			cache_buffers_filled++;
2545 			last = cache_append_buffer(file);
2546 			if (last == NULL) {
2547 				BLOBFS_TRACE(file, "nomem\n");
2548 				free_fs_request(flush_req);
2549 				pthread_spin_unlock(&file->lock);
2550 				return -ENOMEM;
2551 			}
2552 		}
2553 	}
2554 
2555 	pthread_spin_unlock(&file->lock);
2556 
2557 	if (cache_buffers_filled == 0) {
2558 		free_fs_request(flush_req);
2559 		return 0;
2560 	}
2561 
2562 	flush_req->args.file = file;
2563 	file->fs->send_request(__file_flush, flush_req);
2564 	return 0;
2565 }
2566 
2567 static void
2568 __readahead_done(void *ctx, int bserrno)
2569 {
2570 	struct spdk_fs_request *req = ctx;
2571 	struct spdk_fs_cb_args *args = &req->args;
2572 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2573 	struct spdk_file *file = args->file;
2574 
2575 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2576 
2577 	pthread_spin_lock(&file->lock);
2578 	cache_buffer->bytes_filled = args->op.readahead.length;
2579 	cache_buffer->bytes_flushed = args->op.readahead.length;
2580 	cache_buffer->in_progress = false;
2581 	pthread_spin_unlock(&file->lock);
2582 
2583 	free_fs_request(req);
2584 }
2585 
2586 static void
2587 __readahead(void *ctx)
2588 {
2589 	struct spdk_fs_request *req = ctx;
2590 	struct spdk_fs_cb_args *args = &req->args;
2591 	struct spdk_file *file = args->file;
2592 	uint64_t offset, length, start_lba, num_lba;
2593 	uint32_t lba_size;
2594 
2595 	offset = args->op.readahead.offset;
2596 	length = args->op.readahead.length;
2597 	assert(length > 0);
2598 
2599 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2600 
2601 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2602 		     offset, length, start_lba, num_lba);
2603 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2604 			  args->op.readahead.cache_buffer->buf,
2605 			  start_lba, num_lba, __readahead_done, req);
2606 }
2607 
2608 static uint64_t
2609 __next_cache_buffer_offset(uint64_t offset)
2610 {
2611 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2612 }
2613 
2614 static void
2615 check_readahead(struct spdk_file *file, uint64_t offset,
2616 		struct spdk_fs_channel *channel)
2617 {
2618 	struct spdk_fs_request *req;
2619 	struct spdk_fs_cb_args *args;
2620 
2621 	offset = __next_cache_buffer_offset(offset);
2622 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2623 		return;
2624 	}
2625 
2626 	req = alloc_fs_request(channel);
2627 	if (req == NULL) {
2628 		return;
2629 	}
2630 	args = &req->args;
2631 
2632 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2633 
2634 	args->file = file;
2635 	args->op.readahead.offset = offset;
2636 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2637 	if (!args->op.readahead.cache_buffer) {
2638 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2639 		free_fs_request(req);
2640 		return;
2641 	}
2642 
2643 	args->op.readahead.cache_buffer->in_progress = true;
2644 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2645 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2646 	} else {
2647 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2648 	}
2649 	file->fs->send_request(__readahead, req);
2650 }
2651 
2652 int64_t
2653 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2654 	       void *payload, uint64_t offset, uint64_t length)
2655 {
2656 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2657 	uint64_t final_offset, final_length;
2658 	uint32_t sub_reads = 0;
2659 	struct cache_buffer *buf;
2660 	uint64_t read_len;
2661 	int rc = 0;
2662 
2663 	pthread_spin_lock(&file->lock);
2664 
2665 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2666 
2667 	file->open_for_writing = false;
2668 
2669 	if (length == 0 || offset >= file->append_pos) {
2670 		pthread_spin_unlock(&file->lock);
2671 		return 0;
2672 	}
2673 
2674 	if (offset + length > file->append_pos) {
2675 		length = file->append_pos - offset;
2676 	}
2677 
2678 	if (offset != file->next_seq_offset) {
2679 		file->seq_byte_count = 0;
2680 	}
2681 	file->seq_byte_count += length;
2682 	file->next_seq_offset = offset + length;
2683 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2684 		check_readahead(file, offset, channel);
2685 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2686 	}
2687 
2688 	final_length = 0;
2689 	final_offset = offset + length;
2690 	while (offset < final_offset) {
2691 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2692 		if (length > (final_offset - offset)) {
2693 			length = final_offset - offset;
2694 		}
2695 
2696 		buf = tree_find_filled_buffer(file->tree, offset);
2697 		if (buf == NULL) {
2698 			pthread_spin_unlock(&file->lock);
2699 			rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2700 			pthread_spin_lock(&file->lock);
2701 			if (rc == 0) {
2702 				sub_reads++;
2703 			}
2704 		} else {
2705 			read_len = length;
2706 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2707 				read_len = buf->offset + buf->bytes_filled - offset;
2708 			}
2709 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2710 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2711 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2712 				tree_remove_buffer(file->tree, buf);
2713 				if (file->tree->present_mask == 0) {
2714 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2715 				}
2716 			}
2717 		}
2718 
2719 		if (rc == 0) {
2720 			final_length += length;
2721 		} else {
2722 			break;
2723 		}
2724 		payload += length;
2725 		offset += length;
2726 	}
2727 	pthread_spin_unlock(&file->lock);
2728 	while (sub_reads > 0) {
2729 		sem_wait(&channel->sem);
2730 		sub_reads--;
2731 	}
2732 	if (rc == 0) {
2733 		return final_length;
2734 	} else {
2735 		return rc;
2736 	}
2737 }
2738 
2739 static void
2740 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2741 	   spdk_file_op_complete cb_fn, void *cb_arg)
2742 {
2743 	struct spdk_fs_request *sync_req;
2744 	struct spdk_fs_request *flush_req;
2745 	struct spdk_fs_cb_args *sync_args;
2746 	struct spdk_fs_cb_args *flush_args;
2747 
2748 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2749 
2750 	pthread_spin_lock(&file->lock);
2751 	if (file->append_pos <= file->length_xattr) {
2752 		BLOBFS_TRACE(file, "done - file already synced\n");
2753 		pthread_spin_unlock(&file->lock);
2754 		cb_fn(cb_arg, 0);
2755 		return;
2756 	}
2757 
2758 	sync_req = alloc_fs_request(channel);
2759 	if (!sync_req) {
2760 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2761 		pthread_spin_unlock(&file->lock);
2762 		cb_fn(cb_arg, -ENOMEM);
2763 		return;
2764 	}
2765 	sync_args = &sync_req->args;
2766 
2767 	flush_req = alloc_fs_request(channel);
2768 	if (!flush_req) {
2769 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2770 		free_fs_request(sync_req);
2771 		pthread_spin_unlock(&file->lock);
2772 		cb_fn(cb_arg, -ENOMEM);
2773 		return;
2774 	}
2775 	flush_args = &flush_req->args;
2776 
2777 	sync_args->file = file;
2778 	sync_args->fn.file_op = cb_fn;
2779 	sync_args->arg = cb_arg;
2780 	sync_args->op.sync.offset = file->append_pos;
2781 	sync_args->op.sync.xattr_in_progress = false;
2782 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2783 	pthread_spin_unlock(&file->lock);
2784 
2785 	flush_args->file = file;
2786 	channel->send_request(__file_flush, flush_req);
2787 }
2788 
2789 int
2790 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2791 {
2792 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2793 	struct spdk_fs_cb_args args = {};
2794 
2795 	args.sem = &channel->sem;
2796 	_file_sync(file, channel, __wake_caller, &args);
2797 	sem_wait(&channel->sem);
2798 
2799 	return args.rc;
2800 }
2801 
2802 void
2803 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2804 		     spdk_file_op_complete cb_fn, void *cb_arg)
2805 {
2806 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2807 
2808 	_file_sync(file, channel, cb_fn, cb_arg);
2809 }
2810 
2811 void
2812 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2813 {
2814 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2815 	file->priority = priority;
2816 
2817 }
2818 
2819 /*
2820  * Close routines
2821  */
2822 
2823 static void
2824 __file_close_async_done(void *ctx, int bserrno)
2825 {
2826 	struct spdk_fs_request *req = ctx;
2827 	struct spdk_fs_cb_args *args = &req->args;
2828 	struct spdk_file *file = args->file;
2829 
2830 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2831 
2832 	if (file->is_deleted) {
2833 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2834 		return;
2835 	}
2836 
2837 	args->fn.file_op(args->arg, bserrno);
2838 	free_fs_request(req);
2839 }
2840 
2841 static void
2842 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2843 {
2844 	struct spdk_blob *blob;
2845 
2846 	pthread_spin_lock(&file->lock);
2847 	if (file->ref_count == 0) {
2848 		pthread_spin_unlock(&file->lock);
2849 		__file_close_async_done(req, -EBADF);
2850 		return;
2851 	}
2852 
2853 	file->ref_count--;
2854 	if (file->ref_count > 0) {
2855 		pthread_spin_unlock(&file->lock);
2856 		req->args.fn.file_op(req->args.arg, 0);
2857 		free_fs_request(req);
2858 		return;
2859 	}
2860 
2861 	pthread_spin_unlock(&file->lock);
2862 
2863 	blob = file->blob;
2864 	file->blob = NULL;
2865 	spdk_blob_close(blob, __file_close_async_done, req);
2866 }
2867 
2868 static void
2869 __file_close_async__sync_done(void *arg, int fserrno)
2870 {
2871 	struct spdk_fs_request *req = arg;
2872 	struct spdk_fs_cb_args *args = &req->args;
2873 
2874 	__file_close_async(args->file, req);
2875 }
2876 
2877 void
2878 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2879 {
2880 	struct spdk_fs_request *req;
2881 	struct spdk_fs_cb_args *args;
2882 
2883 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2884 	if (req == NULL) {
2885 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2886 		cb_fn(cb_arg, -ENOMEM);
2887 		return;
2888 	}
2889 
2890 	args = &req->args;
2891 	args->file = file;
2892 	args->fn.file_op = cb_fn;
2893 	args->arg = cb_arg;
2894 
2895 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2896 }
2897 
2898 static void
2899 __file_close(void *arg)
2900 {
2901 	struct spdk_fs_request *req = arg;
2902 	struct spdk_fs_cb_args *args = &req->args;
2903 	struct spdk_file *file = args->file;
2904 
2905 	__file_close_async(file, req);
2906 }
2907 
2908 int
2909 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2910 {
2911 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2912 	struct spdk_fs_request *req;
2913 	struct spdk_fs_cb_args *args;
2914 
2915 	req = alloc_fs_request(channel);
2916 	if (req == NULL) {
2917 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2918 		return -ENOMEM;
2919 	}
2920 
2921 	args = &req->args;
2922 
2923 	spdk_file_sync(file, ctx);
2924 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2925 	args->file = file;
2926 	args->sem = &channel->sem;
2927 	args->fn.file_op = __wake_caller;
2928 	args->arg = args;
2929 	channel->send_request(__file_close, req);
2930 	sem_wait(&channel->sem);
2931 
2932 	return args->rc;
2933 }
2934 
2935 int
2936 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2937 {
2938 	if (size < sizeof(spdk_blob_id)) {
2939 		return -EINVAL;
2940 	}
2941 
2942 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2943 
2944 	return sizeof(spdk_blob_id);
2945 }
2946 
2947 static void
2948 _file_free(void *ctx)
2949 {
2950 	struct spdk_file *file = ctx;
2951 
2952 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2953 
2954 	free(file->name);
2955 	free(file->tree);
2956 	free(file);
2957 }
2958 
2959 static void
2960 file_free(struct spdk_file *file)
2961 {
2962 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2963 	pthread_spin_lock(&file->lock);
2964 	if (file->tree->present_mask == 0) {
2965 		pthread_spin_unlock(&file->lock);
2966 		free(file->name);
2967 		free(file->tree);
2968 		free(file);
2969 		return;
2970 	}
2971 
2972 	tree_free_buffers(file->tree);
2973 	assert(file->tree->present_mask == 0);
2974 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2975 	pthread_spin_unlock(&file->lock);
2976 }
2977 
2978 SPDK_LOG_REGISTER_COMPONENT(blobfs)
2979 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw)
2980