xref: /spdk/lib/blobfs/blobfs.c (revision e967dcd245f096f102d811e5c6d8aeb96c172e3e)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
58 static struct spdk_mempool *g_cache_pool;
59 static TAILQ_HEAD(, spdk_file) g_caches;
60 static int g_fs_count = 0;
61 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_spinlock_t g_caches_lock;
63 
64 #define TRACE_GROUP_BLOBFS	0x7
65 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
66 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
67 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
68 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
69 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
70 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
71 
72 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
73 {
74 	spdk_trace_register_description("BLOBFS_XATTR_START",
75 					TRACE_BLOBFS_XATTR_START,
76 					OWNER_NONE, OBJECT_NONE, 0,
77 					SPDK_TRACE_ARG_TYPE_STR,
78 					"file:    ");
79 	spdk_trace_register_description("BLOBFS_XATTR_END",
80 					TRACE_BLOBFS_XATTR_END,
81 					OWNER_NONE, OBJECT_NONE, 0,
82 					SPDK_TRACE_ARG_TYPE_STR,
83 					"file:    ");
84 	spdk_trace_register_description("BLOBFS_OPEN",
85 					TRACE_BLOBFS_OPEN,
86 					OWNER_NONE, OBJECT_NONE, 0,
87 					SPDK_TRACE_ARG_TYPE_STR,
88 					"file:    ");
89 	spdk_trace_register_description("BLOBFS_CLOSE",
90 					TRACE_BLOBFS_CLOSE,
91 					OWNER_NONE, OBJECT_NONE, 0,
92 					SPDK_TRACE_ARG_TYPE_STR,
93 					"file:    ");
94 	spdk_trace_register_description("BLOBFS_DELETE_START",
95 					TRACE_BLOBFS_DELETE_START,
96 					OWNER_NONE, OBJECT_NONE, 0,
97 					SPDK_TRACE_ARG_TYPE_STR,
98 					"file:    ");
99 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
100 					TRACE_BLOBFS_DELETE_DONE,
101 					OWNER_NONE, OBJECT_NONE, 0,
102 					SPDK_TRACE_ARG_TYPE_STR,
103 					"file:    ");
104 }
105 
106 void
107 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
108 {
109 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
110 	free(cache_buffer);
111 }
112 
113 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
114 
115 struct spdk_file {
116 	struct spdk_filesystem	*fs;
117 	struct spdk_blob	*blob;
118 	char			*name;
119 	uint64_t		trace_arg_name;
120 	uint64_t		length;
121 	bool                    is_deleted;
122 	bool			open_for_writing;
123 	uint64_t		length_flushed;
124 	uint64_t		length_xattr;
125 	uint64_t		append_pos;
126 	uint64_t		seq_byte_count;
127 	uint64_t		next_seq_offset;
128 	uint32_t		priority;
129 	TAILQ_ENTRY(spdk_file)	tailq;
130 	spdk_blob_id		blobid;
131 	uint32_t		ref_count;
132 	pthread_spinlock_t	lock;
133 	struct cache_buffer	*last;
134 	struct cache_tree	*tree;
135 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
136 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
137 	TAILQ_ENTRY(spdk_file)	cache_tailq;
138 };
139 
140 struct spdk_deleted_file {
141 	spdk_blob_id	id;
142 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
143 };
144 
145 struct spdk_filesystem {
146 	struct spdk_blob_store	*bs;
147 	TAILQ_HEAD(, spdk_file)	files;
148 	struct spdk_bs_opts	bs_opts;
149 	struct spdk_bs_dev	*bdev;
150 	fs_send_request_fn	send_request;
151 
152 	struct {
153 		uint32_t		max_ops;
154 		struct spdk_io_channel	*sync_io_channel;
155 		struct spdk_fs_channel	*sync_fs_channel;
156 	} sync_target;
157 
158 	struct {
159 		uint32_t		max_ops;
160 		struct spdk_io_channel	*md_io_channel;
161 		struct spdk_fs_channel	*md_fs_channel;
162 	} md_target;
163 
164 	struct {
165 		uint32_t		max_ops;
166 	} io_target;
167 };
168 
169 struct spdk_fs_cb_args {
170 	union {
171 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
172 		spdk_fs_op_complete			fs_op;
173 		spdk_file_op_with_handle_complete	file_op_with_handle;
174 		spdk_file_op_complete			file_op;
175 		spdk_file_stat_op_complete		stat_op;
176 	} fn;
177 	void *arg;
178 	sem_t *sem;
179 	struct spdk_filesystem *fs;
180 	struct spdk_file *file;
181 	int rc;
182 	struct iovec *iovs;
183 	uint32_t iovcnt;
184 	struct iovec iov;
185 	union {
186 		struct {
187 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
188 		} fs_load;
189 		struct {
190 			uint64_t	length;
191 		} truncate;
192 		struct {
193 			struct spdk_io_channel	*channel;
194 			void		*pin_buf;
195 			int		is_read;
196 			off_t		offset;
197 			size_t		length;
198 			uint64_t	start_lba;
199 			uint64_t	num_lba;
200 			uint32_t	blocklen;
201 		} rw;
202 		struct {
203 			const char	*old_name;
204 			const char	*new_name;
205 		} rename;
206 		struct {
207 			struct cache_buffer	*cache_buffer;
208 			uint64_t		length;
209 		} flush;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 			uint64_t		offset;
214 		} readahead;
215 		struct {
216 			/* offset of the file when the sync request was made */
217 			uint64_t			offset;
218 			TAILQ_ENTRY(spdk_fs_request)	tailq;
219 			bool				xattr_in_progress;
220 			/* length written to the xattr for this file - this should
221 			 * always be the same as the offset if only one thread is
222 			 * writing to the file, but could differ if multiple threads
223 			 * are appending
224 			 */
225 			uint64_t			length;
226 		} sync;
227 		struct {
228 			uint32_t			num_clusters;
229 		} resize;
230 		struct {
231 			const char	*name;
232 			uint32_t	flags;
233 			TAILQ_ENTRY(spdk_fs_request)	tailq;
234 		} open;
235 		struct {
236 			const char		*name;
237 			struct spdk_blob	*blob;
238 		} create;
239 		struct {
240 			const char	*name;
241 		} delete;
242 		struct {
243 			const char	*name;
244 		} stat;
245 	} op;
246 };
247 
248 static void cache_free_buffers(struct spdk_file *file);
249 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
250 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
251 
252 void
253 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
254 {
255 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
256 }
257 
258 static void
259 __initialize_cache(void)
260 {
261 	assert(g_cache_pool == NULL);
262 
263 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
264 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
265 					   CACHE_BUFFER_SIZE,
266 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
267 					   SPDK_ENV_SOCKET_ID_ANY);
268 	if (!g_cache_pool) {
269 		SPDK_ERRLOG("Create mempool failed, you may "
270 			    "increase the memory and try again\n");
271 		assert(false);
272 	}
273 	TAILQ_INIT(&g_caches);
274 	pthread_spin_init(&g_caches_lock, 0);
275 }
276 
277 static void
278 __free_cache(void)
279 {
280 	assert(g_cache_pool != NULL);
281 
282 	spdk_mempool_free(g_cache_pool);
283 	g_cache_pool = NULL;
284 }
285 
286 static uint64_t
287 __file_get_blob_size(struct spdk_file *file)
288 {
289 	uint64_t cluster_sz;
290 
291 	cluster_sz = file->fs->bs_opts.cluster_sz;
292 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
293 }
294 
295 struct spdk_fs_request {
296 	struct spdk_fs_cb_args		args;
297 	TAILQ_ENTRY(spdk_fs_request)	link;
298 	struct spdk_fs_channel		*channel;
299 };
300 
301 struct spdk_fs_channel {
302 	struct spdk_fs_request		*req_mem;
303 	TAILQ_HEAD(, spdk_fs_request)	reqs;
304 	sem_t				sem;
305 	struct spdk_filesystem		*fs;
306 	struct spdk_io_channel		*bs_channel;
307 	fs_send_request_fn		send_request;
308 	bool				sync;
309 	uint32_t			outstanding_reqs;
310 	pthread_spinlock_t		lock;
311 };
312 
313 /* For now, this is effectively an alias. But eventually we'll shift
314  * some data members over. */
315 struct spdk_fs_thread_ctx {
316 	struct spdk_fs_channel	ch;
317 };
318 
319 static struct spdk_fs_request *
320 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
321 {
322 	struct spdk_fs_request *req;
323 	struct iovec *iovs = NULL;
324 
325 	if (iovcnt > 1) {
326 		iovs = calloc(iovcnt, sizeof(struct iovec));
327 		if (!iovs) {
328 			return NULL;
329 		}
330 	}
331 
332 	if (channel->sync) {
333 		pthread_spin_lock(&channel->lock);
334 	}
335 
336 	req = TAILQ_FIRST(&channel->reqs);
337 	if (req) {
338 		channel->outstanding_reqs++;
339 		TAILQ_REMOVE(&channel->reqs, req, link);
340 	}
341 
342 	if (channel->sync) {
343 		pthread_spin_unlock(&channel->lock);
344 	}
345 
346 	if (req == NULL) {
347 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
348 		free(iovs);
349 		return NULL;
350 	}
351 	memset(req, 0, sizeof(*req));
352 	req->channel = channel;
353 	if (iovcnt > 1) {
354 		req->args.iovs = iovs;
355 	} else {
356 		req->args.iovs = &req->args.iov;
357 	}
358 	req->args.iovcnt = iovcnt;
359 
360 	return req;
361 }
362 
363 static struct spdk_fs_request *
364 alloc_fs_request(struct spdk_fs_channel *channel)
365 {
366 	return alloc_fs_request_with_iov(channel, 0);
367 }
368 
369 static void
370 free_fs_request(struct spdk_fs_request *req)
371 {
372 	struct spdk_fs_channel *channel = req->channel;
373 
374 	if (req->args.iovcnt > 1) {
375 		free(req->args.iovs);
376 	}
377 
378 	if (channel->sync) {
379 		pthread_spin_lock(&channel->lock);
380 	}
381 
382 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
383 	channel->outstanding_reqs--;
384 
385 	if (channel->sync) {
386 		pthread_spin_unlock(&channel->lock);
387 	}
388 }
389 
390 static int
391 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
392 			uint32_t max_ops)
393 {
394 	uint32_t i;
395 
396 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
397 	if (!channel->req_mem) {
398 		return -1;
399 	}
400 
401 	channel->outstanding_reqs = 0;
402 	TAILQ_INIT(&channel->reqs);
403 	sem_init(&channel->sem, 0, 0);
404 
405 	for (i = 0; i < max_ops; i++) {
406 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
407 	}
408 
409 	channel->fs = fs;
410 
411 	return 0;
412 }
413 
414 static int
415 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
416 {
417 	struct spdk_filesystem		*fs;
418 	struct spdk_fs_channel		*channel = ctx_buf;
419 
420 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
421 
422 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
423 }
424 
425 static int
426 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
427 {
428 	struct spdk_filesystem		*fs;
429 	struct spdk_fs_channel		*channel = ctx_buf;
430 
431 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
432 
433 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
434 }
435 
436 static int
437 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
438 {
439 	struct spdk_filesystem		*fs;
440 	struct spdk_fs_channel		*channel = ctx_buf;
441 
442 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
443 
444 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
445 }
446 
447 static void
448 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
449 {
450 	struct spdk_fs_channel *channel = ctx_buf;
451 
452 	if (channel->outstanding_reqs > 0) {
453 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
454 			    channel->outstanding_reqs);
455 	}
456 
457 	free(channel->req_mem);
458 	if (channel->bs_channel != NULL) {
459 		spdk_bs_free_io_channel(channel->bs_channel);
460 	}
461 }
462 
463 static void
464 __send_request_direct(fs_request_fn fn, void *arg)
465 {
466 	fn(arg);
467 }
468 
469 static void
470 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
471 {
472 	fs->bs = bs;
473 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
474 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
475 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
476 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
477 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
478 
479 	pthread_mutex_lock(&g_cache_init_lock);
480 	if (g_fs_count == 0) {
481 		__initialize_cache();
482 	}
483 	g_fs_count++;
484 	pthread_mutex_unlock(&g_cache_init_lock);
485 }
486 
487 static void
488 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
489 {
490 	struct spdk_fs_request *req = ctx;
491 	struct spdk_fs_cb_args *args = &req->args;
492 	struct spdk_filesystem *fs = args->fs;
493 
494 	if (bserrno == 0) {
495 		common_fs_bs_init(fs, bs);
496 	} else {
497 		free(fs);
498 		fs = NULL;
499 	}
500 
501 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
502 	free_fs_request(req);
503 }
504 
505 static void
506 fs_conf_parse(void)
507 {
508 	struct spdk_conf_section *sp;
509 
510 	sp = spdk_conf_find_section(NULL, "Blobfs");
511 	if (sp == NULL) {
512 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
513 		return;
514 	}
515 
516 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
517 	if (g_fs_cache_buffer_shift <= 0) {
518 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
519 	}
520 }
521 
522 static struct spdk_filesystem *
523 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
524 {
525 	struct spdk_filesystem *fs;
526 
527 	fs = calloc(1, sizeof(*fs));
528 	if (fs == NULL) {
529 		return NULL;
530 	}
531 
532 	fs->bdev = dev;
533 	fs->send_request = send_request_fn;
534 	TAILQ_INIT(&fs->files);
535 
536 	fs->md_target.max_ops = 512;
537 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
538 				sizeof(struct spdk_fs_channel), "blobfs_md");
539 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
540 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
541 
542 	fs->sync_target.max_ops = 512;
543 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
544 				sizeof(struct spdk_fs_channel), "blobfs_sync");
545 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
546 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
547 
548 	fs->io_target.max_ops = 512;
549 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
550 				sizeof(struct spdk_fs_channel), "blobfs_io");
551 
552 	return fs;
553 }
554 
555 static void
556 __wake_caller(void *arg, int fserrno)
557 {
558 	struct spdk_fs_cb_args *args = arg;
559 
560 	args->rc = fserrno;
561 	sem_post(args->sem);
562 }
563 
564 void
565 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
566 	     fs_send_request_fn send_request_fn,
567 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
568 {
569 	struct spdk_filesystem *fs;
570 	struct spdk_fs_request *req;
571 	struct spdk_fs_cb_args *args;
572 	struct spdk_bs_opts opts = {};
573 
574 	fs = fs_alloc(dev, send_request_fn);
575 	if (fs == NULL) {
576 		cb_fn(cb_arg, NULL, -ENOMEM);
577 		return;
578 	}
579 
580 	fs_conf_parse();
581 
582 	req = alloc_fs_request(fs->md_target.md_fs_channel);
583 	if (req == NULL) {
584 		spdk_fs_free_io_channels(fs);
585 		spdk_fs_io_device_unregister(fs);
586 		cb_fn(cb_arg, NULL, -ENOMEM);
587 		return;
588 	}
589 
590 	args = &req->args;
591 	args->fn.fs_op_with_handle = cb_fn;
592 	args->arg = cb_arg;
593 	args->fs = fs;
594 
595 	spdk_bs_opts_init(&opts);
596 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
597 	if (opt) {
598 		opts.cluster_sz = opt->cluster_sz;
599 	}
600 	spdk_bs_init(dev, &opts, init_cb, req);
601 }
602 
603 static struct spdk_file *
604 file_alloc(struct spdk_filesystem *fs)
605 {
606 	struct spdk_file *file;
607 
608 	file = calloc(1, sizeof(*file));
609 	if (file == NULL) {
610 		return NULL;
611 	}
612 
613 	file->tree = calloc(1, sizeof(*file->tree));
614 	if (file->tree == NULL) {
615 		free(file);
616 		return NULL;
617 	}
618 
619 	file->fs = fs;
620 	TAILQ_INIT(&file->open_requests);
621 	TAILQ_INIT(&file->sync_requests);
622 	pthread_spin_init(&file->lock, 0);
623 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
624 	file->priority = SPDK_FILE_PRIORITY_LOW;
625 	return file;
626 }
627 
628 static void fs_load_done(void *ctx, int bserrno);
629 
630 static int
631 _handle_deleted_files(struct spdk_fs_request *req)
632 {
633 	struct spdk_fs_cb_args *args = &req->args;
634 	struct spdk_filesystem *fs = args->fs;
635 
636 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
637 		struct spdk_deleted_file *deleted_file;
638 
639 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
640 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
641 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
642 		free(deleted_file);
643 		return 0;
644 	}
645 
646 	return 1;
647 }
648 
649 static void
650 fs_load_done(void *ctx, int bserrno)
651 {
652 	struct spdk_fs_request *req = ctx;
653 	struct spdk_fs_cb_args *args = &req->args;
654 	struct spdk_filesystem *fs = args->fs;
655 
656 	/* The filesystem has been loaded.  Now check if there are any files that
657 	 *  were marked for deletion before last unload.  Do not complete the
658 	 *  fs_load callback until all of them have been deleted on disk.
659 	 */
660 	if (_handle_deleted_files(req) == 0) {
661 		/* We found a file that's been marked for deleting but not actually
662 		 *  deleted yet.  This function will get called again once the delete
663 		 *  operation is completed.
664 		 */
665 		return;
666 	}
667 
668 	args->fn.fs_op_with_handle(args->arg, fs, 0);
669 	free_fs_request(req);
670 
671 }
672 
673 static void
674 _file_build_trace_arg_name(struct spdk_file *f)
675 {
676 	f->trace_arg_name = 0;
677 	memcpy(&f->trace_arg_name, f->name,
678 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
679 }
680 
681 static void
682 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
683 {
684 	struct spdk_fs_request *req = ctx;
685 	struct spdk_fs_cb_args *args = &req->args;
686 	struct spdk_filesystem *fs = args->fs;
687 	uint64_t *length;
688 	const char *name;
689 	uint32_t *is_deleted;
690 	size_t value_len;
691 
692 	if (rc < 0) {
693 		args->fn.fs_op_with_handle(args->arg, fs, rc);
694 		free_fs_request(req);
695 		return;
696 	}
697 
698 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
699 	if (rc < 0) {
700 		args->fn.fs_op_with_handle(args->arg, fs, rc);
701 		free_fs_request(req);
702 		return;
703 	}
704 
705 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
706 	if (rc < 0) {
707 		args->fn.fs_op_with_handle(args->arg, fs, rc);
708 		free_fs_request(req);
709 		return;
710 	}
711 
712 	assert(value_len == 8);
713 
714 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
715 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
716 	if (rc < 0) {
717 		struct spdk_file *f;
718 
719 		f = file_alloc(fs);
720 		if (f == NULL) {
721 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
722 			free_fs_request(req);
723 			return;
724 		}
725 
726 		f->name = strdup(name);
727 		_file_build_trace_arg_name(f);
728 		f->blobid = spdk_blob_get_id(blob);
729 		f->length = *length;
730 		f->length_flushed = *length;
731 		f->length_xattr = *length;
732 		f->append_pos = *length;
733 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
734 	} else {
735 		struct spdk_deleted_file *deleted_file;
736 
737 		deleted_file = calloc(1, sizeof(*deleted_file));
738 		if (deleted_file == NULL) {
739 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
740 			free_fs_request(req);
741 			return;
742 		}
743 		deleted_file->id = spdk_blob_get_id(blob);
744 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
745 	}
746 }
747 
748 static void
749 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
750 {
751 	struct spdk_fs_request *req = ctx;
752 	struct spdk_fs_cb_args *args = &req->args;
753 	struct spdk_filesystem *fs = args->fs;
754 	struct spdk_bs_type bstype;
755 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
756 	static const struct spdk_bs_type zeros;
757 
758 	if (bserrno != 0) {
759 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
760 		free_fs_request(req);
761 		free(fs);
762 		return;
763 	}
764 
765 	bstype = spdk_bs_get_bstype(bs);
766 
767 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
768 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
769 		spdk_bs_set_bstype(bs, blobfs_type);
770 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
771 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
772 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
773 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
774 		free_fs_request(req);
775 		free(fs);
776 		return;
777 	}
778 
779 	common_fs_bs_init(fs, bs);
780 	fs_load_done(req, 0);
781 }
782 
783 static void
784 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
785 {
786 	assert(fs != NULL);
787 	spdk_io_device_unregister(&fs->md_target, NULL);
788 	spdk_io_device_unregister(&fs->sync_target, NULL);
789 	spdk_io_device_unregister(&fs->io_target, NULL);
790 	free(fs);
791 }
792 
793 static void
794 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
795 {
796 	assert(fs != NULL);
797 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
798 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
799 }
800 
801 void
802 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
803 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
804 {
805 	struct spdk_filesystem *fs;
806 	struct spdk_fs_cb_args *args;
807 	struct spdk_fs_request *req;
808 	struct spdk_bs_opts	bs_opts;
809 
810 	fs = fs_alloc(dev, send_request_fn);
811 	if (fs == NULL) {
812 		cb_fn(cb_arg, NULL, -ENOMEM);
813 		return;
814 	}
815 
816 	fs_conf_parse();
817 
818 	req = alloc_fs_request(fs->md_target.md_fs_channel);
819 	if (req == NULL) {
820 		spdk_fs_free_io_channels(fs);
821 		spdk_fs_io_device_unregister(fs);
822 		cb_fn(cb_arg, NULL, -ENOMEM);
823 		return;
824 	}
825 
826 	args = &req->args;
827 	args->fn.fs_op_with_handle = cb_fn;
828 	args->arg = cb_arg;
829 	args->fs = fs;
830 	TAILQ_INIT(&args->op.fs_load.deleted_files);
831 	spdk_bs_opts_init(&bs_opts);
832 	bs_opts.iter_cb_fn = iter_cb;
833 	bs_opts.iter_cb_arg = req;
834 	spdk_bs_load(dev, &bs_opts, load_cb, req);
835 }
836 
837 static void
838 unload_cb(void *ctx, int bserrno)
839 {
840 	struct spdk_fs_request *req = ctx;
841 	struct spdk_fs_cb_args *args = &req->args;
842 	struct spdk_filesystem *fs = args->fs;
843 	struct spdk_file *file, *tmp;
844 
845 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
846 		TAILQ_REMOVE(&fs->files, file, tailq);
847 		cache_free_buffers(file);
848 		free(file->name);
849 		free(file->tree);
850 		free(file);
851 	}
852 
853 	pthread_mutex_lock(&g_cache_init_lock);
854 	g_fs_count--;
855 	if (g_fs_count == 0) {
856 		__free_cache();
857 	}
858 	pthread_mutex_unlock(&g_cache_init_lock);
859 
860 	args->fn.fs_op(args->arg, bserrno);
861 	free(req);
862 
863 	spdk_fs_io_device_unregister(fs);
864 }
865 
866 void
867 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
868 {
869 	struct spdk_fs_request *req;
870 	struct spdk_fs_cb_args *args;
871 
872 	/*
873 	 * We must free the md_channel before unloading the blobstore, so just
874 	 *  allocate this request from the general heap.
875 	 */
876 	req = calloc(1, sizeof(*req));
877 	if (req == NULL) {
878 		cb_fn(cb_arg, -ENOMEM);
879 		return;
880 	}
881 
882 	args = &req->args;
883 	args->fn.fs_op = cb_fn;
884 	args->arg = cb_arg;
885 	args->fs = fs;
886 
887 	spdk_fs_free_io_channels(fs);
888 	spdk_bs_unload(fs->bs, unload_cb, req);
889 }
890 
891 static struct spdk_file *
892 fs_find_file(struct spdk_filesystem *fs, const char *name)
893 {
894 	struct spdk_file *file;
895 
896 	TAILQ_FOREACH(file, &fs->files, tailq) {
897 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
898 			return file;
899 		}
900 	}
901 
902 	return NULL;
903 }
904 
905 void
906 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
907 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
908 {
909 	struct spdk_file_stat stat;
910 	struct spdk_file *f = NULL;
911 
912 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
913 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
914 		return;
915 	}
916 
917 	f = fs_find_file(fs, name);
918 	if (f != NULL) {
919 		stat.blobid = f->blobid;
920 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
921 		cb_fn(cb_arg, &stat, 0);
922 		return;
923 	}
924 
925 	cb_fn(cb_arg, NULL, -ENOENT);
926 }
927 
928 static void
929 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
930 {
931 	struct spdk_fs_request *req = arg;
932 	struct spdk_fs_cb_args *args = &req->args;
933 
934 	args->rc = fserrno;
935 	if (fserrno == 0) {
936 		memcpy(args->arg, stat, sizeof(*stat));
937 	}
938 	sem_post(args->sem);
939 }
940 
941 static void
942 __file_stat(void *arg)
943 {
944 	struct spdk_fs_request *req = arg;
945 	struct spdk_fs_cb_args *args = &req->args;
946 
947 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
948 				args->fn.stat_op, req);
949 }
950 
951 int
952 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
953 		  const char *name, struct spdk_file_stat *stat)
954 {
955 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
956 	struct spdk_fs_request *req;
957 	int rc;
958 
959 	req = alloc_fs_request(channel);
960 	if (req == NULL) {
961 		return -ENOMEM;
962 	}
963 
964 	req->args.fs = fs;
965 	req->args.op.stat.name = name;
966 	req->args.fn.stat_op = __copy_stat;
967 	req->args.arg = stat;
968 	req->args.sem = &channel->sem;
969 	channel->send_request(__file_stat, req);
970 	sem_wait(&channel->sem);
971 
972 	rc = req->args.rc;
973 	free_fs_request(req);
974 
975 	return rc;
976 }
977 
978 static void
979 fs_create_blob_close_cb(void *ctx, int bserrno)
980 {
981 	int rc;
982 	struct spdk_fs_request *req = ctx;
983 	struct spdk_fs_cb_args *args = &req->args;
984 
985 	rc = args->rc ? args->rc : bserrno;
986 	args->fn.file_op(args->arg, rc);
987 	free_fs_request(req);
988 }
989 
990 static void
991 fs_create_blob_resize_cb(void *ctx, int bserrno)
992 {
993 	struct spdk_fs_request *req = ctx;
994 	struct spdk_fs_cb_args *args = &req->args;
995 	struct spdk_file *f = args->file;
996 	struct spdk_blob *blob = args->op.create.blob;
997 	uint64_t length = 0;
998 
999 	args->rc = bserrno;
1000 	if (bserrno) {
1001 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1002 		return;
1003 	}
1004 
1005 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1006 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1007 
1008 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1009 }
1010 
1011 static void
1012 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1013 {
1014 	struct spdk_fs_request *req = ctx;
1015 	struct spdk_fs_cb_args *args = &req->args;
1016 
1017 	if (bserrno) {
1018 		args->fn.file_op(args->arg, bserrno);
1019 		free_fs_request(req);
1020 		return;
1021 	}
1022 
1023 	args->op.create.blob = blob;
1024 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1025 }
1026 
1027 static void
1028 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1029 {
1030 	struct spdk_fs_request *req = ctx;
1031 	struct spdk_fs_cb_args *args = &req->args;
1032 	struct spdk_file *f = args->file;
1033 
1034 	if (bserrno) {
1035 		args->fn.file_op(args->arg, bserrno);
1036 		free_fs_request(req);
1037 		return;
1038 	}
1039 
1040 	f->blobid = blobid;
1041 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1042 }
1043 
1044 void
1045 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1046 			  spdk_file_op_complete cb_fn, void *cb_arg)
1047 {
1048 	struct spdk_file *file;
1049 	struct spdk_fs_request *req;
1050 	struct spdk_fs_cb_args *args;
1051 
1052 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1053 		cb_fn(cb_arg, -ENAMETOOLONG);
1054 		return;
1055 	}
1056 
1057 	file = fs_find_file(fs, name);
1058 	if (file != NULL) {
1059 		cb_fn(cb_arg, -EEXIST);
1060 		return;
1061 	}
1062 
1063 	file = file_alloc(fs);
1064 	if (file == NULL) {
1065 		cb_fn(cb_arg, -ENOMEM);
1066 		return;
1067 	}
1068 
1069 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1070 	if (req == NULL) {
1071 		cb_fn(cb_arg, -ENOMEM);
1072 		return;
1073 	}
1074 
1075 	args = &req->args;
1076 	args->file = file;
1077 	args->fn.file_op = cb_fn;
1078 	args->arg = cb_arg;
1079 
1080 	file->name = strdup(name);
1081 	_file_build_trace_arg_name(file);
1082 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1083 }
1084 
1085 static void
1086 __fs_create_file_done(void *arg, int fserrno)
1087 {
1088 	struct spdk_fs_request *req = arg;
1089 	struct spdk_fs_cb_args *args = &req->args;
1090 
1091 	args->rc = fserrno;
1092 	sem_post(args->sem);
1093 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1094 }
1095 
1096 static void
1097 __fs_create_file(void *arg)
1098 {
1099 	struct spdk_fs_request *req = arg;
1100 	struct spdk_fs_cb_args *args = &req->args;
1101 
1102 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1103 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1104 }
1105 
1106 int
1107 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1108 {
1109 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1110 	struct spdk_fs_request *req;
1111 	struct spdk_fs_cb_args *args;
1112 	int rc;
1113 
1114 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1115 
1116 	req = alloc_fs_request(channel);
1117 	if (req == NULL) {
1118 		return -ENOMEM;
1119 	}
1120 
1121 	args = &req->args;
1122 	args->fs = fs;
1123 	args->op.create.name = name;
1124 	args->sem = &channel->sem;
1125 	fs->send_request(__fs_create_file, req);
1126 	sem_wait(&channel->sem);
1127 	rc = args->rc;
1128 	free_fs_request(req);
1129 
1130 	return rc;
1131 }
1132 
1133 static void
1134 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1135 {
1136 	struct spdk_fs_request *req = ctx;
1137 	struct spdk_fs_cb_args *args = &req->args;
1138 	struct spdk_file *f = args->file;
1139 
1140 	f->blob = blob;
1141 	while (!TAILQ_EMPTY(&f->open_requests)) {
1142 		req = TAILQ_FIRST(&f->open_requests);
1143 		args = &req->args;
1144 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1145 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1146 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1147 		free_fs_request(req);
1148 	}
1149 }
1150 
1151 static void
1152 fs_open_blob_create_cb(void *ctx, int bserrno)
1153 {
1154 	struct spdk_fs_request *req = ctx;
1155 	struct spdk_fs_cb_args *args = &req->args;
1156 	struct spdk_file *file = args->file;
1157 	struct spdk_filesystem *fs = args->fs;
1158 
1159 	if (file == NULL) {
1160 		/*
1161 		 * This is from an open with CREATE flag - the file
1162 		 *  is now created so look it up in the file list for this
1163 		 *  filesystem.
1164 		 */
1165 		file = fs_find_file(fs, args->op.open.name);
1166 		assert(file != NULL);
1167 		args->file = file;
1168 	}
1169 
1170 	file->ref_count++;
1171 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1172 	if (file->ref_count == 1) {
1173 		assert(file->blob == NULL);
1174 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1175 	} else if (file->blob != NULL) {
1176 		fs_open_blob_done(req, file->blob, 0);
1177 	} else {
1178 		/*
1179 		 * The blob open for this file is in progress due to a previous
1180 		 *  open request.  When that open completes, it will invoke the
1181 		 *  open callback for this request.
1182 		 */
1183 	}
1184 }
1185 
1186 void
1187 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1188 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1189 {
1190 	struct spdk_file *f = NULL;
1191 	struct spdk_fs_request *req;
1192 	struct spdk_fs_cb_args *args;
1193 
1194 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1195 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1196 		return;
1197 	}
1198 
1199 	f = fs_find_file(fs, name);
1200 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1201 		cb_fn(cb_arg, NULL, -ENOENT);
1202 		return;
1203 	}
1204 
1205 	if (f != NULL && f->is_deleted == true) {
1206 		cb_fn(cb_arg, NULL, -ENOENT);
1207 		return;
1208 	}
1209 
1210 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1211 	if (req == NULL) {
1212 		cb_fn(cb_arg, NULL, -ENOMEM);
1213 		return;
1214 	}
1215 
1216 	args = &req->args;
1217 	args->fn.file_op_with_handle = cb_fn;
1218 	args->arg = cb_arg;
1219 	args->file = f;
1220 	args->fs = fs;
1221 	args->op.open.name = name;
1222 
1223 	if (f == NULL) {
1224 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1225 	} else {
1226 		fs_open_blob_create_cb(req, 0);
1227 	}
1228 }
1229 
1230 static void
1231 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1232 {
1233 	struct spdk_fs_request *req = arg;
1234 	struct spdk_fs_cb_args *args = &req->args;
1235 
1236 	args->file = file;
1237 	__wake_caller(args, bserrno);
1238 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1239 }
1240 
1241 static void
1242 __fs_open_file(void *arg)
1243 {
1244 	struct spdk_fs_request *req = arg;
1245 	struct spdk_fs_cb_args *args = &req->args;
1246 
1247 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1248 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1249 				__fs_open_file_done, req);
1250 }
1251 
1252 int
1253 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1254 		  const char *name, uint32_t flags, struct spdk_file **file)
1255 {
1256 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1257 	struct spdk_fs_request *req;
1258 	struct spdk_fs_cb_args *args;
1259 	int rc;
1260 
1261 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1262 
1263 	req = alloc_fs_request(channel);
1264 	if (req == NULL) {
1265 		return -ENOMEM;
1266 	}
1267 
1268 	args = &req->args;
1269 	args->fs = fs;
1270 	args->op.open.name = name;
1271 	args->op.open.flags = flags;
1272 	args->sem = &channel->sem;
1273 	fs->send_request(__fs_open_file, req);
1274 	sem_wait(&channel->sem);
1275 	rc = args->rc;
1276 	if (rc == 0) {
1277 		*file = args->file;
1278 	} else {
1279 		*file = NULL;
1280 	}
1281 	free_fs_request(req);
1282 
1283 	return rc;
1284 }
1285 
1286 static void
1287 fs_rename_blob_close_cb(void *ctx, int bserrno)
1288 {
1289 	struct spdk_fs_request *req = ctx;
1290 	struct spdk_fs_cb_args *args = &req->args;
1291 
1292 	args->fn.fs_op(args->arg, bserrno);
1293 	free_fs_request(req);
1294 }
1295 
1296 static void
1297 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1298 {
1299 	struct spdk_fs_request *req = ctx;
1300 	struct spdk_fs_cb_args *args = &req->args;
1301 	const char *new_name = args->op.rename.new_name;
1302 
1303 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1304 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1305 }
1306 
1307 static void
1308 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1309 {
1310 	struct spdk_fs_cb_args *args = &req->args;
1311 	struct spdk_file *f;
1312 
1313 	f = fs_find_file(args->fs, args->op.rename.old_name);
1314 	if (f == NULL) {
1315 		args->fn.fs_op(args->arg, -ENOENT);
1316 		free_fs_request(req);
1317 		return;
1318 	}
1319 
1320 	free(f->name);
1321 	f->name = strdup(args->op.rename.new_name);
1322 	_file_build_trace_arg_name(f);
1323 	args->file = f;
1324 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1325 }
1326 
1327 static void
1328 fs_rename_delete_done(void *arg, int fserrno)
1329 {
1330 	__spdk_fs_md_rename_file(arg);
1331 }
1332 
1333 void
1334 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1335 			  const char *old_name, const char *new_name,
1336 			  spdk_file_op_complete cb_fn, void *cb_arg)
1337 {
1338 	struct spdk_file *f;
1339 	struct spdk_fs_request *req;
1340 	struct spdk_fs_cb_args *args;
1341 
1342 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1343 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1344 		cb_fn(cb_arg, -ENAMETOOLONG);
1345 		return;
1346 	}
1347 
1348 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1349 	if (req == NULL) {
1350 		cb_fn(cb_arg, -ENOMEM);
1351 		return;
1352 	}
1353 
1354 	args = &req->args;
1355 	args->fn.fs_op = cb_fn;
1356 	args->fs = fs;
1357 	args->arg = cb_arg;
1358 	args->op.rename.old_name = old_name;
1359 	args->op.rename.new_name = new_name;
1360 
1361 	f = fs_find_file(fs, new_name);
1362 	if (f == NULL) {
1363 		__spdk_fs_md_rename_file(req);
1364 		return;
1365 	}
1366 
1367 	/*
1368 	 * The rename overwrites an existing file.  So delete the existing file, then
1369 	 *  do the actual rename.
1370 	 */
1371 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1372 }
1373 
1374 static void
1375 __fs_rename_file_done(void *arg, int fserrno)
1376 {
1377 	struct spdk_fs_request *req = arg;
1378 	struct spdk_fs_cb_args *args = &req->args;
1379 
1380 	__wake_caller(args, fserrno);
1381 }
1382 
1383 static void
1384 __fs_rename_file(void *arg)
1385 {
1386 	struct spdk_fs_request *req = arg;
1387 	struct spdk_fs_cb_args *args = &req->args;
1388 
1389 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1390 				  __fs_rename_file_done, req);
1391 }
1392 
1393 int
1394 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1395 		    const char *old_name, const char *new_name)
1396 {
1397 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1398 	struct spdk_fs_request *req;
1399 	struct spdk_fs_cb_args *args;
1400 	int rc;
1401 
1402 	req = alloc_fs_request(channel);
1403 	if (req == NULL) {
1404 		return -ENOMEM;
1405 	}
1406 
1407 	args = &req->args;
1408 
1409 	args->fs = fs;
1410 	args->op.rename.old_name = old_name;
1411 	args->op.rename.new_name = new_name;
1412 	args->sem = &channel->sem;
1413 	fs->send_request(__fs_rename_file, req);
1414 	sem_wait(&channel->sem);
1415 	rc = args->rc;
1416 	free_fs_request(req);
1417 	return rc;
1418 }
1419 
1420 static void
1421 blob_delete_cb(void *ctx, int bserrno)
1422 {
1423 	struct spdk_fs_request *req = ctx;
1424 	struct spdk_fs_cb_args *args = &req->args;
1425 
1426 	args->fn.file_op(args->arg, bserrno);
1427 	free_fs_request(req);
1428 }
1429 
1430 void
1431 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1432 			  spdk_file_op_complete cb_fn, void *cb_arg)
1433 {
1434 	struct spdk_file *f;
1435 	spdk_blob_id blobid;
1436 	struct spdk_fs_request *req;
1437 	struct spdk_fs_cb_args *args;
1438 
1439 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1440 
1441 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1442 		cb_fn(cb_arg, -ENAMETOOLONG);
1443 		return;
1444 	}
1445 
1446 	f = fs_find_file(fs, name);
1447 	if (f == NULL) {
1448 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot find the file=%s to deleted\n", name);
1449 		cb_fn(cb_arg, -ENOENT);
1450 		return;
1451 	}
1452 
1453 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1454 	if (req == NULL) {
1455 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate the req for the file=%s to deleted\n", name);
1456 		cb_fn(cb_arg, -ENOMEM);
1457 		return;
1458 	}
1459 
1460 	args = &req->args;
1461 	args->fn.file_op = cb_fn;
1462 	args->arg = cb_arg;
1463 
1464 	if (f->ref_count > 0) {
1465 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1466 		f->is_deleted = true;
1467 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1468 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1469 		return;
1470 	}
1471 
1472 	TAILQ_REMOVE(&fs->files, f, tailq);
1473 
1474 	cache_free_buffers(f);
1475 
1476 	blobid = f->blobid;
1477 
1478 	free(f->name);
1479 	free(f->tree);
1480 	free(f);
1481 
1482 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1483 }
1484 
1485 static uint64_t
1486 fs_name_to_uint64(const char *name)
1487 {
1488 	uint64_t result = 0;
1489 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1490 	return result;
1491 }
1492 
1493 static void
1494 __fs_delete_file_done(void *arg, int fserrno)
1495 {
1496 	struct spdk_fs_request *req = arg;
1497 	struct spdk_fs_cb_args *args = &req->args;
1498 
1499 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1500 	__wake_caller(args, fserrno);
1501 }
1502 
1503 static void
1504 __fs_delete_file(void *arg)
1505 {
1506 	struct spdk_fs_request *req = arg;
1507 	struct spdk_fs_cb_args *args = &req->args;
1508 
1509 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1510 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1511 }
1512 
1513 int
1514 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1515 		    const char *name)
1516 {
1517 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1518 	struct spdk_fs_request *req;
1519 	struct spdk_fs_cb_args *args;
1520 	int rc;
1521 
1522 	req = alloc_fs_request(channel);
1523 	if (req == NULL) {
1524 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1525 		return -ENOMEM;
1526 	}
1527 
1528 	args = &req->args;
1529 	args->fs = fs;
1530 	args->op.delete.name = name;
1531 	args->sem = &channel->sem;
1532 	fs->send_request(__fs_delete_file, req);
1533 	sem_wait(&channel->sem);
1534 	rc = args->rc;
1535 	free_fs_request(req);
1536 
1537 	return rc;
1538 }
1539 
1540 spdk_fs_iter
1541 spdk_fs_iter_first(struct spdk_filesystem *fs)
1542 {
1543 	struct spdk_file *f;
1544 
1545 	f = TAILQ_FIRST(&fs->files);
1546 	return f;
1547 }
1548 
1549 spdk_fs_iter
1550 spdk_fs_iter_next(spdk_fs_iter iter)
1551 {
1552 	struct spdk_file *f = iter;
1553 
1554 	if (f == NULL) {
1555 		return NULL;
1556 	}
1557 
1558 	f = TAILQ_NEXT(f, tailq);
1559 	return f;
1560 }
1561 
1562 const char *
1563 spdk_file_get_name(struct spdk_file *file)
1564 {
1565 	return file->name;
1566 }
1567 
1568 uint64_t
1569 spdk_file_get_length(struct spdk_file *file)
1570 {
1571 	uint64_t length;
1572 
1573 	assert(file != NULL);
1574 
1575 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1576 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1577 	return length;
1578 }
1579 
1580 static void
1581 fs_truncate_complete_cb(void *ctx, int bserrno)
1582 {
1583 	struct spdk_fs_request *req = ctx;
1584 	struct spdk_fs_cb_args *args = &req->args;
1585 
1586 	args->fn.file_op(args->arg, bserrno);
1587 	free_fs_request(req);
1588 }
1589 
1590 static void
1591 fs_truncate_resize_cb(void *ctx, int bserrno)
1592 {
1593 	struct spdk_fs_request *req = ctx;
1594 	struct spdk_fs_cb_args *args = &req->args;
1595 	struct spdk_file *file = args->file;
1596 	uint64_t *length = &args->op.truncate.length;
1597 
1598 	if (bserrno) {
1599 		args->fn.file_op(args->arg, bserrno);
1600 		free_fs_request(req);
1601 		return;
1602 	}
1603 
1604 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1605 
1606 	file->length = *length;
1607 	if (file->append_pos > file->length) {
1608 		file->append_pos = file->length;
1609 	}
1610 
1611 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1612 }
1613 
1614 static uint64_t
1615 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1616 {
1617 	return (length + cluster_sz - 1) / cluster_sz;
1618 }
1619 
1620 void
1621 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1622 			 spdk_file_op_complete cb_fn, void *cb_arg)
1623 {
1624 	struct spdk_filesystem *fs;
1625 	size_t num_clusters;
1626 	struct spdk_fs_request *req;
1627 	struct spdk_fs_cb_args *args;
1628 
1629 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1630 	if (length == file->length) {
1631 		cb_fn(cb_arg, 0);
1632 		return;
1633 	}
1634 
1635 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1636 	if (req == NULL) {
1637 		cb_fn(cb_arg, -ENOMEM);
1638 		return;
1639 	}
1640 
1641 	args = &req->args;
1642 	args->fn.file_op = cb_fn;
1643 	args->arg = cb_arg;
1644 	args->file = file;
1645 	args->op.truncate.length = length;
1646 	fs = file->fs;
1647 
1648 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1649 
1650 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1651 }
1652 
1653 static void
1654 __truncate(void *arg)
1655 {
1656 	struct spdk_fs_request *req = arg;
1657 	struct spdk_fs_cb_args *args = &req->args;
1658 
1659 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1660 				 args->fn.file_op, args);
1661 }
1662 
1663 int
1664 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1665 		   uint64_t length)
1666 {
1667 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1668 	struct spdk_fs_request *req;
1669 	struct spdk_fs_cb_args *args;
1670 	int rc;
1671 
1672 	req = alloc_fs_request(channel);
1673 	if (req == NULL) {
1674 		return -ENOMEM;
1675 	}
1676 
1677 	args = &req->args;
1678 
1679 	args->file = file;
1680 	args->op.truncate.length = length;
1681 	args->fn.file_op = __wake_caller;
1682 	args->sem = &channel->sem;
1683 
1684 	channel->send_request(__truncate, req);
1685 	sem_wait(&channel->sem);
1686 	rc = args->rc;
1687 	free_fs_request(req);
1688 
1689 	return rc;
1690 }
1691 
1692 static void
1693 __rw_done(void *ctx, int bserrno)
1694 {
1695 	struct spdk_fs_request *req = ctx;
1696 	struct spdk_fs_cb_args *args = &req->args;
1697 
1698 	spdk_free(args->op.rw.pin_buf);
1699 	args->fn.file_op(args->arg, bserrno);
1700 	free_fs_request(req);
1701 }
1702 
1703 static void
1704 __read_done(void *ctx, int bserrno)
1705 {
1706 	struct spdk_fs_request *req = ctx;
1707 	struct spdk_fs_cb_args *args = &req->args;
1708 
1709 	assert(req != NULL);
1710 	if (args->op.rw.is_read) {
1711 		memcpy(args->iovs[0].iov_base,
1712 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1713 		       args->iovs[0].iov_len);
1714 		__rw_done(req, 0);
1715 	} else {
1716 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1717 		       args->iovs[0].iov_base,
1718 		       args->iovs[0].iov_len);
1719 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1720 				   args->op.rw.pin_buf,
1721 				   args->op.rw.start_lba, args->op.rw.num_lba,
1722 				   __rw_done, req);
1723 	}
1724 }
1725 
1726 static void
1727 __do_blob_read(void *ctx, int fserrno)
1728 {
1729 	struct spdk_fs_request *req = ctx;
1730 	struct spdk_fs_cb_args *args = &req->args;
1731 
1732 	if (fserrno) {
1733 		__rw_done(req, fserrno);
1734 		return;
1735 	}
1736 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1737 			  args->op.rw.pin_buf,
1738 			  args->op.rw.start_lba, args->op.rw.num_lba,
1739 			  __read_done, req);
1740 }
1741 
1742 static void
1743 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1744 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1745 {
1746 	uint64_t end_lba;
1747 
1748 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1749 	*start_lba = offset / *lba_size;
1750 	end_lba = (offset + length - 1) / *lba_size;
1751 	*num_lba = (end_lba - *start_lba + 1);
1752 }
1753 
1754 static void
1755 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1756 	    void *payload, uint64_t offset, uint64_t length,
1757 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1758 {
1759 	struct spdk_fs_request *req;
1760 	struct spdk_fs_cb_args *args;
1761 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1762 	uint64_t start_lba, num_lba, pin_buf_length;
1763 	uint32_t lba_size;
1764 
1765 	if (is_read && offset + length > file->length) {
1766 		cb_fn(cb_arg, -EINVAL);
1767 		return;
1768 	}
1769 
1770 	req = alloc_fs_request_with_iov(channel, 1);
1771 	if (req == NULL) {
1772 		cb_fn(cb_arg, -ENOMEM);
1773 		return;
1774 	}
1775 
1776 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1777 
1778 	args = &req->args;
1779 	args->fn.file_op = cb_fn;
1780 	args->arg = cb_arg;
1781 	args->file = file;
1782 	args->op.rw.channel = channel->bs_channel;
1783 	args->iovs[0].iov_base = payload;
1784 	args->iovs[0].iov_len = (size_t)length;
1785 	args->op.rw.is_read = is_read;
1786 	args->op.rw.offset = offset;
1787 	args->op.rw.blocklen = lba_size;
1788 
1789 	pin_buf_length = num_lba * lba_size;
1790 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1791 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1792 	if (args->op.rw.pin_buf == NULL) {
1793 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1794 			      file->name, offset, length);
1795 		free_fs_request(req);
1796 		cb_fn(cb_arg, -ENOMEM);
1797 		return;
1798 	}
1799 
1800 	args->op.rw.start_lba = start_lba;
1801 	args->op.rw.num_lba = num_lba;
1802 
1803 	if (!is_read && file->length < offset + length) {
1804 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1805 	} else {
1806 		__do_blob_read(req, 0);
1807 	}
1808 }
1809 
1810 void
1811 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1812 		      void *payload, uint64_t offset, uint64_t length,
1813 		      spdk_file_op_complete cb_fn, void *cb_arg)
1814 {
1815 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1816 }
1817 
1818 void
1819 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1820 		     void *payload, uint64_t offset, uint64_t length,
1821 		     spdk_file_op_complete cb_fn, void *cb_arg)
1822 {
1823 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1824 		      file->name, offset, length);
1825 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1826 }
1827 
1828 struct spdk_io_channel *
1829 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1830 {
1831 	struct spdk_io_channel *io_channel;
1832 	struct spdk_fs_channel *fs_channel;
1833 
1834 	io_channel = spdk_get_io_channel(&fs->io_target);
1835 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1836 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1837 	fs_channel->send_request = __send_request_direct;
1838 
1839 	return io_channel;
1840 }
1841 
1842 void
1843 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1844 {
1845 	spdk_put_io_channel(channel);
1846 }
1847 
1848 struct spdk_fs_thread_ctx *
1849 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1850 {
1851 	struct spdk_fs_thread_ctx *ctx;
1852 
1853 	ctx = calloc(1, sizeof(*ctx));
1854 	if (!ctx) {
1855 		return NULL;
1856 	}
1857 
1858 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1859 
1860 	ctx->ch.send_request = fs->send_request;
1861 	ctx->ch.sync = 1;
1862 	pthread_spin_init(&ctx->ch.lock, 0);
1863 
1864 	return ctx;
1865 }
1866 
1867 
1868 void
1869 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1870 {
1871 	assert(ctx->ch.sync == 1);
1872 
1873 	while (true) {
1874 		pthread_spin_lock(&ctx->ch.lock);
1875 		if (ctx->ch.outstanding_reqs == 0) {
1876 			pthread_spin_unlock(&ctx->ch.lock);
1877 			break;
1878 		}
1879 		pthread_spin_unlock(&ctx->ch.lock);
1880 		usleep(1000);
1881 	}
1882 
1883 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1884 	free(ctx);
1885 }
1886 
1887 void
1888 spdk_fs_set_cache_size(uint64_t size_in_mb)
1889 {
1890 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1891 }
1892 
1893 uint64_t
1894 spdk_fs_get_cache_size(void)
1895 {
1896 	return g_fs_cache_size / (1024 * 1024);
1897 }
1898 
1899 static void __file_flush(void *ctx);
1900 
1901 static void *
1902 alloc_cache_memory_buffer(struct spdk_file *context)
1903 {
1904 	struct spdk_file *file;
1905 	void *buf;
1906 
1907 	buf = spdk_mempool_get(g_cache_pool);
1908 	if (buf != NULL) {
1909 		return buf;
1910 	}
1911 
1912 	pthread_spin_lock(&g_caches_lock);
1913 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1914 		if (!file->open_for_writing &&
1915 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1916 		    file != context) {
1917 			break;
1918 		}
1919 	}
1920 	pthread_spin_unlock(&g_caches_lock);
1921 	if (file != NULL) {
1922 		cache_free_buffers(file);
1923 		buf = spdk_mempool_get(g_cache_pool);
1924 		if (buf != NULL) {
1925 			return buf;
1926 		}
1927 	}
1928 
1929 	pthread_spin_lock(&g_caches_lock);
1930 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1931 		if (!file->open_for_writing && file != context) {
1932 			break;
1933 		}
1934 	}
1935 	pthread_spin_unlock(&g_caches_lock);
1936 	if (file != NULL) {
1937 		cache_free_buffers(file);
1938 		buf = spdk_mempool_get(g_cache_pool);
1939 		if (buf != NULL) {
1940 			return buf;
1941 		}
1942 	}
1943 
1944 	pthread_spin_lock(&g_caches_lock);
1945 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1946 		if (file != context) {
1947 			break;
1948 		}
1949 	}
1950 	pthread_spin_unlock(&g_caches_lock);
1951 	if (file != NULL) {
1952 		cache_free_buffers(file);
1953 		buf = spdk_mempool_get(g_cache_pool);
1954 		if (buf != NULL) {
1955 			return buf;
1956 		}
1957 	}
1958 
1959 	return NULL;
1960 }
1961 
1962 static struct cache_buffer *
1963 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1964 {
1965 	struct cache_buffer *buf;
1966 	int count = 0;
1967 
1968 	buf = calloc(1, sizeof(*buf));
1969 	if (buf == NULL) {
1970 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1971 		return NULL;
1972 	}
1973 
1974 	buf->buf = alloc_cache_memory_buffer(file);
1975 	while (buf->buf == NULL) {
1976 		/*
1977 		 * TODO: alloc_cache_memory_buffer() should eventually free
1978 		 *  some buffers.  Need a more sophisticated check here, instead
1979 		 *  of just bailing if 100 tries does not result in getting a
1980 		 *  free buffer.  This will involve using the sync channel's
1981 		 *  semaphore to block until a buffer becomes available.
1982 		 */
1983 		if (count++ == 100) {
1984 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1985 				    file, offset);
1986 			free(buf);
1987 			return NULL;
1988 		}
1989 		buf->buf = alloc_cache_memory_buffer(file);
1990 	}
1991 
1992 	buf->buf_size = CACHE_BUFFER_SIZE;
1993 	buf->offset = offset;
1994 
1995 	pthread_spin_lock(&g_caches_lock);
1996 	if (file->tree->present_mask == 0) {
1997 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1998 	}
1999 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
2000 	pthread_spin_unlock(&g_caches_lock);
2001 
2002 	return buf;
2003 }
2004 
2005 static struct cache_buffer *
2006 cache_append_buffer(struct spdk_file *file)
2007 {
2008 	struct cache_buffer *last;
2009 
2010 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2011 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2012 
2013 	last = cache_insert_buffer(file, file->append_pos);
2014 	if (last == NULL) {
2015 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
2016 		return NULL;
2017 	}
2018 
2019 	file->last = last;
2020 
2021 	return last;
2022 }
2023 
2024 static void __check_sync_reqs(struct spdk_file *file);
2025 
2026 static void
2027 __file_cache_finish_sync(void *ctx, int bserrno)
2028 {
2029 	struct spdk_file *file;
2030 	struct spdk_fs_request *sync_req = ctx;
2031 	struct spdk_fs_cb_args *sync_args;
2032 
2033 	sync_args = &sync_req->args;
2034 	file = sync_args->file;
2035 	pthread_spin_lock(&file->lock);
2036 	file->length_xattr = sync_args->op.sync.length;
2037 	assert(sync_args->op.sync.offset <= file->length_flushed);
2038 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2039 			  0, file->trace_arg_name);
2040 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2041 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2042 	pthread_spin_unlock(&file->lock);
2043 
2044 	sync_args->fn.file_op(sync_args->arg, bserrno);
2045 	pthread_spin_lock(&file->lock);
2046 	free_fs_request(sync_req);
2047 	pthread_spin_unlock(&file->lock);
2048 
2049 	__check_sync_reqs(file);
2050 }
2051 
2052 static void
2053 __check_sync_reqs(struct spdk_file *file)
2054 {
2055 	struct spdk_fs_request *sync_req;
2056 
2057 	pthread_spin_lock(&file->lock);
2058 
2059 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2060 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2061 			break;
2062 		}
2063 	}
2064 
2065 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2066 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2067 		sync_req->args.op.sync.xattr_in_progress = true;
2068 		sync_req->args.op.sync.length = file->length_flushed;
2069 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2070 				    sizeof(file->length_flushed));
2071 
2072 		pthread_spin_unlock(&file->lock);
2073 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2074 				  0, file->trace_arg_name);
2075 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2076 	} else {
2077 		pthread_spin_unlock(&file->lock);
2078 	}
2079 }
2080 
2081 static void
2082 __file_flush_done(void *ctx, int bserrno)
2083 {
2084 	struct spdk_fs_request *req = ctx;
2085 	struct spdk_fs_cb_args *args = &req->args;
2086 	struct spdk_file *file = args->file;
2087 	struct cache_buffer *next = args->op.flush.cache_buffer;
2088 
2089 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2090 
2091 	pthread_spin_lock(&file->lock);
2092 	next->in_progress = false;
2093 	next->bytes_flushed += args->op.flush.length;
2094 	file->length_flushed += args->op.flush.length;
2095 	if (file->length_flushed > file->length) {
2096 		file->length = file->length_flushed;
2097 	}
2098 	if (next->bytes_flushed == next->buf_size) {
2099 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2100 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2101 	}
2102 
2103 	/*
2104 	 * Assert that there is no cached data that extends past the end of the underlying
2105 	 *  blob.
2106 	 */
2107 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2108 	       next->bytes_filled == 0);
2109 
2110 	pthread_spin_unlock(&file->lock);
2111 
2112 	__check_sync_reqs(file);
2113 
2114 	__file_flush(req);
2115 }
2116 
2117 static void
2118 __file_flush(void *ctx)
2119 {
2120 	struct spdk_fs_request *req = ctx;
2121 	struct spdk_fs_cb_args *args = &req->args;
2122 	struct spdk_file *file = args->file;
2123 	struct cache_buffer *next;
2124 	uint64_t offset, length, start_lba, num_lba;
2125 	uint32_t lba_size;
2126 
2127 	pthread_spin_lock(&file->lock);
2128 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2129 	if (next == NULL || next->in_progress) {
2130 		/*
2131 		 * There is either no data to flush, or a flush I/O is already in
2132 		 *  progress.  So return immediately - if a flush I/O is in
2133 		 *  progress we will flush more data after that is completed.
2134 		 */
2135 		free_fs_request(req);
2136 		if (next == NULL) {
2137 			/*
2138 			 * For cases where a file's cache was evicted, and then the
2139 			 *  file was later appended, we will write the data directly
2140 			 *  to disk and bypass cache.  So just update length_flushed
2141 			 *  here to reflect that all data was already written to disk.
2142 			 */
2143 			file->length_flushed = file->append_pos;
2144 		}
2145 		pthread_spin_unlock(&file->lock);
2146 		if (next == NULL) {
2147 			/*
2148 			 * There is no data to flush, but we still need to check for any
2149 			 *  outstanding sync requests to make sure metadata gets updated.
2150 			 */
2151 			__check_sync_reqs(file);
2152 		}
2153 		return;
2154 	}
2155 
2156 	offset = next->offset + next->bytes_flushed;
2157 	length = next->bytes_filled - next->bytes_flushed;
2158 	if (length == 0) {
2159 		free_fs_request(req);
2160 		pthread_spin_unlock(&file->lock);
2161 		/*
2162 		 * There is no data to flush, but we still need to check for any
2163 		 *  outstanding sync requests to make sure metadata gets updated.
2164 		 */
2165 		__check_sync_reqs(file);
2166 		return;
2167 	}
2168 	args->op.flush.length = length;
2169 	args->op.flush.cache_buffer = next;
2170 
2171 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2172 
2173 	next->in_progress = true;
2174 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2175 		     offset, length, start_lba, num_lba);
2176 	pthread_spin_unlock(&file->lock);
2177 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2178 			   next->buf + (start_lba * lba_size) - next->offset,
2179 			   start_lba, num_lba, __file_flush_done, req);
2180 }
2181 
2182 static void
2183 __file_extend_done(void *arg, int bserrno)
2184 {
2185 	struct spdk_fs_cb_args *args = arg;
2186 
2187 	__wake_caller(args, bserrno);
2188 }
2189 
2190 static void
2191 __file_extend_resize_cb(void *_args, int bserrno)
2192 {
2193 	struct spdk_fs_cb_args *args = _args;
2194 	struct spdk_file *file = args->file;
2195 
2196 	if (bserrno) {
2197 		__wake_caller(args, bserrno);
2198 		return;
2199 	}
2200 
2201 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2202 }
2203 
2204 static void
2205 __file_extend_blob(void *_args)
2206 {
2207 	struct spdk_fs_cb_args *args = _args;
2208 	struct spdk_file *file = args->file;
2209 
2210 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2211 }
2212 
2213 static void
2214 __rw_from_file_done(void *ctx, int bserrno)
2215 {
2216 	struct spdk_fs_request *req = ctx;
2217 
2218 	__wake_caller(&req->args, bserrno);
2219 	free_fs_request(req);
2220 }
2221 
2222 static void
2223 __rw_from_file(void *ctx)
2224 {
2225 	struct spdk_fs_request *req = ctx;
2226 	struct spdk_fs_cb_args *args = &req->args;
2227 	struct spdk_file *file = args->file;
2228 
2229 	if (args->op.rw.is_read) {
2230 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2231 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2232 				     __rw_from_file_done, req);
2233 	} else {
2234 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2235 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2236 				      __rw_from_file_done, req);
2237 	}
2238 }
2239 
2240 static int
2241 __send_rw_from_file(struct spdk_file *file, void *payload,
2242 		    uint64_t offset, uint64_t length, bool is_read,
2243 		    struct spdk_fs_channel *channel)
2244 {
2245 	struct spdk_fs_request *req;
2246 	struct spdk_fs_cb_args *args;
2247 
2248 	req = alloc_fs_request_with_iov(channel, 1);
2249 	if (req == NULL) {
2250 		sem_post(&channel->sem);
2251 		return -ENOMEM;
2252 	}
2253 
2254 	args = &req->args;
2255 	args->file = file;
2256 	args->sem = &channel->sem;
2257 	args->iovs[0].iov_base = payload;
2258 	args->iovs[0].iov_len = (size_t)length;
2259 	args->op.rw.offset = offset;
2260 	args->op.rw.is_read = is_read;
2261 	file->fs->send_request(__rw_from_file, req);
2262 	return 0;
2263 }
2264 
2265 int
2266 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2267 		void *payload, uint64_t offset, uint64_t length)
2268 {
2269 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2270 	struct spdk_fs_request *flush_req;
2271 	uint64_t rem_length, copy, blob_size, cluster_sz;
2272 	uint32_t cache_buffers_filled = 0;
2273 	uint8_t *cur_payload;
2274 	struct cache_buffer *last;
2275 
2276 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2277 
2278 	if (length == 0) {
2279 		return 0;
2280 	}
2281 
2282 	if (offset != file->append_pos) {
2283 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2284 		return -EINVAL;
2285 	}
2286 
2287 	pthread_spin_lock(&file->lock);
2288 	file->open_for_writing = true;
2289 
2290 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2291 		cache_append_buffer(file);
2292 	}
2293 
2294 	if (file->last == NULL) {
2295 		int rc;
2296 
2297 		file->append_pos += length;
2298 		pthread_spin_unlock(&file->lock);
2299 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2300 		sem_wait(&channel->sem);
2301 		return rc;
2302 	}
2303 
2304 	blob_size = __file_get_blob_size(file);
2305 
2306 	if ((offset + length) > blob_size) {
2307 		struct spdk_fs_cb_args extend_args = {};
2308 
2309 		cluster_sz = file->fs->bs_opts.cluster_sz;
2310 		extend_args.sem = &channel->sem;
2311 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2312 		extend_args.file = file;
2313 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2314 		pthread_spin_unlock(&file->lock);
2315 		file->fs->send_request(__file_extend_blob, &extend_args);
2316 		sem_wait(&channel->sem);
2317 		if (extend_args.rc) {
2318 			return extend_args.rc;
2319 		}
2320 	}
2321 
2322 	flush_req = alloc_fs_request(channel);
2323 	if (flush_req == NULL) {
2324 		pthread_spin_unlock(&file->lock);
2325 		return -ENOMEM;
2326 	}
2327 
2328 	last = file->last;
2329 	rem_length = length;
2330 	cur_payload = payload;
2331 	while (rem_length > 0) {
2332 		copy = last->buf_size - last->bytes_filled;
2333 		if (copy > rem_length) {
2334 			copy = rem_length;
2335 		}
2336 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2337 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2338 		file->append_pos += copy;
2339 		if (file->length < file->append_pos) {
2340 			file->length = file->append_pos;
2341 		}
2342 		cur_payload += copy;
2343 		last->bytes_filled += copy;
2344 		rem_length -= copy;
2345 		if (last->bytes_filled == last->buf_size) {
2346 			cache_buffers_filled++;
2347 			last = cache_append_buffer(file);
2348 			if (last == NULL) {
2349 				BLOBFS_TRACE(file, "nomem\n");
2350 				free_fs_request(flush_req);
2351 				pthread_spin_unlock(&file->lock);
2352 				return -ENOMEM;
2353 			}
2354 		}
2355 	}
2356 
2357 	pthread_spin_unlock(&file->lock);
2358 
2359 	if (cache_buffers_filled == 0) {
2360 		free_fs_request(flush_req);
2361 		return 0;
2362 	}
2363 
2364 	flush_req->args.file = file;
2365 	file->fs->send_request(__file_flush, flush_req);
2366 	return 0;
2367 }
2368 
2369 static void
2370 __readahead_done(void *ctx, int bserrno)
2371 {
2372 	struct spdk_fs_request *req = ctx;
2373 	struct spdk_fs_cb_args *args = &req->args;
2374 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2375 	struct spdk_file *file = args->file;
2376 
2377 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2378 
2379 	pthread_spin_lock(&file->lock);
2380 	cache_buffer->bytes_filled = args->op.readahead.length;
2381 	cache_buffer->bytes_flushed = args->op.readahead.length;
2382 	cache_buffer->in_progress = false;
2383 	pthread_spin_unlock(&file->lock);
2384 
2385 	free_fs_request(req);
2386 }
2387 
2388 static void
2389 __readahead(void *ctx)
2390 {
2391 	struct spdk_fs_request *req = ctx;
2392 	struct spdk_fs_cb_args *args = &req->args;
2393 	struct spdk_file *file = args->file;
2394 	uint64_t offset, length, start_lba, num_lba;
2395 	uint32_t lba_size;
2396 
2397 	offset = args->op.readahead.offset;
2398 	length = args->op.readahead.length;
2399 	assert(length > 0);
2400 
2401 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2402 
2403 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2404 		     offset, length, start_lba, num_lba);
2405 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2406 			  args->op.readahead.cache_buffer->buf,
2407 			  start_lba, num_lba, __readahead_done, req);
2408 }
2409 
2410 static uint64_t
2411 __next_cache_buffer_offset(uint64_t offset)
2412 {
2413 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2414 }
2415 
2416 static void
2417 check_readahead(struct spdk_file *file, uint64_t offset,
2418 		struct spdk_fs_channel *channel)
2419 {
2420 	struct spdk_fs_request *req;
2421 	struct spdk_fs_cb_args *args;
2422 
2423 	offset = __next_cache_buffer_offset(offset);
2424 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2425 		return;
2426 	}
2427 
2428 	req = alloc_fs_request(channel);
2429 	if (req == NULL) {
2430 		return;
2431 	}
2432 	args = &req->args;
2433 
2434 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2435 
2436 	args->file = file;
2437 	args->op.readahead.offset = offset;
2438 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2439 	if (!args->op.readahead.cache_buffer) {
2440 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2441 		free_fs_request(req);
2442 		return;
2443 	}
2444 
2445 	args->op.readahead.cache_buffer->in_progress = true;
2446 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2447 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2448 	} else {
2449 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2450 	}
2451 	file->fs->send_request(__readahead, req);
2452 }
2453 
2454 static int
2455 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2456 	    struct spdk_fs_channel *channel)
2457 {
2458 	struct cache_buffer *buf;
2459 	int rc;
2460 
2461 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2462 	if (buf == NULL) {
2463 		pthread_spin_unlock(&file->lock);
2464 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2465 		pthread_spin_lock(&file->lock);
2466 		return rc;
2467 	}
2468 
2469 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2470 		length = buf->offset + buf->bytes_filled - offset;
2471 	}
2472 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2473 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2474 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2475 		pthread_spin_lock(&g_caches_lock);
2476 		spdk_tree_remove_buffer(file->tree, buf);
2477 		if (file->tree->present_mask == 0) {
2478 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2479 		}
2480 		pthread_spin_unlock(&g_caches_lock);
2481 	}
2482 
2483 	sem_post(&channel->sem);
2484 	return 0;
2485 }
2486 
2487 int64_t
2488 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2489 	       void *payload, uint64_t offset, uint64_t length)
2490 {
2491 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2492 	uint64_t final_offset, final_length;
2493 	uint32_t sub_reads = 0;
2494 	int rc = 0;
2495 
2496 	pthread_spin_lock(&file->lock);
2497 
2498 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2499 
2500 	file->open_for_writing = false;
2501 
2502 	if (length == 0 || offset >= file->append_pos) {
2503 		pthread_spin_unlock(&file->lock);
2504 		return 0;
2505 	}
2506 
2507 	if (offset + length > file->append_pos) {
2508 		length = file->append_pos - offset;
2509 	}
2510 
2511 	if (offset != file->next_seq_offset) {
2512 		file->seq_byte_count = 0;
2513 	}
2514 	file->seq_byte_count += length;
2515 	file->next_seq_offset = offset + length;
2516 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2517 		check_readahead(file, offset, channel);
2518 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2519 	}
2520 
2521 	final_length = 0;
2522 	final_offset = offset + length;
2523 	while (offset < final_offset) {
2524 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2525 		if (length > (final_offset - offset)) {
2526 			length = final_offset - offset;
2527 		}
2528 		rc = __file_read(file, payload, offset, length, channel);
2529 		if (rc == 0) {
2530 			final_length += length;
2531 		} else {
2532 			break;
2533 		}
2534 		payload += length;
2535 		offset += length;
2536 		sub_reads++;
2537 	}
2538 	pthread_spin_unlock(&file->lock);
2539 	while (sub_reads-- > 0) {
2540 		sem_wait(&channel->sem);
2541 	}
2542 	if (rc == 0) {
2543 		return final_length;
2544 	} else {
2545 		return rc;
2546 	}
2547 }
2548 
2549 static void
2550 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2551 	   spdk_file_op_complete cb_fn, void *cb_arg)
2552 {
2553 	struct spdk_fs_request *sync_req;
2554 	struct spdk_fs_request *flush_req;
2555 	struct spdk_fs_cb_args *sync_args;
2556 	struct spdk_fs_cb_args *flush_args;
2557 
2558 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2559 
2560 	pthread_spin_lock(&file->lock);
2561 	if (file->append_pos <= file->length_xattr) {
2562 		BLOBFS_TRACE(file, "done - file already synced\n");
2563 		pthread_spin_unlock(&file->lock);
2564 		cb_fn(cb_arg, 0);
2565 		return;
2566 	}
2567 
2568 	sync_req = alloc_fs_request(channel);
2569 	if (!sync_req) {
2570 		pthread_spin_unlock(&file->lock);
2571 		cb_fn(cb_arg, -ENOMEM);
2572 		return;
2573 	}
2574 	sync_args = &sync_req->args;
2575 
2576 	flush_req = alloc_fs_request(channel);
2577 	if (!flush_req) {
2578 		pthread_spin_unlock(&file->lock);
2579 		cb_fn(cb_arg, -ENOMEM);
2580 		return;
2581 	}
2582 	flush_args = &flush_req->args;
2583 
2584 	sync_args->file = file;
2585 	sync_args->fn.file_op = cb_fn;
2586 	sync_args->arg = cb_arg;
2587 	sync_args->op.sync.offset = file->append_pos;
2588 	sync_args->op.sync.xattr_in_progress = false;
2589 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2590 	pthread_spin_unlock(&file->lock);
2591 
2592 	flush_args->file = file;
2593 	channel->send_request(__file_flush, flush_req);
2594 }
2595 
2596 int
2597 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2598 {
2599 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2600 	struct spdk_fs_cb_args args = {};
2601 
2602 	args.sem = &channel->sem;
2603 	_file_sync(file, channel, __wake_caller, &args);
2604 	sem_wait(&channel->sem);
2605 
2606 	return args.rc;
2607 }
2608 
2609 void
2610 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2611 		     spdk_file_op_complete cb_fn, void *cb_arg)
2612 {
2613 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2614 
2615 	_file_sync(file, channel, cb_fn, cb_arg);
2616 }
2617 
2618 void
2619 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2620 {
2621 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2622 	file->priority = priority;
2623 
2624 }
2625 
2626 /*
2627  * Close routines
2628  */
2629 
2630 static void
2631 __file_close_async_done(void *ctx, int bserrno)
2632 {
2633 	struct spdk_fs_request *req = ctx;
2634 	struct spdk_fs_cb_args *args = &req->args;
2635 	struct spdk_file *file = args->file;
2636 
2637 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2638 
2639 	if (file->is_deleted) {
2640 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2641 		return;
2642 	}
2643 
2644 	args->fn.file_op(args->arg, bserrno);
2645 	free_fs_request(req);
2646 }
2647 
2648 static void
2649 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2650 {
2651 	struct spdk_blob *blob;
2652 
2653 	pthread_spin_lock(&file->lock);
2654 	if (file->ref_count == 0) {
2655 		pthread_spin_unlock(&file->lock);
2656 		__file_close_async_done(req, -EBADF);
2657 		return;
2658 	}
2659 
2660 	file->ref_count--;
2661 	if (file->ref_count > 0) {
2662 		pthread_spin_unlock(&file->lock);
2663 		req->args.fn.file_op(req->args.arg, 0);
2664 		free_fs_request(req);
2665 		return;
2666 	}
2667 
2668 	pthread_spin_unlock(&file->lock);
2669 
2670 	blob = file->blob;
2671 	file->blob = NULL;
2672 	spdk_blob_close(blob, __file_close_async_done, req);
2673 }
2674 
2675 static void
2676 __file_close_async__sync_done(void *arg, int fserrno)
2677 {
2678 	struct spdk_fs_request *req = arg;
2679 	struct spdk_fs_cb_args *args = &req->args;
2680 
2681 	__file_close_async(args->file, req);
2682 }
2683 
2684 void
2685 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2686 {
2687 	struct spdk_fs_request *req;
2688 	struct spdk_fs_cb_args *args;
2689 
2690 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2691 	if (req == NULL) {
2692 		cb_fn(cb_arg, -ENOMEM);
2693 		return;
2694 	}
2695 
2696 	args = &req->args;
2697 	args->file = file;
2698 	args->fn.file_op = cb_fn;
2699 	args->arg = cb_arg;
2700 
2701 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2702 }
2703 
2704 static void
2705 __file_close(void *arg)
2706 {
2707 	struct spdk_fs_request *req = arg;
2708 	struct spdk_fs_cb_args *args = &req->args;
2709 	struct spdk_file *file = args->file;
2710 
2711 	__file_close_async(file, req);
2712 }
2713 
2714 int
2715 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2716 {
2717 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2718 	struct spdk_fs_request *req;
2719 	struct spdk_fs_cb_args *args;
2720 
2721 	req = alloc_fs_request(channel);
2722 	if (req == NULL) {
2723 		return -ENOMEM;
2724 	}
2725 
2726 	args = &req->args;
2727 
2728 	spdk_file_sync(file, ctx);
2729 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2730 	args->file = file;
2731 	args->sem = &channel->sem;
2732 	args->fn.file_op = __wake_caller;
2733 	args->arg = args;
2734 	channel->send_request(__file_close, req);
2735 	sem_wait(&channel->sem);
2736 
2737 	return args->rc;
2738 }
2739 
2740 int
2741 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2742 {
2743 	if (size < sizeof(spdk_blob_id)) {
2744 		return -EINVAL;
2745 	}
2746 
2747 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2748 
2749 	return sizeof(spdk_blob_id);
2750 }
2751 
2752 static void
2753 cache_free_buffers(struct spdk_file *file)
2754 {
2755 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2756 	pthread_spin_lock(&file->lock);
2757 	pthread_spin_lock(&g_caches_lock);
2758 	if (file->tree->present_mask == 0) {
2759 		pthread_spin_unlock(&g_caches_lock);
2760 		pthread_spin_unlock(&file->lock);
2761 		return;
2762 	}
2763 	spdk_tree_free_buffers(file->tree);
2764 
2765 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2766 	/* If not freed, put it in the end of the queue */
2767 	if (file->tree->present_mask != 0) {
2768 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2769 	}
2770 	file->last = NULL;
2771 	pthread_spin_unlock(&g_caches_lock);
2772 	pthread_spin_unlock(&file->lock);
2773 }
2774 
2775 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2776 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2777