xref: /spdk/lib/blobfs/blobfs.c (revision 617184be3b3bce00e06598f778e6d831de7a00a7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
58 static struct spdk_mempool *g_cache_pool;
59 static TAILQ_HEAD(, spdk_file) g_caches;
60 static int g_fs_count = 0;
61 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_spinlock_t g_caches_lock;
63 
64 #define TRACE_GROUP_BLOBFS	0x7
65 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
66 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
67 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
68 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
69 
70 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
71 {
72 	spdk_trace_register_description("BLOBFS_XATTR_START", "",
73 					TRACE_BLOBFS_XATTR_START,
74 					OWNER_NONE, OBJECT_NONE, 0,
75 					SPDK_TRACE_ARG_TYPE_STR,
76 					"file:    ");
77 	spdk_trace_register_description("BLOBFS_XATTR_END", "",
78 					TRACE_BLOBFS_XATTR_END,
79 					OWNER_NONE, OBJECT_NONE, 0,
80 					SPDK_TRACE_ARG_TYPE_STR,
81 					"file:    ");
82 	spdk_trace_register_description("BLOBFS_OPEN", "",
83 					TRACE_BLOBFS_OPEN,
84 					OWNER_NONE, OBJECT_NONE, 0,
85 					SPDK_TRACE_ARG_TYPE_STR,
86 					"file:    ");
87 	spdk_trace_register_description("BLOBFS_CLOSE", "",
88 					TRACE_BLOBFS_CLOSE,
89 					OWNER_NONE, OBJECT_NONE, 0,
90 					SPDK_TRACE_ARG_TYPE_STR,
91 					"file:    ");
92 }
93 
94 void
95 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
96 {
97 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
98 	free(cache_buffer);
99 }
100 
101 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
102 
103 struct spdk_file {
104 	struct spdk_filesystem	*fs;
105 	struct spdk_blob	*blob;
106 	char			*name;
107 	uint64_t		trace_arg_name;
108 	uint64_t		length;
109 	bool                    is_deleted;
110 	bool			open_for_writing;
111 	uint64_t		length_flushed;
112 	uint64_t		append_pos;
113 	uint64_t		seq_byte_count;
114 	uint64_t		next_seq_offset;
115 	uint32_t		priority;
116 	TAILQ_ENTRY(spdk_file)	tailq;
117 	spdk_blob_id		blobid;
118 	uint32_t		ref_count;
119 	pthread_spinlock_t	lock;
120 	struct cache_buffer	*last;
121 	struct cache_tree	*tree;
122 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
123 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
124 	TAILQ_ENTRY(spdk_file)	cache_tailq;
125 };
126 
127 struct spdk_deleted_file {
128 	spdk_blob_id	id;
129 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
130 };
131 
132 struct spdk_filesystem {
133 	struct spdk_blob_store	*bs;
134 	TAILQ_HEAD(, spdk_file)	files;
135 	struct spdk_bs_opts	bs_opts;
136 	struct spdk_bs_dev	*bdev;
137 	fs_send_request_fn	send_request;
138 
139 	struct {
140 		uint32_t		max_ops;
141 		struct spdk_io_channel	*sync_io_channel;
142 		struct spdk_fs_channel	*sync_fs_channel;
143 	} sync_target;
144 
145 	struct {
146 		uint32_t		max_ops;
147 		struct spdk_io_channel	*md_io_channel;
148 		struct spdk_fs_channel	*md_fs_channel;
149 	} md_target;
150 
151 	struct {
152 		uint32_t		max_ops;
153 	} io_target;
154 };
155 
156 struct spdk_fs_cb_args {
157 	union {
158 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
159 		spdk_fs_op_complete			fs_op;
160 		spdk_file_op_with_handle_complete	file_op_with_handle;
161 		spdk_file_op_complete			file_op;
162 		spdk_file_stat_op_complete		stat_op;
163 	} fn;
164 	void *arg;
165 	sem_t *sem;
166 	struct spdk_filesystem *fs;
167 	struct spdk_file *file;
168 	int rc;
169 	struct iovec *iovs;
170 	uint32_t iovcnt;
171 	struct iovec iov;
172 	union {
173 		struct {
174 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
175 		} fs_load;
176 		struct {
177 			uint64_t	length;
178 		} truncate;
179 		struct {
180 			struct spdk_io_channel	*channel;
181 			void		*pin_buf;
182 			int		is_read;
183 			off_t		offset;
184 			size_t		length;
185 			uint64_t	start_lba;
186 			uint64_t	num_lba;
187 			uint32_t	blocklen;
188 		} rw;
189 		struct {
190 			const char	*old_name;
191 			const char	*new_name;
192 		} rename;
193 		struct {
194 			struct cache_buffer	*cache_buffer;
195 			uint64_t		length;
196 		} flush;
197 		struct {
198 			struct cache_buffer	*cache_buffer;
199 			uint64_t		length;
200 			uint64_t		offset;
201 		} readahead;
202 		struct {
203 			uint64_t			offset;
204 			TAILQ_ENTRY(spdk_fs_request)	tailq;
205 			bool				xattr_in_progress;
206 		} sync;
207 		struct {
208 			uint32_t			num_clusters;
209 		} resize;
210 		struct {
211 			const char	*name;
212 			uint32_t	flags;
213 			TAILQ_ENTRY(spdk_fs_request)	tailq;
214 		} open;
215 		struct {
216 			const char		*name;
217 			struct spdk_blob	*blob;
218 		} create;
219 		struct {
220 			const char	*name;
221 		} delete;
222 		struct {
223 			const char	*name;
224 		} stat;
225 	} op;
226 };
227 
228 static void cache_free_buffers(struct spdk_file *file);
229 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
230 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
231 
232 void
233 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
234 {
235 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
236 }
237 
238 static void
239 __initialize_cache(void)
240 {
241 	assert(g_cache_pool == NULL);
242 
243 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
244 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
245 					   CACHE_BUFFER_SIZE,
246 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
247 					   SPDK_ENV_SOCKET_ID_ANY);
248 	if (!g_cache_pool) {
249 		SPDK_ERRLOG("Create mempool failed, you may "
250 			    "increase the memory and try again\n");
251 		assert(false);
252 	}
253 	TAILQ_INIT(&g_caches);
254 	pthread_spin_init(&g_caches_lock, 0);
255 }
256 
257 static void
258 __free_cache(void)
259 {
260 	assert(g_cache_pool != NULL);
261 
262 	spdk_mempool_free(g_cache_pool);
263 	g_cache_pool = NULL;
264 }
265 
266 static uint64_t
267 __file_get_blob_size(struct spdk_file *file)
268 {
269 	uint64_t cluster_sz;
270 
271 	cluster_sz = file->fs->bs_opts.cluster_sz;
272 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
273 }
274 
275 struct spdk_fs_request {
276 	struct spdk_fs_cb_args		args;
277 	TAILQ_ENTRY(spdk_fs_request)	link;
278 	struct spdk_fs_channel		*channel;
279 };
280 
281 struct spdk_fs_channel {
282 	struct spdk_fs_request		*req_mem;
283 	TAILQ_HEAD(, spdk_fs_request)	reqs;
284 	sem_t				sem;
285 	struct spdk_filesystem		*fs;
286 	struct spdk_io_channel		*bs_channel;
287 	fs_send_request_fn		send_request;
288 	bool				sync;
289 	uint32_t			outstanding_reqs;
290 	pthread_spinlock_t		lock;
291 };
292 
293 /* For now, this is effectively an alias. But eventually we'll shift
294  * some data members over. */
295 struct spdk_fs_thread_ctx {
296 	struct spdk_fs_channel	ch;
297 };
298 
299 static struct spdk_fs_request *
300 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
301 {
302 	struct spdk_fs_request *req;
303 	struct iovec *iovs = NULL;
304 
305 	if (iovcnt > 1) {
306 		iovs = calloc(iovcnt, sizeof(struct iovec));
307 		if (!iovs) {
308 			return NULL;
309 		}
310 	}
311 
312 	if (channel->sync) {
313 		pthread_spin_lock(&channel->lock);
314 	}
315 
316 	req = TAILQ_FIRST(&channel->reqs);
317 	if (req) {
318 		channel->outstanding_reqs++;
319 		TAILQ_REMOVE(&channel->reqs, req, link);
320 	}
321 
322 	if (channel->sync) {
323 		pthread_spin_unlock(&channel->lock);
324 	}
325 
326 	if (req == NULL) {
327 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
328 		free(iovs);
329 		return NULL;
330 	}
331 	memset(req, 0, sizeof(*req));
332 	req->channel = channel;
333 	if (iovcnt > 1) {
334 		req->args.iovs = iovs;
335 	} else {
336 		req->args.iovs = &req->args.iov;
337 	}
338 	req->args.iovcnt = iovcnt;
339 
340 	return req;
341 }
342 
343 static struct spdk_fs_request *
344 alloc_fs_request(struct spdk_fs_channel *channel)
345 {
346 	return alloc_fs_request_with_iov(channel, 0);
347 }
348 
349 static void
350 free_fs_request(struct spdk_fs_request *req)
351 {
352 	struct spdk_fs_channel *channel = req->channel;
353 
354 	if (req->args.iovcnt > 1) {
355 		free(req->args.iovs);
356 	}
357 
358 	if (channel->sync) {
359 		pthread_spin_lock(&channel->lock);
360 	}
361 
362 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
363 	channel->outstanding_reqs--;
364 
365 	if (channel->sync) {
366 		pthread_spin_unlock(&channel->lock);
367 	}
368 }
369 
370 static int
371 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
372 			uint32_t max_ops)
373 {
374 	uint32_t i;
375 
376 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
377 	if (!channel->req_mem) {
378 		return -1;
379 	}
380 
381 	channel->outstanding_reqs = 0;
382 	TAILQ_INIT(&channel->reqs);
383 	sem_init(&channel->sem, 0, 0);
384 
385 	for (i = 0; i < max_ops; i++) {
386 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
387 	}
388 
389 	channel->fs = fs;
390 
391 	return 0;
392 }
393 
394 static int
395 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
396 {
397 	struct spdk_filesystem		*fs;
398 	struct spdk_fs_channel		*channel = ctx_buf;
399 
400 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
401 
402 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
403 }
404 
405 static int
406 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
407 {
408 	struct spdk_filesystem		*fs;
409 	struct spdk_fs_channel		*channel = ctx_buf;
410 
411 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
412 
413 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
414 }
415 
416 static int
417 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
418 {
419 	struct spdk_filesystem		*fs;
420 	struct spdk_fs_channel		*channel = ctx_buf;
421 
422 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
423 
424 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
425 }
426 
427 static void
428 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
429 {
430 	struct spdk_fs_channel *channel = ctx_buf;
431 
432 	if (channel->outstanding_reqs > 0) {
433 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
434 			    channel->outstanding_reqs);
435 	}
436 
437 	free(channel->req_mem);
438 	if (channel->bs_channel != NULL) {
439 		spdk_bs_free_io_channel(channel->bs_channel);
440 	}
441 }
442 
443 static void
444 __send_request_direct(fs_request_fn fn, void *arg)
445 {
446 	fn(arg);
447 }
448 
449 static void
450 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
451 {
452 	fs->bs = bs;
453 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
454 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
455 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
456 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
457 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
458 
459 	pthread_mutex_lock(&g_cache_init_lock);
460 	if (g_fs_count == 0) {
461 		__initialize_cache();
462 	}
463 	g_fs_count++;
464 	pthread_mutex_unlock(&g_cache_init_lock);
465 }
466 
467 static void
468 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
469 {
470 	struct spdk_fs_request *req = ctx;
471 	struct spdk_fs_cb_args *args = &req->args;
472 	struct spdk_filesystem *fs = args->fs;
473 
474 	if (bserrno == 0) {
475 		common_fs_bs_init(fs, bs);
476 	} else {
477 		free(fs);
478 		fs = NULL;
479 	}
480 
481 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
482 	free_fs_request(req);
483 }
484 
485 static void
486 fs_conf_parse(void)
487 {
488 	struct spdk_conf_section *sp;
489 
490 	sp = spdk_conf_find_section(NULL, "Blobfs");
491 	if (sp == NULL) {
492 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
493 		return;
494 	}
495 
496 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
497 	if (g_fs_cache_buffer_shift <= 0) {
498 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
499 	}
500 }
501 
502 static struct spdk_filesystem *
503 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
504 {
505 	struct spdk_filesystem *fs;
506 
507 	fs = calloc(1, sizeof(*fs));
508 	if (fs == NULL) {
509 		return NULL;
510 	}
511 
512 	fs->bdev = dev;
513 	fs->send_request = send_request_fn;
514 	TAILQ_INIT(&fs->files);
515 
516 	fs->md_target.max_ops = 512;
517 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
518 				sizeof(struct spdk_fs_channel), "blobfs_md");
519 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
520 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
521 
522 	fs->sync_target.max_ops = 512;
523 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
524 				sizeof(struct spdk_fs_channel), "blobfs_sync");
525 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
526 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
527 
528 	fs->io_target.max_ops = 512;
529 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
530 				sizeof(struct spdk_fs_channel), "blobfs_io");
531 
532 	return fs;
533 }
534 
535 static void
536 __wake_caller(void *arg, int fserrno)
537 {
538 	struct spdk_fs_cb_args *args = arg;
539 
540 	args->rc = fserrno;
541 	sem_post(args->sem);
542 }
543 
544 void
545 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
546 	     fs_send_request_fn send_request_fn,
547 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
548 {
549 	struct spdk_filesystem *fs;
550 	struct spdk_fs_request *req;
551 	struct spdk_fs_cb_args *args;
552 	struct spdk_bs_opts opts = {};
553 
554 	fs = fs_alloc(dev, send_request_fn);
555 	if (fs == NULL) {
556 		cb_fn(cb_arg, NULL, -ENOMEM);
557 		return;
558 	}
559 
560 	fs_conf_parse();
561 
562 	req = alloc_fs_request(fs->md_target.md_fs_channel);
563 	if (req == NULL) {
564 		spdk_fs_free_io_channels(fs);
565 		spdk_fs_io_device_unregister(fs);
566 		cb_fn(cb_arg, NULL, -ENOMEM);
567 		return;
568 	}
569 
570 	args = &req->args;
571 	args->fn.fs_op_with_handle = cb_fn;
572 	args->arg = cb_arg;
573 	args->fs = fs;
574 
575 	spdk_bs_opts_init(&opts);
576 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
577 	if (opt) {
578 		opts.cluster_sz = opt->cluster_sz;
579 	}
580 	spdk_bs_init(dev, &opts, init_cb, req);
581 }
582 
583 static struct spdk_file *
584 file_alloc(struct spdk_filesystem *fs)
585 {
586 	struct spdk_file *file;
587 
588 	file = calloc(1, sizeof(*file));
589 	if (file == NULL) {
590 		return NULL;
591 	}
592 
593 	file->tree = calloc(1, sizeof(*file->tree));
594 	if (file->tree == NULL) {
595 		free(file);
596 		return NULL;
597 	}
598 
599 	file->fs = fs;
600 	TAILQ_INIT(&file->open_requests);
601 	TAILQ_INIT(&file->sync_requests);
602 	pthread_spin_init(&file->lock, 0);
603 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
604 	file->priority = SPDK_FILE_PRIORITY_LOW;
605 	return file;
606 }
607 
608 static void fs_load_done(void *ctx, int bserrno);
609 
610 static int
611 _handle_deleted_files(struct spdk_fs_request *req)
612 {
613 	struct spdk_fs_cb_args *args = &req->args;
614 	struct spdk_filesystem *fs = args->fs;
615 
616 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
617 		struct spdk_deleted_file *deleted_file;
618 
619 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
620 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
621 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
622 		free(deleted_file);
623 		return 0;
624 	}
625 
626 	return 1;
627 }
628 
629 static void
630 fs_load_done(void *ctx, int bserrno)
631 {
632 	struct spdk_fs_request *req = ctx;
633 	struct spdk_fs_cb_args *args = &req->args;
634 	struct spdk_filesystem *fs = args->fs;
635 
636 	/* The filesystem has been loaded.  Now check if there are any files that
637 	 *  were marked for deletion before last unload.  Do not complete the
638 	 *  fs_load callback until all of them have been deleted on disk.
639 	 */
640 	if (_handle_deleted_files(req) == 0) {
641 		/* We found a file that's been marked for deleting but not actually
642 		 *  deleted yet.  This function will get called again once the delete
643 		 *  operation is completed.
644 		 */
645 		return;
646 	}
647 
648 	args->fn.fs_op_with_handle(args->arg, fs, 0);
649 	free_fs_request(req);
650 
651 }
652 
653 static void
654 _file_build_trace_arg_name(struct spdk_file *f)
655 {
656 	f->trace_arg_name = 0;
657 	memcpy(&f->trace_arg_name, f->name,
658 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
659 }
660 
661 static void
662 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
663 {
664 	struct spdk_fs_request *req = ctx;
665 	struct spdk_fs_cb_args *args = &req->args;
666 	struct spdk_filesystem *fs = args->fs;
667 	uint64_t *length;
668 	const char *name;
669 	uint32_t *is_deleted;
670 	size_t value_len;
671 
672 	if (rc < 0) {
673 		args->fn.fs_op_with_handle(args->arg, fs, rc);
674 		free_fs_request(req);
675 		return;
676 	}
677 
678 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
679 	if (rc < 0) {
680 		args->fn.fs_op_with_handle(args->arg, fs, rc);
681 		free_fs_request(req);
682 		return;
683 	}
684 
685 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
686 	if (rc < 0) {
687 		args->fn.fs_op_with_handle(args->arg, fs, rc);
688 		free_fs_request(req);
689 		return;
690 	}
691 
692 	assert(value_len == 8);
693 
694 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
695 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
696 	if (rc < 0) {
697 		struct spdk_file *f;
698 
699 		f = file_alloc(fs);
700 		if (f == NULL) {
701 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
702 			free_fs_request(req);
703 			return;
704 		}
705 
706 		f->name = strdup(name);
707 		_file_build_trace_arg_name(f);
708 		f->blobid = spdk_blob_get_id(blob);
709 		f->length = *length;
710 		f->length_flushed = *length;
711 		f->append_pos = *length;
712 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
713 	} else {
714 		struct spdk_deleted_file *deleted_file;
715 
716 		deleted_file = calloc(1, sizeof(*deleted_file));
717 		if (deleted_file == NULL) {
718 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
719 			free_fs_request(req);
720 			return;
721 		}
722 		deleted_file->id = spdk_blob_get_id(blob);
723 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
724 	}
725 }
726 
727 static void
728 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
729 {
730 	struct spdk_fs_request *req = ctx;
731 	struct spdk_fs_cb_args *args = &req->args;
732 	struct spdk_filesystem *fs = args->fs;
733 	struct spdk_bs_type bstype;
734 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
735 	static const struct spdk_bs_type zeros;
736 
737 	if (bserrno != 0) {
738 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
739 		free_fs_request(req);
740 		free(fs);
741 		return;
742 	}
743 
744 	bstype = spdk_bs_get_bstype(bs);
745 
746 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
747 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
748 		spdk_bs_set_bstype(bs, blobfs_type);
749 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
750 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
751 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
752 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
753 		free_fs_request(req);
754 		free(fs);
755 		return;
756 	}
757 
758 	common_fs_bs_init(fs, bs);
759 	fs_load_done(req, 0);
760 }
761 
762 static void
763 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
764 {
765 	assert(fs != NULL);
766 	spdk_io_device_unregister(&fs->md_target, NULL);
767 	spdk_io_device_unregister(&fs->sync_target, NULL);
768 	spdk_io_device_unregister(&fs->io_target, NULL);
769 	free(fs);
770 }
771 
772 static void
773 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
774 {
775 	assert(fs != NULL);
776 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
777 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
778 }
779 
780 void
781 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
782 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
783 {
784 	struct spdk_filesystem *fs;
785 	struct spdk_fs_cb_args *args;
786 	struct spdk_fs_request *req;
787 	struct spdk_bs_opts	bs_opts;
788 
789 	fs = fs_alloc(dev, send_request_fn);
790 	if (fs == NULL) {
791 		cb_fn(cb_arg, NULL, -ENOMEM);
792 		return;
793 	}
794 
795 	fs_conf_parse();
796 
797 	req = alloc_fs_request(fs->md_target.md_fs_channel);
798 	if (req == NULL) {
799 		spdk_fs_free_io_channels(fs);
800 		spdk_fs_io_device_unregister(fs);
801 		cb_fn(cb_arg, NULL, -ENOMEM);
802 		return;
803 	}
804 
805 	args = &req->args;
806 	args->fn.fs_op_with_handle = cb_fn;
807 	args->arg = cb_arg;
808 	args->fs = fs;
809 	TAILQ_INIT(&args->op.fs_load.deleted_files);
810 	spdk_bs_opts_init(&bs_opts);
811 	bs_opts.iter_cb_fn = iter_cb;
812 	bs_opts.iter_cb_arg = req;
813 	spdk_bs_load(dev, &bs_opts, load_cb, req);
814 }
815 
816 static void
817 unload_cb(void *ctx, int bserrno)
818 {
819 	struct spdk_fs_request *req = ctx;
820 	struct spdk_fs_cb_args *args = &req->args;
821 	struct spdk_filesystem *fs = args->fs;
822 	struct spdk_file *file, *tmp;
823 
824 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
825 		TAILQ_REMOVE(&fs->files, file, tailq);
826 		cache_free_buffers(file);
827 		free(file->name);
828 		free(file->tree);
829 		free(file);
830 	}
831 
832 	pthread_mutex_lock(&g_cache_init_lock);
833 	g_fs_count--;
834 	if (g_fs_count == 0) {
835 		__free_cache();
836 	}
837 	pthread_mutex_unlock(&g_cache_init_lock);
838 
839 	args->fn.fs_op(args->arg, bserrno);
840 	free(req);
841 
842 	spdk_fs_io_device_unregister(fs);
843 }
844 
845 void
846 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
847 {
848 	struct spdk_fs_request *req;
849 	struct spdk_fs_cb_args *args;
850 
851 	/*
852 	 * We must free the md_channel before unloading the blobstore, so just
853 	 *  allocate this request from the general heap.
854 	 */
855 	req = calloc(1, sizeof(*req));
856 	if (req == NULL) {
857 		cb_fn(cb_arg, -ENOMEM);
858 		return;
859 	}
860 
861 	args = &req->args;
862 	args->fn.fs_op = cb_fn;
863 	args->arg = cb_arg;
864 	args->fs = fs;
865 
866 	spdk_fs_free_io_channels(fs);
867 	spdk_bs_unload(fs->bs, unload_cb, req);
868 }
869 
870 static struct spdk_file *
871 fs_find_file(struct spdk_filesystem *fs, const char *name)
872 {
873 	struct spdk_file *file;
874 
875 	TAILQ_FOREACH(file, &fs->files, tailq) {
876 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
877 			return file;
878 		}
879 	}
880 
881 	return NULL;
882 }
883 
884 void
885 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
886 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
887 {
888 	struct spdk_file_stat stat;
889 	struct spdk_file *f = NULL;
890 
891 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
892 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
893 		return;
894 	}
895 
896 	f = fs_find_file(fs, name);
897 	if (f != NULL) {
898 		stat.blobid = f->blobid;
899 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
900 		cb_fn(cb_arg, &stat, 0);
901 		return;
902 	}
903 
904 	cb_fn(cb_arg, NULL, -ENOENT);
905 }
906 
907 static void
908 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
909 {
910 	struct spdk_fs_request *req = arg;
911 	struct spdk_fs_cb_args *args = &req->args;
912 
913 	args->rc = fserrno;
914 	if (fserrno == 0) {
915 		memcpy(args->arg, stat, sizeof(*stat));
916 	}
917 	sem_post(args->sem);
918 }
919 
920 static void
921 __file_stat(void *arg)
922 {
923 	struct spdk_fs_request *req = arg;
924 	struct spdk_fs_cb_args *args = &req->args;
925 
926 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
927 				args->fn.stat_op, req);
928 }
929 
930 int
931 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
932 		  const char *name, struct spdk_file_stat *stat)
933 {
934 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
935 	struct spdk_fs_request *req;
936 	int rc;
937 
938 	req = alloc_fs_request(channel);
939 	if (req == NULL) {
940 		return -ENOMEM;
941 	}
942 
943 	req->args.fs = fs;
944 	req->args.op.stat.name = name;
945 	req->args.fn.stat_op = __copy_stat;
946 	req->args.arg = stat;
947 	req->args.sem = &channel->sem;
948 	channel->send_request(__file_stat, req);
949 	sem_wait(&channel->sem);
950 
951 	rc = req->args.rc;
952 	free_fs_request(req);
953 
954 	return rc;
955 }
956 
957 static void
958 fs_create_blob_close_cb(void *ctx, int bserrno)
959 {
960 	int rc;
961 	struct spdk_fs_request *req = ctx;
962 	struct spdk_fs_cb_args *args = &req->args;
963 
964 	rc = args->rc ? args->rc : bserrno;
965 	args->fn.file_op(args->arg, rc);
966 	free_fs_request(req);
967 }
968 
969 static void
970 fs_create_blob_resize_cb(void *ctx, int bserrno)
971 {
972 	struct spdk_fs_request *req = ctx;
973 	struct spdk_fs_cb_args *args = &req->args;
974 	struct spdk_file *f = args->file;
975 	struct spdk_blob *blob = args->op.create.blob;
976 	uint64_t length = 0;
977 
978 	args->rc = bserrno;
979 	if (bserrno) {
980 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
981 		return;
982 	}
983 
984 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
985 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
986 
987 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
988 }
989 
990 static void
991 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
992 {
993 	struct spdk_fs_request *req = ctx;
994 	struct spdk_fs_cb_args *args = &req->args;
995 
996 	if (bserrno) {
997 		args->fn.file_op(args->arg, bserrno);
998 		free_fs_request(req);
999 		return;
1000 	}
1001 
1002 	args->op.create.blob = blob;
1003 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1004 }
1005 
1006 static void
1007 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1008 {
1009 	struct spdk_fs_request *req = ctx;
1010 	struct spdk_fs_cb_args *args = &req->args;
1011 	struct spdk_file *f = args->file;
1012 
1013 	if (bserrno) {
1014 		args->fn.file_op(args->arg, bserrno);
1015 		free_fs_request(req);
1016 		return;
1017 	}
1018 
1019 	f->blobid = blobid;
1020 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1021 }
1022 
1023 void
1024 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1025 			  spdk_file_op_complete cb_fn, void *cb_arg)
1026 {
1027 	struct spdk_file *file;
1028 	struct spdk_fs_request *req;
1029 	struct spdk_fs_cb_args *args;
1030 
1031 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1032 		cb_fn(cb_arg, -ENAMETOOLONG);
1033 		return;
1034 	}
1035 
1036 	file = fs_find_file(fs, name);
1037 	if (file != NULL) {
1038 		cb_fn(cb_arg, -EEXIST);
1039 		return;
1040 	}
1041 
1042 	file = file_alloc(fs);
1043 	if (file == NULL) {
1044 		cb_fn(cb_arg, -ENOMEM);
1045 		return;
1046 	}
1047 
1048 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1049 	if (req == NULL) {
1050 		cb_fn(cb_arg, -ENOMEM);
1051 		return;
1052 	}
1053 
1054 	args = &req->args;
1055 	args->file = file;
1056 	args->fn.file_op = cb_fn;
1057 	args->arg = cb_arg;
1058 
1059 	file->name = strdup(name);
1060 	_file_build_trace_arg_name(file);
1061 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1062 }
1063 
1064 static void
1065 __fs_create_file_done(void *arg, int fserrno)
1066 {
1067 	struct spdk_fs_request *req = arg;
1068 	struct spdk_fs_cb_args *args = &req->args;
1069 
1070 	args->rc = fserrno;
1071 	sem_post(args->sem);
1072 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1073 }
1074 
1075 static void
1076 __fs_create_file(void *arg)
1077 {
1078 	struct spdk_fs_request *req = arg;
1079 	struct spdk_fs_cb_args *args = &req->args;
1080 
1081 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1082 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1083 }
1084 
1085 int
1086 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1087 {
1088 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1089 	struct spdk_fs_request *req;
1090 	struct spdk_fs_cb_args *args;
1091 	int rc;
1092 
1093 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1094 
1095 	req = alloc_fs_request(channel);
1096 	if (req == NULL) {
1097 		return -ENOMEM;
1098 	}
1099 
1100 	args = &req->args;
1101 	args->fs = fs;
1102 	args->op.create.name = name;
1103 	args->sem = &channel->sem;
1104 	fs->send_request(__fs_create_file, req);
1105 	sem_wait(&channel->sem);
1106 	rc = args->rc;
1107 	free_fs_request(req);
1108 
1109 	return rc;
1110 }
1111 
1112 static void
1113 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1114 {
1115 	struct spdk_fs_request *req = ctx;
1116 	struct spdk_fs_cb_args *args = &req->args;
1117 	struct spdk_file *f = args->file;
1118 
1119 	f->blob = blob;
1120 	while (!TAILQ_EMPTY(&f->open_requests)) {
1121 		req = TAILQ_FIRST(&f->open_requests);
1122 		args = &req->args;
1123 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1124 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1125 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1126 		free_fs_request(req);
1127 	}
1128 }
1129 
1130 static void
1131 fs_open_blob_create_cb(void *ctx, int bserrno)
1132 {
1133 	struct spdk_fs_request *req = ctx;
1134 	struct spdk_fs_cb_args *args = &req->args;
1135 	struct spdk_file *file = args->file;
1136 	struct spdk_filesystem *fs = args->fs;
1137 
1138 	if (file == NULL) {
1139 		/*
1140 		 * This is from an open with CREATE flag - the file
1141 		 *  is now created so look it up in the file list for this
1142 		 *  filesystem.
1143 		 */
1144 		file = fs_find_file(fs, args->op.open.name);
1145 		assert(file != NULL);
1146 		args->file = file;
1147 	}
1148 
1149 	file->ref_count++;
1150 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1151 	if (file->ref_count == 1) {
1152 		assert(file->blob == NULL);
1153 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1154 	} else if (file->blob != NULL) {
1155 		fs_open_blob_done(req, file->blob, 0);
1156 	} else {
1157 		/*
1158 		 * The blob open for this file is in progress due to a previous
1159 		 *  open request.  When that open completes, it will invoke the
1160 		 *  open callback for this request.
1161 		 */
1162 	}
1163 }
1164 
1165 void
1166 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1167 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1168 {
1169 	struct spdk_file *f = NULL;
1170 	struct spdk_fs_request *req;
1171 	struct spdk_fs_cb_args *args;
1172 
1173 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1174 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1175 		return;
1176 	}
1177 
1178 	f = fs_find_file(fs, name);
1179 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1180 		cb_fn(cb_arg, NULL, -ENOENT);
1181 		return;
1182 	}
1183 
1184 	if (f != NULL && f->is_deleted == true) {
1185 		cb_fn(cb_arg, NULL, -ENOENT);
1186 		return;
1187 	}
1188 
1189 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1190 	if (req == NULL) {
1191 		cb_fn(cb_arg, NULL, -ENOMEM);
1192 		return;
1193 	}
1194 
1195 	args = &req->args;
1196 	args->fn.file_op_with_handle = cb_fn;
1197 	args->arg = cb_arg;
1198 	args->file = f;
1199 	args->fs = fs;
1200 	args->op.open.name = name;
1201 
1202 	if (f == NULL) {
1203 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1204 	} else {
1205 		fs_open_blob_create_cb(req, 0);
1206 	}
1207 }
1208 
1209 static void
1210 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1211 {
1212 	struct spdk_fs_request *req = arg;
1213 	struct spdk_fs_cb_args *args = &req->args;
1214 
1215 	args->file = file;
1216 	__wake_caller(args, bserrno);
1217 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1218 }
1219 
1220 static void
1221 __fs_open_file(void *arg)
1222 {
1223 	struct spdk_fs_request *req = arg;
1224 	struct spdk_fs_cb_args *args = &req->args;
1225 
1226 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1227 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1228 				__fs_open_file_done, req);
1229 }
1230 
1231 int
1232 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1233 		  const char *name, uint32_t flags, struct spdk_file **file)
1234 {
1235 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1236 	struct spdk_fs_request *req;
1237 	struct spdk_fs_cb_args *args;
1238 	int rc;
1239 
1240 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1241 
1242 	req = alloc_fs_request(channel);
1243 	if (req == NULL) {
1244 		return -ENOMEM;
1245 	}
1246 
1247 	args = &req->args;
1248 	args->fs = fs;
1249 	args->op.open.name = name;
1250 	args->op.open.flags = flags;
1251 	args->sem = &channel->sem;
1252 	fs->send_request(__fs_open_file, req);
1253 	sem_wait(&channel->sem);
1254 	rc = args->rc;
1255 	if (rc == 0) {
1256 		*file = args->file;
1257 	} else {
1258 		*file = NULL;
1259 	}
1260 	free_fs_request(req);
1261 
1262 	return rc;
1263 }
1264 
1265 static void
1266 fs_rename_blob_close_cb(void *ctx, int bserrno)
1267 {
1268 	struct spdk_fs_request *req = ctx;
1269 	struct spdk_fs_cb_args *args = &req->args;
1270 
1271 	args->fn.fs_op(args->arg, bserrno);
1272 	free_fs_request(req);
1273 }
1274 
1275 static void
1276 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1277 {
1278 	struct spdk_fs_request *req = ctx;
1279 	struct spdk_fs_cb_args *args = &req->args;
1280 	const char *new_name = args->op.rename.new_name;
1281 
1282 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1283 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1284 }
1285 
1286 static void
1287 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1288 {
1289 	struct spdk_fs_cb_args *args = &req->args;
1290 	struct spdk_file *f;
1291 
1292 	f = fs_find_file(args->fs, args->op.rename.old_name);
1293 	if (f == NULL) {
1294 		args->fn.fs_op(args->arg, -ENOENT);
1295 		free_fs_request(req);
1296 		return;
1297 	}
1298 
1299 	free(f->name);
1300 	f->name = strdup(args->op.rename.new_name);
1301 	_file_build_trace_arg_name(f);
1302 	args->file = f;
1303 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1304 }
1305 
1306 static void
1307 fs_rename_delete_done(void *arg, int fserrno)
1308 {
1309 	__spdk_fs_md_rename_file(arg);
1310 }
1311 
1312 void
1313 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1314 			  const char *old_name, const char *new_name,
1315 			  spdk_file_op_complete cb_fn, void *cb_arg)
1316 {
1317 	struct spdk_file *f;
1318 	struct spdk_fs_request *req;
1319 	struct spdk_fs_cb_args *args;
1320 
1321 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1322 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1323 		cb_fn(cb_arg, -ENAMETOOLONG);
1324 		return;
1325 	}
1326 
1327 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1328 	if (req == NULL) {
1329 		cb_fn(cb_arg, -ENOMEM);
1330 		return;
1331 	}
1332 
1333 	args = &req->args;
1334 	args->fn.fs_op = cb_fn;
1335 	args->fs = fs;
1336 	args->arg = cb_arg;
1337 	args->op.rename.old_name = old_name;
1338 	args->op.rename.new_name = new_name;
1339 
1340 	f = fs_find_file(fs, new_name);
1341 	if (f == NULL) {
1342 		__spdk_fs_md_rename_file(req);
1343 		return;
1344 	}
1345 
1346 	/*
1347 	 * The rename overwrites an existing file.  So delete the existing file, then
1348 	 *  do the actual rename.
1349 	 */
1350 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1351 }
1352 
1353 static void
1354 __fs_rename_file_done(void *arg, int fserrno)
1355 {
1356 	struct spdk_fs_request *req = arg;
1357 	struct spdk_fs_cb_args *args = &req->args;
1358 
1359 	__wake_caller(args, fserrno);
1360 }
1361 
1362 static void
1363 __fs_rename_file(void *arg)
1364 {
1365 	struct spdk_fs_request *req = arg;
1366 	struct spdk_fs_cb_args *args = &req->args;
1367 
1368 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1369 				  __fs_rename_file_done, req);
1370 }
1371 
1372 int
1373 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1374 		    const char *old_name, const char *new_name)
1375 {
1376 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1377 	struct spdk_fs_request *req;
1378 	struct spdk_fs_cb_args *args;
1379 	int rc;
1380 
1381 	req = alloc_fs_request(channel);
1382 	if (req == NULL) {
1383 		return -ENOMEM;
1384 	}
1385 
1386 	args = &req->args;
1387 
1388 	args->fs = fs;
1389 	args->op.rename.old_name = old_name;
1390 	args->op.rename.new_name = new_name;
1391 	args->sem = &channel->sem;
1392 	fs->send_request(__fs_rename_file, req);
1393 	sem_wait(&channel->sem);
1394 	rc = args->rc;
1395 	free_fs_request(req);
1396 	return rc;
1397 }
1398 
1399 static void
1400 blob_delete_cb(void *ctx, int bserrno)
1401 {
1402 	struct spdk_fs_request *req = ctx;
1403 	struct spdk_fs_cb_args *args = &req->args;
1404 
1405 	args->fn.file_op(args->arg, bserrno);
1406 	free_fs_request(req);
1407 }
1408 
1409 void
1410 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1411 			  spdk_file_op_complete cb_fn, void *cb_arg)
1412 {
1413 	struct spdk_file *f;
1414 	spdk_blob_id blobid;
1415 	struct spdk_fs_request *req;
1416 	struct spdk_fs_cb_args *args;
1417 
1418 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1419 
1420 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1421 		cb_fn(cb_arg, -ENAMETOOLONG);
1422 		return;
1423 	}
1424 
1425 	f = fs_find_file(fs, name);
1426 	if (f == NULL) {
1427 		cb_fn(cb_arg, -ENOENT);
1428 		return;
1429 	}
1430 
1431 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1432 	if (req == NULL) {
1433 		cb_fn(cb_arg, -ENOMEM);
1434 		return;
1435 	}
1436 
1437 	args = &req->args;
1438 	args->fn.file_op = cb_fn;
1439 	args->arg = cb_arg;
1440 
1441 	if (f->ref_count > 0) {
1442 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1443 		f->is_deleted = true;
1444 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1445 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1446 		return;
1447 	}
1448 
1449 	TAILQ_REMOVE(&fs->files, f, tailq);
1450 
1451 	cache_free_buffers(f);
1452 
1453 	blobid = f->blobid;
1454 
1455 	free(f->name);
1456 	free(f->tree);
1457 	free(f);
1458 
1459 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1460 }
1461 
1462 static void
1463 __fs_delete_file_done(void *arg, int fserrno)
1464 {
1465 	struct spdk_fs_request *req = arg;
1466 	struct spdk_fs_cb_args *args = &req->args;
1467 
1468 	__wake_caller(args, fserrno);
1469 }
1470 
1471 static void
1472 __fs_delete_file(void *arg)
1473 {
1474 	struct spdk_fs_request *req = arg;
1475 	struct spdk_fs_cb_args *args = &req->args;
1476 
1477 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1478 }
1479 
1480 int
1481 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1482 		    const char *name)
1483 {
1484 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1485 	struct spdk_fs_request *req;
1486 	struct spdk_fs_cb_args *args;
1487 	int rc;
1488 
1489 	req = alloc_fs_request(channel);
1490 	if (req == NULL) {
1491 		return -ENOMEM;
1492 	}
1493 
1494 	args = &req->args;
1495 	args->fs = fs;
1496 	args->op.delete.name = name;
1497 	args->sem = &channel->sem;
1498 	fs->send_request(__fs_delete_file, req);
1499 	sem_wait(&channel->sem);
1500 	rc = args->rc;
1501 	free_fs_request(req);
1502 
1503 	return rc;
1504 }
1505 
1506 spdk_fs_iter
1507 spdk_fs_iter_first(struct spdk_filesystem *fs)
1508 {
1509 	struct spdk_file *f;
1510 
1511 	f = TAILQ_FIRST(&fs->files);
1512 	return f;
1513 }
1514 
1515 spdk_fs_iter
1516 spdk_fs_iter_next(spdk_fs_iter iter)
1517 {
1518 	struct spdk_file *f = iter;
1519 
1520 	if (f == NULL) {
1521 		return NULL;
1522 	}
1523 
1524 	f = TAILQ_NEXT(f, tailq);
1525 	return f;
1526 }
1527 
1528 const char *
1529 spdk_file_get_name(struct spdk_file *file)
1530 {
1531 	return file->name;
1532 }
1533 
1534 uint64_t
1535 spdk_file_get_length(struct spdk_file *file)
1536 {
1537 	uint64_t length;
1538 
1539 	assert(file != NULL);
1540 
1541 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1542 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1543 	return length;
1544 }
1545 
1546 static void
1547 fs_truncate_complete_cb(void *ctx, int bserrno)
1548 {
1549 	struct spdk_fs_request *req = ctx;
1550 	struct spdk_fs_cb_args *args = &req->args;
1551 
1552 	args->fn.file_op(args->arg, bserrno);
1553 	free_fs_request(req);
1554 }
1555 
1556 static void
1557 fs_truncate_resize_cb(void *ctx, int bserrno)
1558 {
1559 	struct spdk_fs_request *req = ctx;
1560 	struct spdk_fs_cb_args *args = &req->args;
1561 	struct spdk_file *file = args->file;
1562 	uint64_t *length = &args->op.truncate.length;
1563 
1564 	if (bserrno) {
1565 		args->fn.file_op(args->arg, bserrno);
1566 		free_fs_request(req);
1567 		return;
1568 	}
1569 
1570 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1571 
1572 	file->length = *length;
1573 	if (file->append_pos > file->length) {
1574 		file->append_pos = file->length;
1575 	}
1576 
1577 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1578 }
1579 
1580 static uint64_t
1581 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1582 {
1583 	return (length + cluster_sz - 1) / cluster_sz;
1584 }
1585 
1586 void
1587 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1588 			 spdk_file_op_complete cb_fn, void *cb_arg)
1589 {
1590 	struct spdk_filesystem *fs;
1591 	size_t num_clusters;
1592 	struct spdk_fs_request *req;
1593 	struct spdk_fs_cb_args *args;
1594 
1595 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1596 	if (length == file->length) {
1597 		cb_fn(cb_arg, 0);
1598 		return;
1599 	}
1600 
1601 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1602 	if (req == NULL) {
1603 		cb_fn(cb_arg, -ENOMEM);
1604 		return;
1605 	}
1606 
1607 	args = &req->args;
1608 	args->fn.file_op = cb_fn;
1609 	args->arg = cb_arg;
1610 	args->file = file;
1611 	args->op.truncate.length = length;
1612 	fs = file->fs;
1613 
1614 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1615 
1616 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1617 }
1618 
1619 static void
1620 __truncate(void *arg)
1621 {
1622 	struct spdk_fs_request *req = arg;
1623 	struct spdk_fs_cb_args *args = &req->args;
1624 
1625 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1626 				 args->fn.file_op, args);
1627 }
1628 
1629 int
1630 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1631 		   uint64_t length)
1632 {
1633 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1634 	struct spdk_fs_request *req;
1635 	struct spdk_fs_cb_args *args;
1636 	int rc;
1637 
1638 	req = alloc_fs_request(channel);
1639 	if (req == NULL) {
1640 		return -ENOMEM;
1641 	}
1642 
1643 	args = &req->args;
1644 
1645 	args->file = file;
1646 	args->op.truncate.length = length;
1647 	args->fn.file_op = __wake_caller;
1648 	args->sem = &channel->sem;
1649 
1650 	channel->send_request(__truncate, req);
1651 	sem_wait(&channel->sem);
1652 	rc = args->rc;
1653 	free_fs_request(req);
1654 
1655 	return rc;
1656 }
1657 
1658 static void
1659 __rw_done(void *ctx, int bserrno)
1660 {
1661 	struct spdk_fs_request *req = ctx;
1662 	struct spdk_fs_cb_args *args = &req->args;
1663 
1664 	spdk_free(args->op.rw.pin_buf);
1665 	args->fn.file_op(args->arg, bserrno);
1666 	free_fs_request(req);
1667 }
1668 
1669 static void
1670 __read_done(void *ctx, int bserrno)
1671 {
1672 	struct spdk_fs_request *req = ctx;
1673 	struct spdk_fs_cb_args *args = &req->args;
1674 
1675 	assert(req != NULL);
1676 	if (args->op.rw.is_read) {
1677 		memcpy(args->iovs[0].iov_base,
1678 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1679 		       args->iovs[0].iov_len);
1680 		__rw_done(req, 0);
1681 	} else {
1682 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1683 		       args->iovs[0].iov_base,
1684 		       args->iovs[0].iov_len);
1685 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1686 				   args->op.rw.pin_buf,
1687 				   args->op.rw.start_lba, args->op.rw.num_lba,
1688 				   __rw_done, req);
1689 	}
1690 }
1691 
1692 static void
1693 __do_blob_read(void *ctx, int fserrno)
1694 {
1695 	struct spdk_fs_request *req = ctx;
1696 	struct spdk_fs_cb_args *args = &req->args;
1697 
1698 	if (fserrno) {
1699 		__rw_done(req, fserrno);
1700 		return;
1701 	}
1702 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1703 			  args->op.rw.pin_buf,
1704 			  args->op.rw.start_lba, args->op.rw.num_lba,
1705 			  __read_done, req);
1706 }
1707 
1708 static void
1709 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1710 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1711 {
1712 	uint64_t end_lba;
1713 
1714 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1715 	*start_lba = offset / *lba_size;
1716 	end_lba = (offset + length - 1) / *lba_size;
1717 	*num_lba = (end_lba - *start_lba + 1);
1718 }
1719 
1720 static void
1721 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1722 	    void *payload, uint64_t offset, uint64_t length,
1723 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1724 {
1725 	struct spdk_fs_request *req;
1726 	struct spdk_fs_cb_args *args;
1727 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1728 	uint64_t start_lba, num_lba, pin_buf_length;
1729 	uint32_t lba_size;
1730 
1731 	if (is_read && offset + length > file->length) {
1732 		cb_fn(cb_arg, -EINVAL);
1733 		return;
1734 	}
1735 
1736 	req = alloc_fs_request_with_iov(channel, 1);
1737 	if (req == NULL) {
1738 		cb_fn(cb_arg, -ENOMEM);
1739 		return;
1740 	}
1741 
1742 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1743 
1744 	args = &req->args;
1745 	args->fn.file_op = cb_fn;
1746 	args->arg = cb_arg;
1747 	args->file = file;
1748 	args->op.rw.channel = channel->bs_channel;
1749 	args->iovs[0].iov_base = payload;
1750 	args->iovs[0].iov_len = (size_t)length;
1751 	args->op.rw.is_read = is_read;
1752 	args->op.rw.offset = offset;
1753 	args->op.rw.blocklen = lba_size;
1754 
1755 	pin_buf_length = num_lba * lba_size;
1756 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1757 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1758 	if (args->op.rw.pin_buf == NULL) {
1759 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1760 			      file->name, offset, length);
1761 		free_fs_request(req);
1762 		cb_fn(cb_arg, -ENOMEM);
1763 		return;
1764 	}
1765 
1766 	args->op.rw.start_lba = start_lba;
1767 	args->op.rw.num_lba = num_lba;
1768 
1769 	if (!is_read && file->length < offset + length) {
1770 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1771 	} else {
1772 		__do_blob_read(req, 0);
1773 	}
1774 }
1775 
1776 void
1777 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1778 		      void *payload, uint64_t offset, uint64_t length,
1779 		      spdk_file_op_complete cb_fn, void *cb_arg)
1780 {
1781 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1782 }
1783 
1784 void
1785 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1786 		     void *payload, uint64_t offset, uint64_t length,
1787 		     spdk_file_op_complete cb_fn, void *cb_arg)
1788 {
1789 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1790 		      file->name, offset, length);
1791 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1792 }
1793 
1794 struct spdk_io_channel *
1795 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1796 {
1797 	struct spdk_io_channel *io_channel;
1798 	struct spdk_fs_channel *fs_channel;
1799 
1800 	io_channel = spdk_get_io_channel(&fs->io_target);
1801 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1802 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1803 	fs_channel->send_request = __send_request_direct;
1804 
1805 	return io_channel;
1806 }
1807 
1808 void
1809 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1810 {
1811 	spdk_put_io_channel(channel);
1812 }
1813 
1814 struct spdk_fs_thread_ctx *
1815 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1816 {
1817 	struct spdk_fs_thread_ctx *ctx;
1818 
1819 	ctx = calloc(1, sizeof(*ctx));
1820 	if (!ctx) {
1821 		return NULL;
1822 	}
1823 
1824 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1825 
1826 	ctx->ch.send_request = fs->send_request;
1827 	ctx->ch.sync = 1;
1828 	pthread_spin_init(&ctx->ch.lock, 0);
1829 
1830 	return ctx;
1831 }
1832 
1833 
1834 void
1835 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1836 {
1837 	assert(ctx->ch.sync == 1);
1838 
1839 	while (true) {
1840 		pthread_spin_lock(&ctx->ch.lock);
1841 		if (ctx->ch.outstanding_reqs == 0) {
1842 			pthread_spin_unlock(&ctx->ch.lock);
1843 			break;
1844 		}
1845 		pthread_spin_unlock(&ctx->ch.lock);
1846 		usleep(1000);
1847 	}
1848 
1849 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1850 	free(ctx);
1851 }
1852 
1853 void
1854 spdk_fs_set_cache_size(uint64_t size_in_mb)
1855 {
1856 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1857 }
1858 
1859 uint64_t
1860 spdk_fs_get_cache_size(void)
1861 {
1862 	return g_fs_cache_size / (1024 * 1024);
1863 }
1864 
1865 static void __file_flush(void *ctx);
1866 
1867 static void *
1868 alloc_cache_memory_buffer(struct spdk_file *context)
1869 {
1870 	struct spdk_file *file;
1871 	void *buf;
1872 
1873 	buf = spdk_mempool_get(g_cache_pool);
1874 	if (buf != NULL) {
1875 		return buf;
1876 	}
1877 
1878 	pthread_spin_lock(&g_caches_lock);
1879 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1880 		if (!file->open_for_writing &&
1881 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1882 		    file != context) {
1883 			break;
1884 		}
1885 	}
1886 	pthread_spin_unlock(&g_caches_lock);
1887 	if (file != NULL) {
1888 		cache_free_buffers(file);
1889 		buf = spdk_mempool_get(g_cache_pool);
1890 		if (buf != NULL) {
1891 			return buf;
1892 		}
1893 	}
1894 
1895 	pthread_spin_lock(&g_caches_lock);
1896 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1897 		if (!file->open_for_writing && file != context) {
1898 			break;
1899 		}
1900 	}
1901 	pthread_spin_unlock(&g_caches_lock);
1902 	if (file != NULL) {
1903 		cache_free_buffers(file);
1904 		buf = spdk_mempool_get(g_cache_pool);
1905 		if (buf != NULL) {
1906 			return buf;
1907 		}
1908 	}
1909 
1910 	pthread_spin_lock(&g_caches_lock);
1911 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1912 		if (file != context) {
1913 			break;
1914 		}
1915 	}
1916 	pthread_spin_unlock(&g_caches_lock);
1917 	if (file != NULL) {
1918 		cache_free_buffers(file);
1919 		buf = spdk_mempool_get(g_cache_pool);
1920 		if (buf != NULL) {
1921 			return buf;
1922 		}
1923 	}
1924 
1925 	return NULL;
1926 }
1927 
1928 static struct cache_buffer *
1929 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1930 {
1931 	struct cache_buffer *buf;
1932 	int count = 0;
1933 
1934 	buf = calloc(1, sizeof(*buf));
1935 	if (buf == NULL) {
1936 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1937 		return NULL;
1938 	}
1939 
1940 	buf->buf = alloc_cache_memory_buffer(file);
1941 	while (buf->buf == NULL) {
1942 		/*
1943 		 * TODO: alloc_cache_memory_buffer() should eventually free
1944 		 *  some buffers.  Need a more sophisticated check here, instead
1945 		 *  of just bailing if 100 tries does not result in getting a
1946 		 *  free buffer.  This will involve using the sync channel's
1947 		 *  semaphore to block until a buffer becomes available.
1948 		 */
1949 		if (count++ == 100) {
1950 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1951 				    file, offset);
1952 			free(buf);
1953 			return NULL;
1954 		}
1955 		buf->buf = alloc_cache_memory_buffer(file);
1956 	}
1957 
1958 	buf->buf_size = CACHE_BUFFER_SIZE;
1959 	buf->offset = offset;
1960 
1961 	pthread_spin_lock(&g_caches_lock);
1962 	if (file->tree->present_mask == 0) {
1963 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1964 	}
1965 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1966 	pthread_spin_unlock(&g_caches_lock);
1967 
1968 	return buf;
1969 }
1970 
1971 static struct cache_buffer *
1972 cache_append_buffer(struct spdk_file *file)
1973 {
1974 	struct cache_buffer *last;
1975 
1976 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1977 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1978 
1979 	last = cache_insert_buffer(file, file->append_pos);
1980 	if (last == NULL) {
1981 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1982 		return NULL;
1983 	}
1984 
1985 	file->last = last;
1986 
1987 	return last;
1988 }
1989 
1990 static void __check_sync_reqs(struct spdk_file *file);
1991 
1992 static void
1993 __file_cache_finish_sync(void *ctx, int bserrno)
1994 {
1995 	struct spdk_file *file = ctx;
1996 	struct spdk_fs_request *sync_req;
1997 	struct spdk_fs_cb_args *sync_args;
1998 
1999 	pthread_spin_lock(&file->lock);
2000 	sync_req = TAILQ_FIRST(&file->sync_requests);
2001 	sync_args = &sync_req->args;
2002 	assert(sync_args->op.sync.offset <= file->length_flushed);
2003 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2004 			  0, file->trace_arg_name);
2005 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2006 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2007 	pthread_spin_unlock(&file->lock);
2008 
2009 	sync_args->fn.file_op(sync_args->arg, bserrno);
2010 	__check_sync_reqs(file);
2011 
2012 	pthread_spin_lock(&file->lock);
2013 	free_fs_request(sync_req);
2014 	pthread_spin_unlock(&file->lock);
2015 }
2016 
2017 static void
2018 __check_sync_reqs(struct spdk_file *file)
2019 {
2020 	struct spdk_fs_request *sync_req;
2021 
2022 	pthread_spin_lock(&file->lock);
2023 
2024 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2025 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2026 			break;
2027 		}
2028 	}
2029 
2030 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2031 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2032 		sync_req->args.op.sync.xattr_in_progress = true;
2033 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2034 				    sizeof(file->length_flushed));
2035 
2036 		pthread_spin_unlock(&file->lock);
2037 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2038 				  0, file->trace_arg_name);
2039 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
2040 	} else {
2041 		pthread_spin_unlock(&file->lock);
2042 	}
2043 }
2044 
2045 static void
2046 __file_flush_done(void *ctx, int bserrno)
2047 {
2048 	struct spdk_fs_request *req = ctx;
2049 	struct spdk_fs_cb_args *args = &req->args;
2050 	struct spdk_file *file = args->file;
2051 	struct cache_buffer *next = args->op.flush.cache_buffer;
2052 
2053 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2054 
2055 	pthread_spin_lock(&file->lock);
2056 	next->in_progress = false;
2057 	next->bytes_flushed += args->op.flush.length;
2058 	file->length_flushed += args->op.flush.length;
2059 	if (file->length_flushed > file->length) {
2060 		file->length = file->length_flushed;
2061 	}
2062 	if (next->bytes_flushed == next->buf_size) {
2063 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2064 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2065 	}
2066 
2067 	/*
2068 	 * Assert that there is no cached data that extends past the end of the underlying
2069 	 *  blob.
2070 	 */
2071 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2072 	       next->bytes_filled == 0);
2073 
2074 	pthread_spin_unlock(&file->lock);
2075 
2076 	__check_sync_reqs(file);
2077 
2078 	__file_flush(req);
2079 }
2080 
2081 static void
2082 __file_flush(void *ctx)
2083 {
2084 	struct spdk_fs_request *req = ctx;
2085 	struct spdk_fs_cb_args *args = &req->args;
2086 	struct spdk_file *file = args->file;
2087 	struct cache_buffer *next;
2088 	uint64_t offset, length, start_lba, num_lba;
2089 	uint32_t lba_size;
2090 
2091 	pthread_spin_lock(&file->lock);
2092 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2093 	if (next == NULL || next->in_progress) {
2094 		/*
2095 		 * There is either no data to flush, or a flush I/O is already in
2096 		 *  progress.  So return immediately - if a flush I/O is in
2097 		 *  progress we will flush more data after that is completed.
2098 		 */
2099 		free_fs_request(req);
2100 		if (next == NULL) {
2101 			/*
2102 			 * For cases where a file's cache was evicted, and then the
2103 			 *  file was later appended, we will write the data directly
2104 			 *  to disk and bypass cache.  So just update length_flushed
2105 			 *  here to reflect that all data was already written to disk.
2106 			 */
2107 			file->length_flushed = file->append_pos;
2108 		}
2109 		pthread_spin_unlock(&file->lock);
2110 		if (next == NULL) {
2111 			/*
2112 			 * There is no data to flush, but we still need to check for any
2113 			 *  outstanding sync requests to make sure metadata gets updated.
2114 			 */
2115 			__check_sync_reqs(file);
2116 		}
2117 		return;
2118 	}
2119 
2120 	offset = next->offset + next->bytes_flushed;
2121 	length = next->bytes_filled - next->bytes_flushed;
2122 	if (length == 0) {
2123 		free_fs_request(req);
2124 		pthread_spin_unlock(&file->lock);
2125 		return;
2126 	}
2127 	args->op.flush.length = length;
2128 	args->op.flush.cache_buffer = next;
2129 
2130 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2131 
2132 	next->in_progress = true;
2133 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2134 		     offset, length, start_lba, num_lba);
2135 	pthread_spin_unlock(&file->lock);
2136 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2137 			   next->buf + (start_lba * lba_size) - next->offset,
2138 			   start_lba, num_lba, __file_flush_done, req);
2139 }
2140 
2141 static void
2142 __file_extend_done(void *arg, int bserrno)
2143 {
2144 	struct spdk_fs_cb_args *args = arg;
2145 
2146 	__wake_caller(args, bserrno);
2147 }
2148 
2149 static void
2150 __file_extend_resize_cb(void *_args, int bserrno)
2151 {
2152 	struct spdk_fs_cb_args *args = _args;
2153 	struct spdk_file *file = args->file;
2154 
2155 	if (bserrno) {
2156 		__wake_caller(args, bserrno);
2157 		return;
2158 	}
2159 
2160 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2161 }
2162 
2163 static void
2164 __file_extend_blob(void *_args)
2165 {
2166 	struct spdk_fs_cb_args *args = _args;
2167 	struct spdk_file *file = args->file;
2168 
2169 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2170 }
2171 
2172 static void
2173 __rw_from_file_done(void *ctx, int bserrno)
2174 {
2175 	struct spdk_fs_request *req = ctx;
2176 
2177 	__wake_caller(&req->args, bserrno);
2178 	free_fs_request(req);
2179 }
2180 
2181 static void
2182 __rw_from_file(void *ctx)
2183 {
2184 	struct spdk_fs_request *req = ctx;
2185 	struct spdk_fs_cb_args *args = &req->args;
2186 	struct spdk_file *file = args->file;
2187 
2188 	if (args->op.rw.is_read) {
2189 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2190 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2191 				     __rw_from_file_done, req);
2192 	} else {
2193 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2194 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2195 				      __rw_from_file_done, req);
2196 	}
2197 }
2198 
2199 static int
2200 __send_rw_from_file(struct spdk_file *file, void *payload,
2201 		    uint64_t offset, uint64_t length, bool is_read,
2202 		    struct spdk_fs_channel *channel)
2203 {
2204 	struct spdk_fs_request *req;
2205 	struct spdk_fs_cb_args *args;
2206 
2207 	req = alloc_fs_request_with_iov(channel, 1);
2208 	if (req == NULL) {
2209 		sem_post(&channel->sem);
2210 		return -ENOMEM;
2211 	}
2212 
2213 	args = &req->args;
2214 	args->file = file;
2215 	args->sem = &channel->sem;
2216 	args->iovs[0].iov_base = payload;
2217 	args->iovs[0].iov_len = (size_t)length;
2218 	args->op.rw.offset = offset;
2219 	args->op.rw.is_read = is_read;
2220 	file->fs->send_request(__rw_from_file, req);
2221 	return 0;
2222 }
2223 
2224 int
2225 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2226 		void *payload, uint64_t offset, uint64_t length)
2227 {
2228 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2229 	struct spdk_fs_request *flush_req;
2230 	uint64_t rem_length, copy, blob_size, cluster_sz;
2231 	uint32_t cache_buffers_filled = 0;
2232 	uint8_t *cur_payload;
2233 	struct cache_buffer *last;
2234 
2235 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2236 
2237 	if (length == 0) {
2238 		return 0;
2239 	}
2240 
2241 	if (offset != file->append_pos) {
2242 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2243 		return -EINVAL;
2244 	}
2245 
2246 	pthread_spin_lock(&file->lock);
2247 	file->open_for_writing = true;
2248 
2249 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2250 		cache_append_buffer(file);
2251 	}
2252 
2253 	if (file->last == NULL) {
2254 		int rc;
2255 
2256 		file->append_pos += length;
2257 		pthread_spin_unlock(&file->lock);
2258 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2259 		sem_wait(&channel->sem);
2260 		return rc;
2261 	}
2262 
2263 	blob_size = __file_get_blob_size(file);
2264 
2265 	if ((offset + length) > blob_size) {
2266 		struct spdk_fs_cb_args extend_args = {};
2267 
2268 		cluster_sz = file->fs->bs_opts.cluster_sz;
2269 		extend_args.sem = &channel->sem;
2270 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2271 		extend_args.file = file;
2272 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2273 		pthread_spin_unlock(&file->lock);
2274 		file->fs->send_request(__file_extend_blob, &extend_args);
2275 		sem_wait(&channel->sem);
2276 		if (extend_args.rc) {
2277 			return extend_args.rc;
2278 		}
2279 	}
2280 
2281 	flush_req = alloc_fs_request(channel);
2282 	if (flush_req == NULL) {
2283 		pthread_spin_unlock(&file->lock);
2284 		return -ENOMEM;
2285 	}
2286 
2287 	last = file->last;
2288 	rem_length = length;
2289 	cur_payload = payload;
2290 	while (rem_length > 0) {
2291 		copy = last->buf_size - last->bytes_filled;
2292 		if (copy > rem_length) {
2293 			copy = rem_length;
2294 		}
2295 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2296 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2297 		file->append_pos += copy;
2298 		if (file->length < file->append_pos) {
2299 			file->length = file->append_pos;
2300 		}
2301 		cur_payload += copy;
2302 		last->bytes_filled += copy;
2303 		rem_length -= copy;
2304 		if (last->bytes_filled == last->buf_size) {
2305 			cache_buffers_filled++;
2306 			last = cache_append_buffer(file);
2307 			if (last == NULL) {
2308 				BLOBFS_TRACE(file, "nomem\n");
2309 				free_fs_request(flush_req);
2310 				pthread_spin_unlock(&file->lock);
2311 				return -ENOMEM;
2312 			}
2313 		}
2314 	}
2315 
2316 	pthread_spin_unlock(&file->lock);
2317 
2318 	if (cache_buffers_filled == 0) {
2319 		free_fs_request(flush_req);
2320 		return 0;
2321 	}
2322 
2323 	flush_req->args.file = file;
2324 	file->fs->send_request(__file_flush, flush_req);
2325 	return 0;
2326 }
2327 
2328 static void
2329 __readahead_done(void *ctx, int bserrno)
2330 {
2331 	struct spdk_fs_request *req = ctx;
2332 	struct spdk_fs_cb_args *args = &req->args;
2333 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2334 	struct spdk_file *file = args->file;
2335 
2336 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2337 
2338 	pthread_spin_lock(&file->lock);
2339 	cache_buffer->bytes_filled = args->op.readahead.length;
2340 	cache_buffer->bytes_flushed = args->op.readahead.length;
2341 	cache_buffer->in_progress = false;
2342 	pthread_spin_unlock(&file->lock);
2343 
2344 	free_fs_request(req);
2345 }
2346 
2347 static void
2348 __readahead(void *ctx)
2349 {
2350 	struct spdk_fs_request *req = ctx;
2351 	struct spdk_fs_cb_args *args = &req->args;
2352 	struct spdk_file *file = args->file;
2353 	uint64_t offset, length, start_lba, num_lba;
2354 	uint32_t lba_size;
2355 
2356 	offset = args->op.readahead.offset;
2357 	length = args->op.readahead.length;
2358 	assert(length > 0);
2359 
2360 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2361 
2362 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2363 		     offset, length, start_lba, num_lba);
2364 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2365 			  args->op.readahead.cache_buffer->buf,
2366 			  start_lba, num_lba, __readahead_done, req);
2367 }
2368 
2369 static uint64_t
2370 __next_cache_buffer_offset(uint64_t offset)
2371 {
2372 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2373 }
2374 
2375 static void
2376 check_readahead(struct spdk_file *file, uint64_t offset,
2377 		struct spdk_fs_channel *channel)
2378 {
2379 	struct spdk_fs_request *req;
2380 	struct spdk_fs_cb_args *args;
2381 
2382 	offset = __next_cache_buffer_offset(offset);
2383 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2384 		return;
2385 	}
2386 
2387 	req = alloc_fs_request(channel);
2388 	if (req == NULL) {
2389 		return;
2390 	}
2391 	args = &req->args;
2392 
2393 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2394 
2395 	args->file = file;
2396 	args->op.readahead.offset = offset;
2397 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2398 	if (!args->op.readahead.cache_buffer) {
2399 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2400 		free_fs_request(req);
2401 		return;
2402 	}
2403 
2404 	args->op.readahead.cache_buffer->in_progress = true;
2405 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2406 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2407 	} else {
2408 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2409 	}
2410 	file->fs->send_request(__readahead, req);
2411 }
2412 
2413 static int
2414 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2415 	    struct spdk_fs_channel *channel)
2416 {
2417 	struct cache_buffer *buf;
2418 	int rc;
2419 
2420 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2421 	if (buf == NULL) {
2422 		pthread_spin_unlock(&file->lock);
2423 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2424 		pthread_spin_lock(&file->lock);
2425 		return rc;
2426 	}
2427 
2428 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2429 		length = buf->offset + buf->bytes_filled - offset;
2430 	}
2431 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2432 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2433 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2434 		pthread_spin_lock(&g_caches_lock);
2435 		spdk_tree_remove_buffer(file->tree, buf);
2436 		if (file->tree->present_mask == 0) {
2437 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2438 		}
2439 		pthread_spin_unlock(&g_caches_lock);
2440 	}
2441 
2442 	sem_post(&channel->sem);
2443 	return 0;
2444 }
2445 
2446 int64_t
2447 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2448 	       void *payload, uint64_t offset, uint64_t length)
2449 {
2450 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2451 	uint64_t final_offset, final_length;
2452 	uint32_t sub_reads = 0;
2453 	int rc = 0;
2454 
2455 	pthread_spin_lock(&file->lock);
2456 
2457 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2458 
2459 	file->open_for_writing = false;
2460 
2461 	if (length == 0 || offset >= file->append_pos) {
2462 		pthread_spin_unlock(&file->lock);
2463 		return 0;
2464 	}
2465 
2466 	if (offset + length > file->append_pos) {
2467 		length = file->append_pos - offset;
2468 	}
2469 
2470 	if (offset != file->next_seq_offset) {
2471 		file->seq_byte_count = 0;
2472 	}
2473 	file->seq_byte_count += length;
2474 	file->next_seq_offset = offset + length;
2475 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2476 		check_readahead(file, offset, channel);
2477 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2478 	}
2479 
2480 	final_length = 0;
2481 	final_offset = offset + length;
2482 	while (offset < final_offset) {
2483 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2484 		if (length > (final_offset - offset)) {
2485 			length = final_offset - offset;
2486 		}
2487 		rc = __file_read(file, payload, offset, length, channel);
2488 		if (rc == 0) {
2489 			final_length += length;
2490 		} else {
2491 			break;
2492 		}
2493 		payload += length;
2494 		offset += length;
2495 		sub_reads++;
2496 	}
2497 	pthread_spin_unlock(&file->lock);
2498 	while (sub_reads-- > 0) {
2499 		sem_wait(&channel->sem);
2500 	}
2501 	if (rc == 0) {
2502 		return final_length;
2503 	} else {
2504 		return rc;
2505 	}
2506 }
2507 
2508 static void
2509 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2510 	   spdk_file_op_complete cb_fn, void *cb_arg)
2511 {
2512 	struct spdk_fs_request *sync_req;
2513 	struct spdk_fs_request *flush_req;
2514 	struct spdk_fs_cb_args *sync_args;
2515 	struct spdk_fs_cb_args *flush_args;
2516 
2517 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2518 
2519 	pthread_spin_lock(&file->lock);
2520 	if (file->append_pos <= file->length_flushed) {
2521 		BLOBFS_TRACE(file, "done - no data to flush\n");
2522 		pthread_spin_unlock(&file->lock);
2523 		cb_fn(cb_arg, 0);
2524 		return;
2525 	}
2526 
2527 	sync_req = alloc_fs_request(channel);
2528 	if (!sync_req) {
2529 		pthread_spin_unlock(&file->lock);
2530 		cb_fn(cb_arg, -ENOMEM);
2531 		return;
2532 	}
2533 	sync_args = &sync_req->args;
2534 
2535 	flush_req = alloc_fs_request(channel);
2536 	if (!flush_req) {
2537 		pthread_spin_unlock(&file->lock);
2538 		cb_fn(cb_arg, -ENOMEM);
2539 		return;
2540 	}
2541 	flush_args = &flush_req->args;
2542 
2543 	sync_args->file = file;
2544 	sync_args->fn.file_op = cb_fn;
2545 	sync_args->arg = cb_arg;
2546 	sync_args->op.sync.offset = file->append_pos;
2547 	sync_args->op.sync.xattr_in_progress = false;
2548 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2549 	pthread_spin_unlock(&file->lock);
2550 
2551 	flush_args->file = file;
2552 	channel->send_request(__file_flush, flush_req);
2553 }
2554 
2555 int
2556 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2557 {
2558 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2559 	struct spdk_fs_cb_args args = {};
2560 
2561 	args.sem = &channel->sem;
2562 	_file_sync(file, channel, __wake_caller, &args);
2563 	sem_wait(&channel->sem);
2564 
2565 	return args.rc;
2566 }
2567 
2568 void
2569 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2570 		     spdk_file_op_complete cb_fn, void *cb_arg)
2571 {
2572 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2573 
2574 	_file_sync(file, channel, cb_fn, cb_arg);
2575 }
2576 
2577 void
2578 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2579 {
2580 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2581 	file->priority = priority;
2582 
2583 }
2584 
2585 /*
2586  * Close routines
2587  */
2588 
2589 static void
2590 __file_close_async_done(void *ctx, int bserrno)
2591 {
2592 	struct spdk_fs_request *req = ctx;
2593 	struct spdk_fs_cb_args *args = &req->args;
2594 	struct spdk_file *file = args->file;
2595 
2596 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2597 
2598 	if (file->is_deleted) {
2599 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2600 		return;
2601 	}
2602 
2603 	args->fn.file_op(args->arg, bserrno);
2604 	free_fs_request(req);
2605 }
2606 
2607 static void
2608 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2609 {
2610 	struct spdk_blob *blob;
2611 
2612 	pthread_spin_lock(&file->lock);
2613 	if (file->ref_count == 0) {
2614 		pthread_spin_unlock(&file->lock);
2615 		__file_close_async_done(req, -EBADF);
2616 		return;
2617 	}
2618 
2619 	file->ref_count--;
2620 	if (file->ref_count > 0) {
2621 		pthread_spin_unlock(&file->lock);
2622 		req->args.fn.file_op(req->args.arg, 0);
2623 		free_fs_request(req);
2624 		return;
2625 	}
2626 
2627 	pthread_spin_unlock(&file->lock);
2628 
2629 	blob = file->blob;
2630 	file->blob = NULL;
2631 	spdk_blob_close(blob, __file_close_async_done, req);
2632 }
2633 
2634 static void
2635 __file_close_async__sync_done(void *arg, int fserrno)
2636 {
2637 	struct spdk_fs_request *req = arg;
2638 	struct spdk_fs_cb_args *args = &req->args;
2639 
2640 	__file_close_async(args->file, req);
2641 }
2642 
2643 void
2644 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2645 {
2646 	struct spdk_fs_request *req;
2647 	struct spdk_fs_cb_args *args;
2648 
2649 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2650 	if (req == NULL) {
2651 		cb_fn(cb_arg, -ENOMEM);
2652 		return;
2653 	}
2654 
2655 	args = &req->args;
2656 	args->file = file;
2657 	args->fn.file_op = cb_fn;
2658 	args->arg = cb_arg;
2659 
2660 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2661 }
2662 
2663 static void
2664 __file_close(void *arg)
2665 {
2666 	struct spdk_fs_request *req = arg;
2667 	struct spdk_fs_cb_args *args = &req->args;
2668 	struct spdk_file *file = args->file;
2669 
2670 	__file_close_async(file, req);
2671 }
2672 
2673 int
2674 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2675 {
2676 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2677 	struct spdk_fs_request *req;
2678 	struct spdk_fs_cb_args *args;
2679 
2680 	req = alloc_fs_request(channel);
2681 	if (req == NULL) {
2682 		return -ENOMEM;
2683 	}
2684 
2685 	args = &req->args;
2686 
2687 	spdk_file_sync(file, ctx);
2688 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2689 	args->file = file;
2690 	args->sem = &channel->sem;
2691 	args->fn.file_op = __wake_caller;
2692 	args->arg = req;
2693 	channel->send_request(__file_close, req);
2694 	sem_wait(&channel->sem);
2695 
2696 	return args->rc;
2697 }
2698 
2699 int
2700 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2701 {
2702 	if (size < sizeof(spdk_blob_id)) {
2703 		return -EINVAL;
2704 	}
2705 
2706 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2707 
2708 	return sizeof(spdk_blob_id);
2709 }
2710 
2711 static void
2712 cache_free_buffers(struct spdk_file *file)
2713 {
2714 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2715 	pthread_spin_lock(&file->lock);
2716 	pthread_spin_lock(&g_caches_lock);
2717 	if (file->tree->present_mask == 0) {
2718 		pthread_spin_unlock(&g_caches_lock);
2719 		pthread_spin_unlock(&file->lock);
2720 		return;
2721 	}
2722 	spdk_tree_free_buffers(file->tree);
2723 
2724 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2725 	/* If not freed, put it in the end of the queue */
2726 	if (file->tree->present_mask != 0) {
2727 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2728 	}
2729 	file->last = NULL;
2730 	pthread_spin_unlock(&g_caches_lock);
2731 	pthread_spin_unlock(&file->lock);
2732 }
2733 
2734 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2735 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2736