xref: /spdk/lib/blobfs/blobfs.c (revision 03e3fc4f5835983a4e6602b4e770922e798ce263)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "tree.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
58 
59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
60 static struct spdk_mempool *g_cache_pool;
61 static TAILQ_HEAD(, spdk_file) g_caches;
62 static struct spdk_poller *g_cache_pool_mgmt_poller;
63 static struct spdk_thread *g_cache_pool_thread;
64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
65 static int g_fs_count = 0;
66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
67 
68 #define TRACE_GROUP_BLOBFS	0x7
69 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
70 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
71 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
72 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
73 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
74 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
75 
76 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
77 {
78 	spdk_trace_register_description("BLOBFS_XATTR_START",
79 					TRACE_BLOBFS_XATTR_START,
80 					OWNER_NONE, OBJECT_NONE, 0,
81 					SPDK_TRACE_ARG_TYPE_STR,
82 					"file:    ");
83 	spdk_trace_register_description("BLOBFS_XATTR_END",
84 					TRACE_BLOBFS_XATTR_END,
85 					OWNER_NONE, OBJECT_NONE, 0,
86 					SPDK_TRACE_ARG_TYPE_STR,
87 					"file:    ");
88 	spdk_trace_register_description("BLOBFS_OPEN",
89 					TRACE_BLOBFS_OPEN,
90 					OWNER_NONE, OBJECT_NONE, 0,
91 					SPDK_TRACE_ARG_TYPE_STR,
92 					"file:    ");
93 	spdk_trace_register_description("BLOBFS_CLOSE",
94 					TRACE_BLOBFS_CLOSE,
95 					OWNER_NONE, OBJECT_NONE, 0,
96 					SPDK_TRACE_ARG_TYPE_STR,
97 					"file:    ");
98 	spdk_trace_register_description("BLOBFS_DELETE_START",
99 					TRACE_BLOBFS_DELETE_START,
100 					OWNER_NONE, OBJECT_NONE, 0,
101 					SPDK_TRACE_ARG_TYPE_STR,
102 					"file:    ");
103 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
104 					TRACE_BLOBFS_DELETE_DONE,
105 					OWNER_NONE, OBJECT_NONE, 0,
106 					SPDK_TRACE_ARG_TYPE_STR,
107 					"file:    ");
108 }
109 
110 void
111 cache_buffer_free(struct cache_buffer *cache_buffer)
112 {
113 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
114 	free(cache_buffer);
115 }
116 
117 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
118 
119 struct spdk_file {
120 	struct spdk_filesystem	*fs;
121 	struct spdk_blob	*blob;
122 	char			*name;
123 	uint64_t		trace_arg_name;
124 	uint64_t		length;
125 	bool                    is_deleted;
126 	bool			open_for_writing;
127 	uint64_t		length_flushed;
128 	uint64_t		length_xattr;
129 	uint64_t		append_pos;
130 	uint64_t		seq_byte_count;
131 	uint64_t		next_seq_offset;
132 	uint32_t		priority;
133 	TAILQ_ENTRY(spdk_file)	tailq;
134 	spdk_blob_id		blobid;
135 	uint32_t		ref_count;
136 	pthread_spinlock_t	lock;
137 	struct cache_buffer	*last;
138 	struct cache_tree	*tree;
139 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
140 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
141 	TAILQ_ENTRY(spdk_file)	cache_tailq;
142 };
143 
144 struct spdk_deleted_file {
145 	spdk_blob_id	id;
146 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
147 };
148 
149 struct spdk_filesystem {
150 	struct spdk_blob_store	*bs;
151 	TAILQ_HEAD(, spdk_file)	files;
152 	struct spdk_bs_opts	bs_opts;
153 	struct spdk_bs_dev	*bdev;
154 	fs_send_request_fn	send_request;
155 
156 	struct {
157 		uint32_t		max_ops;
158 		struct spdk_io_channel	*sync_io_channel;
159 		struct spdk_fs_channel	*sync_fs_channel;
160 	} sync_target;
161 
162 	struct {
163 		uint32_t		max_ops;
164 		struct spdk_io_channel	*md_io_channel;
165 		struct spdk_fs_channel	*md_fs_channel;
166 	} md_target;
167 
168 	struct {
169 		uint32_t		max_ops;
170 	} io_target;
171 };
172 
173 struct spdk_fs_cb_args {
174 	union {
175 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
176 		spdk_fs_op_complete			fs_op;
177 		spdk_file_op_with_handle_complete	file_op_with_handle;
178 		spdk_file_op_complete			file_op;
179 		spdk_file_stat_op_complete		stat_op;
180 	} fn;
181 	void *arg;
182 	sem_t *sem;
183 	struct spdk_filesystem *fs;
184 	struct spdk_file *file;
185 	int rc;
186 	struct iovec *iovs;
187 	uint32_t iovcnt;
188 	struct iovec iov;
189 	union {
190 		struct {
191 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
192 		} fs_load;
193 		struct {
194 			uint64_t	length;
195 		} truncate;
196 		struct {
197 			struct spdk_io_channel	*channel;
198 			void		*pin_buf;
199 			int		is_read;
200 			off_t		offset;
201 			size_t		length;
202 			uint64_t	start_lba;
203 			uint64_t	num_lba;
204 			uint32_t	blocklen;
205 		} rw;
206 		struct {
207 			const char	*old_name;
208 			const char	*new_name;
209 		} rename;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 		} flush;
214 		struct {
215 			struct cache_buffer	*cache_buffer;
216 			uint64_t		length;
217 			uint64_t		offset;
218 		} readahead;
219 		struct {
220 			/* offset of the file when the sync request was made */
221 			uint64_t			offset;
222 			TAILQ_ENTRY(spdk_fs_request)	tailq;
223 			bool				xattr_in_progress;
224 			/* length written to the xattr for this file - this should
225 			 * always be the same as the offset if only one thread is
226 			 * writing to the file, but could differ if multiple threads
227 			 * are appending
228 			 */
229 			uint64_t			length;
230 		} sync;
231 		struct {
232 			uint32_t			num_clusters;
233 		} resize;
234 		struct {
235 			const char	*name;
236 			uint32_t	flags;
237 			TAILQ_ENTRY(spdk_fs_request)	tailq;
238 		} open;
239 		struct {
240 			const char		*name;
241 			struct spdk_blob	*blob;
242 		} create;
243 		struct {
244 			const char	*name;
245 		} delete;
246 		struct {
247 			const char	*name;
248 		} stat;
249 	} op;
250 };
251 
252 static void file_free(struct spdk_file *file);
253 static void fs_io_device_unregister(struct spdk_filesystem *fs);
254 static void fs_free_io_channels(struct spdk_filesystem *fs);
255 
256 void
257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
258 {
259 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
260 }
261 
262 static int _blobfs_cache_pool_reclaim(void *arg);
263 
264 static bool
265 blobfs_cache_pool_need_reclaim(void)
266 {
267 	size_t count;
268 
269 	count = spdk_mempool_count(g_cache_pool);
270 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
271 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
272 	 */
273 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
274 		return false;
275 	}
276 
277 	return true;
278 }
279 
280 static void
281 __start_cache_pool_mgmt(void *ctx)
282 {
283 	assert(g_cache_pool == NULL);
284 
285 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
286 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
287 					   CACHE_BUFFER_SIZE,
288 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
289 					   SPDK_ENV_SOCKET_ID_ANY);
290 	if (!g_cache_pool) {
291 		SPDK_ERRLOG("Create mempool failed, you may "
292 			    "increase the memory and try again\n");
293 		assert(false);
294 	}
295 	TAILQ_INIT(&g_caches);
296 
297 	assert(g_cache_pool_mgmt_poller == NULL);
298 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
299 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
300 }
301 
302 static void
303 __stop_cache_pool_mgmt(void *ctx)
304 {
305 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
306 
307 	assert(g_cache_pool != NULL);
308 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
309 	spdk_mempool_free(g_cache_pool);
310 	g_cache_pool = NULL;
311 
312 	spdk_thread_exit(g_cache_pool_thread);
313 }
314 
315 static void
316 initialize_global_cache(void)
317 {
318 	pthread_mutex_lock(&g_cache_init_lock);
319 	if (g_fs_count == 0) {
320 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
321 		assert(g_cache_pool_thread != NULL);
322 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
323 	}
324 	g_fs_count++;
325 	pthread_mutex_unlock(&g_cache_init_lock);
326 }
327 
328 static void
329 free_global_cache(void)
330 {
331 	pthread_mutex_lock(&g_cache_init_lock);
332 	g_fs_count--;
333 	if (g_fs_count == 0) {
334 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
335 	}
336 	pthread_mutex_unlock(&g_cache_init_lock);
337 }
338 
339 static uint64_t
340 __file_get_blob_size(struct spdk_file *file)
341 {
342 	uint64_t cluster_sz;
343 
344 	cluster_sz = file->fs->bs_opts.cluster_sz;
345 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
346 }
347 
348 struct spdk_fs_request {
349 	struct spdk_fs_cb_args		args;
350 	TAILQ_ENTRY(spdk_fs_request)	link;
351 	struct spdk_fs_channel		*channel;
352 };
353 
354 struct spdk_fs_channel {
355 	struct spdk_fs_request		*req_mem;
356 	TAILQ_HEAD(, spdk_fs_request)	reqs;
357 	sem_t				sem;
358 	struct spdk_filesystem		*fs;
359 	struct spdk_io_channel		*bs_channel;
360 	fs_send_request_fn		send_request;
361 	bool				sync;
362 	uint32_t			outstanding_reqs;
363 	pthread_spinlock_t		lock;
364 };
365 
366 /* For now, this is effectively an alias. But eventually we'll shift
367  * some data members over. */
368 struct spdk_fs_thread_ctx {
369 	struct spdk_fs_channel	ch;
370 };
371 
372 static struct spdk_fs_request *
373 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
374 {
375 	struct spdk_fs_request *req;
376 	struct iovec *iovs = NULL;
377 
378 	if (iovcnt > 1) {
379 		iovs = calloc(iovcnt, sizeof(struct iovec));
380 		if (!iovs) {
381 			return NULL;
382 		}
383 	}
384 
385 	if (channel->sync) {
386 		pthread_spin_lock(&channel->lock);
387 	}
388 
389 	req = TAILQ_FIRST(&channel->reqs);
390 	if (req) {
391 		channel->outstanding_reqs++;
392 		TAILQ_REMOVE(&channel->reqs, req, link);
393 	}
394 
395 	if (channel->sync) {
396 		pthread_spin_unlock(&channel->lock);
397 	}
398 
399 	if (req == NULL) {
400 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
401 		free(iovs);
402 		return NULL;
403 	}
404 	memset(req, 0, sizeof(*req));
405 	req->channel = channel;
406 	if (iovcnt > 1) {
407 		req->args.iovs = iovs;
408 	} else {
409 		req->args.iovs = &req->args.iov;
410 	}
411 	req->args.iovcnt = iovcnt;
412 
413 	return req;
414 }
415 
416 static struct spdk_fs_request *
417 alloc_fs_request(struct spdk_fs_channel *channel)
418 {
419 	return alloc_fs_request_with_iov(channel, 0);
420 }
421 
422 static void
423 free_fs_request(struct spdk_fs_request *req)
424 {
425 	struct spdk_fs_channel *channel = req->channel;
426 
427 	if (req->args.iovcnt > 1) {
428 		free(req->args.iovs);
429 	}
430 
431 	if (channel->sync) {
432 		pthread_spin_lock(&channel->lock);
433 	}
434 
435 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
436 	channel->outstanding_reqs--;
437 
438 	if (channel->sync) {
439 		pthread_spin_unlock(&channel->lock);
440 	}
441 }
442 
443 static int
444 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
445 		  uint32_t max_ops)
446 {
447 	uint32_t i;
448 
449 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
450 	if (!channel->req_mem) {
451 		return -1;
452 	}
453 
454 	channel->outstanding_reqs = 0;
455 	TAILQ_INIT(&channel->reqs);
456 	sem_init(&channel->sem, 0, 0);
457 
458 	for (i = 0; i < max_ops; i++) {
459 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
460 	}
461 
462 	channel->fs = fs;
463 
464 	return 0;
465 }
466 
467 static int
468 fs_md_channel_create(void *io_device, void *ctx_buf)
469 {
470 	struct spdk_filesystem		*fs;
471 	struct spdk_fs_channel		*channel = ctx_buf;
472 
473 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
474 
475 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
476 }
477 
478 static int
479 fs_sync_channel_create(void *io_device, void *ctx_buf)
480 {
481 	struct spdk_filesystem		*fs;
482 	struct spdk_fs_channel		*channel = ctx_buf;
483 
484 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
485 
486 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
487 }
488 
489 static int
490 fs_io_channel_create(void *io_device, void *ctx_buf)
491 {
492 	struct spdk_filesystem		*fs;
493 	struct spdk_fs_channel		*channel = ctx_buf;
494 
495 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
496 
497 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
498 }
499 
500 static void
501 fs_channel_destroy(void *io_device, void *ctx_buf)
502 {
503 	struct spdk_fs_channel *channel = ctx_buf;
504 
505 	if (channel->outstanding_reqs > 0) {
506 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
507 			    channel->outstanding_reqs);
508 	}
509 
510 	free(channel->req_mem);
511 	if (channel->bs_channel != NULL) {
512 		spdk_bs_free_io_channel(channel->bs_channel);
513 	}
514 }
515 
516 static void
517 __send_request_direct(fs_request_fn fn, void *arg)
518 {
519 	fn(arg);
520 }
521 
522 static void
523 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
524 {
525 	fs->bs = bs;
526 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
527 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
528 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
529 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
530 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
531 
532 	initialize_global_cache();
533 }
534 
535 static void
536 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
537 {
538 	struct spdk_fs_request *req = ctx;
539 	struct spdk_fs_cb_args *args = &req->args;
540 	struct spdk_filesystem *fs = args->fs;
541 
542 	if (bserrno == 0) {
543 		common_fs_bs_init(fs, bs);
544 	} else {
545 		free(fs);
546 		fs = NULL;
547 	}
548 
549 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
550 	free_fs_request(req);
551 }
552 
553 static void
554 fs_conf_parse(void)
555 {
556 	struct spdk_conf_section *sp;
557 
558 	sp = spdk_conf_find_section(NULL, "Blobfs");
559 	if (sp == NULL) {
560 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
561 		return;
562 	}
563 
564 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
565 	if (g_fs_cache_buffer_shift <= 0) {
566 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
567 	}
568 }
569 
570 static struct spdk_filesystem *
571 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
572 {
573 	struct spdk_filesystem *fs;
574 
575 	fs = calloc(1, sizeof(*fs));
576 	if (fs == NULL) {
577 		return NULL;
578 	}
579 
580 	fs->bdev = dev;
581 	fs->send_request = send_request_fn;
582 	TAILQ_INIT(&fs->files);
583 
584 	fs->md_target.max_ops = 512;
585 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
586 				sizeof(struct spdk_fs_channel), "blobfs_md");
587 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
588 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
589 
590 	fs->sync_target.max_ops = 512;
591 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
592 				sizeof(struct spdk_fs_channel), "blobfs_sync");
593 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
594 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
595 
596 	fs->io_target.max_ops = 512;
597 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
598 				sizeof(struct spdk_fs_channel), "blobfs_io");
599 
600 	return fs;
601 }
602 
603 static void
604 __wake_caller(void *arg, int fserrno)
605 {
606 	struct spdk_fs_cb_args *args = arg;
607 
608 	args->rc = fserrno;
609 	sem_post(args->sem);
610 }
611 
612 void
613 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
614 	     fs_send_request_fn send_request_fn,
615 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
616 {
617 	struct spdk_filesystem *fs;
618 	struct spdk_fs_request *req;
619 	struct spdk_fs_cb_args *args;
620 	struct spdk_bs_opts opts = {};
621 
622 	fs = fs_alloc(dev, send_request_fn);
623 	if (fs == NULL) {
624 		cb_fn(cb_arg, NULL, -ENOMEM);
625 		return;
626 	}
627 
628 	fs_conf_parse();
629 
630 	req = alloc_fs_request(fs->md_target.md_fs_channel);
631 	if (req == NULL) {
632 		fs_free_io_channels(fs);
633 		fs_io_device_unregister(fs);
634 		cb_fn(cb_arg, NULL, -ENOMEM);
635 		return;
636 	}
637 
638 	args = &req->args;
639 	args->fn.fs_op_with_handle = cb_fn;
640 	args->arg = cb_arg;
641 	args->fs = fs;
642 
643 	spdk_bs_opts_init(&opts);
644 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
645 	if (opt) {
646 		opts.cluster_sz = opt->cluster_sz;
647 	}
648 	spdk_bs_init(dev, &opts, init_cb, req);
649 }
650 
651 static struct spdk_file *
652 file_alloc(struct spdk_filesystem *fs)
653 {
654 	struct spdk_file *file;
655 
656 	file = calloc(1, sizeof(*file));
657 	if (file == NULL) {
658 		return NULL;
659 	}
660 
661 	file->tree = calloc(1, sizeof(*file->tree));
662 	if (file->tree == NULL) {
663 		free(file);
664 		return NULL;
665 	}
666 
667 	file->fs = fs;
668 	TAILQ_INIT(&file->open_requests);
669 	TAILQ_INIT(&file->sync_requests);
670 	pthread_spin_init(&file->lock, 0);
671 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
672 	file->priority = SPDK_FILE_PRIORITY_LOW;
673 	return file;
674 }
675 
676 static void fs_load_done(void *ctx, int bserrno);
677 
678 static int
679 _handle_deleted_files(struct spdk_fs_request *req)
680 {
681 	struct spdk_fs_cb_args *args = &req->args;
682 	struct spdk_filesystem *fs = args->fs;
683 
684 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
685 		struct spdk_deleted_file *deleted_file;
686 
687 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
688 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
689 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
690 		free(deleted_file);
691 		return 0;
692 	}
693 
694 	return 1;
695 }
696 
697 static void
698 fs_load_done(void *ctx, int bserrno)
699 {
700 	struct spdk_fs_request *req = ctx;
701 	struct spdk_fs_cb_args *args = &req->args;
702 	struct spdk_filesystem *fs = args->fs;
703 
704 	/* The filesystem has been loaded.  Now check if there are any files that
705 	 *  were marked for deletion before last unload.  Do not complete the
706 	 *  fs_load callback until all of them have been deleted on disk.
707 	 */
708 	if (_handle_deleted_files(req) == 0) {
709 		/* We found a file that's been marked for deleting but not actually
710 		 *  deleted yet.  This function will get called again once the delete
711 		 *  operation is completed.
712 		 */
713 		return;
714 	}
715 
716 	args->fn.fs_op_with_handle(args->arg, fs, 0);
717 	free_fs_request(req);
718 
719 }
720 
721 static void
722 _file_build_trace_arg_name(struct spdk_file *f)
723 {
724 	f->trace_arg_name = 0;
725 	memcpy(&f->trace_arg_name, f->name,
726 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
727 }
728 
729 static void
730 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
731 {
732 	struct spdk_fs_request *req = ctx;
733 	struct spdk_fs_cb_args *args = &req->args;
734 	struct spdk_filesystem *fs = args->fs;
735 	uint64_t *length;
736 	const char *name;
737 	uint32_t *is_deleted;
738 	size_t value_len;
739 
740 	if (rc < 0) {
741 		args->fn.fs_op_with_handle(args->arg, fs, rc);
742 		free_fs_request(req);
743 		return;
744 	}
745 
746 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
747 	if (rc < 0) {
748 		args->fn.fs_op_with_handle(args->arg, fs, rc);
749 		free_fs_request(req);
750 		return;
751 	}
752 
753 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
754 	if (rc < 0) {
755 		args->fn.fs_op_with_handle(args->arg, fs, rc);
756 		free_fs_request(req);
757 		return;
758 	}
759 
760 	assert(value_len == 8);
761 
762 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
763 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
764 	if (rc < 0) {
765 		struct spdk_file *f;
766 
767 		f = file_alloc(fs);
768 		if (f == NULL) {
769 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
770 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
771 			free_fs_request(req);
772 			return;
773 		}
774 
775 		f->name = strdup(name);
776 		_file_build_trace_arg_name(f);
777 		f->blobid = spdk_blob_get_id(blob);
778 		f->length = *length;
779 		f->length_flushed = *length;
780 		f->length_xattr = *length;
781 		f->append_pos = *length;
782 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
783 	} else {
784 		struct spdk_deleted_file *deleted_file;
785 
786 		deleted_file = calloc(1, sizeof(*deleted_file));
787 		if (deleted_file == NULL) {
788 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
789 			free_fs_request(req);
790 			return;
791 		}
792 		deleted_file->id = spdk_blob_get_id(blob);
793 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
794 	}
795 }
796 
797 static void
798 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
799 {
800 	struct spdk_fs_request *req = ctx;
801 	struct spdk_fs_cb_args *args = &req->args;
802 	struct spdk_filesystem *fs = args->fs;
803 	struct spdk_bs_type bstype;
804 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
805 	static const struct spdk_bs_type zeros;
806 
807 	if (bserrno != 0) {
808 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
809 		free_fs_request(req);
810 		fs_free_io_channels(fs);
811 		fs_io_device_unregister(fs);
812 		return;
813 	}
814 
815 	bstype = spdk_bs_get_bstype(bs);
816 
817 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
818 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n");
819 		spdk_bs_set_bstype(bs, blobfs_type);
820 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
821 		SPDK_ERRLOG("not blobfs\n");
822 		SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype));
823 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
824 		free_fs_request(req);
825 		fs_free_io_channels(fs);
826 		fs_io_device_unregister(fs);
827 		return;
828 	}
829 
830 	common_fs_bs_init(fs, bs);
831 	fs_load_done(req, 0);
832 }
833 
834 static void
835 fs_io_device_unregister(struct spdk_filesystem *fs)
836 {
837 	assert(fs != NULL);
838 	spdk_io_device_unregister(&fs->md_target, NULL);
839 	spdk_io_device_unregister(&fs->sync_target, NULL);
840 	spdk_io_device_unregister(&fs->io_target, NULL);
841 	free(fs);
842 }
843 
844 static void
845 fs_free_io_channels(struct spdk_filesystem *fs)
846 {
847 	assert(fs != NULL);
848 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
849 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
850 }
851 
852 void
853 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
854 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
855 {
856 	struct spdk_filesystem *fs;
857 	struct spdk_fs_cb_args *args;
858 	struct spdk_fs_request *req;
859 	struct spdk_bs_opts	bs_opts;
860 
861 	fs = fs_alloc(dev, send_request_fn);
862 	if (fs == NULL) {
863 		cb_fn(cb_arg, NULL, -ENOMEM);
864 		return;
865 	}
866 
867 	fs_conf_parse();
868 
869 	req = alloc_fs_request(fs->md_target.md_fs_channel);
870 	if (req == NULL) {
871 		fs_free_io_channels(fs);
872 		fs_io_device_unregister(fs);
873 		cb_fn(cb_arg, NULL, -ENOMEM);
874 		return;
875 	}
876 
877 	args = &req->args;
878 	args->fn.fs_op_with_handle = cb_fn;
879 	args->arg = cb_arg;
880 	args->fs = fs;
881 	TAILQ_INIT(&args->op.fs_load.deleted_files);
882 	spdk_bs_opts_init(&bs_opts);
883 	bs_opts.iter_cb_fn = iter_cb;
884 	bs_opts.iter_cb_arg = req;
885 	spdk_bs_load(dev, &bs_opts, load_cb, req);
886 }
887 
888 static void
889 unload_cb(void *ctx, int bserrno)
890 {
891 	struct spdk_fs_request *req = ctx;
892 	struct spdk_fs_cb_args *args = &req->args;
893 	struct spdk_filesystem *fs = args->fs;
894 	struct spdk_file *file, *tmp;
895 
896 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
897 		TAILQ_REMOVE(&fs->files, file, tailq);
898 		file_free(file);
899 	}
900 
901 	free_global_cache();
902 
903 	args->fn.fs_op(args->arg, bserrno);
904 	free(req);
905 
906 	fs_io_device_unregister(fs);
907 }
908 
909 void
910 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
911 {
912 	struct spdk_fs_request *req;
913 	struct spdk_fs_cb_args *args;
914 
915 	/*
916 	 * We must free the md_channel before unloading the blobstore, so just
917 	 *  allocate this request from the general heap.
918 	 */
919 	req = calloc(1, sizeof(*req));
920 	if (req == NULL) {
921 		cb_fn(cb_arg, -ENOMEM);
922 		return;
923 	}
924 
925 	args = &req->args;
926 	args->fn.fs_op = cb_fn;
927 	args->arg = cb_arg;
928 	args->fs = fs;
929 
930 	fs_free_io_channels(fs);
931 	spdk_bs_unload(fs->bs, unload_cb, req);
932 }
933 
934 static struct spdk_file *
935 fs_find_file(struct spdk_filesystem *fs, const char *name)
936 {
937 	struct spdk_file *file;
938 
939 	TAILQ_FOREACH(file, &fs->files, tailq) {
940 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
941 			return file;
942 		}
943 	}
944 
945 	return NULL;
946 }
947 
948 void
949 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
950 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
951 {
952 	struct spdk_file_stat stat;
953 	struct spdk_file *f = NULL;
954 
955 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
956 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
957 		return;
958 	}
959 
960 	f = fs_find_file(fs, name);
961 	if (f != NULL) {
962 		stat.blobid = f->blobid;
963 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
964 		cb_fn(cb_arg, &stat, 0);
965 		return;
966 	}
967 
968 	cb_fn(cb_arg, NULL, -ENOENT);
969 }
970 
971 static void
972 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
973 {
974 	struct spdk_fs_request *req = arg;
975 	struct spdk_fs_cb_args *args = &req->args;
976 
977 	args->rc = fserrno;
978 	if (fserrno == 0) {
979 		memcpy(args->arg, stat, sizeof(*stat));
980 	}
981 	sem_post(args->sem);
982 }
983 
984 static void
985 __file_stat(void *arg)
986 {
987 	struct spdk_fs_request *req = arg;
988 	struct spdk_fs_cb_args *args = &req->args;
989 
990 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
991 				args->fn.stat_op, req);
992 }
993 
994 int
995 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
996 		  const char *name, struct spdk_file_stat *stat)
997 {
998 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
999 	struct spdk_fs_request *req;
1000 	int rc;
1001 
1002 	req = alloc_fs_request(channel);
1003 	if (req == NULL) {
1004 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
1005 		return -ENOMEM;
1006 	}
1007 
1008 	req->args.fs = fs;
1009 	req->args.op.stat.name = name;
1010 	req->args.fn.stat_op = __copy_stat;
1011 	req->args.arg = stat;
1012 	req->args.sem = &channel->sem;
1013 	channel->send_request(__file_stat, req);
1014 	sem_wait(&channel->sem);
1015 
1016 	rc = req->args.rc;
1017 	free_fs_request(req);
1018 
1019 	return rc;
1020 }
1021 
1022 static void
1023 fs_create_blob_close_cb(void *ctx, int bserrno)
1024 {
1025 	int rc;
1026 	struct spdk_fs_request *req = ctx;
1027 	struct spdk_fs_cb_args *args = &req->args;
1028 
1029 	rc = args->rc ? args->rc : bserrno;
1030 	args->fn.file_op(args->arg, rc);
1031 	free_fs_request(req);
1032 }
1033 
1034 static void
1035 fs_create_blob_resize_cb(void *ctx, int bserrno)
1036 {
1037 	struct spdk_fs_request *req = ctx;
1038 	struct spdk_fs_cb_args *args = &req->args;
1039 	struct spdk_file *f = args->file;
1040 	struct spdk_blob *blob = args->op.create.blob;
1041 	uint64_t length = 0;
1042 
1043 	args->rc = bserrno;
1044 	if (bserrno) {
1045 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1046 		return;
1047 	}
1048 
1049 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1050 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1051 
1052 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1053 }
1054 
1055 static void
1056 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1057 {
1058 	struct spdk_fs_request *req = ctx;
1059 	struct spdk_fs_cb_args *args = &req->args;
1060 
1061 	if (bserrno) {
1062 		args->fn.file_op(args->arg, bserrno);
1063 		free_fs_request(req);
1064 		return;
1065 	}
1066 
1067 	args->op.create.blob = blob;
1068 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1069 }
1070 
1071 static void
1072 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1073 {
1074 	struct spdk_fs_request *req = ctx;
1075 	struct spdk_fs_cb_args *args = &req->args;
1076 	struct spdk_file *f = args->file;
1077 
1078 	if (bserrno) {
1079 		args->fn.file_op(args->arg, bserrno);
1080 		free_fs_request(req);
1081 		return;
1082 	}
1083 
1084 	f->blobid = blobid;
1085 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1086 }
1087 
1088 void
1089 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1090 			  spdk_file_op_complete cb_fn, void *cb_arg)
1091 {
1092 	struct spdk_file *file;
1093 	struct spdk_fs_request *req;
1094 	struct spdk_fs_cb_args *args;
1095 
1096 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1097 		cb_fn(cb_arg, -ENAMETOOLONG);
1098 		return;
1099 	}
1100 
1101 	file = fs_find_file(fs, name);
1102 	if (file != NULL) {
1103 		cb_fn(cb_arg, -EEXIST);
1104 		return;
1105 	}
1106 
1107 	file = file_alloc(fs);
1108 	if (file == NULL) {
1109 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1110 		cb_fn(cb_arg, -ENOMEM);
1111 		return;
1112 	}
1113 
1114 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1115 	if (req == NULL) {
1116 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1117 		cb_fn(cb_arg, -ENOMEM);
1118 		return;
1119 	}
1120 
1121 	args = &req->args;
1122 	args->file = file;
1123 	args->fn.file_op = cb_fn;
1124 	args->arg = cb_arg;
1125 
1126 	file->name = strdup(name);
1127 	_file_build_trace_arg_name(file);
1128 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1129 }
1130 
1131 static void
1132 __fs_create_file_done(void *arg, int fserrno)
1133 {
1134 	struct spdk_fs_request *req = arg;
1135 	struct spdk_fs_cb_args *args = &req->args;
1136 
1137 	__wake_caller(args, fserrno);
1138 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1139 }
1140 
1141 static void
1142 __fs_create_file(void *arg)
1143 {
1144 	struct spdk_fs_request *req = arg;
1145 	struct spdk_fs_cb_args *args = &req->args;
1146 
1147 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1148 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1149 }
1150 
1151 int
1152 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1153 {
1154 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1155 	struct spdk_fs_request *req;
1156 	struct spdk_fs_cb_args *args;
1157 	int rc;
1158 
1159 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1160 
1161 	req = alloc_fs_request(channel);
1162 	if (req == NULL) {
1163 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1164 		return -ENOMEM;
1165 	}
1166 
1167 	args = &req->args;
1168 	args->fs = fs;
1169 	args->op.create.name = name;
1170 	args->sem = &channel->sem;
1171 	fs->send_request(__fs_create_file, req);
1172 	sem_wait(&channel->sem);
1173 	rc = args->rc;
1174 	free_fs_request(req);
1175 
1176 	return rc;
1177 }
1178 
1179 static void
1180 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1181 {
1182 	struct spdk_fs_request *req = ctx;
1183 	struct spdk_fs_cb_args *args = &req->args;
1184 	struct spdk_file *f = args->file;
1185 
1186 	f->blob = blob;
1187 	while (!TAILQ_EMPTY(&f->open_requests)) {
1188 		req = TAILQ_FIRST(&f->open_requests);
1189 		args = &req->args;
1190 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1191 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1192 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1193 		free_fs_request(req);
1194 	}
1195 }
1196 
1197 static void
1198 fs_open_blob_create_cb(void *ctx, int bserrno)
1199 {
1200 	struct spdk_fs_request *req = ctx;
1201 	struct spdk_fs_cb_args *args = &req->args;
1202 	struct spdk_file *file = args->file;
1203 	struct spdk_filesystem *fs = args->fs;
1204 
1205 	if (file == NULL) {
1206 		/*
1207 		 * This is from an open with CREATE flag - the file
1208 		 *  is now created so look it up in the file list for this
1209 		 *  filesystem.
1210 		 */
1211 		file = fs_find_file(fs, args->op.open.name);
1212 		assert(file != NULL);
1213 		args->file = file;
1214 	}
1215 
1216 	file->ref_count++;
1217 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1218 	if (file->ref_count == 1) {
1219 		assert(file->blob == NULL);
1220 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1221 	} else if (file->blob != NULL) {
1222 		fs_open_blob_done(req, file->blob, 0);
1223 	} else {
1224 		/*
1225 		 * The blob open for this file is in progress due to a previous
1226 		 *  open request.  When that open completes, it will invoke the
1227 		 *  open callback for this request.
1228 		 */
1229 	}
1230 }
1231 
1232 void
1233 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1234 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1235 {
1236 	struct spdk_file *f = NULL;
1237 	struct spdk_fs_request *req;
1238 	struct spdk_fs_cb_args *args;
1239 
1240 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1241 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1242 		return;
1243 	}
1244 
1245 	f = fs_find_file(fs, name);
1246 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1247 		cb_fn(cb_arg, NULL, -ENOENT);
1248 		return;
1249 	}
1250 
1251 	if (f != NULL && f->is_deleted == true) {
1252 		cb_fn(cb_arg, NULL, -ENOENT);
1253 		return;
1254 	}
1255 
1256 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1257 	if (req == NULL) {
1258 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1259 		cb_fn(cb_arg, NULL, -ENOMEM);
1260 		return;
1261 	}
1262 
1263 	args = &req->args;
1264 	args->fn.file_op_with_handle = cb_fn;
1265 	args->arg = cb_arg;
1266 	args->file = f;
1267 	args->fs = fs;
1268 	args->op.open.name = name;
1269 
1270 	if (f == NULL) {
1271 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1272 	} else {
1273 		fs_open_blob_create_cb(req, 0);
1274 	}
1275 }
1276 
1277 static void
1278 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1279 {
1280 	struct spdk_fs_request *req = arg;
1281 	struct spdk_fs_cb_args *args = &req->args;
1282 
1283 	args->file = file;
1284 	__wake_caller(args, bserrno);
1285 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1286 }
1287 
1288 static void
1289 __fs_open_file(void *arg)
1290 {
1291 	struct spdk_fs_request *req = arg;
1292 	struct spdk_fs_cb_args *args = &req->args;
1293 
1294 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1295 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1296 				__fs_open_file_done, req);
1297 }
1298 
1299 int
1300 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1301 		  const char *name, uint32_t flags, struct spdk_file **file)
1302 {
1303 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1304 	struct spdk_fs_request *req;
1305 	struct spdk_fs_cb_args *args;
1306 	int rc;
1307 
1308 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1309 
1310 	req = alloc_fs_request(channel);
1311 	if (req == NULL) {
1312 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1313 		return -ENOMEM;
1314 	}
1315 
1316 	args = &req->args;
1317 	args->fs = fs;
1318 	args->op.open.name = name;
1319 	args->op.open.flags = flags;
1320 	args->sem = &channel->sem;
1321 	fs->send_request(__fs_open_file, req);
1322 	sem_wait(&channel->sem);
1323 	rc = args->rc;
1324 	if (rc == 0) {
1325 		*file = args->file;
1326 	} else {
1327 		*file = NULL;
1328 	}
1329 	free_fs_request(req);
1330 
1331 	return rc;
1332 }
1333 
1334 static void
1335 fs_rename_blob_close_cb(void *ctx, int bserrno)
1336 {
1337 	struct spdk_fs_request *req = ctx;
1338 	struct spdk_fs_cb_args *args = &req->args;
1339 
1340 	args->fn.fs_op(args->arg, bserrno);
1341 	free_fs_request(req);
1342 }
1343 
1344 static void
1345 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1346 {
1347 	struct spdk_fs_request *req = ctx;
1348 	struct spdk_fs_cb_args *args = &req->args;
1349 	const char *new_name = args->op.rename.new_name;
1350 
1351 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1352 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1353 }
1354 
1355 static void
1356 _fs_md_rename_file(struct spdk_fs_request *req)
1357 {
1358 	struct spdk_fs_cb_args *args = &req->args;
1359 	struct spdk_file *f;
1360 
1361 	f = fs_find_file(args->fs, args->op.rename.old_name);
1362 	if (f == NULL) {
1363 		args->fn.fs_op(args->arg, -ENOENT);
1364 		free_fs_request(req);
1365 		return;
1366 	}
1367 
1368 	free(f->name);
1369 	f->name = strdup(args->op.rename.new_name);
1370 	_file_build_trace_arg_name(f);
1371 	args->file = f;
1372 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1373 }
1374 
1375 static void
1376 fs_rename_delete_done(void *arg, int fserrno)
1377 {
1378 	_fs_md_rename_file(arg);
1379 }
1380 
1381 void
1382 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1383 			  const char *old_name, const char *new_name,
1384 			  spdk_file_op_complete cb_fn, void *cb_arg)
1385 {
1386 	struct spdk_file *f;
1387 	struct spdk_fs_request *req;
1388 	struct spdk_fs_cb_args *args;
1389 
1390 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1391 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1392 		cb_fn(cb_arg, -ENAMETOOLONG);
1393 		return;
1394 	}
1395 
1396 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1397 	if (req == NULL) {
1398 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1399 			    new_name);
1400 		cb_fn(cb_arg, -ENOMEM);
1401 		return;
1402 	}
1403 
1404 	args = &req->args;
1405 	args->fn.fs_op = cb_fn;
1406 	args->fs = fs;
1407 	args->arg = cb_arg;
1408 	args->op.rename.old_name = old_name;
1409 	args->op.rename.new_name = new_name;
1410 
1411 	f = fs_find_file(fs, new_name);
1412 	if (f == NULL) {
1413 		_fs_md_rename_file(req);
1414 		return;
1415 	}
1416 
1417 	/*
1418 	 * The rename overwrites an existing file.  So delete the existing file, then
1419 	 *  do the actual rename.
1420 	 */
1421 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1422 }
1423 
1424 static void
1425 __fs_rename_file_done(void *arg, int fserrno)
1426 {
1427 	struct spdk_fs_request *req = arg;
1428 	struct spdk_fs_cb_args *args = &req->args;
1429 
1430 	__wake_caller(args, fserrno);
1431 }
1432 
1433 static void
1434 __fs_rename_file(void *arg)
1435 {
1436 	struct spdk_fs_request *req = arg;
1437 	struct spdk_fs_cb_args *args = &req->args;
1438 
1439 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1440 				  __fs_rename_file_done, req);
1441 }
1442 
1443 int
1444 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1445 		    const char *old_name, const char *new_name)
1446 {
1447 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1448 	struct spdk_fs_request *req;
1449 	struct spdk_fs_cb_args *args;
1450 	int rc;
1451 
1452 	req = alloc_fs_request(channel);
1453 	if (req == NULL) {
1454 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1455 		return -ENOMEM;
1456 	}
1457 
1458 	args = &req->args;
1459 
1460 	args->fs = fs;
1461 	args->op.rename.old_name = old_name;
1462 	args->op.rename.new_name = new_name;
1463 	args->sem = &channel->sem;
1464 	fs->send_request(__fs_rename_file, req);
1465 	sem_wait(&channel->sem);
1466 	rc = args->rc;
1467 	free_fs_request(req);
1468 	return rc;
1469 }
1470 
1471 static void
1472 blob_delete_cb(void *ctx, int bserrno)
1473 {
1474 	struct spdk_fs_request *req = ctx;
1475 	struct spdk_fs_cb_args *args = &req->args;
1476 
1477 	args->fn.file_op(args->arg, bserrno);
1478 	free_fs_request(req);
1479 }
1480 
1481 void
1482 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1483 			  spdk_file_op_complete cb_fn, void *cb_arg)
1484 {
1485 	struct spdk_file *f;
1486 	spdk_blob_id blobid;
1487 	struct spdk_fs_request *req;
1488 	struct spdk_fs_cb_args *args;
1489 
1490 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1491 
1492 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1493 		cb_fn(cb_arg, -ENAMETOOLONG);
1494 		return;
1495 	}
1496 
1497 	f = fs_find_file(fs, name);
1498 	if (f == NULL) {
1499 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1500 		cb_fn(cb_arg, -ENOENT);
1501 		return;
1502 	}
1503 
1504 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1505 	if (req == NULL) {
1506 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1507 		cb_fn(cb_arg, -ENOMEM);
1508 		return;
1509 	}
1510 
1511 	args = &req->args;
1512 	args->fn.file_op = cb_fn;
1513 	args->arg = cb_arg;
1514 
1515 	if (f->ref_count > 0) {
1516 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1517 		f->is_deleted = true;
1518 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1519 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1520 		return;
1521 	}
1522 
1523 	blobid = f->blobid;
1524 	TAILQ_REMOVE(&fs->files, f, tailq);
1525 
1526 	file_free(f);
1527 
1528 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1529 }
1530 
1531 static uint64_t
1532 fs_name_to_uint64(const char *name)
1533 {
1534 	uint64_t result = 0;
1535 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1536 	return result;
1537 }
1538 
1539 static void
1540 __fs_delete_file_done(void *arg, int fserrno)
1541 {
1542 	struct spdk_fs_request *req = arg;
1543 	struct spdk_fs_cb_args *args = &req->args;
1544 
1545 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1546 	__wake_caller(args, fserrno);
1547 }
1548 
1549 static void
1550 __fs_delete_file(void *arg)
1551 {
1552 	struct spdk_fs_request *req = arg;
1553 	struct spdk_fs_cb_args *args = &req->args;
1554 
1555 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1556 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1557 }
1558 
1559 int
1560 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1561 		    const char *name)
1562 {
1563 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1564 	struct spdk_fs_request *req;
1565 	struct spdk_fs_cb_args *args;
1566 	int rc;
1567 
1568 	req = alloc_fs_request(channel);
1569 	if (req == NULL) {
1570 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1571 		return -ENOMEM;
1572 	}
1573 
1574 	args = &req->args;
1575 	args->fs = fs;
1576 	args->op.delete.name = name;
1577 	args->sem = &channel->sem;
1578 	fs->send_request(__fs_delete_file, req);
1579 	sem_wait(&channel->sem);
1580 	rc = args->rc;
1581 	free_fs_request(req);
1582 
1583 	return rc;
1584 }
1585 
1586 spdk_fs_iter
1587 spdk_fs_iter_first(struct spdk_filesystem *fs)
1588 {
1589 	struct spdk_file *f;
1590 
1591 	f = TAILQ_FIRST(&fs->files);
1592 	return f;
1593 }
1594 
1595 spdk_fs_iter
1596 spdk_fs_iter_next(spdk_fs_iter iter)
1597 {
1598 	struct spdk_file *f = iter;
1599 
1600 	if (f == NULL) {
1601 		return NULL;
1602 	}
1603 
1604 	f = TAILQ_NEXT(f, tailq);
1605 	return f;
1606 }
1607 
1608 const char *
1609 spdk_file_get_name(struct spdk_file *file)
1610 {
1611 	return file->name;
1612 }
1613 
1614 uint64_t
1615 spdk_file_get_length(struct spdk_file *file)
1616 {
1617 	uint64_t length;
1618 
1619 	assert(file != NULL);
1620 
1621 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1622 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1623 	return length;
1624 }
1625 
1626 static void
1627 fs_truncate_complete_cb(void *ctx, int bserrno)
1628 {
1629 	struct spdk_fs_request *req = ctx;
1630 	struct spdk_fs_cb_args *args = &req->args;
1631 
1632 	args->fn.file_op(args->arg, bserrno);
1633 	free_fs_request(req);
1634 }
1635 
1636 static void
1637 fs_truncate_resize_cb(void *ctx, int bserrno)
1638 {
1639 	struct spdk_fs_request *req = ctx;
1640 	struct spdk_fs_cb_args *args = &req->args;
1641 	struct spdk_file *file = args->file;
1642 	uint64_t *length = &args->op.truncate.length;
1643 
1644 	if (bserrno) {
1645 		args->fn.file_op(args->arg, bserrno);
1646 		free_fs_request(req);
1647 		return;
1648 	}
1649 
1650 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1651 
1652 	file->length = *length;
1653 	if (file->append_pos > file->length) {
1654 		file->append_pos = file->length;
1655 	}
1656 
1657 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1658 }
1659 
1660 static uint64_t
1661 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1662 {
1663 	return (length + cluster_sz - 1) / cluster_sz;
1664 }
1665 
1666 void
1667 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1668 			 spdk_file_op_complete cb_fn, void *cb_arg)
1669 {
1670 	struct spdk_filesystem *fs;
1671 	size_t num_clusters;
1672 	struct spdk_fs_request *req;
1673 	struct spdk_fs_cb_args *args;
1674 
1675 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1676 	if (length == file->length) {
1677 		cb_fn(cb_arg, 0);
1678 		return;
1679 	}
1680 
1681 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1682 	if (req == NULL) {
1683 		cb_fn(cb_arg, -ENOMEM);
1684 		return;
1685 	}
1686 
1687 	args = &req->args;
1688 	args->fn.file_op = cb_fn;
1689 	args->arg = cb_arg;
1690 	args->file = file;
1691 	args->op.truncate.length = length;
1692 	fs = file->fs;
1693 
1694 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1695 
1696 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1697 }
1698 
1699 static void
1700 __truncate(void *arg)
1701 {
1702 	struct spdk_fs_request *req = arg;
1703 	struct spdk_fs_cb_args *args = &req->args;
1704 
1705 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1706 				 args->fn.file_op, args);
1707 }
1708 
1709 int
1710 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1711 		   uint64_t length)
1712 {
1713 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1714 	struct spdk_fs_request *req;
1715 	struct spdk_fs_cb_args *args;
1716 	int rc;
1717 
1718 	req = alloc_fs_request(channel);
1719 	if (req == NULL) {
1720 		return -ENOMEM;
1721 	}
1722 
1723 	args = &req->args;
1724 
1725 	args->file = file;
1726 	args->op.truncate.length = length;
1727 	args->fn.file_op = __wake_caller;
1728 	args->sem = &channel->sem;
1729 
1730 	channel->send_request(__truncate, req);
1731 	sem_wait(&channel->sem);
1732 	rc = args->rc;
1733 	free_fs_request(req);
1734 
1735 	return rc;
1736 }
1737 
1738 static void
1739 __rw_done(void *ctx, int bserrno)
1740 {
1741 	struct spdk_fs_request *req = ctx;
1742 	struct spdk_fs_cb_args *args = &req->args;
1743 
1744 	spdk_free(args->op.rw.pin_buf);
1745 	args->fn.file_op(args->arg, bserrno);
1746 	free_fs_request(req);
1747 }
1748 
1749 static void
1750 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1751 {
1752 	int i;
1753 	size_t len;
1754 
1755 	for (i = 0; i < iovcnt; i++) {
1756 		len = spdk_min(iovs[i].iov_len, buf_len);
1757 		memcpy(buf, iovs[i].iov_base, len);
1758 		buf += len;
1759 		assert(buf_len >= len);
1760 		buf_len -= len;
1761 	}
1762 }
1763 
1764 static void
1765 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1766 {
1767 	int i;
1768 	size_t len;
1769 
1770 	for (i = 0; i < iovcnt; i++) {
1771 		len = spdk_min(iovs[i].iov_len, buf_len);
1772 		memcpy(iovs[i].iov_base, buf, len);
1773 		buf += len;
1774 		assert(buf_len >= len);
1775 		buf_len -= len;
1776 	}
1777 }
1778 
1779 static void
1780 __read_done(void *ctx, int bserrno)
1781 {
1782 	struct spdk_fs_request *req = ctx;
1783 	struct spdk_fs_cb_args *args = &req->args;
1784 	void *buf;
1785 
1786 	assert(req != NULL);
1787 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1788 	if (args->op.rw.is_read) {
1789 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1790 		__rw_done(req, 0);
1791 	} else {
1792 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1793 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1794 				   args->op.rw.pin_buf,
1795 				   args->op.rw.start_lba, args->op.rw.num_lba,
1796 				   __rw_done, req);
1797 	}
1798 }
1799 
1800 static void
1801 __do_blob_read(void *ctx, int fserrno)
1802 {
1803 	struct spdk_fs_request *req = ctx;
1804 	struct spdk_fs_cb_args *args = &req->args;
1805 
1806 	if (fserrno) {
1807 		__rw_done(req, fserrno);
1808 		return;
1809 	}
1810 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1811 			  args->op.rw.pin_buf,
1812 			  args->op.rw.start_lba, args->op.rw.num_lba,
1813 			  __read_done, req);
1814 }
1815 
1816 static void
1817 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1818 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1819 {
1820 	uint64_t end_lba;
1821 
1822 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1823 	*start_lba = offset / *lba_size;
1824 	end_lba = (offset + length - 1) / *lba_size;
1825 	*num_lba = (end_lba - *start_lba + 1);
1826 }
1827 
1828 static bool
1829 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1830 {
1831 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1832 
1833 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1834 		return true;
1835 	}
1836 
1837 	return false;
1838 }
1839 
1840 static void
1841 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1842 {
1843 	uint32_t i;
1844 
1845 	for (i = 0; i < iovcnt; i++) {
1846 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1847 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1848 	}
1849 }
1850 
1851 static void
1852 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1853 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1854 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1855 {
1856 	struct spdk_fs_request *req;
1857 	struct spdk_fs_cb_args *args;
1858 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1859 	uint64_t start_lba, num_lba, pin_buf_length;
1860 	uint32_t lba_size;
1861 
1862 	if (is_read && offset + length > file->length) {
1863 		cb_fn(cb_arg, -EINVAL);
1864 		return;
1865 	}
1866 
1867 	req = alloc_fs_request_with_iov(channel, iovcnt);
1868 	if (req == NULL) {
1869 		cb_fn(cb_arg, -ENOMEM);
1870 		return;
1871 	}
1872 
1873 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1874 
1875 	args = &req->args;
1876 	args->fn.file_op = cb_fn;
1877 	args->arg = cb_arg;
1878 	args->file = file;
1879 	args->op.rw.channel = channel->bs_channel;
1880 	_fs_request_setup_iovs(req, iovs, iovcnt);
1881 	args->op.rw.is_read = is_read;
1882 	args->op.rw.offset = offset;
1883 	args->op.rw.blocklen = lba_size;
1884 
1885 	pin_buf_length = num_lba * lba_size;
1886 	args->op.rw.length = pin_buf_length;
1887 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1888 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1889 	if (args->op.rw.pin_buf == NULL) {
1890 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1891 			      file->name, offset, length);
1892 		free_fs_request(req);
1893 		cb_fn(cb_arg, -ENOMEM);
1894 		return;
1895 	}
1896 
1897 	args->op.rw.start_lba = start_lba;
1898 	args->op.rw.num_lba = num_lba;
1899 
1900 	if (!is_read && file->length < offset + length) {
1901 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1902 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1903 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1904 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1905 				   args->op.rw.pin_buf,
1906 				   args->op.rw.start_lba, args->op.rw.num_lba,
1907 				   __rw_done, req);
1908 	} else {
1909 		__do_blob_read(req, 0);
1910 	}
1911 }
1912 
1913 static void
1914 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1915 	    void *payload, uint64_t offset, uint64_t length,
1916 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1917 {
1918 	struct iovec iov;
1919 
1920 	iov.iov_base = payload;
1921 	iov.iov_len = (size_t)length;
1922 
1923 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1924 }
1925 
1926 void
1927 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1928 		      void *payload, uint64_t offset, uint64_t length,
1929 		      spdk_file_op_complete cb_fn, void *cb_arg)
1930 {
1931 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1932 }
1933 
1934 void
1935 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1936 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1937 		       spdk_file_op_complete cb_fn, void *cb_arg)
1938 {
1939 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1940 		      file->name, offset, length);
1941 
1942 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1943 }
1944 
1945 void
1946 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1947 		     void *payload, uint64_t offset, uint64_t length,
1948 		     spdk_file_op_complete cb_fn, void *cb_arg)
1949 {
1950 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1951 		      file->name, offset, length);
1952 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1953 }
1954 
1955 void
1956 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1957 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1958 		      spdk_file_op_complete cb_fn, void *cb_arg)
1959 {
1960 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1961 		      file->name, offset, length);
1962 
1963 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1964 }
1965 
1966 struct spdk_io_channel *
1967 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1968 {
1969 	struct spdk_io_channel *io_channel;
1970 	struct spdk_fs_channel *fs_channel;
1971 
1972 	io_channel = spdk_get_io_channel(&fs->io_target);
1973 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1974 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1975 	fs_channel->send_request = __send_request_direct;
1976 
1977 	return io_channel;
1978 }
1979 
1980 void
1981 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1982 {
1983 	spdk_put_io_channel(channel);
1984 }
1985 
1986 struct spdk_fs_thread_ctx *
1987 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1988 {
1989 	struct spdk_fs_thread_ctx *ctx;
1990 
1991 	ctx = calloc(1, sizeof(*ctx));
1992 	if (!ctx) {
1993 		return NULL;
1994 	}
1995 
1996 	fs_channel_create(fs, &ctx->ch, 512);
1997 
1998 	ctx->ch.send_request = fs->send_request;
1999 	ctx->ch.sync = 1;
2000 	pthread_spin_init(&ctx->ch.lock, 0);
2001 
2002 	return ctx;
2003 }
2004 
2005 
2006 void
2007 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
2008 {
2009 	assert(ctx->ch.sync == 1);
2010 
2011 	while (true) {
2012 		pthread_spin_lock(&ctx->ch.lock);
2013 		if (ctx->ch.outstanding_reqs == 0) {
2014 			pthread_spin_unlock(&ctx->ch.lock);
2015 			break;
2016 		}
2017 		pthread_spin_unlock(&ctx->ch.lock);
2018 		usleep(1000);
2019 	}
2020 
2021 	fs_channel_destroy(NULL, &ctx->ch);
2022 	free(ctx);
2023 }
2024 
2025 int
2026 spdk_fs_set_cache_size(uint64_t size_in_mb)
2027 {
2028 	/* setting g_fs_cache_size is only permitted if cache pool
2029 	 * is already freed or hasn't been initialized
2030 	 */
2031 	if (g_cache_pool != NULL) {
2032 		return -EPERM;
2033 	}
2034 
2035 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2036 
2037 	return 0;
2038 }
2039 
2040 uint64_t
2041 spdk_fs_get_cache_size(void)
2042 {
2043 	return g_fs_cache_size / (1024 * 1024);
2044 }
2045 
2046 static void __file_flush(void *ctx);
2047 
2048 /* Try to free some cache buffers from this file.
2049  */
2050 static int
2051 reclaim_cache_buffers(struct spdk_file *file)
2052 {
2053 	int rc;
2054 
2055 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2056 
2057 	/* The function is safe to be called with any threads, while the file
2058 	 * lock maybe locked by other thread for now, so try to get the file
2059 	 * lock here.
2060 	 */
2061 	rc = pthread_spin_trylock(&file->lock);
2062 	if (rc != 0) {
2063 		return -1;
2064 	}
2065 
2066 	if (file->tree->present_mask == 0) {
2067 		pthread_spin_unlock(&file->lock);
2068 		return -1;
2069 	}
2070 	tree_free_buffers(file->tree);
2071 
2072 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2073 	/* If not freed, put it in the end of the queue */
2074 	if (file->tree->present_mask != 0) {
2075 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2076 	} else {
2077 		file->last = NULL;
2078 	}
2079 	pthread_spin_unlock(&file->lock);
2080 
2081 	return 0;
2082 }
2083 
2084 static int
2085 _blobfs_cache_pool_reclaim(void *arg)
2086 {
2087 	struct spdk_file *file, *tmp;
2088 	int rc;
2089 
2090 	if (!blobfs_cache_pool_need_reclaim()) {
2091 		return 0;
2092 	}
2093 
2094 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2095 		if (!file->open_for_writing &&
2096 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2097 			rc = reclaim_cache_buffers(file);
2098 			if (rc < 0) {
2099 				continue;
2100 			}
2101 			if (!blobfs_cache_pool_need_reclaim()) {
2102 				return 1;
2103 			}
2104 			break;
2105 		}
2106 	}
2107 
2108 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2109 		if (!file->open_for_writing) {
2110 			rc = reclaim_cache_buffers(file);
2111 			if (rc < 0) {
2112 				continue;
2113 			}
2114 			if (!blobfs_cache_pool_need_reclaim()) {
2115 				return 1;
2116 			}
2117 			break;
2118 		}
2119 	}
2120 
2121 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2122 		rc = reclaim_cache_buffers(file);
2123 		if (rc < 0) {
2124 			continue;
2125 		}
2126 		break;
2127 	}
2128 
2129 	return 1;
2130 }
2131 
2132 static void
2133 _add_file_to_cache_pool(void *ctx)
2134 {
2135 	struct spdk_file *file = ctx;
2136 
2137 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2138 }
2139 
2140 static void
2141 _remove_file_from_cache_pool(void *ctx)
2142 {
2143 	struct spdk_file *file = ctx;
2144 
2145 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2146 }
2147 
2148 static struct cache_buffer *
2149 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2150 {
2151 	struct cache_buffer *buf;
2152 	int count = 0;
2153 	bool need_update = false;
2154 
2155 	buf = calloc(1, sizeof(*buf));
2156 	if (buf == NULL) {
2157 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
2158 		return NULL;
2159 	}
2160 
2161 	do {
2162 		buf->buf = spdk_mempool_get(g_cache_pool);
2163 		if (buf->buf) {
2164 			break;
2165 		}
2166 		if (count++ == 100) {
2167 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2168 				    file, offset);
2169 			free(buf);
2170 			return NULL;
2171 		}
2172 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2173 	} while (true);
2174 
2175 	buf->buf_size = CACHE_BUFFER_SIZE;
2176 	buf->offset = offset;
2177 
2178 	if (file->tree->present_mask == 0) {
2179 		need_update = true;
2180 	}
2181 	file->tree = tree_insert_buffer(file->tree, buf);
2182 
2183 	if (need_update) {
2184 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2185 	}
2186 
2187 	return buf;
2188 }
2189 
2190 static struct cache_buffer *
2191 cache_append_buffer(struct spdk_file *file)
2192 {
2193 	struct cache_buffer *last;
2194 
2195 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2196 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2197 
2198 	last = cache_insert_buffer(file, file->append_pos);
2199 	if (last == NULL) {
2200 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
2201 		return NULL;
2202 	}
2203 
2204 	file->last = last;
2205 
2206 	return last;
2207 }
2208 
2209 static void __check_sync_reqs(struct spdk_file *file);
2210 
2211 static void
2212 __file_cache_finish_sync(void *ctx, int bserrno)
2213 {
2214 	struct spdk_file *file;
2215 	struct spdk_fs_request *sync_req = ctx;
2216 	struct spdk_fs_cb_args *sync_args;
2217 
2218 	sync_args = &sync_req->args;
2219 	file = sync_args->file;
2220 	pthread_spin_lock(&file->lock);
2221 	file->length_xattr = sync_args->op.sync.length;
2222 	assert(sync_args->op.sync.offset <= file->length_flushed);
2223 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2224 			  0, file->trace_arg_name);
2225 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2226 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2227 	pthread_spin_unlock(&file->lock);
2228 
2229 	sync_args->fn.file_op(sync_args->arg, bserrno);
2230 
2231 	free_fs_request(sync_req);
2232 	__check_sync_reqs(file);
2233 }
2234 
2235 static void
2236 __check_sync_reqs(struct spdk_file *file)
2237 {
2238 	struct spdk_fs_request *sync_req;
2239 
2240 	pthread_spin_lock(&file->lock);
2241 
2242 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2243 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2244 			break;
2245 		}
2246 	}
2247 
2248 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2249 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2250 		sync_req->args.op.sync.xattr_in_progress = true;
2251 		sync_req->args.op.sync.length = file->length_flushed;
2252 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2253 				    sizeof(file->length_flushed));
2254 
2255 		pthread_spin_unlock(&file->lock);
2256 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2257 				  0, file->trace_arg_name);
2258 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2259 	} else {
2260 		pthread_spin_unlock(&file->lock);
2261 	}
2262 }
2263 
2264 static void
2265 __file_flush_done(void *ctx, int bserrno)
2266 {
2267 	struct spdk_fs_request *req = ctx;
2268 	struct spdk_fs_cb_args *args = &req->args;
2269 	struct spdk_file *file = args->file;
2270 	struct cache_buffer *next = args->op.flush.cache_buffer;
2271 
2272 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2273 
2274 	pthread_spin_lock(&file->lock);
2275 	next->in_progress = false;
2276 	next->bytes_flushed += args->op.flush.length;
2277 	file->length_flushed += args->op.flush.length;
2278 	if (file->length_flushed > file->length) {
2279 		file->length = file->length_flushed;
2280 	}
2281 	if (next->bytes_flushed == next->buf_size) {
2282 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2283 		next = tree_find_buffer(file->tree, file->length_flushed);
2284 	}
2285 
2286 	/*
2287 	 * Assert that there is no cached data that extends past the end of the underlying
2288 	 *  blob.
2289 	 */
2290 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2291 	       next->bytes_filled == 0);
2292 
2293 	pthread_spin_unlock(&file->lock);
2294 
2295 	__check_sync_reqs(file);
2296 
2297 	__file_flush(req);
2298 }
2299 
2300 static void
2301 __file_flush(void *ctx)
2302 {
2303 	struct spdk_fs_request *req = ctx;
2304 	struct spdk_fs_cb_args *args = &req->args;
2305 	struct spdk_file *file = args->file;
2306 	struct cache_buffer *next;
2307 	uint64_t offset, length, start_lba, num_lba;
2308 	uint32_t lba_size;
2309 
2310 	pthread_spin_lock(&file->lock);
2311 	next = tree_find_buffer(file->tree, file->length_flushed);
2312 	if (next == NULL || next->in_progress ||
2313 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2314 		/*
2315 		 * There is either no data to flush, a flush I/O is already in
2316 		 *  progress, or the next buffer is partially filled but there's no
2317 		 *  outstanding request to sync it.
2318 		 * So return immediately - if a flush I/O is in progress we will flush
2319 		 *  more data after that is completed, or a partial buffer will get flushed
2320 		 *  when it is either filled or the file is synced.
2321 		 */
2322 		free_fs_request(req);
2323 		if (next == NULL) {
2324 			/*
2325 			 * For cases where a file's cache was evicted, and then the
2326 			 *  file was later appended, we will write the data directly
2327 			 *  to disk and bypass cache.  So just update length_flushed
2328 			 *  here to reflect that all data was already written to disk.
2329 			 */
2330 			file->length_flushed = file->append_pos;
2331 		}
2332 		pthread_spin_unlock(&file->lock);
2333 		if (next == NULL) {
2334 			/*
2335 			 * There is no data to flush, but we still need to check for any
2336 			 *  outstanding sync requests to make sure metadata gets updated.
2337 			 */
2338 			__check_sync_reqs(file);
2339 		}
2340 		return;
2341 	}
2342 
2343 	offset = next->offset + next->bytes_flushed;
2344 	length = next->bytes_filled - next->bytes_flushed;
2345 	if (length == 0) {
2346 		free_fs_request(req);
2347 		pthread_spin_unlock(&file->lock);
2348 		/*
2349 		 * There is no data to flush, but we still need to check for any
2350 		 *  outstanding sync requests to make sure metadata gets updated.
2351 		 */
2352 		__check_sync_reqs(file);
2353 		return;
2354 	}
2355 	args->op.flush.length = length;
2356 	args->op.flush.cache_buffer = next;
2357 
2358 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2359 
2360 	next->in_progress = true;
2361 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2362 		     offset, length, start_lba, num_lba);
2363 	pthread_spin_unlock(&file->lock);
2364 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2365 			   next->buf + (start_lba * lba_size) - next->offset,
2366 			   start_lba, num_lba, __file_flush_done, req);
2367 }
2368 
2369 static void
2370 __file_extend_done(void *arg, int bserrno)
2371 {
2372 	struct spdk_fs_cb_args *args = arg;
2373 
2374 	__wake_caller(args, bserrno);
2375 }
2376 
2377 static void
2378 __file_extend_resize_cb(void *_args, int bserrno)
2379 {
2380 	struct spdk_fs_cb_args *args = _args;
2381 	struct spdk_file *file = args->file;
2382 
2383 	if (bserrno) {
2384 		__wake_caller(args, bserrno);
2385 		return;
2386 	}
2387 
2388 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2389 }
2390 
2391 static void
2392 __file_extend_blob(void *_args)
2393 {
2394 	struct spdk_fs_cb_args *args = _args;
2395 	struct spdk_file *file = args->file;
2396 
2397 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2398 }
2399 
2400 static void
2401 __rw_from_file_done(void *ctx, int bserrno)
2402 {
2403 	struct spdk_fs_request *req = ctx;
2404 
2405 	__wake_caller(&req->args, bserrno);
2406 	free_fs_request(req);
2407 }
2408 
2409 static void
2410 __rw_from_file(void *ctx)
2411 {
2412 	struct spdk_fs_request *req = ctx;
2413 	struct spdk_fs_cb_args *args = &req->args;
2414 	struct spdk_file *file = args->file;
2415 
2416 	if (args->op.rw.is_read) {
2417 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2418 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2419 				     __rw_from_file_done, req);
2420 	} else {
2421 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2422 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2423 				      __rw_from_file_done, req);
2424 	}
2425 }
2426 
2427 static int
2428 __send_rw_from_file(struct spdk_file *file, void *payload,
2429 		    uint64_t offset, uint64_t length, bool is_read,
2430 		    struct spdk_fs_channel *channel)
2431 {
2432 	struct spdk_fs_request *req;
2433 	struct spdk_fs_cb_args *args;
2434 
2435 	req = alloc_fs_request_with_iov(channel, 1);
2436 	if (req == NULL) {
2437 		sem_post(&channel->sem);
2438 		return -ENOMEM;
2439 	}
2440 
2441 	args = &req->args;
2442 	args->file = file;
2443 	args->sem = &channel->sem;
2444 	args->iovs[0].iov_base = payload;
2445 	args->iovs[0].iov_len = (size_t)length;
2446 	args->op.rw.offset = offset;
2447 	args->op.rw.is_read = is_read;
2448 	file->fs->send_request(__rw_from_file, req);
2449 	return 0;
2450 }
2451 
2452 int
2453 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2454 		void *payload, uint64_t offset, uint64_t length)
2455 {
2456 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2457 	struct spdk_fs_request *flush_req;
2458 	uint64_t rem_length, copy, blob_size, cluster_sz;
2459 	uint32_t cache_buffers_filled = 0;
2460 	uint8_t *cur_payload;
2461 	struct cache_buffer *last;
2462 
2463 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2464 
2465 	if (length == 0) {
2466 		return 0;
2467 	}
2468 
2469 	if (offset != file->append_pos) {
2470 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2471 		return -EINVAL;
2472 	}
2473 
2474 	pthread_spin_lock(&file->lock);
2475 	file->open_for_writing = true;
2476 
2477 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2478 		cache_append_buffer(file);
2479 	}
2480 
2481 	if (file->last == NULL) {
2482 		int rc;
2483 
2484 		file->append_pos += length;
2485 		pthread_spin_unlock(&file->lock);
2486 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2487 		sem_wait(&channel->sem);
2488 		return rc;
2489 	}
2490 
2491 	blob_size = __file_get_blob_size(file);
2492 
2493 	if ((offset + length) > blob_size) {
2494 		struct spdk_fs_cb_args extend_args = {};
2495 
2496 		cluster_sz = file->fs->bs_opts.cluster_sz;
2497 		extend_args.sem = &channel->sem;
2498 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2499 		extend_args.file = file;
2500 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2501 		pthread_spin_unlock(&file->lock);
2502 		file->fs->send_request(__file_extend_blob, &extend_args);
2503 		sem_wait(&channel->sem);
2504 		if (extend_args.rc) {
2505 			return extend_args.rc;
2506 		}
2507 	}
2508 
2509 	flush_req = alloc_fs_request(channel);
2510 	if (flush_req == NULL) {
2511 		pthread_spin_unlock(&file->lock);
2512 		return -ENOMEM;
2513 	}
2514 
2515 	last = file->last;
2516 	rem_length = length;
2517 	cur_payload = payload;
2518 	while (rem_length > 0) {
2519 		copy = last->buf_size - last->bytes_filled;
2520 		if (copy > rem_length) {
2521 			copy = rem_length;
2522 		}
2523 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2524 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2525 		file->append_pos += copy;
2526 		if (file->length < file->append_pos) {
2527 			file->length = file->append_pos;
2528 		}
2529 		cur_payload += copy;
2530 		last->bytes_filled += copy;
2531 		rem_length -= copy;
2532 		if (last->bytes_filled == last->buf_size) {
2533 			cache_buffers_filled++;
2534 			last = cache_append_buffer(file);
2535 			if (last == NULL) {
2536 				BLOBFS_TRACE(file, "nomem\n");
2537 				free_fs_request(flush_req);
2538 				pthread_spin_unlock(&file->lock);
2539 				return -ENOMEM;
2540 			}
2541 		}
2542 	}
2543 
2544 	pthread_spin_unlock(&file->lock);
2545 
2546 	if (cache_buffers_filled == 0) {
2547 		free_fs_request(flush_req);
2548 		return 0;
2549 	}
2550 
2551 	flush_req->args.file = file;
2552 	file->fs->send_request(__file_flush, flush_req);
2553 	return 0;
2554 }
2555 
2556 static void
2557 __readahead_done(void *ctx, int bserrno)
2558 {
2559 	struct spdk_fs_request *req = ctx;
2560 	struct spdk_fs_cb_args *args = &req->args;
2561 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2562 	struct spdk_file *file = args->file;
2563 
2564 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2565 
2566 	pthread_spin_lock(&file->lock);
2567 	cache_buffer->bytes_filled = args->op.readahead.length;
2568 	cache_buffer->bytes_flushed = args->op.readahead.length;
2569 	cache_buffer->in_progress = false;
2570 	pthread_spin_unlock(&file->lock);
2571 
2572 	free_fs_request(req);
2573 }
2574 
2575 static void
2576 __readahead(void *ctx)
2577 {
2578 	struct spdk_fs_request *req = ctx;
2579 	struct spdk_fs_cb_args *args = &req->args;
2580 	struct spdk_file *file = args->file;
2581 	uint64_t offset, length, start_lba, num_lba;
2582 	uint32_t lba_size;
2583 
2584 	offset = args->op.readahead.offset;
2585 	length = args->op.readahead.length;
2586 	assert(length > 0);
2587 
2588 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2589 
2590 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2591 		     offset, length, start_lba, num_lba);
2592 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2593 			  args->op.readahead.cache_buffer->buf,
2594 			  start_lba, num_lba, __readahead_done, req);
2595 }
2596 
2597 static uint64_t
2598 __next_cache_buffer_offset(uint64_t offset)
2599 {
2600 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2601 }
2602 
2603 static void
2604 check_readahead(struct spdk_file *file, uint64_t offset,
2605 		struct spdk_fs_channel *channel)
2606 {
2607 	struct spdk_fs_request *req;
2608 	struct spdk_fs_cb_args *args;
2609 
2610 	offset = __next_cache_buffer_offset(offset);
2611 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2612 		return;
2613 	}
2614 
2615 	req = alloc_fs_request(channel);
2616 	if (req == NULL) {
2617 		return;
2618 	}
2619 	args = &req->args;
2620 
2621 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2622 
2623 	args->file = file;
2624 	args->op.readahead.offset = offset;
2625 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2626 	if (!args->op.readahead.cache_buffer) {
2627 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2628 		free_fs_request(req);
2629 		return;
2630 	}
2631 
2632 	args->op.readahead.cache_buffer->in_progress = true;
2633 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2634 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2635 	} else {
2636 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2637 	}
2638 	file->fs->send_request(__readahead, req);
2639 }
2640 
2641 int64_t
2642 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2643 	       void *payload, uint64_t offset, uint64_t length)
2644 {
2645 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2646 	uint64_t final_offset, final_length;
2647 	uint32_t sub_reads = 0;
2648 	struct cache_buffer *buf;
2649 	uint64_t read_len;
2650 	int rc = 0;
2651 
2652 	pthread_spin_lock(&file->lock);
2653 
2654 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2655 
2656 	file->open_for_writing = false;
2657 
2658 	if (length == 0 || offset >= file->append_pos) {
2659 		pthread_spin_unlock(&file->lock);
2660 		return 0;
2661 	}
2662 
2663 	if (offset + length > file->append_pos) {
2664 		length = file->append_pos - offset;
2665 	}
2666 
2667 	if (offset != file->next_seq_offset) {
2668 		file->seq_byte_count = 0;
2669 	}
2670 	file->seq_byte_count += length;
2671 	file->next_seq_offset = offset + length;
2672 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2673 		check_readahead(file, offset, channel);
2674 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2675 	}
2676 
2677 	final_length = 0;
2678 	final_offset = offset + length;
2679 	while (offset < final_offset) {
2680 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2681 		if (length > (final_offset - offset)) {
2682 			length = final_offset - offset;
2683 		}
2684 
2685 		buf = tree_find_filled_buffer(file->tree, offset);
2686 		if (buf == NULL) {
2687 			pthread_spin_unlock(&file->lock);
2688 			rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2689 			pthread_spin_lock(&file->lock);
2690 			if (rc == 0) {
2691 				sub_reads++;
2692 			}
2693 		} else {
2694 			read_len = length;
2695 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2696 				read_len = buf->offset + buf->bytes_filled - offset;
2697 			}
2698 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2699 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2700 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2701 				tree_remove_buffer(file->tree, buf);
2702 				if (file->tree->present_mask == 0) {
2703 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2704 				}
2705 			}
2706 		}
2707 
2708 		if (rc == 0) {
2709 			final_length += length;
2710 		} else {
2711 			break;
2712 		}
2713 		payload += length;
2714 		offset += length;
2715 	}
2716 	pthread_spin_unlock(&file->lock);
2717 	while (sub_reads > 0) {
2718 		sem_wait(&channel->sem);
2719 		sub_reads--;
2720 	}
2721 	if (rc == 0) {
2722 		return final_length;
2723 	} else {
2724 		return rc;
2725 	}
2726 }
2727 
2728 static void
2729 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2730 	   spdk_file_op_complete cb_fn, void *cb_arg)
2731 {
2732 	struct spdk_fs_request *sync_req;
2733 	struct spdk_fs_request *flush_req;
2734 	struct spdk_fs_cb_args *sync_args;
2735 	struct spdk_fs_cb_args *flush_args;
2736 
2737 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2738 
2739 	pthread_spin_lock(&file->lock);
2740 	if (file->append_pos <= file->length_xattr) {
2741 		BLOBFS_TRACE(file, "done - file already synced\n");
2742 		pthread_spin_unlock(&file->lock);
2743 		cb_fn(cb_arg, 0);
2744 		return;
2745 	}
2746 
2747 	sync_req = alloc_fs_request(channel);
2748 	if (!sync_req) {
2749 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2750 		pthread_spin_unlock(&file->lock);
2751 		cb_fn(cb_arg, -ENOMEM);
2752 		return;
2753 	}
2754 	sync_args = &sync_req->args;
2755 
2756 	flush_req = alloc_fs_request(channel);
2757 	if (!flush_req) {
2758 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2759 		free_fs_request(sync_req);
2760 		pthread_spin_unlock(&file->lock);
2761 		cb_fn(cb_arg, -ENOMEM);
2762 		return;
2763 	}
2764 	flush_args = &flush_req->args;
2765 
2766 	sync_args->file = file;
2767 	sync_args->fn.file_op = cb_fn;
2768 	sync_args->arg = cb_arg;
2769 	sync_args->op.sync.offset = file->append_pos;
2770 	sync_args->op.sync.xattr_in_progress = false;
2771 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2772 	pthread_spin_unlock(&file->lock);
2773 
2774 	flush_args->file = file;
2775 	channel->send_request(__file_flush, flush_req);
2776 }
2777 
2778 int
2779 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2780 {
2781 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2782 	struct spdk_fs_cb_args args = {};
2783 
2784 	args.sem = &channel->sem;
2785 	_file_sync(file, channel, __wake_caller, &args);
2786 	sem_wait(&channel->sem);
2787 
2788 	return args.rc;
2789 }
2790 
2791 void
2792 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2793 		     spdk_file_op_complete cb_fn, void *cb_arg)
2794 {
2795 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2796 
2797 	_file_sync(file, channel, cb_fn, cb_arg);
2798 }
2799 
2800 void
2801 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2802 {
2803 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2804 	file->priority = priority;
2805 
2806 }
2807 
2808 /*
2809  * Close routines
2810  */
2811 
2812 static void
2813 __file_close_async_done(void *ctx, int bserrno)
2814 {
2815 	struct spdk_fs_request *req = ctx;
2816 	struct spdk_fs_cb_args *args = &req->args;
2817 	struct spdk_file *file = args->file;
2818 
2819 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2820 
2821 	if (file->is_deleted) {
2822 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2823 		return;
2824 	}
2825 
2826 	args->fn.file_op(args->arg, bserrno);
2827 	free_fs_request(req);
2828 }
2829 
2830 static void
2831 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2832 {
2833 	struct spdk_blob *blob;
2834 
2835 	pthread_spin_lock(&file->lock);
2836 	if (file->ref_count == 0) {
2837 		pthread_spin_unlock(&file->lock);
2838 		__file_close_async_done(req, -EBADF);
2839 		return;
2840 	}
2841 
2842 	file->ref_count--;
2843 	if (file->ref_count > 0) {
2844 		pthread_spin_unlock(&file->lock);
2845 		req->args.fn.file_op(req->args.arg, 0);
2846 		free_fs_request(req);
2847 		return;
2848 	}
2849 
2850 	pthread_spin_unlock(&file->lock);
2851 
2852 	blob = file->blob;
2853 	file->blob = NULL;
2854 	spdk_blob_close(blob, __file_close_async_done, req);
2855 }
2856 
2857 static void
2858 __file_close_async__sync_done(void *arg, int fserrno)
2859 {
2860 	struct spdk_fs_request *req = arg;
2861 	struct spdk_fs_cb_args *args = &req->args;
2862 
2863 	__file_close_async(args->file, req);
2864 }
2865 
2866 void
2867 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2868 {
2869 	struct spdk_fs_request *req;
2870 	struct spdk_fs_cb_args *args;
2871 
2872 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2873 	if (req == NULL) {
2874 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2875 		cb_fn(cb_arg, -ENOMEM);
2876 		return;
2877 	}
2878 
2879 	args = &req->args;
2880 	args->file = file;
2881 	args->fn.file_op = cb_fn;
2882 	args->arg = cb_arg;
2883 
2884 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2885 }
2886 
2887 static void
2888 __file_close(void *arg)
2889 {
2890 	struct spdk_fs_request *req = arg;
2891 	struct spdk_fs_cb_args *args = &req->args;
2892 	struct spdk_file *file = args->file;
2893 
2894 	__file_close_async(file, req);
2895 }
2896 
2897 int
2898 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2899 {
2900 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2901 	struct spdk_fs_request *req;
2902 	struct spdk_fs_cb_args *args;
2903 
2904 	req = alloc_fs_request(channel);
2905 	if (req == NULL) {
2906 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2907 		return -ENOMEM;
2908 	}
2909 
2910 	args = &req->args;
2911 
2912 	spdk_file_sync(file, ctx);
2913 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2914 	args->file = file;
2915 	args->sem = &channel->sem;
2916 	args->fn.file_op = __wake_caller;
2917 	args->arg = args;
2918 	channel->send_request(__file_close, req);
2919 	sem_wait(&channel->sem);
2920 
2921 	return args->rc;
2922 }
2923 
2924 int
2925 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2926 {
2927 	if (size < sizeof(spdk_blob_id)) {
2928 		return -EINVAL;
2929 	}
2930 
2931 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2932 
2933 	return sizeof(spdk_blob_id);
2934 }
2935 
2936 static void
2937 _file_free(void *ctx)
2938 {
2939 	struct spdk_file *file = ctx;
2940 
2941 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2942 
2943 	free(file->name);
2944 	free(file->tree);
2945 	free(file);
2946 }
2947 
2948 static void
2949 file_free(struct spdk_file *file)
2950 {
2951 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2952 	pthread_spin_lock(&file->lock);
2953 	if (file->tree->present_mask == 0) {
2954 		pthread_spin_unlock(&file->lock);
2955 		free(file->name);
2956 		free(file->tree);
2957 		free(file);
2958 		return;
2959 	}
2960 
2961 	tree_free_buffers(file->tree);
2962 	assert(file->tree->present_mask == 0);
2963 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2964 	pthread_spin_unlock(&file->lock);
2965 }
2966 
2967 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2968 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2969