xref: /spdk/lib/blobfs/blobfs.c (revision 9c48768aa0d6599ef68057efe547aa9adfb328d4)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
58 static struct spdk_mempool *g_cache_pool;
59 static TAILQ_HEAD(, spdk_file) g_caches;
60 static int g_fs_count = 0;
61 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_spinlock_t g_caches_lock;
63 
64 #define TRACE_GROUP_BLOBFS	0x7
65 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
66 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
67 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
68 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
69 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
70 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
71 
72 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
73 {
74 	spdk_trace_register_description("BLOBFS_XATTR_START",
75 					TRACE_BLOBFS_XATTR_START,
76 					OWNER_NONE, OBJECT_NONE, 0,
77 					SPDK_TRACE_ARG_TYPE_STR,
78 					"file:    ");
79 	spdk_trace_register_description("BLOBFS_XATTR_END",
80 					TRACE_BLOBFS_XATTR_END,
81 					OWNER_NONE, OBJECT_NONE, 0,
82 					SPDK_TRACE_ARG_TYPE_STR,
83 					"file:    ");
84 	spdk_trace_register_description("BLOBFS_OPEN",
85 					TRACE_BLOBFS_OPEN,
86 					OWNER_NONE, OBJECT_NONE, 0,
87 					SPDK_TRACE_ARG_TYPE_STR,
88 					"file:    ");
89 	spdk_trace_register_description("BLOBFS_CLOSE",
90 					TRACE_BLOBFS_CLOSE,
91 					OWNER_NONE, OBJECT_NONE, 0,
92 					SPDK_TRACE_ARG_TYPE_STR,
93 					"file:    ");
94 	spdk_trace_register_description("BLOBFS_DELETE_START",
95 					TRACE_BLOBFS_DELETE_START,
96 					OWNER_NONE, OBJECT_NONE, 0,
97 					SPDK_TRACE_ARG_TYPE_STR,
98 					"file:    ");
99 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
100 					TRACE_BLOBFS_DELETE_DONE,
101 					OWNER_NONE, OBJECT_NONE, 0,
102 					SPDK_TRACE_ARG_TYPE_STR,
103 					"file:    ");
104 }
105 
106 void
107 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
108 {
109 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
110 	free(cache_buffer);
111 }
112 
113 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
114 
115 struct spdk_file {
116 	struct spdk_filesystem	*fs;
117 	struct spdk_blob	*blob;
118 	char			*name;
119 	uint64_t		trace_arg_name;
120 	uint64_t		length;
121 	bool                    is_deleted;
122 	bool			open_for_writing;
123 	uint64_t		length_flushed;
124 	uint64_t		length_xattr;
125 	uint64_t		append_pos;
126 	uint64_t		seq_byte_count;
127 	uint64_t		next_seq_offset;
128 	uint32_t		priority;
129 	TAILQ_ENTRY(spdk_file)	tailq;
130 	spdk_blob_id		blobid;
131 	uint32_t		ref_count;
132 	pthread_spinlock_t	lock;
133 	struct cache_buffer	*last;
134 	struct cache_tree	*tree;
135 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
136 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
137 	TAILQ_ENTRY(spdk_file)	cache_tailq;
138 };
139 
140 struct spdk_deleted_file {
141 	spdk_blob_id	id;
142 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
143 };
144 
145 struct spdk_filesystem {
146 	struct spdk_blob_store	*bs;
147 	TAILQ_HEAD(, spdk_file)	files;
148 	struct spdk_bs_opts	bs_opts;
149 	struct spdk_bs_dev	*bdev;
150 	fs_send_request_fn	send_request;
151 
152 	struct {
153 		uint32_t		max_ops;
154 		struct spdk_io_channel	*sync_io_channel;
155 		struct spdk_fs_channel	*sync_fs_channel;
156 	} sync_target;
157 
158 	struct {
159 		uint32_t		max_ops;
160 		struct spdk_io_channel	*md_io_channel;
161 		struct spdk_fs_channel	*md_fs_channel;
162 	} md_target;
163 
164 	struct {
165 		uint32_t		max_ops;
166 	} io_target;
167 };
168 
169 struct spdk_fs_cb_args {
170 	union {
171 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
172 		spdk_fs_op_complete			fs_op;
173 		spdk_file_op_with_handle_complete	file_op_with_handle;
174 		spdk_file_op_complete			file_op;
175 		spdk_file_stat_op_complete		stat_op;
176 	} fn;
177 	void *arg;
178 	sem_t *sem;
179 	struct spdk_filesystem *fs;
180 	struct spdk_file *file;
181 	int rc;
182 	struct iovec *iovs;
183 	uint32_t iovcnt;
184 	struct iovec iov;
185 	union {
186 		struct {
187 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
188 		} fs_load;
189 		struct {
190 			uint64_t	length;
191 		} truncate;
192 		struct {
193 			struct spdk_io_channel	*channel;
194 			void		*pin_buf;
195 			int		is_read;
196 			off_t		offset;
197 			size_t		length;
198 			uint64_t	start_lba;
199 			uint64_t	num_lba;
200 			uint32_t	blocklen;
201 		} rw;
202 		struct {
203 			const char	*old_name;
204 			const char	*new_name;
205 		} rename;
206 		struct {
207 			struct cache_buffer	*cache_buffer;
208 			uint64_t		length;
209 		} flush;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 			uint64_t		offset;
214 		} readahead;
215 		struct {
216 			/* offset of the file when the sync request was made */
217 			uint64_t			offset;
218 			TAILQ_ENTRY(spdk_fs_request)	tailq;
219 			bool				xattr_in_progress;
220 			/* length written to the xattr for this file - this should
221 			 * always be the same as the offset if only one thread is
222 			 * writing to the file, but could differ if multiple threads
223 			 * are appending
224 			 */
225 			uint64_t			length;
226 		} sync;
227 		struct {
228 			uint32_t			num_clusters;
229 		} resize;
230 		struct {
231 			const char	*name;
232 			uint32_t	flags;
233 			TAILQ_ENTRY(spdk_fs_request)	tailq;
234 		} open;
235 		struct {
236 			const char		*name;
237 			struct spdk_blob	*blob;
238 		} create;
239 		struct {
240 			const char	*name;
241 		} delete;
242 		struct {
243 			const char	*name;
244 		} stat;
245 	} op;
246 };
247 
248 static void cache_free_buffers(struct spdk_file *file);
249 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
250 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
251 
252 void
253 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
254 {
255 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
256 }
257 
258 static void
259 __initialize_cache(void)
260 {
261 	assert(g_cache_pool == NULL);
262 
263 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
264 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
265 					   CACHE_BUFFER_SIZE,
266 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
267 					   SPDK_ENV_SOCKET_ID_ANY);
268 	if (!g_cache_pool) {
269 		SPDK_ERRLOG("Create mempool failed, you may "
270 			    "increase the memory and try again\n");
271 		assert(false);
272 	}
273 	TAILQ_INIT(&g_caches);
274 	pthread_spin_init(&g_caches_lock, 0);
275 }
276 
277 static void
278 __free_cache(void)
279 {
280 	assert(g_cache_pool != NULL);
281 
282 	spdk_mempool_free(g_cache_pool);
283 	g_cache_pool = NULL;
284 }
285 
286 static uint64_t
287 __file_get_blob_size(struct spdk_file *file)
288 {
289 	uint64_t cluster_sz;
290 
291 	cluster_sz = file->fs->bs_opts.cluster_sz;
292 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
293 }
294 
295 struct spdk_fs_request {
296 	struct spdk_fs_cb_args		args;
297 	TAILQ_ENTRY(spdk_fs_request)	link;
298 	struct spdk_fs_channel		*channel;
299 };
300 
301 struct spdk_fs_channel {
302 	struct spdk_fs_request		*req_mem;
303 	TAILQ_HEAD(, spdk_fs_request)	reqs;
304 	sem_t				sem;
305 	struct spdk_filesystem		*fs;
306 	struct spdk_io_channel		*bs_channel;
307 	fs_send_request_fn		send_request;
308 	bool				sync;
309 	uint32_t			outstanding_reqs;
310 	pthread_spinlock_t		lock;
311 };
312 
313 /* For now, this is effectively an alias. But eventually we'll shift
314  * some data members over. */
315 struct spdk_fs_thread_ctx {
316 	struct spdk_fs_channel	ch;
317 };
318 
319 static struct spdk_fs_request *
320 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
321 {
322 	struct spdk_fs_request *req;
323 	struct iovec *iovs = NULL;
324 
325 	if (iovcnt > 1) {
326 		iovs = calloc(iovcnt, sizeof(struct iovec));
327 		if (!iovs) {
328 			return NULL;
329 		}
330 	}
331 
332 	if (channel->sync) {
333 		pthread_spin_lock(&channel->lock);
334 	}
335 
336 	req = TAILQ_FIRST(&channel->reqs);
337 	if (req) {
338 		channel->outstanding_reqs++;
339 		TAILQ_REMOVE(&channel->reqs, req, link);
340 	}
341 
342 	if (channel->sync) {
343 		pthread_spin_unlock(&channel->lock);
344 	}
345 
346 	if (req == NULL) {
347 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
348 		free(iovs);
349 		return NULL;
350 	}
351 	memset(req, 0, sizeof(*req));
352 	req->channel = channel;
353 	if (iovcnt > 1) {
354 		req->args.iovs = iovs;
355 	} else {
356 		req->args.iovs = &req->args.iov;
357 	}
358 	req->args.iovcnt = iovcnt;
359 
360 	return req;
361 }
362 
363 static struct spdk_fs_request *
364 alloc_fs_request(struct spdk_fs_channel *channel)
365 {
366 	return alloc_fs_request_with_iov(channel, 0);
367 }
368 
369 static void
370 free_fs_request(struct spdk_fs_request *req)
371 {
372 	struct spdk_fs_channel *channel = req->channel;
373 
374 	if (req->args.iovcnt > 1) {
375 		free(req->args.iovs);
376 	}
377 
378 	if (channel->sync) {
379 		pthread_spin_lock(&channel->lock);
380 	}
381 
382 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
383 	channel->outstanding_reqs--;
384 
385 	if (channel->sync) {
386 		pthread_spin_unlock(&channel->lock);
387 	}
388 }
389 
390 static int
391 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
392 			uint32_t max_ops)
393 {
394 	uint32_t i;
395 
396 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
397 	if (!channel->req_mem) {
398 		return -1;
399 	}
400 
401 	channel->outstanding_reqs = 0;
402 	TAILQ_INIT(&channel->reqs);
403 	sem_init(&channel->sem, 0, 0);
404 
405 	for (i = 0; i < max_ops; i++) {
406 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
407 	}
408 
409 	channel->fs = fs;
410 
411 	return 0;
412 }
413 
414 static int
415 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
416 {
417 	struct spdk_filesystem		*fs;
418 	struct spdk_fs_channel		*channel = ctx_buf;
419 
420 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
421 
422 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
423 }
424 
425 static int
426 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
427 {
428 	struct spdk_filesystem		*fs;
429 	struct spdk_fs_channel		*channel = ctx_buf;
430 
431 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
432 
433 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
434 }
435 
436 static int
437 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
438 {
439 	struct spdk_filesystem		*fs;
440 	struct spdk_fs_channel		*channel = ctx_buf;
441 
442 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
443 
444 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
445 }
446 
447 static void
448 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
449 {
450 	struct spdk_fs_channel *channel = ctx_buf;
451 
452 	if (channel->outstanding_reqs > 0) {
453 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
454 			    channel->outstanding_reqs);
455 	}
456 
457 	free(channel->req_mem);
458 	if (channel->bs_channel != NULL) {
459 		spdk_bs_free_io_channel(channel->bs_channel);
460 	}
461 }
462 
463 static void
464 __send_request_direct(fs_request_fn fn, void *arg)
465 {
466 	fn(arg);
467 }
468 
469 static void
470 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
471 {
472 	fs->bs = bs;
473 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
474 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
475 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
476 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
477 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
478 
479 	pthread_mutex_lock(&g_cache_init_lock);
480 	if (g_fs_count == 0) {
481 		__initialize_cache();
482 	}
483 	g_fs_count++;
484 	pthread_mutex_unlock(&g_cache_init_lock);
485 }
486 
487 static void
488 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
489 {
490 	struct spdk_fs_request *req = ctx;
491 	struct spdk_fs_cb_args *args = &req->args;
492 	struct spdk_filesystem *fs = args->fs;
493 
494 	if (bserrno == 0) {
495 		common_fs_bs_init(fs, bs);
496 	} else {
497 		free(fs);
498 		fs = NULL;
499 	}
500 
501 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
502 	free_fs_request(req);
503 }
504 
505 static void
506 fs_conf_parse(void)
507 {
508 	struct spdk_conf_section *sp;
509 
510 	sp = spdk_conf_find_section(NULL, "Blobfs");
511 	if (sp == NULL) {
512 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
513 		return;
514 	}
515 
516 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
517 	if (g_fs_cache_buffer_shift <= 0) {
518 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
519 	}
520 }
521 
522 static struct spdk_filesystem *
523 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
524 {
525 	struct spdk_filesystem *fs;
526 
527 	fs = calloc(1, sizeof(*fs));
528 	if (fs == NULL) {
529 		return NULL;
530 	}
531 
532 	fs->bdev = dev;
533 	fs->send_request = send_request_fn;
534 	TAILQ_INIT(&fs->files);
535 
536 	fs->md_target.max_ops = 512;
537 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
538 				sizeof(struct spdk_fs_channel), "blobfs_md");
539 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
540 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
541 
542 	fs->sync_target.max_ops = 512;
543 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
544 				sizeof(struct spdk_fs_channel), "blobfs_sync");
545 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
546 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
547 
548 	fs->io_target.max_ops = 512;
549 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
550 				sizeof(struct spdk_fs_channel), "blobfs_io");
551 
552 	return fs;
553 }
554 
555 static void
556 __wake_caller(void *arg, int fserrno)
557 {
558 	struct spdk_fs_cb_args *args = arg;
559 
560 	args->rc = fserrno;
561 	sem_post(args->sem);
562 }
563 
564 void
565 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
566 	     fs_send_request_fn send_request_fn,
567 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
568 {
569 	struct spdk_filesystem *fs;
570 	struct spdk_fs_request *req;
571 	struct spdk_fs_cb_args *args;
572 	struct spdk_bs_opts opts = {};
573 
574 	fs = fs_alloc(dev, send_request_fn);
575 	if (fs == NULL) {
576 		cb_fn(cb_arg, NULL, -ENOMEM);
577 		return;
578 	}
579 
580 	fs_conf_parse();
581 
582 	req = alloc_fs_request(fs->md_target.md_fs_channel);
583 	if (req == NULL) {
584 		spdk_fs_free_io_channels(fs);
585 		spdk_fs_io_device_unregister(fs);
586 		cb_fn(cb_arg, NULL, -ENOMEM);
587 		return;
588 	}
589 
590 	args = &req->args;
591 	args->fn.fs_op_with_handle = cb_fn;
592 	args->arg = cb_arg;
593 	args->fs = fs;
594 
595 	spdk_bs_opts_init(&opts);
596 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
597 	if (opt) {
598 		opts.cluster_sz = opt->cluster_sz;
599 	}
600 	spdk_bs_init(dev, &opts, init_cb, req);
601 }
602 
603 static struct spdk_file *
604 file_alloc(struct spdk_filesystem *fs)
605 {
606 	struct spdk_file *file;
607 
608 	file = calloc(1, sizeof(*file));
609 	if (file == NULL) {
610 		return NULL;
611 	}
612 
613 	file->tree = calloc(1, sizeof(*file->tree));
614 	if (file->tree == NULL) {
615 		free(file);
616 		return NULL;
617 	}
618 
619 	file->fs = fs;
620 	TAILQ_INIT(&file->open_requests);
621 	TAILQ_INIT(&file->sync_requests);
622 	pthread_spin_init(&file->lock, 0);
623 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
624 	file->priority = SPDK_FILE_PRIORITY_LOW;
625 	return file;
626 }
627 
628 static void fs_load_done(void *ctx, int bserrno);
629 
630 static int
631 _handle_deleted_files(struct spdk_fs_request *req)
632 {
633 	struct spdk_fs_cb_args *args = &req->args;
634 	struct spdk_filesystem *fs = args->fs;
635 
636 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
637 		struct spdk_deleted_file *deleted_file;
638 
639 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
640 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
641 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
642 		free(deleted_file);
643 		return 0;
644 	}
645 
646 	return 1;
647 }
648 
649 static void
650 fs_load_done(void *ctx, int bserrno)
651 {
652 	struct spdk_fs_request *req = ctx;
653 	struct spdk_fs_cb_args *args = &req->args;
654 	struct spdk_filesystem *fs = args->fs;
655 
656 	/* The filesystem has been loaded.  Now check if there are any files that
657 	 *  were marked for deletion before last unload.  Do not complete the
658 	 *  fs_load callback until all of them have been deleted on disk.
659 	 */
660 	if (_handle_deleted_files(req) == 0) {
661 		/* We found a file that's been marked for deleting but not actually
662 		 *  deleted yet.  This function will get called again once the delete
663 		 *  operation is completed.
664 		 */
665 		return;
666 	}
667 
668 	args->fn.fs_op_with_handle(args->arg, fs, 0);
669 	free_fs_request(req);
670 
671 }
672 
673 static void
674 _file_build_trace_arg_name(struct spdk_file *f)
675 {
676 	f->trace_arg_name = 0;
677 	memcpy(&f->trace_arg_name, f->name,
678 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
679 }
680 
681 static void
682 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
683 {
684 	struct spdk_fs_request *req = ctx;
685 	struct spdk_fs_cb_args *args = &req->args;
686 	struct spdk_filesystem *fs = args->fs;
687 	uint64_t *length;
688 	const char *name;
689 	uint32_t *is_deleted;
690 	size_t value_len;
691 
692 	if (rc < 0) {
693 		args->fn.fs_op_with_handle(args->arg, fs, rc);
694 		free_fs_request(req);
695 		return;
696 	}
697 
698 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
699 	if (rc < 0) {
700 		args->fn.fs_op_with_handle(args->arg, fs, rc);
701 		free_fs_request(req);
702 		return;
703 	}
704 
705 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
706 	if (rc < 0) {
707 		args->fn.fs_op_with_handle(args->arg, fs, rc);
708 		free_fs_request(req);
709 		return;
710 	}
711 
712 	assert(value_len == 8);
713 
714 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
715 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
716 	if (rc < 0) {
717 		struct spdk_file *f;
718 
719 		f = file_alloc(fs);
720 		if (f == NULL) {
721 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
722 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
723 			free_fs_request(req);
724 			return;
725 		}
726 
727 		f->name = strdup(name);
728 		_file_build_trace_arg_name(f);
729 		f->blobid = spdk_blob_get_id(blob);
730 		f->length = *length;
731 		f->length_flushed = *length;
732 		f->length_xattr = *length;
733 		f->append_pos = *length;
734 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
735 	} else {
736 		struct spdk_deleted_file *deleted_file;
737 
738 		deleted_file = calloc(1, sizeof(*deleted_file));
739 		if (deleted_file == NULL) {
740 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
741 			free_fs_request(req);
742 			return;
743 		}
744 		deleted_file->id = spdk_blob_get_id(blob);
745 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
746 	}
747 }
748 
749 static void
750 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
751 {
752 	struct spdk_fs_request *req = ctx;
753 	struct spdk_fs_cb_args *args = &req->args;
754 	struct spdk_filesystem *fs = args->fs;
755 	struct spdk_bs_type bstype;
756 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
757 	static const struct spdk_bs_type zeros;
758 
759 	if (bserrno != 0) {
760 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
761 		free_fs_request(req);
762 		free(fs);
763 		return;
764 	}
765 
766 	bstype = spdk_bs_get_bstype(bs);
767 
768 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
769 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
770 		spdk_bs_set_bstype(bs, blobfs_type);
771 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
772 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
773 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
774 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
775 		free_fs_request(req);
776 		free(fs);
777 		return;
778 	}
779 
780 	common_fs_bs_init(fs, bs);
781 	fs_load_done(req, 0);
782 }
783 
784 static void
785 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
786 {
787 	assert(fs != NULL);
788 	spdk_io_device_unregister(&fs->md_target, NULL);
789 	spdk_io_device_unregister(&fs->sync_target, NULL);
790 	spdk_io_device_unregister(&fs->io_target, NULL);
791 	free(fs);
792 }
793 
794 static void
795 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
796 {
797 	assert(fs != NULL);
798 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
799 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
800 }
801 
802 void
803 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
804 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
805 {
806 	struct spdk_filesystem *fs;
807 	struct spdk_fs_cb_args *args;
808 	struct spdk_fs_request *req;
809 	struct spdk_bs_opts	bs_opts;
810 
811 	fs = fs_alloc(dev, send_request_fn);
812 	if (fs == NULL) {
813 		cb_fn(cb_arg, NULL, -ENOMEM);
814 		return;
815 	}
816 
817 	fs_conf_parse();
818 
819 	req = alloc_fs_request(fs->md_target.md_fs_channel);
820 	if (req == NULL) {
821 		spdk_fs_free_io_channels(fs);
822 		spdk_fs_io_device_unregister(fs);
823 		cb_fn(cb_arg, NULL, -ENOMEM);
824 		return;
825 	}
826 
827 	args = &req->args;
828 	args->fn.fs_op_with_handle = cb_fn;
829 	args->arg = cb_arg;
830 	args->fs = fs;
831 	TAILQ_INIT(&args->op.fs_load.deleted_files);
832 	spdk_bs_opts_init(&bs_opts);
833 	bs_opts.iter_cb_fn = iter_cb;
834 	bs_opts.iter_cb_arg = req;
835 	spdk_bs_load(dev, &bs_opts, load_cb, req);
836 }
837 
838 static void
839 unload_cb(void *ctx, int bserrno)
840 {
841 	struct spdk_fs_request *req = ctx;
842 	struct spdk_fs_cb_args *args = &req->args;
843 	struct spdk_filesystem *fs = args->fs;
844 	struct spdk_file *file, *tmp;
845 
846 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
847 		TAILQ_REMOVE(&fs->files, file, tailq);
848 		cache_free_buffers(file);
849 		free(file->name);
850 		free(file->tree);
851 		free(file);
852 	}
853 
854 	pthread_mutex_lock(&g_cache_init_lock);
855 	g_fs_count--;
856 	if (g_fs_count == 0) {
857 		__free_cache();
858 	}
859 	pthread_mutex_unlock(&g_cache_init_lock);
860 
861 	args->fn.fs_op(args->arg, bserrno);
862 	free(req);
863 
864 	spdk_fs_io_device_unregister(fs);
865 }
866 
867 void
868 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
869 {
870 	struct spdk_fs_request *req;
871 	struct spdk_fs_cb_args *args;
872 
873 	/*
874 	 * We must free the md_channel before unloading the blobstore, so just
875 	 *  allocate this request from the general heap.
876 	 */
877 	req = calloc(1, sizeof(*req));
878 	if (req == NULL) {
879 		cb_fn(cb_arg, -ENOMEM);
880 		return;
881 	}
882 
883 	args = &req->args;
884 	args->fn.fs_op = cb_fn;
885 	args->arg = cb_arg;
886 	args->fs = fs;
887 
888 	spdk_fs_free_io_channels(fs);
889 	spdk_bs_unload(fs->bs, unload_cb, req);
890 }
891 
892 static struct spdk_file *
893 fs_find_file(struct spdk_filesystem *fs, const char *name)
894 {
895 	struct spdk_file *file;
896 
897 	TAILQ_FOREACH(file, &fs->files, tailq) {
898 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
899 			return file;
900 		}
901 	}
902 
903 	return NULL;
904 }
905 
906 void
907 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
908 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
909 {
910 	struct spdk_file_stat stat;
911 	struct spdk_file *f = NULL;
912 
913 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
914 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
915 		return;
916 	}
917 
918 	f = fs_find_file(fs, name);
919 	if (f != NULL) {
920 		stat.blobid = f->blobid;
921 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
922 		cb_fn(cb_arg, &stat, 0);
923 		return;
924 	}
925 
926 	cb_fn(cb_arg, NULL, -ENOENT);
927 }
928 
929 static void
930 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
931 {
932 	struct spdk_fs_request *req = arg;
933 	struct spdk_fs_cb_args *args = &req->args;
934 
935 	args->rc = fserrno;
936 	if (fserrno == 0) {
937 		memcpy(args->arg, stat, sizeof(*stat));
938 	}
939 	sem_post(args->sem);
940 }
941 
942 static void
943 __file_stat(void *arg)
944 {
945 	struct spdk_fs_request *req = arg;
946 	struct spdk_fs_cb_args *args = &req->args;
947 
948 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
949 				args->fn.stat_op, req);
950 }
951 
952 int
953 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
954 		  const char *name, struct spdk_file_stat *stat)
955 {
956 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
957 	struct spdk_fs_request *req;
958 	int rc;
959 
960 	req = alloc_fs_request(channel);
961 	if (req == NULL) {
962 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
963 		return -ENOMEM;
964 	}
965 
966 	req->args.fs = fs;
967 	req->args.op.stat.name = name;
968 	req->args.fn.stat_op = __copy_stat;
969 	req->args.arg = stat;
970 	req->args.sem = &channel->sem;
971 	channel->send_request(__file_stat, req);
972 	sem_wait(&channel->sem);
973 
974 	rc = req->args.rc;
975 	free_fs_request(req);
976 
977 	return rc;
978 }
979 
980 static void
981 fs_create_blob_close_cb(void *ctx, int bserrno)
982 {
983 	int rc;
984 	struct spdk_fs_request *req = ctx;
985 	struct spdk_fs_cb_args *args = &req->args;
986 
987 	rc = args->rc ? args->rc : bserrno;
988 	args->fn.file_op(args->arg, rc);
989 	free_fs_request(req);
990 }
991 
992 static void
993 fs_create_blob_resize_cb(void *ctx, int bserrno)
994 {
995 	struct spdk_fs_request *req = ctx;
996 	struct spdk_fs_cb_args *args = &req->args;
997 	struct spdk_file *f = args->file;
998 	struct spdk_blob *blob = args->op.create.blob;
999 	uint64_t length = 0;
1000 
1001 	args->rc = bserrno;
1002 	if (bserrno) {
1003 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1004 		return;
1005 	}
1006 
1007 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1008 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1009 
1010 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1011 }
1012 
1013 static void
1014 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1015 {
1016 	struct spdk_fs_request *req = ctx;
1017 	struct spdk_fs_cb_args *args = &req->args;
1018 
1019 	if (bserrno) {
1020 		args->fn.file_op(args->arg, bserrno);
1021 		free_fs_request(req);
1022 		return;
1023 	}
1024 
1025 	args->op.create.blob = blob;
1026 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1027 }
1028 
1029 static void
1030 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1031 {
1032 	struct spdk_fs_request *req = ctx;
1033 	struct spdk_fs_cb_args *args = &req->args;
1034 	struct spdk_file *f = args->file;
1035 
1036 	if (bserrno) {
1037 		args->fn.file_op(args->arg, bserrno);
1038 		free_fs_request(req);
1039 		return;
1040 	}
1041 
1042 	f->blobid = blobid;
1043 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1044 }
1045 
1046 void
1047 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1048 			  spdk_file_op_complete cb_fn, void *cb_arg)
1049 {
1050 	struct spdk_file *file;
1051 	struct spdk_fs_request *req;
1052 	struct spdk_fs_cb_args *args;
1053 
1054 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1055 		cb_fn(cb_arg, -ENAMETOOLONG);
1056 		return;
1057 	}
1058 
1059 	file = fs_find_file(fs, name);
1060 	if (file != NULL) {
1061 		cb_fn(cb_arg, -EEXIST);
1062 		return;
1063 	}
1064 
1065 	file = file_alloc(fs);
1066 	if (file == NULL) {
1067 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1068 		cb_fn(cb_arg, -ENOMEM);
1069 		return;
1070 	}
1071 
1072 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1073 	if (req == NULL) {
1074 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1075 		cb_fn(cb_arg, -ENOMEM);
1076 		return;
1077 	}
1078 
1079 	args = &req->args;
1080 	args->file = file;
1081 	args->fn.file_op = cb_fn;
1082 	args->arg = cb_arg;
1083 
1084 	file->name = strdup(name);
1085 	_file_build_trace_arg_name(file);
1086 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1087 }
1088 
1089 static void
1090 __fs_create_file_done(void *arg, int fserrno)
1091 {
1092 	struct spdk_fs_request *req = arg;
1093 	struct spdk_fs_cb_args *args = &req->args;
1094 
1095 	args->rc = fserrno;
1096 	sem_post(args->sem);
1097 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1098 }
1099 
1100 static void
1101 __fs_create_file(void *arg)
1102 {
1103 	struct spdk_fs_request *req = arg;
1104 	struct spdk_fs_cb_args *args = &req->args;
1105 
1106 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1107 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1108 }
1109 
1110 int
1111 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1112 {
1113 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1114 	struct spdk_fs_request *req;
1115 	struct spdk_fs_cb_args *args;
1116 	int rc;
1117 
1118 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1119 
1120 	req = alloc_fs_request(channel);
1121 	if (req == NULL) {
1122 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1123 		return -ENOMEM;
1124 	}
1125 
1126 	args = &req->args;
1127 	args->fs = fs;
1128 	args->op.create.name = name;
1129 	args->sem = &channel->sem;
1130 	fs->send_request(__fs_create_file, req);
1131 	sem_wait(&channel->sem);
1132 	rc = args->rc;
1133 	free_fs_request(req);
1134 
1135 	return rc;
1136 }
1137 
1138 static void
1139 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1140 {
1141 	struct spdk_fs_request *req = ctx;
1142 	struct spdk_fs_cb_args *args = &req->args;
1143 	struct spdk_file *f = args->file;
1144 
1145 	f->blob = blob;
1146 	while (!TAILQ_EMPTY(&f->open_requests)) {
1147 		req = TAILQ_FIRST(&f->open_requests);
1148 		args = &req->args;
1149 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1150 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1151 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1152 		free_fs_request(req);
1153 	}
1154 }
1155 
1156 static void
1157 fs_open_blob_create_cb(void *ctx, int bserrno)
1158 {
1159 	struct spdk_fs_request *req = ctx;
1160 	struct spdk_fs_cb_args *args = &req->args;
1161 	struct spdk_file *file = args->file;
1162 	struct spdk_filesystem *fs = args->fs;
1163 
1164 	if (file == NULL) {
1165 		/*
1166 		 * This is from an open with CREATE flag - the file
1167 		 *  is now created so look it up in the file list for this
1168 		 *  filesystem.
1169 		 */
1170 		file = fs_find_file(fs, args->op.open.name);
1171 		assert(file != NULL);
1172 		args->file = file;
1173 	}
1174 
1175 	file->ref_count++;
1176 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1177 	if (file->ref_count == 1) {
1178 		assert(file->blob == NULL);
1179 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1180 	} else if (file->blob != NULL) {
1181 		fs_open_blob_done(req, file->blob, 0);
1182 	} else {
1183 		/*
1184 		 * The blob open for this file is in progress due to a previous
1185 		 *  open request.  When that open completes, it will invoke the
1186 		 *  open callback for this request.
1187 		 */
1188 	}
1189 }
1190 
1191 void
1192 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1193 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1194 {
1195 	struct spdk_file *f = NULL;
1196 	struct spdk_fs_request *req;
1197 	struct spdk_fs_cb_args *args;
1198 
1199 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1200 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1201 		return;
1202 	}
1203 
1204 	f = fs_find_file(fs, name);
1205 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1206 		cb_fn(cb_arg, NULL, -ENOENT);
1207 		return;
1208 	}
1209 
1210 	if (f != NULL && f->is_deleted == true) {
1211 		cb_fn(cb_arg, NULL, -ENOENT);
1212 		return;
1213 	}
1214 
1215 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1216 	if (req == NULL) {
1217 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1218 		cb_fn(cb_arg, NULL, -ENOMEM);
1219 		return;
1220 	}
1221 
1222 	args = &req->args;
1223 	args->fn.file_op_with_handle = cb_fn;
1224 	args->arg = cb_arg;
1225 	args->file = f;
1226 	args->fs = fs;
1227 	args->op.open.name = name;
1228 
1229 	if (f == NULL) {
1230 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1231 	} else {
1232 		fs_open_blob_create_cb(req, 0);
1233 	}
1234 }
1235 
1236 static void
1237 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1238 {
1239 	struct spdk_fs_request *req = arg;
1240 	struct spdk_fs_cb_args *args = &req->args;
1241 
1242 	args->file = file;
1243 	__wake_caller(args, bserrno);
1244 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1245 }
1246 
1247 static void
1248 __fs_open_file(void *arg)
1249 {
1250 	struct spdk_fs_request *req = arg;
1251 	struct spdk_fs_cb_args *args = &req->args;
1252 
1253 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1254 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1255 				__fs_open_file_done, req);
1256 }
1257 
1258 int
1259 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1260 		  const char *name, uint32_t flags, struct spdk_file **file)
1261 {
1262 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1263 	struct spdk_fs_request *req;
1264 	struct spdk_fs_cb_args *args;
1265 	int rc;
1266 
1267 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1268 
1269 	req = alloc_fs_request(channel);
1270 	if (req == NULL) {
1271 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1272 		return -ENOMEM;
1273 	}
1274 
1275 	args = &req->args;
1276 	args->fs = fs;
1277 	args->op.open.name = name;
1278 	args->op.open.flags = flags;
1279 	args->sem = &channel->sem;
1280 	fs->send_request(__fs_open_file, req);
1281 	sem_wait(&channel->sem);
1282 	rc = args->rc;
1283 	if (rc == 0) {
1284 		*file = args->file;
1285 	} else {
1286 		*file = NULL;
1287 	}
1288 	free_fs_request(req);
1289 
1290 	return rc;
1291 }
1292 
1293 static void
1294 fs_rename_blob_close_cb(void *ctx, int bserrno)
1295 {
1296 	struct spdk_fs_request *req = ctx;
1297 	struct spdk_fs_cb_args *args = &req->args;
1298 
1299 	args->fn.fs_op(args->arg, bserrno);
1300 	free_fs_request(req);
1301 }
1302 
1303 static void
1304 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1305 {
1306 	struct spdk_fs_request *req = ctx;
1307 	struct spdk_fs_cb_args *args = &req->args;
1308 	const char *new_name = args->op.rename.new_name;
1309 
1310 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1311 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1312 }
1313 
1314 static void
1315 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1316 {
1317 	struct spdk_fs_cb_args *args = &req->args;
1318 	struct spdk_file *f;
1319 
1320 	f = fs_find_file(args->fs, args->op.rename.old_name);
1321 	if (f == NULL) {
1322 		args->fn.fs_op(args->arg, -ENOENT);
1323 		free_fs_request(req);
1324 		return;
1325 	}
1326 
1327 	free(f->name);
1328 	f->name = strdup(args->op.rename.new_name);
1329 	_file_build_trace_arg_name(f);
1330 	args->file = f;
1331 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1332 }
1333 
1334 static void
1335 fs_rename_delete_done(void *arg, int fserrno)
1336 {
1337 	__spdk_fs_md_rename_file(arg);
1338 }
1339 
1340 void
1341 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1342 			  const char *old_name, const char *new_name,
1343 			  spdk_file_op_complete cb_fn, void *cb_arg)
1344 {
1345 	struct spdk_file *f;
1346 	struct spdk_fs_request *req;
1347 	struct spdk_fs_cb_args *args;
1348 
1349 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1350 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1351 		cb_fn(cb_arg, -ENAMETOOLONG);
1352 		return;
1353 	}
1354 
1355 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1356 	if (req == NULL) {
1357 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1358 			    new_name);
1359 		cb_fn(cb_arg, -ENOMEM);
1360 		return;
1361 	}
1362 
1363 	args = &req->args;
1364 	args->fn.fs_op = cb_fn;
1365 	args->fs = fs;
1366 	args->arg = cb_arg;
1367 	args->op.rename.old_name = old_name;
1368 	args->op.rename.new_name = new_name;
1369 
1370 	f = fs_find_file(fs, new_name);
1371 	if (f == NULL) {
1372 		__spdk_fs_md_rename_file(req);
1373 		return;
1374 	}
1375 
1376 	/*
1377 	 * The rename overwrites an existing file.  So delete the existing file, then
1378 	 *  do the actual rename.
1379 	 */
1380 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1381 }
1382 
1383 static void
1384 __fs_rename_file_done(void *arg, int fserrno)
1385 {
1386 	struct spdk_fs_request *req = arg;
1387 	struct spdk_fs_cb_args *args = &req->args;
1388 
1389 	__wake_caller(args, fserrno);
1390 }
1391 
1392 static void
1393 __fs_rename_file(void *arg)
1394 {
1395 	struct spdk_fs_request *req = arg;
1396 	struct spdk_fs_cb_args *args = &req->args;
1397 
1398 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1399 				  __fs_rename_file_done, req);
1400 }
1401 
1402 int
1403 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1404 		    const char *old_name, const char *new_name)
1405 {
1406 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1407 	struct spdk_fs_request *req;
1408 	struct spdk_fs_cb_args *args;
1409 	int rc;
1410 
1411 	req = alloc_fs_request(channel);
1412 	if (req == NULL) {
1413 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1414 		return -ENOMEM;
1415 	}
1416 
1417 	args = &req->args;
1418 
1419 	args->fs = fs;
1420 	args->op.rename.old_name = old_name;
1421 	args->op.rename.new_name = new_name;
1422 	args->sem = &channel->sem;
1423 	fs->send_request(__fs_rename_file, req);
1424 	sem_wait(&channel->sem);
1425 	rc = args->rc;
1426 	free_fs_request(req);
1427 	return rc;
1428 }
1429 
1430 static void
1431 blob_delete_cb(void *ctx, int bserrno)
1432 {
1433 	struct spdk_fs_request *req = ctx;
1434 	struct spdk_fs_cb_args *args = &req->args;
1435 
1436 	args->fn.file_op(args->arg, bserrno);
1437 	free_fs_request(req);
1438 }
1439 
1440 void
1441 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1442 			  spdk_file_op_complete cb_fn, void *cb_arg)
1443 {
1444 	struct spdk_file *f;
1445 	spdk_blob_id blobid;
1446 	struct spdk_fs_request *req;
1447 	struct spdk_fs_cb_args *args;
1448 
1449 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1450 
1451 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1452 		cb_fn(cb_arg, -ENAMETOOLONG);
1453 		return;
1454 	}
1455 
1456 	f = fs_find_file(fs, name);
1457 	if (f == NULL) {
1458 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1459 		cb_fn(cb_arg, -ENOENT);
1460 		return;
1461 	}
1462 
1463 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1464 	if (req == NULL) {
1465 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1466 		cb_fn(cb_arg, -ENOMEM);
1467 		return;
1468 	}
1469 
1470 	args = &req->args;
1471 	args->fn.file_op = cb_fn;
1472 	args->arg = cb_arg;
1473 
1474 	if (f->ref_count > 0) {
1475 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1476 		f->is_deleted = true;
1477 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1478 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1479 		return;
1480 	}
1481 
1482 	TAILQ_REMOVE(&fs->files, f, tailq);
1483 
1484 	cache_free_buffers(f);
1485 
1486 	blobid = f->blobid;
1487 
1488 	free(f->name);
1489 	free(f->tree);
1490 	free(f);
1491 
1492 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1493 }
1494 
1495 static uint64_t
1496 fs_name_to_uint64(const char *name)
1497 {
1498 	uint64_t result = 0;
1499 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1500 	return result;
1501 }
1502 
1503 static void
1504 __fs_delete_file_done(void *arg, int fserrno)
1505 {
1506 	struct spdk_fs_request *req = arg;
1507 	struct spdk_fs_cb_args *args = &req->args;
1508 
1509 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1510 	__wake_caller(args, fserrno);
1511 }
1512 
1513 static void
1514 __fs_delete_file(void *arg)
1515 {
1516 	struct spdk_fs_request *req = arg;
1517 	struct spdk_fs_cb_args *args = &req->args;
1518 
1519 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1520 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1521 }
1522 
1523 int
1524 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1525 		    const char *name)
1526 {
1527 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1528 	struct spdk_fs_request *req;
1529 	struct spdk_fs_cb_args *args;
1530 	int rc;
1531 
1532 	req = alloc_fs_request(channel);
1533 	if (req == NULL) {
1534 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1535 		return -ENOMEM;
1536 	}
1537 
1538 	args = &req->args;
1539 	args->fs = fs;
1540 	args->op.delete.name = name;
1541 	args->sem = &channel->sem;
1542 	fs->send_request(__fs_delete_file, req);
1543 	sem_wait(&channel->sem);
1544 	rc = args->rc;
1545 	free_fs_request(req);
1546 
1547 	return rc;
1548 }
1549 
1550 spdk_fs_iter
1551 spdk_fs_iter_first(struct spdk_filesystem *fs)
1552 {
1553 	struct spdk_file *f;
1554 
1555 	f = TAILQ_FIRST(&fs->files);
1556 	return f;
1557 }
1558 
1559 spdk_fs_iter
1560 spdk_fs_iter_next(spdk_fs_iter iter)
1561 {
1562 	struct spdk_file *f = iter;
1563 
1564 	if (f == NULL) {
1565 		return NULL;
1566 	}
1567 
1568 	f = TAILQ_NEXT(f, tailq);
1569 	return f;
1570 }
1571 
1572 const char *
1573 spdk_file_get_name(struct spdk_file *file)
1574 {
1575 	return file->name;
1576 }
1577 
1578 uint64_t
1579 spdk_file_get_length(struct spdk_file *file)
1580 {
1581 	uint64_t length;
1582 
1583 	assert(file != NULL);
1584 
1585 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1586 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1587 	return length;
1588 }
1589 
1590 static void
1591 fs_truncate_complete_cb(void *ctx, int bserrno)
1592 {
1593 	struct spdk_fs_request *req = ctx;
1594 	struct spdk_fs_cb_args *args = &req->args;
1595 
1596 	args->fn.file_op(args->arg, bserrno);
1597 	free_fs_request(req);
1598 }
1599 
1600 static void
1601 fs_truncate_resize_cb(void *ctx, int bserrno)
1602 {
1603 	struct spdk_fs_request *req = ctx;
1604 	struct spdk_fs_cb_args *args = &req->args;
1605 	struct spdk_file *file = args->file;
1606 	uint64_t *length = &args->op.truncate.length;
1607 
1608 	if (bserrno) {
1609 		args->fn.file_op(args->arg, bserrno);
1610 		free_fs_request(req);
1611 		return;
1612 	}
1613 
1614 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1615 
1616 	file->length = *length;
1617 	if (file->append_pos > file->length) {
1618 		file->append_pos = file->length;
1619 	}
1620 
1621 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1622 }
1623 
1624 static uint64_t
1625 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1626 {
1627 	return (length + cluster_sz - 1) / cluster_sz;
1628 }
1629 
1630 void
1631 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1632 			 spdk_file_op_complete cb_fn, void *cb_arg)
1633 {
1634 	struct spdk_filesystem *fs;
1635 	size_t num_clusters;
1636 	struct spdk_fs_request *req;
1637 	struct spdk_fs_cb_args *args;
1638 
1639 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1640 	if (length == file->length) {
1641 		cb_fn(cb_arg, 0);
1642 		return;
1643 	}
1644 
1645 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1646 	if (req == NULL) {
1647 		cb_fn(cb_arg, -ENOMEM);
1648 		return;
1649 	}
1650 
1651 	args = &req->args;
1652 	args->fn.file_op = cb_fn;
1653 	args->arg = cb_arg;
1654 	args->file = file;
1655 	args->op.truncate.length = length;
1656 	fs = file->fs;
1657 
1658 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1659 
1660 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1661 }
1662 
1663 static void
1664 __truncate(void *arg)
1665 {
1666 	struct spdk_fs_request *req = arg;
1667 	struct spdk_fs_cb_args *args = &req->args;
1668 
1669 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1670 				 args->fn.file_op, args);
1671 }
1672 
1673 int
1674 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1675 		   uint64_t length)
1676 {
1677 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1678 	struct spdk_fs_request *req;
1679 	struct spdk_fs_cb_args *args;
1680 	int rc;
1681 
1682 	req = alloc_fs_request(channel);
1683 	if (req == NULL) {
1684 		return -ENOMEM;
1685 	}
1686 
1687 	args = &req->args;
1688 
1689 	args->file = file;
1690 	args->op.truncate.length = length;
1691 	args->fn.file_op = __wake_caller;
1692 	args->sem = &channel->sem;
1693 
1694 	channel->send_request(__truncate, req);
1695 	sem_wait(&channel->sem);
1696 	rc = args->rc;
1697 	free_fs_request(req);
1698 
1699 	return rc;
1700 }
1701 
1702 static void
1703 __rw_done(void *ctx, int bserrno)
1704 {
1705 	struct spdk_fs_request *req = ctx;
1706 	struct spdk_fs_cb_args *args = &req->args;
1707 
1708 	spdk_free(args->op.rw.pin_buf);
1709 	args->fn.file_op(args->arg, bserrno);
1710 	free_fs_request(req);
1711 }
1712 
1713 static void
1714 __read_done(void *ctx, int bserrno)
1715 {
1716 	struct spdk_fs_request *req = ctx;
1717 	struct spdk_fs_cb_args *args = &req->args;
1718 
1719 	assert(req != NULL);
1720 	if (args->op.rw.is_read) {
1721 		memcpy(args->iovs[0].iov_base,
1722 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1723 		       args->iovs[0].iov_len);
1724 		__rw_done(req, 0);
1725 	} else {
1726 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1727 		       args->iovs[0].iov_base,
1728 		       args->iovs[0].iov_len);
1729 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1730 				   args->op.rw.pin_buf,
1731 				   args->op.rw.start_lba, args->op.rw.num_lba,
1732 				   __rw_done, req);
1733 	}
1734 }
1735 
1736 static void
1737 __do_blob_read(void *ctx, int fserrno)
1738 {
1739 	struct spdk_fs_request *req = ctx;
1740 	struct spdk_fs_cb_args *args = &req->args;
1741 
1742 	if (fserrno) {
1743 		__rw_done(req, fserrno);
1744 		return;
1745 	}
1746 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1747 			  args->op.rw.pin_buf,
1748 			  args->op.rw.start_lba, args->op.rw.num_lba,
1749 			  __read_done, req);
1750 }
1751 
1752 static void
1753 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1754 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1755 {
1756 	uint64_t end_lba;
1757 
1758 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1759 	*start_lba = offset / *lba_size;
1760 	end_lba = (offset + length - 1) / *lba_size;
1761 	*num_lba = (end_lba - *start_lba + 1);
1762 }
1763 
1764 static void
1765 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1766 	    void *payload, uint64_t offset, uint64_t length,
1767 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1768 {
1769 	struct spdk_fs_request *req;
1770 	struct spdk_fs_cb_args *args;
1771 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1772 	uint64_t start_lba, num_lba, pin_buf_length;
1773 	uint32_t lba_size;
1774 
1775 	if (is_read && offset + length > file->length) {
1776 		cb_fn(cb_arg, -EINVAL);
1777 		return;
1778 	}
1779 
1780 	req = alloc_fs_request_with_iov(channel, 1);
1781 	if (req == NULL) {
1782 		cb_fn(cb_arg, -ENOMEM);
1783 		return;
1784 	}
1785 
1786 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1787 
1788 	args = &req->args;
1789 	args->fn.file_op = cb_fn;
1790 	args->arg = cb_arg;
1791 	args->file = file;
1792 	args->op.rw.channel = channel->bs_channel;
1793 	args->iovs[0].iov_base = payload;
1794 	args->iovs[0].iov_len = (size_t)length;
1795 	args->op.rw.is_read = is_read;
1796 	args->op.rw.offset = offset;
1797 	args->op.rw.blocklen = lba_size;
1798 
1799 	pin_buf_length = num_lba * lba_size;
1800 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1801 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1802 	if (args->op.rw.pin_buf == NULL) {
1803 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1804 			      file->name, offset, length);
1805 		free_fs_request(req);
1806 		cb_fn(cb_arg, -ENOMEM);
1807 		return;
1808 	}
1809 
1810 	args->op.rw.start_lba = start_lba;
1811 	args->op.rw.num_lba = num_lba;
1812 
1813 	if (!is_read && file->length < offset + length) {
1814 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1815 	} else {
1816 		__do_blob_read(req, 0);
1817 	}
1818 }
1819 
1820 void
1821 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1822 		      void *payload, uint64_t offset, uint64_t length,
1823 		      spdk_file_op_complete cb_fn, void *cb_arg)
1824 {
1825 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1826 }
1827 
1828 void
1829 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1830 		     void *payload, uint64_t offset, uint64_t length,
1831 		     spdk_file_op_complete cb_fn, void *cb_arg)
1832 {
1833 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1834 		      file->name, offset, length);
1835 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1836 }
1837 
1838 struct spdk_io_channel *
1839 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1840 {
1841 	struct spdk_io_channel *io_channel;
1842 	struct spdk_fs_channel *fs_channel;
1843 
1844 	io_channel = spdk_get_io_channel(&fs->io_target);
1845 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1846 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1847 	fs_channel->send_request = __send_request_direct;
1848 
1849 	return io_channel;
1850 }
1851 
1852 void
1853 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1854 {
1855 	spdk_put_io_channel(channel);
1856 }
1857 
1858 struct spdk_fs_thread_ctx *
1859 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1860 {
1861 	struct spdk_fs_thread_ctx *ctx;
1862 
1863 	ctx = calloc(1, sizeof(*ctx));
1864 	if (!ctx) {
1865 		return NULL;
1866 	}
1867 
1868 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1869 
1870 	ctx->ch.send_request = fs->send_request;
1871 	ctx->ch.sync = 1;
1872 	pthread_spin_init(&ctx->ch.lock, 0);
1873 
1874 	return ctx;
1875 }
1876 
1877 
1878 void
1879 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1880 {
1881 	assert(ctx->ch.sync == 1);
1882 
1883 	while (true) {
1884 		pthread_spin_lock(&ctx->ch.lock);
1885 		if (ctx->ch.outstanding_reqs == 0) {
1886 			pthread_spin_unlock(&ctx->ch.lock);
1887 			break;
1888 		}
1889 		pthread_spin_unlock(&ctx->ch.lock);
1890 		usleep(1000);
1891 	}
1892 
1893 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1894 	free(ctx);
1895 }
1896 
1897 void
1898 spdk_fs_set_cache_size(uint64_t size_in_mb)
1899 {
1900 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1901 }
1902 
1903 uint64_t
1904 spdk_fs_get_cache_size(void)
1905 {
1906 	return g_fs_cache_size / (1024 * 1024);
1907 }
1908 
1909 static void __file_flush(void *ctx);
1910 
1911 static void *
1912 alloc_cache_memory_buffer(struct spdk_file *context)
1913 {
1914 	struct spdk_file *file;
1915 	void *buf;
1916 
1917 	buf = spdk_mempool_get(g_cache_pool);
1918 	if (buf != NULL) {
1919 		return buf;
1920 	}
1921 
1922 	pthread_spin_lock(&g_caches_lock);
1923 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1924 		if (!file->open_for_writing &&
1925 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1926 		    file != context) {
1927 			break;
1928 		}
1929 	}
1930 	pthread_spin_unlock(&g_caches_lock);
1931 	if (file != NULL) {
1932 		cache_free_buffers(file);
1933 		buf = spdk_mempool_get(g_cache_pool);
1934 		if (buf != NULL) {
1935 			return buf;
1936 		}
1937 	}
1938 
1939 	pthread_spin_lock(&g_caches_lock);
1940 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1941 		if (!file->open_for_writing && file != context) {
1942 			break;
1943 		}
1944 	}
1945 	pthread_spin_unlock(&g_caches_lock);
1946 	if (file != NULL) {
1947 		cache_free_buffers(file);
1948 		buf = spdk_mempool_get(g_cache_pool);
1949 		if (buf != NULL) {
1950 			return buf;
1951 		}
1952 	}
1953 
1954 	pthread_spin_lock(&g_caches_lock);
1955 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1956 		if (file != context) {
1957 			break;
1958 		}
1959 	}
1960 	pthread_spin_unlock(&g_caches_lock);
1961 	if (file != NULL) {
1962 		cache_free_buffers(file);
1963 		buf = spdk_mempool_get(g_cache_pool);
1964 		if (buf != NULL) {
1965 			return buf;
1966 		}
1967 	}
1968 
1969 	return NULL;
1970 }
1971 
1972 static struct cache_buffer *
1973 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1974 {
1975 	struct cache_buffer *buf;
1976 	int count = 0;
1977 
1978 	buf = calloc(1, sizeof(*buf));
1979 	if (buf == NULL) {
1980 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1981 		return NULL;
1982 	}
1983 
1984 	buf->buf = alloc_cache_memory_buffer(file);
1985 	while (buf->buf == NULL) {
1986 		/*
1987 		 * TODO: alloc_cache_memory_buffer() should eventually free
1988 		 *  some buffers.  Need a more sophisticated check here, instead
1989 		 *  of just bailing if 100 tries does not result in getting a
1990 		 *  free buffer.  This will involve using the sync channel's
1991 		 *  semaphore to block until a buffer becomes available.
1992 		 */
1993 		if (count++ == 100) {
1994 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1995 				    file, offset);
1996 			free(buf);
1997 			return NULL;
1998 		}
1999 		buf->buf = alloc_cache_memory_buffer(file);
2000 	}
2001 
2002 	buf->buf_size = CACHE_BUFFER_SIZE;
2003 	buf->offset = offset;
2004 
2005 	pthread_spin_lock(&g_caches_lock);
2006 	if (file->tree->present_mask == 0) {
2007 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2008 	}
2009 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
2010 	pthread_spin_unlock(&g_caches_lock);
2011 
2012 	return buf;
2013 }
2014 
2015 static struct cache_buffer *
2016 cache_append_buffer(struct spdk_file *file)
2017 {
2018 	struct cache_buffer *last;
2019 
2020 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2021 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2022 
2023 	last = cache_insert_buffer(file, file->append_pos);
2024 	if (last == NULL) {
2025 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
2026 		return NULL;
2027 	}
2028 
2029 	file->last = last;
2030 
2031 	return last;
2032 }
2033 
2034 static void __check_sync_reqs(struct spdk_file *file);
2035 
2036 static void
2037 __file_cache_finish_sync(void *ctx, int bserrno)
2038 {
2039 	struct spdk_file *file;
2040 	struct spdk_fs_request *sync_req = ctx;
2041 	struct spdk_fs_cb_args *sync_args;
2042 
2043 	sync_args = &sync_req->args;
2044 	file = sync_args->file;
2045 	pthread_spin_lock(&file->lock);
2046 	file->length_xattr = sync_args->op.sync.length;
2047 	assert(sync_args->op.sync.offset <= file->length_flushed);
2048 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2049 			  0, file->trace_arg_name);
2050 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2051 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2052 	pthread_spin_unlock(&file->lock);
2053 
2054 	sync_args->fn.file_op(sync_args->arg, bserrno);
2055 	pthread_spin_lock(&file->lock);
2056 	free_fs_request(sync_req);
2057 	pthread_spin_unlock(&file->lock);
2058 
2059 	__check_sync_reqs(file);
2060 }
2061 
2062 static void
2063 __check_sync_reqs(struct spdk_file *file)
2064 {
2065 	struct spdk_fs_request *sync_req;
2066 
2067 	pthread_spin_lock(&file->lock);
2068 
2069 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2070 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2071 			break;
2072 		}
2073 	}
2074 
2075 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2076 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2077 		sync_req->args.op.sync.xattr_in_progress = true;
2078 		sync_req->args.op.sync.length = file->length_flushed;
2079 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2080 				    sizeof(file->length_flushed));
2081 
2082 		pthread_spin_unlock(&file->lock);
2083 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2084 				  0, file->trace_arg_name);
2085 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2086 	} else {
2087 		pthread_spin_unlock(&file->lock);
2088 	}
2089 }
2090 
2091 static void
2092 __file_flush_done(void *ctx, int bserrno)
2093 {
2094 	struct spdk_fs_request *req = ctx;
2095 	struct spdk_fs_cb_args *args = &req->args;
2096 	struct spdk_file *file = args->file;
2097 	struct cache_buffer *next = args->op.flush.cache_buffer;
2098 
2099 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2100 
2101 	pthread_spin_lock(&file->lock);
2102 	next->in_progress = false;
2103 	next->bytes_flushed += args->op.flush.length;
2104 	file->length_flushed += args->op.flush.length;
2105 	if (file->length_flushed > file->length) {
2106 		file->length = file->length_flushed;
2107 	}
2108 	if (next->bytes_flushed == next->buf_size) {
2109 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2110 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2111 	}
2112 
2113 	/*
2114 	 * Assert that there is no cached data that extends past the end of the underlying
2115 	 *  blob.
2116 	 */
2117 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2118 	       next->bytes_filled == 0);
2119 
2120 	pthread_spin_unlock(&file->lock);
2121 
2122 	__check_sync_reqs(file);
2123 
2124 	__file_flush(req);
2125 }
2126 
2127 static void
2128 __file_flush(void *ctx)
2129 {
2130 	struct spdk_fs_request *req = ctx;
2131 	struct spdk_fs_cb_args *args = &req->args;
2132 	struct spdk_file *file = args->file;
2133 	struct cache_buffer *next;
2134 	uint64_t offset, length, start_lba, num_lba;
2135 	uint32_t lba_size;
2136 
2137 	pthread_spin_lock(&file->lock);
2138 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2139 	if (next == NULL || next->in_progress ||
2140 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2141 		/*
2142 		 * There is either no data to flush, a flush I/O is already in
2143 		 *  progress, or the next buffer is partially filled but there's no
2144 		 *  outstanding request to sync it.
2145 		 * So return immediately - if a flush I/O is in progress we will flush
2146 		 *  more data after that is completed, or a partial buffer will get flushed
2147 		 *  when it is either filled or the file is synced.
2148 		 */
2149 		free_fs_request(req);
2150 		if (next == NULL) {
2151 			/*
2152 			 * For cases where a file's cache was evicted, and then the
2153 			 *  file was later appended, we will write the data directly
2154 			 *  to disk and bypass cache.  So just update length_flushed
2155 			 *  here to reflect that all data was already written to disk.
2156 			 */
2157 			file->length_flushed = file->append_pos;
2158 		}
2159 		pthread_spin_unlock(&file->lock);
2160 		if (next == NULL) {
2161 			/*
2162 			 * There is no data to flush, but we still need to check for any
2163 			 *  outstanding sync requests to make sure metadata gets updated.
2164 			 */
2165 			__check_sync_reqs(file);
2166 		}
2167 		return;
2168 	}
2169 
2170 	offset = next->offset + next->bytes_flushed;
2171 	length = next->bytes_filled - next->bytes_flushed;
2172 	if (length == 0) {
2173 		free_fs_request(req);
2174 		pthread_spin_unlock(&file->lock);
2175 		/*
2176 		 * There is no data to flush, but we still need to check for any
2177 		 *  outstanding sync requests to make sure metadata gets updated.
2178 		 */
2179 		__check_sync_reqs(file);
2180 		return;
2181 	}
2182 	args->op.flush.length = length;
2183 	args->op.flush.cache_buffer = next;
2184 
2185 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2186 
2187 	next->in_progress = true;
2188 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2189 		     offset, length, start_lba, num_lba);
2190 	pthread_spin_unlock(&file->lock);
2191 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2192 			   next->buf + (start_lba * lba_size) - next->offset,
2193 			   start_lba, num_lba, __file_flush_done, req);
2194 }
2195 
2196 static void
2197 __file_extend_done(void *arg, int bserrno)
2198 {
2199 	struct spdk_fs_cb_args *args = arg;
2200 
2201 	__wake_caller(args, bserrno);
2202 }
2203 
2204 static void
2205 __file_extend_resize_cb(void *_args, int bserrno)
2206 {
2207 	struct spdk_fs_cb_args *args = _args;
2208 	struct spdk_file *file = args->file;
2209 
2210 	if (bserrno) {
2211 		__wake_caller(args, bserrno);
2212 		return;
2213 	}
2214 
2215 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2216 }
2217 
2218 static void
2219 __file_extend_blob(void *_args)
2220 {
2221 	struct spdk_fs_cb_args *args = _args;
2222 	struct spdk_file *file = args->file;
2223 
2224 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2225 }
2226 
2227 static void
2228 __rw_from_file_done(void *ctx, int bserrno)
2229 {
2230 	struct spdk_fs_request *req = ctx;
2231 
2232 	__wake_caller(&req->args, bserrno);
2233 	free_fs_request(req);
2234 }
2235 
2236 static void
2237 __rw_from_file(void *ctx)
2238 {
2239 	struct spdk_fs_request *req = ctx;
2240 	struct spdk_fs_cb_args *args = &req->args;
2241 	struct spdk_file *file = args->file;
2242 
2243 	if (args->op.rw.is_read) {
2244 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2245 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2246 				     __rw_from_file_done, req);
2247 	} else {
2248 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2249 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2250 				      __rw_from_file_done, req);
2251 	}
2252 }
2253 
2254 static int
2255 __send_rw_from_file(struct spdk_file *file, void *payload,
2256 		    uint64_t offset, uint64_t length, bool is_read,
2257 		    struct spdk_fs_channel *channel)
2258 {
2259 	struct spdk_fs_request *req;
2260 	struct spdk_fs_cb_args *args;
2261 
2262 	req = alloc_fs_request_with_iov(channel, 1);
2263 	if (req == NULL) {
2264 		sem_post(&channel->sem);
2265 		return -ENOMEM;
2266 	}
2267 
2268 	args = &req->args;
2269 	args->file = file;
2270 	args->sem = &channel->sem;
2271 	args->iovs[0].iov_base = payload;
2272 	args->iovs[0].iov_len = (size_t)length;
2273 	args->op.rw.offset = offset;
2274 	args->op.rw.is_read = is_read;
2275 	file->fs->send_request(__rw_from_file, req);
2276 	return 0;
2277 }
2278 
2279 int
2280 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2281 		void *payload, uint64_t offset, uint64_t length)
2282 {
2283 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2284 	struct spdk_fs_request *flush_req;
2285 	uint64_t rem_length, copy, blob_size, cluster_sz;
2286 	uint32_t cache_buffers_filled = 0;
2287 	uint8_t *cur_payload;
2288 	struct cache_buffer *last;
2289 
2290 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2291 
2292 	if (length == 0) {
2293 		return 0;
2294 	}
2295 
2296 	if (offset != file->append_pos) {
2297 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2298 		return -EINVAL;
2299 	}
2300 
2301 	pthread_spin_lock(&file->lock);
2302 	file->open_for_writing = true;
2303 
2304 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2305 		cache_append_buffer(file);
2306 	}
2307 
2308 	if (file->last == NULL) {
2309 		int rc;
2310 
2311 		file->append_pos += length;
2312 		pthread_spin_unlock(&file->lock);
2313 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2314 		sem_wait(&channel->sem);
2315 		return rc;
2316 	}
2317 
2318 	blob_size = __file_get_blob_size(file);
2319 
2320 	if ((offset + length) > blob_size) {
2321 		struct spdk_fs_cb_args extend_args = {};
2322 
2323 		cluster_sz = file->fs->bs_opts.cluster_sz;
2324 		extend_args.sem = &channel->sem;
2325 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2326 		extend_args.file = file;
2327 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2328 		pthread_spin_unlock(&file->lock);
2329 		file->fs->send_request(__file_extend_blob, &extend_args);
2330 		sem_wait(&channel->sem);
2331 		if (extend_args.rc) {
2332 			return extend_args.rc;
2333 		}
2334 	}
2335 
2336 	flush_req = alloc_fs_request(channel);
2337 	if (flush_req == NULL) {
2338 		pthread_spin_unlock(&file->lock);
2339 		return -ENOMEM;
2340 	}
2341 
2342 	last = file->last;
2343 	rem_length = length;
2344 	cur_payload = payload;
2345 	while (rem_length > 0) {
2346 		copy = last->buf_size - last->bytes_filled;
2347 		if (copy > rem_length) {
2348 			copy = rem_length;
2349 		}
2350 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2351 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2352 		file->append_pos += copy;
2353 		if (file->length < file->append_pos) {
2354 			file->length = file->append_pos;
2355 		}
2356 		cur_payload += copy;
2357 		last->bytes_filled += copy;
2358 		rem_length -= copy;
2359 		if (last->bytes_filled == last->buf_size) {
2360 			cache_buffers_filled++;
2361 			last = cache_append_buffer(file);
2362 			if (last == NULL) {
2363 				BLOBFS_TRACE(file, "nomem\n");
2364 				free_fs_request(flush_req);
2365 				pthread_spin_unlock(&file->lock);
2366 				return -ENOMEM;
2367 			}
2368 		}
2369 	}
2370 
2371 	pthread_spin_unlock(&file->lock);
2372 
2373 	if (cache_buffers_filled == 0) {
2374 		free_fs_request(flush_req);
2375 		return 0;
2376 	}
2377 
2378 	flush_req->args.file = file;
2379 	file->fs->send_request(__file_flush, flush_req);
2380 	return 0;
2381 }
2382 
2383 static void
2384 __readahead_done(void *ctx, int bserrno)
2385 {
2386 	struct spdk_fs_request *req = ctx;
2387 	struct spdk_fs_cb_args *args = &req->args;
2388 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2389 	struct spdk_file *file = args->file;
2390 
2391 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2392 
2393 	pthread_spin_lock(&file->lock);
2394 	cache_buffer->bytes_filled = args->op.readahead.length;
2395 	cache_buffer->bytes_flushed = args->op.readahead.length;
2396 	cache_buffer->in_progress = false;
2397 	pthread_spin_unlock(&file->lock);
2398 
2399 	free_fs_request(req);
2400 }
2401 
2402 static void
2403 __readahead(void *ctx)
2404 {
2405 	struct spdk_fs_request *req = ctx;
2406 	struct spdk_fs_cb_args *args = &req->args;
2407 	struct spdk_file *file = args->file;
2408 	uint64_t offset, length, start_lba, num_lba;
2409 	uint32_t lba_size;
2410 
2411 	offset = args->op.readahead.offset;
2412 	length = args->op.readahead.length;
2413 	assert(length > 0);
2414 
2415 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2416 
2417 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2418 		     offset, length, start_lba, num_lba);
2419 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2420 			  args->op.readahead.cache_buffer->buf,
2421 			  start_lba, num_lba, __readahead_done, req);
2422 }
2423 
2424 static uint64_t
2425 __next_cache_buffer_offset(uint64_t offset)
2426 {
2427 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2428 }
2429 
2430 static void
2431 check_readahead(struct spdk_file *file, uint64_t offset,
2432 		struct spdk_fs_channel *channel)
2433 {
2434 	struct spdk_fs_request *req;
2435 	struct spdk_fs_cb_args *args;
2436 
2437 	offset = __next_cache_buffer_offset(offset);
2438 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2439 		return;
2440 	}
2441 
2442 	req = alloc_fs_request(channel);
2443 	if (req == NULL) {
2444 		return;
2445 	}
2446 	args = &req->args;
2447 
2448 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2449 
2450 	args->file = file;
2451 	args->op.readahead.offset = offset;
2452 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2453 	if (!args->op.readahead.cache_buffer) {
2454 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2455 		free_fs_request(req);
2456 		return;
2457 	}
2458 
2459 	args->op.readahead.cache_buffer->in_progress = true;
2460 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2461 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2462 	} else {
2463 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2464 	}
2465 	file->fs->send_request(__readahead, req);
2466 }
2467 
2468 static int
2469 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2470 	    struct spdk_fs_channel *channel)
2471 {
2472 	struct cache_buffer *buf;
2473 	int rc;
2474 
2475 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2476 	if (buf == NULL) {
2477 		pthread_spin_unlock(&file->lock);
2478 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2479 		pthread_spin_lock(&file->lock);
2480 		return rc;
2481 	}
2482 
2483 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2484 		length = buf->offset + buf->bytes_filled - offset;
2485 	}
2486 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2487 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2488 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2489 		pthread_spin_lock(&g_caches_lock);
2490 		spdk_tree_remove_buffer(file->tree, buf);
2491 		if (file->tree->present_mask == 0) {
2492 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2493 		}
2494 		pthread_spin_unlock(&g_caches_lock);
2495 	}
2496 
2497 	sem_post(&channel->sem);
2498 	return 0;
2499 }
2500 
2501 int64_t
2502 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2503 	       void *payload, uint64_t offset, uint64_t length)
2504 {
2505 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2506 	uint64_t final_offset, final_length;
2507 	uint32_t sub_reads = 0;
2508 	int rc = 0;
2509 
2510 	pthread_spin_lock(&file->lock);
2511 
2512 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2513 
2514 	file->open_for_writing = false;
2515 
2516 	if (length == 0 || offset >= file->append_pos) {
2517 		pthread_spin_unlock(&file->lock);
2518 		return 0;
2519 	}
2520 
2521 	if (offset + length > file->append_pos) {
2522 		length = file->append_pos - offset;
2523 	}
2524 
2525 	if (offset != file->next_seq_offset) {
2526 		file->seq_byte_count = 0;
2527 	}
2528 	file->seq_byte_count += length;
2529 	file->next_seq_offset = offset + length;
2530 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2531 		check_readahead(file, offset, channel);
2532 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2533 	}
2534 
2535 	final_length = 0;
2536 	final_offset = offset + length;
2537 	while (offset < final_offset) {
2538 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2539 		if (length > (final_offset - offset)) {
2540 			length = final_offset - offset;
2541 		}
2542 
2543 		sub_reads++;
2544 		rc = __file_read(file, payload, offset, length, channel);
2545 		if (rc == 0) {
2546 			final_length += length;
2547 		} else {
2548 			break;
2549 		}
2550 		payload += length;
2551 		offset += length;
2552 	}
2553 	pthread_spin_unlock(&file->lock);
2554 	while (sub_reads-- > 0) {
2555 		sem_wait(&channel->sem);
2556 	}
2557 	if (rc == 0) {
2558 		return final_length;
2559 	} else {
2560 		return rc;
2561 	}
2562 }
2563 
2564 static void
2565 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2566 	   spdk_file_op_complete cb_fn, void *cb_arg)
2567 {
2568 	struct spdk_fs_request *sync_req;
2569 	struct spdk_fs_request *flush_req;
2570 	struct spdk_fs_cb_args *sync_args;
2571 	struct spdk_fs_cb_args *flush_args;
2572 
2573 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2574 
2575 	pthread_spin_lock(&file->lock);
2576 	if (file->append_pos <= file->length_xattr) {
2577 		BLOBFS_TRACE(file, "done - file already synced\n");
2578 		pthread_spin_unlock(&file->lock);
2579 		cb_fn(cb_arg, 0);
2580 		return;
2581 	}
2582 
2583 	sync_req = alloc_fs_request(channel);
2584 	if (!sync_req) {
2585 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2586 		pthread_spin_unlock(&file->lock);
2587 		cb_fn(cb_arg, -ENOMEM);
2588 		return;
2589 	}
2590 	sync_args = &sync_req->args;
2591 
2592 	flush_req = alloc_fs_request(channel);
2593 	if (!flush_req) {
2594 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2595 		pthread_spin_unlock(&file->lock);
2596 		cb_fn(cb_arg, -ENOMEM);
2597 		return;
2598 	}
2599 	flush_args = &flush_req->args;
2600 
2601 	sync_args->file = file;
2602 	sync_args->fn.file_op = cb_fn;
2603 	sync_args->arg = cb_arg;
2604 	sync_args->op.sync.offset = file->append_pos;
2605 	sync_args->op.sync.xattr_in_progress = false;
2606 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2607 	pthread_spin_unlock(&file->lock);
2608 
2609 	flush_args->file = file;
2610 	channel->send_request(__file_flush, flush_req);
2611 }
2612 
2613 int
2614 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2615 {
2616 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2617 	struct spdk_fs_cb_args args = {};
2618 
2619 	args.sem = &channel->sem;
2620 	_file_sync(file, channel, __wake_caller, &args);
2621 	sem_wait(&channel->sem);
2622 
2623 	return args.rc;
2624 }
2625 
2626 void
2627 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2628 		     spdk_file_op_complete cb_fn, void *cb_arg)
2629 {
2630 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2631 
2632 	_file_sync(file, channel, cb_fn, cb_arg);
2633 }
2634 
2635 void
2636 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2637 {
2638 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2639 	file->priority = priority;
2640 
2641 }
2642 
2643 /*
2644  * Close routines
2645  */
2646 
2647 static void
2648 __file_close_async_done(void *ctx, int bserrno)
2649 {
2650 	struct spdk_fs_request *req = ctx;
2651 	struct spdk_fs_cb_args *args = &req->args;
2652 	struct spdk_file *file = args->file;
2653 
2654 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2655 
2656 	if (file->is_deleted) {
2657 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2658 		return;
2659 	}
2660 
2661 	args->fn.file_op(args->arg, bserrno);
2662 	free_fs_request(req);
2663 }
2664 
2665 static void
2666 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2667 {
2668 	struct spdk_blob *blob;
2669 
2670 	pthread_spin_lock(&file->lock);
2671 	if (file->ref_count == 0) {
2672 		pthread_spin_unlock(&file->lock);
2673 		__file_close_async_done(req, -EBADF);
2674 		return;
2675 	}
2676 
2677 	file->ref_count--;
2678 	if (file->ref_count > 0) {
2679 		pthread_spin_unlock(&file->lock);
2680 		req->args.fn.file_op(req->args.arg, 0);
2681 		free_fs_request(req);
2682 		return;
2683 	}
2684 
2685 	pthread_spin_unlock(&file->lock);
2686 
2687 	blob = file->blob;
2688 	file->blob = NULL;
2689 	spdk_blob_close(blob, __file_close_async_done, req);
2690 }
2691 
2692 static void
2693 __file_close_async__sync_done(void *arg, int fserrno)
2694 {
2695 	struct spdk_fs_request *req = arg;
2696 	struct spdk_fs_cb_args *args = &req->args;
2697 
2698 	__file_close_async(args->file, req);
2699 }
2700 
2701 void
2702 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2703 {
2704 	struct spdk_fs_request *req;
2705 	struct spdk_fs_cb_args *args;
2706 
2707 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2708 	if (req == NULL) {
2709 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2710 		cb_fn(cb_arg, -ENOMEM);
2711 		return;
2712 	}
2713 
2714 	args = &req->args;
2715 	args->file = file;
2716 	args->fn.file_op = cb_fn;
2717 	args->arg = cb_arg;
2718 
2719 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2720 }
2721 
2722 static void
2723 __file_close(void *arg)
2724 {
2725 	struct spdk_fs_request *req = arg;
2726 	struct spdk_fs_cb_args *args = &req->args;
2727 	struct spdk_file *file = args->file;
2728 
2729 	__file_close_async(file, req);
2730 }
2731 
2732 int
2733 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2734 {
2735 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2736 	struct spdk_fs_request *req;
2737 	struct spdk_fs_cb_args *args;
2738 
2739 	req = alloc_fs_request(channel);
2740 	if (req == NULL) {
2741 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2742 		return -ENOMEM;
2743 	}
2744 
2745 	args = &req->args;
2746 
2747 	spdk_file_sync(file, ctx);
2748 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2749 	args->file = file;
2750 	args->sem = &channel->sem;
2751 	args->fn.file_op = __wake_caller;
2752 	args->arg = args;
2753 	channel->send_request(__file_close, req);
2754 	sem_wait(&channel->sem);
2755 
2756 	return args->rc;
2757 }
2758 
2759 int
2760 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2761 {
2762 	if (size < sizeof(spdk_blob_id)) {
2763 		return -EINVAL;
2764 	}
2765 
2766 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2767 
2768 	return sizeof(spdk_blob_id);
2769 }
2770 
2771 static void
2772 cache_free_buffers(struct spdk_file *file)
2773 {
2774 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2775 	pthread_spin_lock(&file->lock);
2776 	pthread_spin_lock(&g_caches_lock);
2777 	if (file->tree->present_mask == 0) {
2778 		pthread_spin_unlock(&g_caches_lock);
2779 		pthread_spin_unlock(&file->lock);
2780 		return;
2781 	}
2782 	spdk_tree_free_buffers(file->tree);
2783 
2784 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2785 	/* If not freed, put it in the end of the queue */
2786 	if (file->tree->present_mask != 0) {
2787 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2788 	}
2789 	file->last = NULL;
2790 	pthread_spin_unlock(&g_caches_lock);
2791 	pthread_spin_unlock(&file->lock);
2792 }
2793 
2794 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2795 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2796