xref: /spdk/lib/blobfs/blobfs.c (revision c4d9daeb7bf491bc0eb6e8d417b75d44773cb009)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
58 static struct spdk_mempool *g_cache_pool;
59 static TAILQ_HEAD(, spdk_file) g_caches;
60 static int g_fs_count = 0;
61 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_spinlock_t g_caches_lock;
63 
64 #define TRACE_GROUP_BLOBFS	0x7
65 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
66 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
67 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
68 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
69 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
70 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
71 
72 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
73 {
74 	spdk_trace_register_description("BLOBFS_XATTR_START",
75 					TRACE_BLOBFS_XATTR_START,
76 					OWNER_NONE, OBJECT_NONE, 0,
77 					SPDK_TRACE_ARG_TYPE_STR,
78 					"file:    ");
79 	spdk_trace_register_description("BLOBFS_XATTR_END",
80 					TRACE_BLOBFS_XATTR_END,
81 					OWNER_NONE, OBJECT_NONE, 0,
82 					SPDK_TRACE_ARG_TYPE_STR,
83 					"file:    ");
84 	spdk_trace_register_description("BLOBFS_OPEN",
85 					TRACE_BLOBFS_OPEN,
86 					OWNER_NONE, OBJECT_NONE, 0,
87 					SPDK_TRACE_ARG_TYPE_STR,
88 					"file:    ");
89 	spdk_trace_register_description("BLOBFS_CLOSE",
90 					TRACE_BLOBFS_CLOSE,
91 					OWNER_NONE, OBJECT_NONE, 0,
92 					SPDK_TRACE_ARG_TYPE_STR,
93 					"file:    ");
94 	spdk_trace_register_description("BLOBFS_DELETE_START",
95 					TRACE_BLOBFS_DELETE_START,
96 					OWNER_NONE, OBJECT_NONE, 0,
97 					SPDK_TRACE_ARG_TYPE_STR,
98 					"file:    ");
99 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
100 					TRACE_BLOBFS_DELETE_DONE,
101 					OWNER_NONE, OBJECT_NONE, 0,
102 					SPDK_TRACE_ARG_TYPE_STR,
103 					"file:    ");
104 }
105 
106 void
107 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
108 {
109 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
110 	free(cache_buffer);
111 }
112 
113 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
114 
115 struct spdk_file {
116 	struct spdk_filesystem	*fs;
117 	struct spdk_blob	*blob;
118 	char			*name;
119 	uint64_t		trace_arg_name;
120 	uint64_t		length;
121 	bool                    is_deleted;
122 	bool			open_for_writing;
123 	uint64_t		length_flushed;
124 	uint64_t		append_pos;
125 	uint64_t		seq_byte_count;
126 	uint64_t		next_seq_offset;
127 	uint32_t		priority;
128 	TAILQ_ENTRY(spdk_file)	tailq;
129 	spdk_blob_id		blobid;
130 	uint32_t		ref_count;
131 	pthread_spinlock_t	lock;
132 	struct cache_buffer	*last;
133 	struct cache_tree	*tree;
134 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
135 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
136 	TAILQ_ENTRY(spdk_file)	cache_tailq;
137 };
138 
139 struct spdk_deleted_file {
140 	spdk_blob_id	id;
141 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
142 };
143 
144 struct spdk_filesystem {
145 	struct spdk_blob_store	*bs;
146 	TAILQ_HEAD(, spdk_file)	files;
147 	struct spdk_bs_opts	bs_opts;
148 	struct spdk_bs_dev	*bdev;
149 	fs_send_request_fn	send_request;
150 
151 	struct {
152 		uint32_t		max_ops;
153 		struct spdk_io_channel	*sync_io_channel;
154 		struct spdk_fs_channel	*sync_fs_channel;
155 	} sync_target;
156 
157 	struct {
158 		uint32_t		max_ops;
159 		struct spdk_io_channel	*md_io_channel;
160 		struct spdk_fs_channel	*md_fs_channel;
161 	} md_target;
162 
163 	struct {
164 		uint32_t		max_ops;
165 	} io_target;
166 };
167 
168 struct spdk_fs_cb_args {
169 	union {
170 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
171 		spdk_fs_op_complete			fs_op;
172 		spdk_file_op_with_handle_complete	file_op_with_handle;
173 		spdk_file_op_complete			file_op;
174 		spdk_file_stat_op_complete		stat_op;
175 	} fn;
176 	void *arg;
177 	sem_t *sem;
178 	struct spdk_filesystem *fs;
179 	struct spdk_file *file;
180 	int rc;
181 	struct iovec *iovs;
182 	uint32_t iovcnt;
183 	struct iovec iov;
184 	union {
185 		struct {
186 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
187 		} fs_load;
188 		struct {
189 			uint64_t	length;
190 		} truncate;
191 		struct {
192 			struct spdk_io_channel	*channel;
193 			void		*pin_buf;
194 			int		is_read;
195 			off_t		offset;
196 			size_t		length;
197 			uint64_t	start_lba;
198 			uint64_t	num_lba;
199 			uint32_t	blocklen;
200 		} rw;
201 		struct {
202 			const char	*old_name;
203 			const char	*new_name;
204 		} rename;
205 		struct {
206 			struct cache_buffer	*cache_buffer;
207 			uint64_t		length;
208 		} flush;
209 		struct {
210 			struct cache_buffer	*cache_buffer;
211 			uint64_t		length;
212 			uint64_t		offset;
213 		} readahead;
214 		struct {
215 			uint64_t			offset;
216 			TAILQ_ENTRY(spdk_fs_request)	tailq;
217 			bool				xattr_in_progress;
218 		} sync;
219 		struct {
220 			uint32_t			num_clusters;
221 		} resize;
222 		struct {
223 			const char	*name;
224 			uint32_t	flags;
225 			TAILQ_ENTRY(spdk_fs_request)	tailq;
226 		} open;
227 		struct {
228 			const char		*name;
229 			struct spdk_blob	*blob;
230 		} create;
231 		struct {
232 			const char	*name;
233 		} delete;
234 		struct {
235 			const char	*name;
236 		} stat;
237 	} op;
238 };
239 
240 static void cache_free_buffers(struct spdk_file *file);
241 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
242 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
243 
244 void
245 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
246 {
247 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
248 }
249 
250 static void
251 __initialize_cache(void)
252 {
253 	assert(g_cache_pool == NULL);
254 
255 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
256 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
257 					   CACHE_BUFFER_SIZE,
258 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
259 					   SPDK_ENV_SOCKET_ID_ANY);
260 	if (!g_cache_pool) {
261 		SPDK_ERRLOG("Create mempool failed, you may "
262 			    "increase the memory and try again\n");
263 		assert(false);
264 	}
265 	TAILQ_INIT(&g_caches);
266 	pthread_spin_init(&g_caches_lock, 0);
267 }
268 
269 static void
270 __free_cache(void)
271 {
272 	assert(g_cache_pool != NULL);
273 
274 	spdk_mempool_free(g_cache_pool);
275 	g_cache_pool = NULL;
276 }
277 
278 static uint64_t
279 __file_get_blob_size(struct spdk_file *file)
280 {
281 	uint64_t cluster_sz;
282 
283 	cluster_sz = file->fs->bs_opts.cluster_sz;
284 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
285 }
286 
287 struct spdk_fs_request {
288 	struct spdk_fs_cb_args		args;
289 	TAILQ_ENTRY(spdk_fs_request)	link;
290 	struct spdk_fs_channel		*channel;
291 };
292 
293 struct spdk_fs_channel {
294 	struct spdk_fs_request		*req_mem;
295 	TAILQ_HEAD(, spdk_fs_request)	reqs;
296 	sem_t				sem;
297 	struct spdk_filesystem		*fs;
298 	struct spdk_io_channel		*bs_channel;
299 	fs_send_request_fn		send_request;
300 	bool				sync;
301 	uint32_t			outstanding_reqs;
302 	pthread_spinlock_t		lock;
303 };
304 
305 /* For now, this is effectively an alias. But eventually we'll shift
306  * some data members over. */
307 struct spdk_fs_thread_ctx {
308 	struct spdk_fs_channel	ch;
309 };
310 
311 static struct spdk_fs_request *
312 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
313 {
314 	struct spdk_fs_request *req;
315 	struct iovec *iovs = NULL;
316 
317 	if (iovcnt > 1) {
318 		iovs = calloc(iovcnt, sizeof(struct iovec));
319 		if (!iovs) {
320 			return NULL;
321 		}
322 	}
323 
324 	if (channel->sync) {
325 		pthread_spin_lock(&channel->lock);
326 	}
327 
328 	req = TAILQ_FIRST(&channel->reqs);
329 	if (req) {
330 		channel->outstanding_reqs++;
331 		TAILQ_REMOVE(&channel->reqs, req, link);
332 	}
333 
334 	if (channel->sync) {
335 		pthread_spin_unlock(&channel->lock);
336 	}
337 
338 	if (req == NULL) {
339 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
340 		free(iovs);
341 		return NULL;
342 	}
343 	memset(req, 0, sizeof(*req));
344 	req->channel = channel;
345 	if (iovcnt > 1) {
346 		req->args.iovs = iovs;
347 	} else {
348 		req->args.iovs = &req->args.iov;
349 	}
350 	req->args.iovcnt = iovcnt;
351 
352 	return req;
353 }
354 
355 static struct spdk_fs_request *
356 alloc_fs_request(struct spdk_fs_channel *channel)
357 {
358 	return alloc_fs_request_with_iov(channel, 0);
359 }
360 
361 static void
362 free_fs_request(struct spdk_fs_request *req)
363 {
364 	struct spdk_fs_channel *channel = req->channel;
365 
366 	if (req->args.iovcnt > 1) {
367 		free(req->args.iovs);
368 	}
369 
370 	if (channel->sync) {
371 		pthread_spin_lock(&channel->lock);
372 	}
373 
374 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
375 	channel->outstanding_reqs--;
376 
377 	if (channel->sync) {
378 		pthread_spin_unlock(&channel->lock);
379 	}
380 }
381 
382 static int
383 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
384 			uint32_t max_ops)
385 {
386 	uint32_t i;
387 
388 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
389 	if (!channel->req_mem) {
390 		return -1;
391 	}
392 
393 	channel->outstanding_reqs = 0;
394 	TAILQ_INIT(&channel->reqs);
395 	sem_init(&channel->sem, 0, 0);
396 
397 	for (i = 0; i < max_ops; i++) {
398 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
399 	}
400 
401 	channel->fs = fs;
402 
403 	return 0;
404 }
405 
406 static int
407 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
408 {
409 	struct spdk_filesystem		*fs;
410 	struct spdk_fs_channel		*channel = ctx_buf;
411 
412 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
413 
414 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
415 }
416 
417 static int
418 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
419 {
420 	struct spdk_filesystem		*fs;
421 	struct spdk_fs_channel		*channel = ctx_buf;
422 
423 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
424 
425 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
426 }
427 
428 static int
429 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
430 {
431 	struct spdk_filesystem		*fs;
432 	struct spdk_fs_channel		*channel = ctx_buf;
433 
434 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
435 
436 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
437 }
438 
439 static void
440 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
441 {
442 	struct spdk_fs_channel *channel = ctx_buf;
443 
444 	if (channel->outstanding_reqs > 0) {
445 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
446 			    channel->outstanding_reqs);
447 	}
448 
449 	free(channel->req_mem);
450 	if (channel->bs_channel != NULL) {
451 		spdk_bs_free_io_channel(channel->bs_channel);
452 	}
453 }
454 
455 static void
456 __send_request_direct(fs_request_fn fn, void *arg)
457 {
458 	fn(arg);
459 }
460 
461 static void
462 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
463 {
464 	fs->bs = bs;
465 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
466 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
467 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
468 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
469 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
470 
471 	pthread_mutex_lock(&g_cache_init_lock);
472 	if (g_fs_count == 0) {
473 		__initialize_cache();
474 	}
475 	g_fs_count++;
476 	pthread_mutex_unlock(&g_cache_init_lock);
477 }
478 
479 static void
480 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
481 {
482 	struct spdk_fs_request *req = ctx;
483 	struct spdk_fs_cb_args *args = &req->args;
484 	struct spdk_filesystem *fs = args->fs;
485 
486 	if (bserrno == 0) {
487 		common_fs_bs_init(fs, bs);
488 	} else {
489 		free(fs);
490 		fs = NULL;
491 	}
492 
493 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
494 	free_fs_request(req);
495 }
496 
497 static void
498 fs_conf_parse(void)
499 {
500 	struct spdk_conf_section *sp;
501 
502 	sp = spdk_conf_find_section(NULL, "Blobfs");
503 	if (sp == NULL) {
504 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
505 		return;
506 	}
507 
508 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
509 	if (g_fs_cache_buffer_shift <= 0) {
510 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
511 	}
512 }
513 
514 static struct spdk_filesystem *
515 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
516 {
517 	struct spdk_filesystem *fs;
518 
519 	fs = calloc(1, sizeof(*fs));
520 	if (fs == NULL) {
521 		return NULL;
522 	}
523 
524 	fs->bdev = dev;
525 	fs->send_request = send_request_fn;
526 	TAILQ_INIT(&fs->files);
527 
528 	fs->md_target.max_ops = 512;
529 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
530 				sizeof(struct spdk_fs_channel), "blobfs_md");
531 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
532 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
533 
534 	fs->sync_target.max_ops = 512;
535 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
536 				sizeof(struct spdk_fs_channel), "blobfs_sync");
537 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
538 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
539 
540 	fs->io_target.max_ops = 512;
541 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
542 				sizeof(struct spdk_fs_channel), "blobfs_io");
543 
544 	return fs;
545 }
546 
547 static void
548 __wake_caller(void *arg, int fserrno)
549 {
550 	struct spdk_fs_cb_args *args = arg;
551 
552 	args->rc = fserrno;
553 	sem_post(args->sem);
554 }
555 
556 void
557 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
558 	     fs_send_request_fn send_request_fn,
559 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
560 {
561 	struct spdk_filesystem *fs;
562 	struct spdk_fs_request *req;
563 	struct spdk_fs_cb_args *args;
564 	struct spdk_bs_opts opts = {};
565 
566 	fs = fs_alloc(dev, send_request_fn);
567 	if (fs == NULL) {
568 		cb_fn(cb_arg, NULL, -ENOMEM);
569 		return;
570 	}
571 
572 	fs_conf_parse();
573 
574 	req = alloc_fs_request(fs->md_target.md_fs_channel);
575 	if (req == NULL) {
576 		spdk_fs_free_io_channels(fs);
577 		spdk_fs_io_device_unregister(fs);
578 		cb_fn(cb_arg, NULL, -ENOMEM);
579 		return;
580 	}
581 
582 	args = &req->args;
583 	args->fn.fs_op_with_handle = cb_fn;
584 	args->arg = cb_arg;
585 	args->fs = fs;
586 
587 	spdk_bs_opts_init(&opts);
588 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
589 	if (opt) {
590 		opts.cluster_sz = opt->cluster_sz;
591 	}
592 	spdk_bs_init(dev, &opts, init_cb, req);
593 }
594 
595 static struct spdk_file *
596 file_alloc(struct spdk_filesystem *fs)
597 {
598 	struct spdk_file *file;
599 
600 	file = calloc(1, sizeof(*file));
601 	if (file == NULL) {
602 		return NULL;
603 	}
604 
605 	file->tree = calloc(1, sizeof(*file->tree));
606 	if (file->tree == NULL) {
607 		free(file);
608 		return NULL;
609 	}
610 
611 	file->fs = fs;
612 	TAILQ_INIT(&file->open_requests);
613 	TAILQ_INIT(&file->sync_requests);
614 	pthread_spin_init(&file->lock, 0);
615 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
616 	file->priority = SPDK_FILE_PRIORITY_LOW;
617 	return file;
618 }
619 
620 static void fs_load_done(void *ctx, int bserrno);
621 
622 static int
623 _handle_deleted_files(struct spdk_fs_request *req)
624 {
625 	struct spdk_fs_cb_args *args = &req->args;
626 	struct spdk_filesystem *fs = args->fs;
627 
628 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
629 		struct spdk_deleted_file *deleted_file;
630 
631 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
632 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
633 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
634 		free(deleted_file);
635 		return 0;
636 	}
637 
638 	return 1;
639 }
640 
641 static void
642 fs_load_done(void *ctx, int bserrno)
643 {
644 	struct spdk_fs_request *req = ctx;
645 	struct spdk_fs_cb_args *args = &req->args;
646 	struct spdk_filesystem *fs = args->fs;
647 
648 	/* The filesystem has been loaded.  Now check if there are any files that
649 	 *  were marked for deletion before last unload.  Do not complete the
650 	 *  fs_load callback until all of them have been deleted on disk.
651 	 */
652 	if (_handle_deleted_files(req) == 0) {
653 		/* We found a file that's been marked for deleting but not actually
654 		 *  deleted yet.  This function will get called again once the delete
655 		 *  operation is completed.
656 		 */
657 		return;
658 	}
659 
660 	args->fn.fs_op_with_handle(args->arg, fs, 0);
661 	free_fs_request(req);
662 
663 }
664 
665 static void
666 _file_build_trace_arg_name(struct spdk_file *f)
667 {
668 	f->trace_arg_name = 0;
669 	memcpy(&f->trace_arg_name, f->name,
670 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
671 }
672 
673 static void
674 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
675 {
676 	struct spdk_fs_request *req = ctx;
677 	struct spdk_fs_cb_args *args = &req->args;
678 	struct spdk_filesystem *fs = args->fs;
679 	uint64_t *length;
680 	const char *name;
681 	uint32_t *is_deleted;
682 	size_t value_len;
683 
684 	if (rc < 0) {
685 		args->fn.fs_op_with_handle(args->arg, fs, rc);
686 		free_fs_request(req);
687 		return;
688 	}
689 
690 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
691 	if (rc < 0) {
692 		args->fn.fs_op_with_handle(args->arg, fs, rc);
693 		free_fs_request(req);
694 		return;
695 	}
696 
697 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
698 	if (rc < 0) {
699 		args->fn.fs_op_with_handle(args->arg, fs, rc);
700 		free_fs_request(req);
701 		return;
702 	}
703 
704 	assert(value_len == 8);
705 
706 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
707 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
708 	if (rc < 0) {
709 		struct spdk_file *f;
710 
711 		f = file_alloc(fs);
712 		if (f == NULL) {
713 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
714 			free_fs_request(req);
715 			return;
716 		}
717 
718 		f->name = strdup(name);
719 		_file_build_trace_arg_name(f);
720 		f->blobid = spdk_blob_get_id(blob);
721 		f->length = *length;
722 		f->length_flushed = *length;
723 		f->append_pos = *length;
724 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
725 	} else {
726 		struct spdk_deleted_file *deleted_file;
727 
728 		deleted_file = calloc(1, sizeof(*deleted_file));
729 		if (deleted_file == NULL) {
730 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
731 			free_fs_request(req);
732 			return;
733 		}
734 		deleted_file->id = spdk_blob_get_id(blob);
735 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
736 	}
737 }
738 
739 static void
740 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
741 {
742 	struct spdk_fs_request *req = ctx;
743 	struct spdk_fs_cb_args *args = &req->args;
744 	struct spdk_filesystem *fs = args->fs;
745 	struct spdk_bs_type bstype;
746 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
747 	static const struct spdk_bs_type zeros;
748 
749 	if (bserrno != 0) {
750 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
751 		free_fs_request(req);
752 		free(fs);
753 		return;
754 	}
755 
756 	bstype = spdk_bs_get_bstype(bs);
757 
758 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
759 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
760 		spdk_bs_set_bstype(bs, blobfs_type);
761 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
762 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
763 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
764 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
765 		free_fs_request(req);
766 		free(fs);
767 		return;
768 	}
769 
770 	common_fs_bs_init(fs, bs);
771 	fs_load_done(req, 0);
772 }
773 
774 static void
775 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
776 {
777 	assert(fs != NULL);
778 	spdk_io_device_unregister(&fs->md_target, NULL);
779 	spdk_io_device_unregister(&fs->sync_target, NULL);
780 	spdk_io_device_unregister(&fs->io_target, NULL);
781 	free(fs);
782 }
783 
784 static void
785 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
786 {
787 	assert(fs != NULL);
788 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
789 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
790 }
791 
792 void
793 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
794 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
795 {
796 	struct spdk_filesystem *fs;
797 	struct spdk_fs_cb_args *args;
798 	struct spdk_fs_request *req;
799 	struct spdk_bs_opts	bs_opts;
800 
801 	fs = fs_alloc(dev, send_request_fn);
802 	if (fs == NULL) {
803 		cb_fn(cb_arg, NULL, -ENOMEM);
804 		return;
805 	}
806 
807 	fs_conf_parse();
808 
809 	req = alloc_fs_request(fs->md_target.md_fs_channel);
810 	if (req == NULL) {
811 		spdk_fs_free_io_channels(fs);
812 		spdk_fs_io_device_unregister(fs);
813 		cb_fn(cb_arg, NULL, -ENOMEM);
814 		return;
815 	}
816 
817 	args = &req->args;
818 	args->fn.fs_op_with_handle = cb_fn;
819 	args->arg = cb_arg;
820 	args->fs = fs;
821 	TAILQ_INIT(&args->op.fs_load.deleted_files);
822 	spdk_bs_opts_init(&bs_opts);
823 	bs_opts.iter_cb_fn = iter_cb;
824 	bs_opts.iter_cb_arg = req;
825 	spdk_bs_load(dev, &bs_opts, load_cb, req);
826 }
827 
828 static void
829 unload_cb(void *ctx, int bserrno)
830 {
831 	struct spdk_fs_request *req = ctx;
832 	struct spdk_fs_cb_args *args = &req->args;
833 	struct spdk_filesystem *fs = args->fs;
834 	struct spdk_file *file, *tmp;
835 
836 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
837 		TAILQ_REMOVE(&fs->files, file, tailq);
838 		cache_free_buffers(file);
839 		free(file->name);
840 		free(file->tree);
841 		free(file);
842 	}
843 
844 	pthread_mutex_lock(&g_cache_init_lock);
845 	g_fs_count--;
846 	if (g_fs_count == 0) {
847 		__free_cache();
848 	}
849 	pthread_mutex_unlock(&g_cache_init_lock);
850 
851 	args->fn.fs_op(args->arg, bserrno);
852 	free(req);
853 
854 	spdk_fs_io_device_unregister(fs);
855 }
856 
857 void
858 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
859 {
860 	struct spdk_fs_request *req;
861 	struct spdk_fs_cb_args *args;
862 
863 	/*
864 	 * We must free the md_channel before unloading the blobstore, so just
865 	 *  allocate this request from the general heap.
866 	 */
867 	req = calloc(1, sizeof(*req));
868 	if (req == NULL) {
869 		cb_fn(cb_arg, -ENOMEM);
870 		return;
871 	}
872 
873 	args = &req->args;
874 	args->fn.fs_op = cb_fn;
875 	args->arg = cb_arg;
876 	args->fs = fs;
877 
878 	spdk_fs_free_io_channels(fs);
879 	spdk_bs_unload(fs->bs, unload_cb, req);
880 }
881 
882 static struct spdk_file *
883 fs_find_file(struct spdk_filesystem *fs, const char *name)
884 {
885 	struct spdk_file *file;
886 
887 	TAILQ_FOREACH(file, &fs->files, tailq) {
888 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
889 			return file;
890 		}
891 	}
892 
893 	return NULL;
894 }
895 
896 void
897 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
898 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
899 {
900 	struct spdk_file_stat stat;
901 	struct spdk_file *f = NULL;
902 
903 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
904 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
905 		return;
906 	}
907 
908 	f = fs_find_file(fs, name);
909 	if (f != NULL) {
910 		stat.blobid = f->blobid;
911 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
912 		cb_fn(cb_arg, &stat, 0);
913 		return;
914 	}
915 
916 	cb_fn(cb_arg, NULL, -ENOENT);
917 }
918 
919 static void
920 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
921 {
922 	struct spdk_fs_request *req = arg;
923 	struct spdk_fs_cb_args *args = &req->args;
924 
925 	args->rc = fserrno;
926 	if (fserrno == 0) {
927 		memcpy(args->arg, stat, sizeof(*stat));
928 	}
929 	sem_post(args->sem);
930 }
931 
932 static void
933 __file_stat(void *arg)
934 {
935 	struct spdk_fs_request *req = arg;
936 	struct spdk_fs_cb_args *args = &req->args;
937 
938 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
939 				args->fn.stat_op, req);
940 }
941 
942 int
943 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
944 		  const char *name, struct spdk_file_stat *stat)
945 {
946 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
947 	struct spdk_fs_request *req;
948 	int rc;
949 
950 	req = alloc_fs_request(channel);
951 	if (req == NULL) {
952 		return -ENOMEM;
953 	}
954 
955 	req->args.fs = fs;
956 	req->args.op.stat.name = name;
957 	req->args.fn.stat_op = __copy_stat;
958 	req->args.arg = stat;
959 	req->args.sem = &channel->sem;
960 	channel->send_request(__file_stat, req);
961 	sem_wait(&channel->sem);
962 
963 	rc = req->args.rc;
964 	free_fs_request(req);
965 
966 	return rc;
967 }
968 
969 static void
970 fs_create_blob_close_cb(void *ctx, int bserrno)
971 {
972 	int rc;
973 	struct spdk_fs_request *req = ctx;
974 	struct spdk_fs_cb_args *args = &req->args;
975 
976 	rc = args->rc ? args->rc : bserrno;
977 	args->fn.file_op(args->arg, rc);
978 	free_fs_request(req);
979 }
980 
981 static void
982 fs_create_blob_resize_cb(void *ctx, int bserrno)
983 {
984 	struct spdk_fs_request *req = ctx;
985 	struct spdk_fs_cb_args *args = &req->args;
986 	struct spdk_file *f = args->file;
987 	struct spdk_blob *blob = args->op.create.blob;
988 	uint64_t length = 0;
989 
990 	args->rc = bserrno;
991 	if (bserrno) {
992 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
993 		return;
994 	}
995 
996 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
997 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
998 
999 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1000 }
1001 
1002 static void
1003 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1004 {
1005 	struct spdk_fs_request *req = ctx;
1006 	struct spdk_fs_cb_args *args = &req->args;
1007 
1008 	if (bserrno) {
1009 		args->fn.file_op(args->arg, bserrno);
1010 		free_fs_request(req);
1011 		return;
1012 	}
1013 
1014 	args->op.create.blob = blob;
1015 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1016 }
1017 
1018 static void
1019 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1020 {
1021 	struct spdk_fs_request *req = ctx;
1022 	struct spdk_fs_cb_args *args = &req->args;
1023 	struct spdk_file *f = args->file;
1024 
1025 	if (bserrno) {
1026 		args->fn.file_op(args->arg, bserrno);
1027 		free_fs_request(req);
1028 		return;
1029 	}
1030 
1031 	f->blobid = blobid;
1032 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1033 }
1034 
1035 void
1036 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1037 			  spdk_file_op_complete cb_fn, void *cb_arg)
1038 {
1039 	struct spdk_file *file;
1040 	struct spdk_fs_request *req;
1041 	struct spdk_fs_cb_args *args;
1042 
1043 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1044 		cb_fn(cb_arg, -ENAMETOOLONG);
1045 		return;
1046 	}
1047 
1048 	file = fs_find_file(fs, name);
1049 	if (file != NULL) {
1050 		cb_fn(cb_arg, -EEXIST);
1051 		return;
1052 	}
1053 
1054 	file = file_alloc(fs);
1055 	if (file == NULL) {
1056 		cb_fn(cb_arg, -ENOMEM);
1057 		return;
1058 	}
1059 
1060 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1061 	if (req == NULL) {
1062 		cb_fn(cb_arg, -ENOMEM);
1063 		return;
1064 	}
1065 
1066 	args = &req->args;
1067 	args->file = file;
1068 	args->fn.file_op = cb_fn;
1069 	args->arg = cb_arg;
1070 
1071 	file->name = strdup(name);
1072 	_file_build_trace_arg_name(file);
1073 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1074 }
1075 
1076 static void
1077 __fs_create_file_done(void *arg, int fserrno)
1078 {
1079 	struct spdk_fs_request *req = arg;
1080 	struct spdk_fs_cb_args *args = &req->args;
1081 
1082 	args->rc = fserrno;
1083 	sem_post(args->sem);
1084 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1085 }
1086 
1087 static void
1088 __fs_create_file(void *arg)
1089 {
1090 	struct spdk_fs_request *req = arg;
1091 	struct spdk_fs_cb_args *args = &req->args;
1092 
1093 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1094 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1095 }
1096 
1097 int
1098 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1099 {
1100 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1101 	struct spdk_fs_request *req;
1102 	struct spdk_fs_cb_args *args;
1103 	int rc;
1104 
1105 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1106 
1107 	req = alloc_fs_request(channel);
1108 	if (req == NULL) {
1109 		return -ENOMEM;
1110 	}
1111 
1112 	args = &req->args;
1113 	args->fs = fs;
1114 	args->op.create.name = name;
1115 	args->sem = &channel->sem;
1116 	fs->send_request(__fs_create_file, req);
1117 	sem_wait(&channel->sem);
1118 	rc = args->rc;
1119 	free_fs_request(req);
1120 
1121 	return rc;
1122 }
1123 
1124 static void
1125 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1126 {
1127 	struct spdk_fs_request *req = ctx;
1128 	struct spdk_fs_cb_args *args = &req->args;
1129 	struct spdk_file *f = args->file;
1130 
1131 	f->blob = blob;
1132 	while (!TAILQ_EMPTY(&f->open_requests)) {
1133 		req = TAILQ_FIRST(&f->open_requests);
1134 		args = &req->args;
1135 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1136 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1137 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1138 		free_fs_request(req);
1139 	}
1140 }
1141 
1142 static void
1143 fs_open_blob_create_cb(void *ctx, int bserrno)
1144 {
1145 	struct spdk_fs_request *req = ctx;
1146 	struct spdk_fs_cb_args *args = &req->args;
1147 	struct spdk_file *file = args->file;
1148 	struct spdk_filesystem *fs = args->fs;
1149 
1150 	if (file == NULL) {
1151 		/*
1152 		 * This is from an open with CREATE flag - the file
1153 		 *  is now created so look it up in the file list for this
1154 		 *  filesystem.
1155 		 */
1156 		file = fs_find_file(fs, args->op.open.name);
1157 		assert(file != NULL);
1158 		args->file = file;
1159 	}
1160 
1161 	file->ref_count++;
1162 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1163 	if (file->ref_count == 1) {
1164 		assert(file->blob == NULL);
1165 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1166 	} else if (file->blob != NULL) {
1167 		fs_open_blob_done(req, file->blob, 0);
1168 	} else {
1169 		/*
1170 		 * The blob open for this file is in progress due to a previous
1171 		 *  open request.  When that open completes, it will invoke the
1172 		 *  open callback for this request.
1173 		 */
1174 	}
1175 }
1176 
1177 void
1178 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1179 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1180 {
1181 	struct spdk_file *f = NULL;
1182 	struct spdk_fs_request *req;
1183 	struct spdk_fs_cb_args *args;
1184 
1185 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1186 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1187 		return;
1188 	}
1189 
1190 	f = fs_find_file(fs, name);
1191 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1192 		cb_fn(cb_arg, NULL, -ENOENT);
1193 		return;
1194 	}
1195 
1196 	if (f != NULL && f->is_deleted == true) {
1197 		cb_fn(cb_arg, NULL, -ENOENT);
1198 		return;
1199 	}
1200 
1201 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1202 	if (req == NULL) {
1203 		cb_fn(cb_arg, NULL, -ENOMEM);
1204 		return;
1205 	}
1206 
1207 	args = &req->args;
1208 	args->fn.file_op_with_handle = cb_fn;
1209 	args->arg = cb_arg;
1210 	args->file = f;
1211 	args->fs = fs;
1212 	args->op.open.name = name;
1213 
1214 	if (f == NULL) {
1215 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1216 	} else {
1217 		fs_open_blob_create_cb(req, 0);
1218 	}
1219 }
1220 
1221 static void
1222 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1223 {
1224 	struct spdk_fs_request *req = arg;
1225 	struct spdk_fs_cb_args *args = &req->args;
1226 
1227 	args->file = file;
1228 	__wake_caller(args, bserrno);
1229 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1230 }
1231 
1232 static void
1233 __fs_open_file(void *arg)
1234 {
1235 	struct spdk_fs_request *req = arg;
1236 	struct spdk_fs_cb_args *args = &req->args;
1237 
1238 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1239 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1240 				__fs_open_file_done, req);
1241 }
1242 
1243 int
1244 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1245 		  const char *name, uint32_t flags, struct spdk_file **file)
1246 {
1247 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1248 	struct spdk_fs_request *req;
1249 	struct spdk_fs_cb_args *args;
1250 	int rc;
1251 
1252 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1253 
1254 	req = alloc_fs_request(channel);
1255 	if (req == NULL) {
1256 		return -ENOMEM;
1257 	}
1258 
1259 	args = &req->args;
1260 	args->fs = fs;
1261 	args->op.open.name = name;
1262 	args->op.open.flags = flags;
1263 	args->sem = &channel->sem;
1264 	fs->send_request(__fs_open_file, req);
1265 	sem_wait(&channel->sem);
1266 	rc = args->rc;
1267 	if (rc == 0) {
1268 		*file = args->file;
1269 	} else {
1270 		*file = NULL;
1271 	}
1272 	free_fs_request(req);
1273 
1274 	return rc;
1275 }
1276 
1277 static void
1278 fs_rename_blob_close_cb(void *ctx, int bserrno)
1279 {
1280 	struct spdk_fs_request *req = ctx;
1281 	struct spdk_fs_cb_args *args = &req->args;
1282 
1283 	args->fn.fs_op(args->arg, bserrno);
1284 	free_fs_request(req);
1285 }
1286 
1287 static void
1288 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1289 {
1290 	struct spdk_fs_request *req = ctx;
1291 	struct spdk_fs_cb_args *args = &req->args;
1292 	const char *new_name = args->op.rename.new_name;
1293 
1294 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1295 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1296 }
1297 
1298 static void
1299 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1300 {
1301 	struct spdk_fs_cb_args *args = &req->args;
1302 	struct spdk_file *f;
1303 
1304 	f = fs_find_file(args->fs, args->op.rename.old_name);
1305 	if (f == NULL) {
1306 		args->fn.fs_op(args->arg, -ENOENT);
1307 		free_fs_request(req);
1308 		return;
1309 	}
1310 
1311 	free(f->name);
1312 	f->name = strdup(args->op.rename.new_name);
1313 	_file_build_trace_arg_name(f);
1314 	args->file = f;
1315 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1316 }
1317 
1318 static void
1319 fs_rename_delete_done(void *arg, int fserrno)
1320 {
1321 	__spdk_fs_md_rename_file(arg);
1322 }
1323 
1324 void
1325 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1326 			  const char *old_name, const char *new_name,
1327 			  spdk_file_op_complete cb_fn, void *cb_arg)
1328 {
1329 	struct spdk_file *f;
1330 	struct spdk_fs_request *req;
1331 	struct spdk_fs_cb_args *args;
1332 
1333 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1334 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1335 		cb_fn(cb_arg, -ENAMETOOLONG);
1336 		return;
1337 	}
1338 
1339 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1340 	if (req == NULL) {
1341 		cb_fn(cb_arg, -ENOMEM);
1342 		return;
1343 	}
1344 
1345 	args = &req->args;
1346 	args->fn.fs_op = cb_fn;
1347 	args->fs = fs;
1348 	args->arg = cb_arg;
1349 	args->op.rename.old_name = old_name;
1350 	args->op.rename.new_name = new_name;
1351 
1352 	f = fs_find_file(fs, new_name);
1353 	if (f == NULL) {
1354 		__spdk_fs_md_rename_file(req);
1355 		return;
1356 	}
1357 
1358 	/*
1359 	 * The rename overwrites an existing file.  So delete the existing file, then
1360 	 *  do the actual rename.
1361 	 */
1362 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1363 }
1364 
1365 static void
1366 __fs_rename_file_done(void *arg, int fserrno)
1367 {
1368 	struct spdk_fs_request *req = arg;
1369 	struct spdk_fs_cb_args *args = &req->args;
1370 
1371 	__wake_caller(args, fserrno);
1372 }
1373 
1374 static void
1375 __fs_rename_file(void *arg)
1376 {
1377 	struct spdk_fs_request *req = arg;
1378 	struct spdk_fs_cb_args *args = &req->args;
1379 
1380 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1381 				  __fs_rename_file_done, req);
1382 }
1383 
1384 int
1385 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1386 		    const char *old_name, const char *new_name)
1387 {
1388 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1389 	struct spdk_fs_request *req;
1390 	struct spdk_fs_cb_args *args;
1391 	int rc;
1392 
1393 	req = alloc_fs_request(channel);
1394 	if (req == NULL) {
1395 		return -ENOMEM;
1396 	}
1397 
1398 	args = &req->args;
1399 
1400 	args->fs = fs;
1401 	args->op.rename.old_name = old_name;
1402 	args->op.rename.new_name = new_name;
1403 	args->sem = &channel->sem;
1404 	fs->send_request(__fs_rename_file, req);
1405 	sem_wait(&channel->sem);
1406 	rc = args->rc;
1407 	free_fs_request(req);
1408 	return rc;
1409 }
1410 
1411 static void
1412 blob_delete_cb(void *ctx, int bserrno)
1413 {
1414 	struct spdk_fs_request *req = ctx;
1415 	struct spdk_fs_cb_args *args = &req->args;
1416 
1417 	args->fn.file_op(args->arg, bserrno);
1418 	free_fs_request(req);
1419 }
1420 
1421 void
1422 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1423 			  spdk_file_op_complete cb_fn, void *cb_arg)
1424 {
1425 	struct spdk_file *f;
1426 	spdk_blob_id blobid;
1427 	struct spdk_fs_request *req;
1428 	struct spdk_fs_cb_args *args;
1429 
1430 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1431 
1432 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1433 		cb_fn(cb_arg, -ENAMETOOLONG);
1434 		return;
1435 	}
1436 
1437 	f = fs_find_file(fs, name);
1438 	if (f == NULL) {
1439 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot find the file=%s to deleted\n", name);
1440 		cb_fn(cb_arg, -ENOENT);
1441 		return;
1442 	}
1443 
1444 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1445 	if (req == NULL) {
1446 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate the req for the file=%s to deleted\n", name);
1447 		cb_fn(cb_arg, -ENOMEM);
1448 		return;
1449 	}
1450 
1451 	args = &req->args;
1452 	args->fn.file_op = cb_fn;
1453 	args->arg = cb_arg;
1454 
1455 	if (f->ref_count > 0) {
1456 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1457 		f->is_deleted = true;
1458 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1459 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1460 		return;
1461 	}
1462 
1463 	TAILQ_REMOVE(&fs->files, f, tailq);
1464 
1465 	cache_free_buffers(f);
1466 
1467 	blobid = f->blobid;
1468 
1469 	free(f->name);
1470 	free(f->tree);
1471 	free(f);
1472 
1473 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1474 }
1475 
1476 static void
1477 __fs_delete_file_done(void *arg, int fserrno)
1478 {
1479 	struct spdk_fs_request *req = arg;
1480 	struct spdk_fs_cb_args *args = &req->args;
1481 
1482 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, *((uint64_t *)args->op.delete.name));
1483 	__wake_caller(args, fserrno);
1484 }
1485 
1486 static void
1487 __fs_delete_file(void *arg)
1488 {
1489 	struct spdk_fs_request *req = arg;
1490 	struct spdk_fs_cb_args *args = &req->args;
1491 
1492 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, *((uint64_t *)args->op.delete.name));
1493 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1494 }
1495 
1496 int
1497 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1498 		    const char *name)
1499 {
1500 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1501 	struct spdk_fs_request *req;
1502 	struct spdk_fs_cb_args *args;
1503 	int rc;
1504 
1505 	req = alloc_fs_request(channel);
1506 	if (req == NULL) {
1507 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1508 		return -ENOMEM;
1509 	}
1510 
1511 	args = &req->args;
1512 	args->fs = fs;
1513 	args->op.delete.name = name;
1514 	args->sem = &channel->sem;
1515 	fs->send_request(__fs_delete_file, req);
1516 	sem_wait(&channel->sem);
1517 	rc = args->rc;
1518 	free_fs_request(req);
1519 
1520 	return rc;
1521 }
1522 
1523 spdk_fs_iter
1524 spdk_fs_iter_first(struct spdk_filesystem *fs)
1525 {
1526 	struct spdk_file *f;
1527 
1528 	f = TAILQ_FIRST(&fs->files);
1529 	return f;
1530 }
1531 
1532 spdk_fs_iter
1533 spdk_fs_iter_next(spdk_fs_iter iter)
1534 {
1535 	struct spdk_file *f = iter;
1536 
1537 	if (f == NULL) {
1538 		return NULL;
1539 	}
1540 
1541 	f = TAILQ_NEXT(f, tailq);
1542 	return f;
1543 }
1544 
1545 const char *
1546 spdk_file_get_name(struct spdk_file *file)
1547 {
1548 	return file->name;
1549 }
1550 
1551 uint64_t
1552 spdk_file_get_length(struct spdk_file *file)
1553 {
1554 	uint64_t length;
1555 
1556 	assert(file != NULL);
1557 
1558 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1559 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1560 	return length;
1561 }
1562 
1563 static void
1564 fs_truncate_complete_cb(void *ctx, int bserrno)
1565 {
1566 	struct spdk_fs_request *req = ctx;
1567 	struct spdk_fs_cb_args *args = &req->args;
1568 
1569 	args->fn.file_op(args->arg, bserrno);
1570 	free_fs_request(req);
1571 }
1572 
1573 static void
1574 fs_truncate_resize_cb(void *ctx, int bserrno)
1575 {
1576 	struct spdk_fs_request *req = ctx;
1577 	struct spdk_fs_cb_args *args = &req->args;
1578 	struct spdk_file *file = args->file;
1579 	uint64_t *length = &args->op.truncate.length;
1580 
1581 	if (bserrno) {
1582 		args->fn.file_op(args->arg, bserrno);
1583 		free_fs_request(req);
1584 		return;
1585 	}
1586 
1587 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1588 
1589 	file->length = *length;
1590 	if (file->append_pos > file->length) {
1591 		file->append_pos = file->length;
1592 	}
1593 
1594 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1595 }
1596 
1597 static uint64_t
1598 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1599 {
1600 	return (length + cluster_sz - 1) / cluster_sz;
1601 }
1602 
1603 void
1604 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1605 			 spdk_file_op_complete cb_fn, void *cb_arg)
1606 {
1607 	struct spdk_filesystem *fs;
1608 	size_t num_clusters;
1609 	struct spdk_fs_request *req;
1610 	struct spdk_fs_cb_args *args;
1611 
1612 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1613 	if (length == file->length) {
1614 		cb_fn(cb_arg, 0);
1615 		return;
1616 	}
1617 
1618 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1619 	if (req == NULL) {
1620 		cb_fn(cb_arg, -ENOMEM);
1621 		return;
1622 	}
1623 
1624 	args = &req->args;
1625 	args->fn.file_op = cb_fn;
1626 	args->arg = cb_arg;
1627 	args->file = file;
1628 	args->op.truncate.length = length;
1629 	fs = file->fs;
1630 
1631 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1632 
1633 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1634 }
1635 
1636 static void
1637 __truncate(void *arg)
1638 {
1639 	struct spdk_fs_request *req = arg;
1640 	struct spdk_fs_cb_args *args = &req->args;
1641 
1642 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1643 				 args->fn.file_op, args);
1644 }
1645 
1646 int
1647 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1648 		   uint64_t length)
1649 {
1650 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1651 	struct spdk_fs_request *req;
1652 	struct spdk_fs_cb_args *args;
1653 	int rc;
1654 
1655 	req = alloc_fs_request(channel);
1656 	if (req == NULL) {
1657 		return -ENOMEM;
1658 	}
1659 
1660 	args = &req->args;
1661 
1662 	args->file = file;
1663 	args->op.truncate.length = length;
1664 	args->fn.file_op = __wake_caller;
1665 	args->sem = &channel->sem;
1666 
1667 	channel->send_request(__truncate, req);
1668 	sem_wait(&channel->sem);
1669 	rc = args->rc;
1670 	free_fs_request(req);
1671 
1672 	return rc;
1673 }
1674 
1675 static void
1676 __rw_done(void *ctx, int bserrno)
1677 {
1678 	struct spdk_fs_request *req = ctx;
1679 	struct spdk_fs_cb_args *args = &req->args;
1680 
1681 	spdk_free(args->op.rw.pin_buf);
1682 	args->fn.file_op(args->arg, bserrno);
1683 	free_fs_request(req);
1684 }
1685 
1686 static void
1687 __read_done(void *ctx, int bserrno)
1688 {
1689 	struct spdk_fs_request *req = ctx;
1690 	struct spdk_fs_cb_args *args = &req->args;
1691 
1692 	assert(req != NULL);
1693 	if (args->op.rw.is_read) {
1694 		memcpy(args->iovs[0].iov_base,
1695 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1696 		       args->iovs[0].iov_len);
1697 		__rw_done(req, 0);
1698 	} else {
1699 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1700 		       args->iovs[0].iov_base,
1701 		       args->iovs[0].iov_len);
1702 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1703 				   args->op.rw.pin_buf,
1704 				   args->op.rw.start_lba, args->op.rw.num_lba,
1705 				   __rw_done, req);
1706 	}
1707 }
1708 
1709 static void
1710 __do_blob_read(void *ctx, int fserrno)
1711 {
1712 	struct spdk_fs_request *req = ctx;
1713 	struct spdk_fs_cb_args *args = &req->args;
1714 
1715 	if (fserrno) {
1716 		__rw_done(req, fserrno);
1717 		return;
1718 	}
1719 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1720 			  args->op.rw.pin_buf,
1721 			  args->op.rw.start_lba, args->op.rw.num_lba,
1722 			  __read_done, req);
1723 }
1724 
1725 static void
1726 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1727 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1728 {
1729 	uint64_t end_lba;
1730 
1731 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1732 	*start_lba = offset / *lba_size;
1733 	end_lba = (offset + length - 1) / *lba_size;
1734 	*num_lba = (end_lba - *start_lba + 1);
1735 }
1736 
1737 static void
1738 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1739 	    void *payload, uint64_t offset, uint64_t length,
1740 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1741 {
1742 	struct spdk_fs_request *req;
1743 	struct spdk_fs_cb_args *args;
1744 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1745 	uint64_t start_lba, num_lba, pin_buf_length;
1746 	uint32_t lba_size;
1747 
1748 	if (is_read && offset + length > file->length) {
1749 		cb_fn(cb_arg, -EINVAL);
1750 		return;
1751 	}
1752 
1753 	req = alloc_fs_request_with_iov(channel, 1);
1754 	if (req == NULL) {
1755 		cb_fn(cb_arg, -ENOMEM);
1756 		return;
1757 	}
1758 
1759 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1760 
1761 	args = &req->args;
1762 	args->fn.file_op = cb_fn;
1763 	args->arg = cb_arg;
1764 	args->file = file;
1765 	args->op.rw.channel = channel->bs_channel;
1766 	args->iovs[0].iov_base = payload;
1767 	args->iovs[0].iov_len = (size_t)length;
1768 	args->op.rw.is_read = is_read;
1769 	args->op.rw.offset = offset;
1770 	args->op.rw.blocklen = lba_size;
1771 
1772 	pin_buf_length = num_lba * lba_size;
1773 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1774 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1775 	if (args->op.rw.pin_buf == NULL) {
1776 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1777 			      file->name, offset, length);
1778 		free_fs_request(req);
1779 		cb_fn(cb_arg, -ENOMEM);
1780 		return;
1781 	}
1782 
1783 	args->op.rw.start_lba = start_lba;
1784 	args->op.rw.num_lba = num_lba;
1785 
1786 	if (!is_read && file->length < offset + length) {
1787 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1788 	} else {
1789 		__do_blob_read(req, 0);
1790 	}
1791 }
1792 
1793 void
1794 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1795 		      void *payload, uint64_t offset, uint64_t length,
1796 		      spdk_file_op_complete cb_fn, void *cb_arg)
1797 {
1798 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1799 }
1800 
1801 void
1802 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1803 		     void *payload, uint64_t offset, uint64_t length,
1804 		     spdk_file_op_complete cb_fn, void *cb_arg)
1805 {
1806 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1807 		      file->name, offset, length);
1808 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1809 }
1810 
1811 struct spdk_io_channel *
1812 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1813 {
1814 	struct spdk_io_channel *io_channel;
1815 	struct spdk_fs_channel *fs_channel;
1816 
1817 	io_channel = spdk_get_io_channel(&fs->io_target);
1818 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1819 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1820 	fs_channel->send_request = __send_request_direct;
1821 
1822 	return io_channel;
1823 }
1824 
1825 void
1826 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1827 {
1828 	spdk_put_io_channel(channel);
1829 }
1830 
1831 struct spdk_fs_thread_ctx *
1832 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1833 {
1834 	struct spdk_fs_thread_ctx *ctx;
1835 
1836 	ctx = calloc(1, sizeof(*ctx));
1837 	if (!ctx) {
1838 		return NULL;
1839 	}
1840 
1841 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1842 
1843 	ctx->ch.send_request = fs->send_request;
1844 	ctx->ch.sync = 1;
1845 	pthread_spin_init(&ctx->ch.lock, 0);
1846 
1847 	return ctx;
1848 }
1849 
1850 
1851 void
1852 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1853 {
1854 	assert(ctx->ch.sync == 1);
1855 
1856 	while (true) {
1857 		pthread_spin_lock(&ctx->ch.lock);
1858 		if (ctx->ch.outstanding_reqs == 0) {
1859 			pthread_spin_unlock(&ctx->ch.lock);
1860 			break;
1861 		}
1862 		pthread_spin_unlock(&ctx->ch.lock);
1863 		usleep(1000);
1864 	}
1865 
1866 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1867 	free(ctx);
1868 }
1869 
1870 void
1871 spdk_fs_set_cache_size(uint64_t size_in_mb)
1872 {
1873 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1874 }
1875 
1876 uint64_t
1877 spdk_fs_get_cache_size(void)
1878 {
1879 	return g_fs_cache_size / (1024 * 1024);
1880 }
1881 
1882 static void __file_flush(void *ctx);
1883 
1884 static void *
1885 alloc_cache_memory_buffer(struct spdk_file *context)
1886 {
1887 	struct spdk_file *file;
1888 	void *buf;
1889 
1890 	buf = spdk_mempool_get(g_cache_pool);
1891 	if (buf != NULL) {
1892 		return buf;
1893 	}
1894 
1895 	pthread_spin_lock(&g_caches_lock);
1896 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1897 		if (!file->open_for_writing &&
1898 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1899 		    file != context) {
1900 			break;
1901 		}
1902 	}
1903 	pthread_spin_unlock(&g_caches_lock);
1904 	if (file != NULL) {
1905 		cache_free_buffers(file);
1906 		buf = spdk_mempool_get(g_cache_pool);
1907 		if (buf != NULL) {
1908 			return buf;
1909 		}
1910 	}
1911 
1912 	pthread_spin_lock(&g_caches_lock);
1913 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1914 		if (!file->open_for_writing && file != context) {
1915 			break;
1916 		}
1917 	}
1918 	pthread_spin_unlock(&g_caches_lock);
1919 	if (file != NULL) {
1920 		cache_free_buffers(file);
1921 		buf = spdk_mempool_get(g_cache_pool);
1922 		if (buf != NULL) {
1923 			return buf;
1924 		}
1925 	}
1926 
1927 	pthread_spin_lock(&g_caches_lock);
1928 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1929 		if (file != context) {
1930 			break;
1931 		}
1932 	}
1933 	pthread_spin_unlock(&g_caches_lock);
1934 	if (file != NULL) {
1935 		cache_free_buffers(file);
1936 		buf = spdk_mempool_get(g_cache_pool);
1937 		if (buf != NULL) {
1938 			return buf;
1939 		}
1940 	}
1941 
1942 	return NULL;
1943 }
1944 
1945 static struct cache_buffer *
1946 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1947 {
1948 	struct cache_buffer *buf;
1949 	int count = 0;
1950 
1951 	buf = calloc(1, sizeof(*buf));
1952 	if (buf == NULL) {
1953 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1954 		return NULL;
1955 	}
1956 
1957 	buf->buf = alloc_cache_memory_buffer(file);
1958 	while (buf->buf == NULL) {
1959 		/*
1960 		 * TODO: alloc_cache_memory_buffer() should eventually free
1961 		 *  some buffers.  Need a more sophisticated check here, instead
1962 		 *  of just bailing if 100 tries does not result in getting a
1963 		 *  free buffer.  This will involve using the sync channel's
1964 		 *  semaphore to block until a buffer becomes available.
1965 		 */
1966 		if (count++ == 100) {
1967 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1968 				    file, offset);
1969 			free(buf);
1970 			return NULL;
1971 		}
1972 		buf->buf = alloc_cache_memory_buffer(file);
1973 	}
1974 
1975 	buf->buf_size = CACHE_BUFFER_SIZE;
1976 	buf->offset = offset;
1977 
1978 	pthread_spin_lock(&g_caches_lock);
1979 	if (file->tree->present_mask == 0) {
1980 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1981 	}
1982 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1983 	pthread_spin_unlock(&g_caches_lock);
1984 
1985 	return buf;
1986 }
1987 
1988 static struct cache_buffer *
1989 cache_append_buffer(struct spdk_file *file)
1990 {
1991 	struct cache_buffer *last;
1992 
1993 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1994 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1995 
1996 	last = cache_insert_buffer(file, file->append_pos);
1997 	if (last == NULL) {
1998 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1999 		return NULL;
2000 	}
2001 
2002 	file->last = last;
2003 
2004 	return last;
2005 }
2006 
2007 static void __check_sync_reqs(struct spdk_file *file);
2008 
2009 static void
2010 __file_cache_finish_sync(void *ctx, int bserrno)
2011 {
2012 	struct spdk_file *file;
2013 	struct spdk_fs_request *sync_req = ctx;
2014 	struct spdk_fs_cb_args *sync_args;
2015 
2016 	sync_args = &sync_req->args;
2017 	file = sync_args->file;
2018 	pthread_spin_lock(&file->lock);
2019 	assert(sync_args->op.sync.offset <= file->length_flushed);
2020 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2021 			  0, file->trace_arg_name);
2022 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2023 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2024 	pthread_spin_unlock(&file->lock);
2025 
2026 	sync_args->fn.file_op(sync_args->arg, bserrno);
2027 	pthread_spin_lock(&file->lock);
2028 	free_fs_request(sync_req);
2029 	pthread_spin_unlock(&file->lock);
2030 
2031 	__check_sync_reqs(file);
2032 }
2033 
2034 static void
2035 __check_sync_reqs(struct spdk_file *file)
2036 {
2037 	struct spdk_fs_request *sync_req;
2038 
2039 	pthread_spin_lock(&file->lock);
2040 
2041 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2042 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2043 			break;
2044 		}
2045 	}
2046 
2047 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2048 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2049 		sync_req->args.op.sync.xattr_in_progress = true;
2050 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2051 				    sizeof(file->length_flushed));
2052 
2053 		pthread_spin_unlock(&file->lock);
2054 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2055 				  0, file->trace_arg_name);
2056 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2057 	} else {
2058 		pthread_spin_unlock(&file->lock);
2059 	}
2060 }
2061 
2062 static void
2063 __file_flush_done(void *ctx, int bserrno)
2064 {
2065 	struct spdk_fs_request *req = ctx;
2066 	struct spdk_fs_cb_args *args = &req->args;
2067 	struct spdk_file *file = args->file;
2068 	struct cache_buffer *next = args->op.flush.cache_buffer;
2069 
2070 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2071 
2072 	pthread_spin_lock(&file->lock);
2073 	next->in_progress = false;
2074 	next->bytes_flushed += args->op.flush.length;
2075 	file->length_flushed += args->op.flush.length;
2076 	if (file->length_flushed > file->length) {
2077 		file->length = file->length_flushed;
2078 	}
2079 	if (next->bytes_flushed == next->buf_size) {
2080 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2081 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2082 	}
2083 
2084 	/*
2085 	 * Assert that there is no cached data that extends past the end of the underlying
2086 	 *  blob.
2087 	 */
2088 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2089 	       next->bytes_filled == 0);
2090 
2091 	pthread_spin_unlock(&file->lock);
2092 
2093 	__check_sync_reqs(file);
2094 
2095 	__file_flush(req);
2096 }
2097 
2098 static void
2099 __file_flush(void *ctx)
2100 {
2101 	struct spdk_fs_request *req = ctx;
2102 	struct spdk_fs_cb_args *args = &req->args;
2103 	struct spdk_file *file = args->file;
2104 	struct cache_buffer *next;
2105 	uint64_t offset, length, start_lba, num_lba;
2106 	uint32_t lba_size;
2107 
2108 	pthread_spin_lock(&file->lock);
2109 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2110 	if (next == NULL || next->in_progress) {
2111 		/*
2112 		 * There is either no data to flush, or a flush I/O is already in
2113 		 *  progress.  So return immediately - if a flush I/O is in
2114 		 *  progress we will flush more data after that is completed.
2115 		 */
2116 		free_fs_request(req);
2117 		if (next == NULL) {
2118 			/*
2119 			 * For cases where a file's cache was evicted, and then the
2120 			 *  file was later appended, we will write the data directly
2121 			 *  to disk and bypass cache.  So just update length_flushed
2122 			 *  here to reflect that all data was already written to disk.
2123 			 */
2124 			file->length_flushed = file->append_pos;
2125 		}
2126 		pthread_spin_unlock(&file->lock);
2127 		if (next == NULL) {
2128 			/*
2129 			 * There is no data to flush, but we still need to check for any
2130 			 *  outstanding sync requests to make sure metadata gets updated.
2131 			 */
2132 			__check_sync_reqs(file);
2133 		}
2134 		return;
2135 	}
2136 
2137 	offset = next->offset + next->bytes_flushed;
2138 	length = next->bytes_filled - next->bytes_flushed;
2139 	if (length == 0) {
2140 		free_fs_request(req);
2141 		pthread_spin_unlock(&file->lock);
2142 		return;
2143 	}
2144 	args->op.flush.length = length;
2145 	args->op.flush.cache_buffer = next;
2146 
2147 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2148 
2149 	next->in_progress = true;
2150 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2151 		     offset, length, start_lba, num_lba);
2152 	pthread_spin_unlock(&file->lock);
2153 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2154 			   next->buf + (start_lba * lba_size) - next->offset,
2155 			   start_lba, num_lba, __file_flush_done, req);
2156 }
2157 
2158 static void
2159 __file_extend_done(void *arg, int bserrno)
2160 {
2161 	struct spdk_fs_cb_args *args = arg;
2162 
2163 	__wake_caller(args, bserrno);
2164 }
2165 
2166 static void
2167 __file_extend_resize_cb(void *_args, int bserrno)
2168 {
2169 	struct spdk_fs_cb_args *args = _args;
2170 	struct spdk_file *file = args->file;
2171 
2172 	if (bserrno) {
2173 		__wake_caller(args, bserrno);
2174 		return;
2175 	}
2176 
2177 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2178 }
2179 
2180 static void
2181 __file_extend_blob(void *_args)
2182 {
2183 	struct spdk_fs_cb_args *args = _args;
2184 	struct spdk_file *file = args->file;
2185 
2186 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2187 }
2188 
2189 static void
2190 __rw_from_file_done(void *ctx, int bserrno)
2191 {
2192 	struct spdk_fs_request *req = ctx;
2193 
2194 	__wake_caller(&req->args, bserrno);
2195 	free_fs_request(req);
2196 }
2197 
2198 static void
2199 __rw_from_file(void *ctx)
2200 {
2201 	struct spdk_fs_request *req = ctx;
2202 	struct spdk_fs_cb_args *args = &req->args;
2203 	struct spdk_file *file = args->file;
2204 
2205 	if (args->op.rw.is_read) {
2206 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2207 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2208 				     __rw_from_file_done, req);
2209 	} else {
2210 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2211 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2212 				      __rw_from_file_done, req);
2213 	}
2214 }
2215 
2216 static int
2217 __send_rw_from_file(struct spdk_file *file, void *payload,
2218 		    uint64_t offset, uint64_t length, bool is_read,
2219 		    struct spdk_fs_channel *channel)
2220 {
2221 	struct spdk_fs_request *req;
2222 	struct spdk_fs_cb_args *args;
2223 
2224 	req = alloc_fs_request_with_iov(channel, 1);
2225 	if (req == NULL) {
2226 		sem_post(&channel->sem);
2227 		return -ENOMEM;
2228 	}
2229 
2230 	args = &req->args;
2231 	args->file = file;
2232 	args->sem = &channel->sem;
2233 	args->iovs[0].iov_base = payload;
2234 	args->iovs[0].iov_len = (size_t)length;
2235 	args->op.rw.offset = offset;
2236 	args->op.rw.is_read = is_read;
2237 	file->fs->send_request(__rw_from_file, req);
2238 	return 0;
2239 }
2240 
2241 int
2242 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2243 		void *payload, uint64_t offset, uint64_t length)
2244 {
2245 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2246 	struct spdk_fs_request *flush_req;
2247 	uint64_t rem_length, copy, blob_size, cluster_sz;
2248 	uint32_t cache_buffers_filled = 0;
2249 	uint8_t *cur_payload;
2250 	struct cache_buffer *last;
2251 
2252 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2253 
2254 	if (length == 0) {
2255 		return 0;
2256 	}
2257 
2258 	if (offset != file->append_pos) {
2259 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2260 		return -EINVAL;
2261 	}
2262 
2263 	pthread_spin_lock(&file->lock);
2264 	file->open_for_writing = true;
2265 
2266 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2267 		cache_append_buffer(file);
2268 	}
2269 
2270 	if (file->last == NULL) {
2271 		int rc;
2272 
2273 		file->append_pos += length;
2274 		pthread_spin_unlock(&file->lock);
2275 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2276 		sem_wait(&channel->sem);
2277 		return rc;
2278 	}
2279 
2280 	blob_size = __file_get_blob_size(file);
2281 
2282 	if ((offset + length) > blob_size) {
2283 		struct spdk_fs_cb_args extend_args = {};
2284 
2285 		cluster_sz = file->fs->bs_opts.cluster_sz;
2286 		extend_args.sem = &channel->sem;
2287 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2288 		extend_args.file = file;
2289 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2290 		pthread_spin_unlock(&file->lock);
2291 		file->fs->send_request(__file_extend_blob, &extend_args);
2292 		sem_wait(&channel->sem);
2293 		if (extend_args.rc) {
2294 			return extend_args.rc;
2295 		}
2296 	}
2297 
2298 	flush_req = alloc_fs_request(channel);
2299 	if (flush_req == NULL) {
2300 		pthread_spin_unlock(&file->lock);
2301 		return -ENOMEM;
2302 	}
2303 
2304 	last = file->last;
2305 	rem_length = length;
2306 	cur_payload = payload;
2307 	while (rem_length > 0) {
2308 		copy = last->buf_size - last->bytes_filled;
2309 		if (copy > rem_length) {
2310 			copy = rem_length;
2311 		}
2312 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2313 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2314 		file->append_pos += copy;
2315 		if (file->length < file->append_pos) {
2316 			file->length = file->append_pos;
2317 		}
2318 		cur_payload += copy;
2319 		last->bytes_filled += copy;
2320 		rem_length -= copy;
2321 		if (last->bytes_filled == last->buf_size) {
2322 			cache_buffers_filled++;
2323 			last = cache_append_buffer(file);
2324 			if (last == NULL) {
2325 				BLOBFS_TRACE(file, "nomem\n");
2326 				free_fs_request(flush_req);
2327 				pthread_spin_unlock(&file->lock);
2328 				return -ENOMEM;
2329 			}
2330 		}
2331 	}
2332 
2333 	pthread_spin_unlock(&file->lock);
2334 
2335 	if (cache_buffers_filled == 0) {
2336 		free_fs_request(flush_req);
2337 		return 0;
2338 	}
2339 
2340 	flush_req->args.file = file;
2341 	file->fs->send_request(__file_flush, flush_req);
2342 	return 0;
2343 }
2344 
2345 static void
2346 __readahead_done(void *ctx, int bserrno)
2347 {
2348 	struct spdk_fs_request *req = ctx;
2349 	struct spdk_fs_cb_args *args = &req->args;
2350 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2351 	struct spdk_file *file = args->file;
2352 
2353 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2354 
2355 	pthread_spin_lock(&file->lock);
2356 	cache_buffer->bytes_filled = args->op.readahead.length;
2357 	cache_buffer->bytes_flushed = args->op.readahead.length;
2358 	cache_buffer->in_progress = false;
2359 	pthread_spin_unlock(&file->lock);
2360 
2361 	free_fs_request(req);
2362 }
2363 
2364 static void
2365 __readahead(void *ctx)
2366 {
2367 	struct spdk_fs_request *req = ctx;
2368 	struct spdk_fs_cb_args *args = &req->args;
2369 	struct spdk_file *file = args->file;
2370 	uint64_t offset, length, start_lba, num_lba;
2371 	uint32_t lba_size;
2372 
2373 	offset = args->op.readahead.offset;
2374 	length = args->op.readahead.length;
2375 	assert(length > 0);
2376 
2377 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2378 
2379 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2380 		     offset, length, start_lba, num_lba);
2381 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2382 			  args->op.readahead.cache_buffer->buf,
2383 			  start_lba, num_lba, __readahead_done, req);
2384 }
2385 
2386 static uint64_t
2387 __next_cache_buffer_offset(uint64_t offset)
2388 {
2389 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2390 }
2391 
2392 static void
2393 check_readahead(struct spdk_file *file, uint64_t offset,
2394 		struct spdk_fs_channel *channel)
2395 {
2396 	struct spdk_fs_request *req;
2397 	struct spdk_fs_cb_args *args;
2398 
2399 	offset = __next_cache_buffer_offset(offset);
2400 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2401 		return;
2402 	}
2403 
2404 	req = alloc_fs_request(channel);
2405 	if (req == NULL) {
2406 		return;
2407 	}
2408 	args = &req->args;
2409 
2410 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2411 
2412 	args->file = file;
2413 	args->op.readahead.offset = offset;
2414 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2415 	if (!args->op.readahead.cache_buffer) {
2416 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2417 		free_fs_request(req);
2418 		return;
2419 	}
2420 
2421 	args->op.readahead.cache_buffer->in_progress = true;
2422 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2423 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2424 	} else {
2425 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2426 	}
2427 	file->fs->send_request(__readahead, req);
2428 }
2429 
2430 static int
2431 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2432 	    struct spdk_fs_channel *channel)
2433 {
2434 	struct cache_buffer *buf;
2435 	int rc;
2436 
2437 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2438 	if (buf == NULL) {
2439 		pthread_spin_unlock(&file->lock);
2440 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2441 		pthread_spin_lock(&file->lock);
2442 		return rc;
2443 	}
2444 
2445 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2446 		length = buf->offset + buf->bytes_filled - offset;
2447 	}
2448 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2449 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2450 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2451 		pthread_spin_lock(&g_caches_lock);
2452 		spdk_tree_remove_buffer(file->tree, buf);
2453 		if (file->tree->present_mask == 0) {
2454 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2455 		}
2456 		pthread_spin_unlock(&g_caches_lock);
2457 	}
2458 
2459 	sem_post(&channel->sem);
2460 	return 0;
2461 }
2462 
2463 int64_t
2464 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2465 	       void *payload, uint64_t offset, uint64_t length)
2466 {
2467 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2468 	uint64_t final_offset, final_length;
2469 	uint32_t sub_reads = 0;
2470 	int rc = 0;
2471 
2472 	pthread_spin_lock(&file->lock);
2473 
2474 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2475 
2476 	file->open_for_writing = false;
2477 
2478 	if (length == 0 || offset >= file->append_pos) {
2479 		pthread_spin_unlock(&file->lock);
2480 		return 0;
2481 	}
2482 
2483 	if (offset + length > file->append_pos) {
2484 		length = file->append_pos - offset;
2485 	}
2486 
2487 	if (offset != file->next_seq_offset) {
2488 		file->seq_byte_count = 0;
2489 	}
2490 	file->seq_byte_count += length;
2491 	file->next_seq_offset = offset + length;
2492 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2493 		check_readahead(file, offset, channel);
2494 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2495 	}
2496 
2497 	final_length = 0;
2498 	final_offset = offset + length;
2499 	while (offset < final_offset) {
2500 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2501 		if (length > (final_offset - offset)) {
2502 			length = final_offset - offset;
2503 		}
2504 		rc = __file_read(file, payload, offset, length, channel);
2505 		if (rc == 0) {
2506 			final_length += length;
2507 		} else {
2508 			break;
2509 		}
2510 		payload += length;
2511 		offset += length;
2512 		sub_reads++;
2513 	}
2514 	pthread_spin_unlock(&file->lock);
2515 	while (sub_reads-- > 0) {
2516 		sem_wait(&channel->sem);
2517 	}
2518 	if (rc == 0) {
2519 		return final_length;
2520 	} else {
2521 		return rc;
2522 	}
2523 }
2524 
2525 static void
2526 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2527 	   spdk_file_op_complete cb_fn, void *cb_arg)
2528 {
2529 	struct spdk_fs_request *sync_req;
2530 	struct spdk_fs_request *flush_req;
2531 	struct spdk_fs_cb_args *sync_args;
2532 	struct spdk_fs_cb_args *flush_args;
2533 
2534 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2535 
2536 	pthread_spin_lock(&file->lock);
2537 	if (file->append_pos <= file->length_flushed) {
2538 		BLOBFS_TRACE(file, "done - no data to flush\n");
2539 		pthread_spin_unlock(&file->lock);
2540 		cb_fn(cb_arg, 0);
2541 		return;
2542 	}
2543 
2544 	sync_req = alloc_fs_request(channel);
2545 	if (!sync_req) {
2546 		pthread_spin_unlock(&file->lock);
2547 		cb_fn(cb_arg, -ENOMEM);
2548 		return;
2549 	}
2550 	sync_args = &sync_req->args;
2551 
2552 	flush_req = alloc_fs_request(channel);
2553 	if (!flush_req) {
2554 		pthread_spin_unlock(&file->lock);
2555 		cb_fn(cb_arg, -ENOMEM);
2556 		return;
2557 	}
2558 	flush_args = &flush_req->args;
2559 
2560 	sync_args->file = file;
2561 	sync_args->fn.file_op = cb_fn;
2562 	sync_args->arg = cb_arg;
2563 	sync_args->op.sync.offset = file->append_pos;
2564 	sync_args->op.sync.xattr_in_progress = false;
2565 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2566 	pthread_spin_unlock(&file->lock);
2567 
2568 	flush_args->file = file;
2569 	channel->send_request(__file_flush, flush_req);
2570 }
2571 
2572 int
2573 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2574 {
2575 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2576 	struct spdk_fs_cb_args args = {};
2577 
2578 	args.sem = &channel->sem;
2579 	_file_sync(file, channel, __wake_caller, &args);
2580 	sem_wait(&channel->sem);
2581 
2582 	return args.rc;
2583 }
2584 
2585 void
2586 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2587 		     spdk_file_op_complete cb_fn, void *cb_arg)
2588 {
2589 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2590 
2591 	_file_sync(file, channel, cb_fn, cb_arg);
2592 }
2593 
2594 void
2595 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2596 {
2597 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2598 	file->priority = priority;
2599 
2600 }
2601 
2602 /*
2603  * Close routines
2604  */
2605 
2606 static void
2607 __file_close_async_done(void *ctx, int bserrno)
2608 {
2609 	struct spdk_fs_request *req = ctx;
2610 	struct spdk_fs_cb_args *args = &req->args;
2611 	struct spdk_file *file = args->file;
2612 
2613 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2614 
2615 	if (file->is_deleted) {
2616 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2617 		return;
2618 	}
2619 
2620 	args->fn.file_op(args->arg, bserrno);
2621 	free_fs_request(req);
2622 }
2623 
2624 static void
2625 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2626 {
2627 	struct spdk_blob *blob;
2628 
2629 	pthread_spin_lock(&file->lock);
2630 	if (file->ref_count == 0) {
2631 		pthread_spin_unlock(&file->lock);
2632 		__file_close_async_done(req, -EBADF);
2633 		return;
2634 	}
2635 
2636 	file->ref_count--;
2637 	if (file->ref_count > 0) {
2638 		pthread_spin_unlock(&file->lock);
2639 		req->args.fn.file_op(req->args.arg, 0);
2640 		free_fs_request(req);
2641 		return;
2642 	}
2643 
2644 	pthread_spin_unlock(&file->lock);
2645 
2646 	blob = file->blob;
2647 	file->blob = NULL;
2648 	spdk_blob_close(blob, __file_close_async_done, req);
2649 }
2650 
2651 static void
2652 __file_close_async__sync_done(void *arg, int fserrno)
2653 {
2654 	struct spdk_fs_request *req = arg;
2655 	struct spdk_fs_cb_args *args = &req->args;
2656 
2657 	__file_close_async(args->file, req);
2658 }
2659 
2660 void
2661 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2662 {
2663 	struct spdk_fs_request *req;
2664 	struct spdk_fs_cb_args *args;
2665 
2666 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2667 	if (req == NULL) {
2668 		cb_fn(cb_arg, -ENOMEM);
2669 		return;
2670 	}
2671 
2672 	args = &req->args;
2673 	args->file = file;
2674 	args->fn.file_op = cb_fn;
2675 	args->arg = cb_arg;
2676 
2677 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2678 }
2679 
2680 static void
2681 __file_close(void *arg)
2682 {
2683 	struct spdk_fs_request *req = arg;
2684 	struct spdk_fs_cb_args *args = &req->args;
2685 	struct spdk_file *file = args->file;
2686 
2687 	__file_close_async(file, req);
2688 }
2689 
2690 int
2691 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2692 {
2693 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2694 	struct spdk_fs_request *req;
2695 	struct spdk_fs_cb_args *args;
2696 
2697 	req = alloc_fs_request(channel);
2698 	if (req == NULL) {
2699 		return -ENOMEM;
2700 	}
2701 
2702 	args = &req->args;
2703 
2704 	spdk_file_sync(file, ctx);
2705 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2706 	args->file = file;
2707 	args->sem = &channel->sem;
2708 	args->fn.file_op = __wake_caller;
2709 	args->arg = args;
2710 	channel->send_request(__file_close, req);
2711 	sem_wait(&channel->sem);
2712 
2713 	return args->rc;
2714 }
2715 
2716 int
2717 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2718 {
2719 	if (size < sizeof(spdk_blob_id)) {
2720 		return -EINVAL;
2721 	}
2722 
2723 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2724 
2725 	return sizeof(spdk_blob_id);
2726 }
2727 
2728 static void
2729 cache_free_buffers(struct spdk_file *file)
2730 {
2731 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2732 	pthread_spin_lock(&file->lock);
2733 	pthread_spin_lock(&g_caches_lock);
2734 	if (file->tree->present_mask == 0) {
2735 		pthread_spin_unlock(&g_caches_lock);
2736 		pthread_spin_unlock(&file->lock);
2737 		return;
2738 	}
2739 	spdk_tree_free_buffers(file->tree);
2740 
2741 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2742 	/* If not freed, put it in the end of the queue */
2743 	if (file->tree->present_mask != 0) {
2744 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2745 	}
2746 	file->last = NULL;
2747 	pthread_spin_unlock(&g_caches_lock);
2748 	pthread_spin_unlock(&file->lock);
2749 }
2750 
2751 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2752 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2753