xref: /spdk/lib/blobfs/blobfs.c (revision dd1c38cc680e4e8ca2642e93bf289072bff7fc3d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	struct iovec *iovs;
138 	uint32_t iovcnt;
139 	struct iovec iov;
140 	union {
141 		struct {
142 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
143 		} fs_load;
144 		struct {
145 			uint64_t	length;
146 		} truncate;
147 		struct {
148 			struct spdk_io_channel	*channel;
149 			void		*pin_buf;
150 			int		is_read;
151 			off_t		offset;
152 			size_t		length;
153 			uint64_t	start_lba;
154 			uint64_t	num_lba;
155 			uint32_t	blocklen;
156 		} rw;
157 		struct {
158 			const char	*old_name;
159 			const char	*new_name;
160 		} rename;
161 		struct {
162 			struct cache_buffer	*cache_buffer;
163 			uint64_t		length;
164 		} flush;
165 		struct {
166 			struct cache_buffer	*cache_buffer;
167 			uint64_t		length;
168 			uint64_t		offset;
169 		} readahead;
170 		struct {
171 			uint64_t			offset;
172 			TAILQ_ENTRY(spdk_fs_request)	tailq;
173 			bool				xattr_in_progress;
174 		} sync;
175 		struct {
176 			uint32_t			num_clusters;
177 		} resize;
178 		struct {
179 			const char	*name;
180 			uint32_t	flags;
181 			TAILQ_ENTRY(spdk_fs_request)	tailq;
182 		} open;
183 		struct {
184 			const char		*name;
185 			struct spdk_blob	*blob;
186 		} create;
187 		struct {
188 			const char	*name;
189 		} delete;
190 		struct {
191 			const char	*name;
192 		} stat;
193 	} op;
194 };
195 
196 static void cache_free_buffers(struct spdk_file *file);
197 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
198 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
199 
200 void
201 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
202 {
203 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
204 }
205 
206 static void
207 __initialize_cache(void)
208 {
209 	assert(g_cache_pool == NULL);
210 
211 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
212 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
213 					   CACHE_BUFFER_SIZE,
214 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
215 					   SPDK_ENV_SOCKET_ID_ANY);
216 	if (!g_cache_pool) {
217 		SPDK_ERRLOG("Create mempool failed, you may "
218 			    "increase the memory and try again\n");
219 		assert(false);
220 	}
221 	TAILQ_INIT(&g_caches);
222 	pthread_spin_init(&g_caches_lock, 0);
223 }
224 
225 static void
226 __free_cache(void)
227 {
228 	assert(g_cache_pool != NULL);
229 
230 	spdk_mempool_free(g_cache_pool);
231 	g_cache_pool = NULL;
232 }
233 
234 static uint64_t
235 __file_get_blob_size(struct spdk_file *file)
236 {
237 	uint64_t cluster_sz;
238 
239 	cluster_sz = file->fs->bs_opts.cluster_sz;
240 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
241 }
242 
243 struct spdk_fs_request {
244 	struct spdk_fs_cb_args		args;
245 	TAILQ_ENTRY(spdk_fs_request)	link;
246 	struct spdk_fs_channel		*channel;
247 };
248 
249 struct spdk_fs_channel {
250 	struct spdk_fs_request		*req_mem;
251 	TAILQ_HEAD(, spdk_fs_request)	reqs;
252 	sem_t				sem;
253 	struct spdk_filesystem		*fs;
254 	struct spdk_io_channel		*bs_channel;
255 	fs_send_request_fn		send_request;
256 	bool				sync;
257 	uint32_t			outstanding_reqs;
258 	pthread_spinlock_t		lock;
259 };
260 
261 /* For now, this is effectively an alias. But eventually we'll shift
262  * some data members over. */
263 struct spdk_fs_thread_ctx {
264 	struct spdk_fs_channel	ch;
265 };
266 
267 static struct spdk_fs_request *
268 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
269 {
270 	struct spdk_fs_request *req;
271 	struct iovec *iovs = NULL;
272 
273 	if (iovcnt > 1) {
274 		iovs = calloc(iovcnt, sizeof(struct iovec));
275 		if (!iovs) {
276 			return NULL;
277 		}
278 	}
279 
280 	if (channel->sync) {
281 		pthread_spin_lock(&channel->lock);
282 	}
283 
284 	req = TAILQ_FIRST(&channel->reqs);
285 	if (req) {
286 		channel->outstanding_reqs++;
287 		TAILQ_REMOVE(&channel->reqs, req, link);
288 	}
289 
290 	if (channel->sync) {
291 		pthread_spin_unlock(&channel->lock);
292 	}
293 
294 	if (req == NULL) {
295 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
296 		free(iovs);
297 		return NULL;
298 	}
299 	memset(req, 0, sizeof(*req));
300 	req->channel = channel;
301 	if (iovcnt > 1) {
302 		req->args.iovs = iovs;
303 	} else {
304 		req->args.iovs = &req->args.iov;
305 	}
306 	req->args.iovcnt = iovcnt;
307 
308 	return req;
309 }
310 
311 static struct spdk_fs_request *
312 alloc_fs_request(struct spdk_fs_channel *channel)
313 {
314 	return alloc_fs_request_with_iov(channel, 0);
315 }
316 
317 static void
318 free_fs_request(struct spdk_fs_request *req)
319 {
320 	struct spdk_fs_channel *channel = req->channel;
321 
322 	if (req->args.iovcnt > 1) {
323 		free(req->args.iovs);
324 	}
325 
326 	if (channel->sync) {
327 		pthread_spin_lock(&channel->lock);
328 	}
329 
330 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
331 	channel->outstanding_reqs--;
332 
333 	if (channel->sync) {
334 		pthread_spin_unlock(&channel->lock);
335 	}
336 }
337 
338 static int
339 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
340 			uint32_t max_ops)
341 {
342 	uint32_t i;
343 
344 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
345 	if (!channel->req_mem) {
346 		return -1;
347 	}
348 
349 	channel->outstanding_reqs = 0;
350 	TAILQ_INIT(&channel->reqs);
351 	sem_init(&channel->sem, 0, 0);
352 
353 	for (i = 0; i < max_ops; i++) {
354 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
355 	}
356 
357 	channel->fs = fs;
358 
359 	return 0;
360 }
361 
362 static int
363 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
364 {
365 	struct spdk_filesystem		*fs;
366 	struct spdk_fs_channel		*channel = ctx_buf;
367 
368 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
369 
370 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
371 }
372 
373 static int
374 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
375 {
376 	struct spdk_filesystem		*fs;
377 	struct spdk_fs_channel		*channel = ctx_buf;
378 
379 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
380 
381 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
382 }
383 
384 static int
385 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
386 {
387 	struct spdk_filesystem		*fs;
388 	struct spdk_fs_channel		*channel = ctx_buf;
389 
390 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
391 
392 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
393 }
394 
395 static void
396 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
397 {
398 	struct spdk_fs_channel *channel = ctx_buf;
399 
400 	if (channel->outstanding_reqs > 0) {
401 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
402 			    channel->outstanding_reqs);
403 	}
404 
405 	free(channel->req_mem);
406 	if (channel->bs_channel != NULL) {
407 		spdk_bs_free_io_channel(channel->bs_channel);
408 	}
409 }
410 
411 static void
412 __send_request_direct(fs_request_fn fn, void *arg)
413 {
414 	fn(arg);
415 }
416 
417 static void
418 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
419 {
420 	fs->bs = bs;
421 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
422 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
423 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
424 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
425 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
426 
427 	pthread_mutex_lock(&g_cache_init_lock);
428 	if (g_fs_count == 0) {
429 		__initialize_cache();
430 	}
431 	g_fs_count++;
432 	pthread_mutex_unlock(&g_cache_init_lock);
433 }
434 
435 static void
436 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
437 {
438 	struct spdk_fs_request *req = ctx;
439 	struct spdk_fs_cb_args *args = &req->args;
440 	struct spdk_filesystem *fs = args->fs;
441 
442 	if (bserrno == 0) {
443 		common_fs_bs_init(fs, bs);
444 	} else {
445 		free(fs);
446 		fs = NULL;
447 	}
448 
449 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
450 	free_fs_request(req);
451 }
452 
453 static void
454 fs_conf_parse(void)
455 {
456 	struct spdk_conf_section *sp;
457 
458 	sp = spdk_conf_find_section(NULL, "Blobfs");
459 	if (sp == NULL) {
460 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
461 		return;
462 	}
463 
464 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
465 	if (g_fs_cache_buffer_shift <= 0) {
466 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
467 	}
468 }
469 
470 static struct spdk_filesystem *
471 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
472 {
473 	struct spdk_filesystem *fs;
474 
475 	fs = calloc(1, sizeof(*fs));
476 	if (fs == NULL) {
477 		return NULL;
478 	}
479 
480 	fs->bdev = dev;
481 	fs->send_request = send_request_fn;
482 	TAILQ_INIT(&fs->files);
483 
484 	fs->md_target.max_ops = 512;
485 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
486 				sizeof(struct spdk_fs_channel), "blobfs_md");
487 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
488 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
489 
490 	fs->sync_target.max_ops = 512;
491 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
492 				sizeof(struct spdk_fs_channel), "blobfs_sync");
493 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
494 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
495 
496 	fs->io_target.max_ops = 512;
497 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
498 				sizeof(struct spdk_fs_channel), "blobfs_io");
499 
500 	return fs;
501 }
502 
503 static void
504 __wake_caller(void *arg, int fserrno)
505 {
506 	struct spdk_fs_cb_args *args = arg;
507 
508 	args->rc = fserrno;
509 	sem_post(args->sem);
510 }
511 
512 void
513 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
514 	     fs_send_request_fn send_request_fn,
515 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
516 {
517 	struct spdk_filesystem *fs;
518 	struct spdk_fs_request *req;
519 	struct spdk_fs_cb_args *args;
520 	struct spdk_bs_opts opts = {};
521 
522 	fs = fs_alloc(dev, send_request_fn);
523 	if (fs == NULL) {
524 		cb_fn(cb_arg, NULL, -ENOMEM);
525 		return;
526 	}
527 
528 	fs_conf_parse();
529 
530 	req = alloc_fs_request(fs->md_target.md_fs_channel);
531 	if (req == NULL) {
532 		spdk_fs_free_io_channels(fs);
533 		spdk_fs_io_device_unregister(fs);
534 		cb_fn(cb_arg, NULL, -ENOMEM);
535 		return;
536 	}
537 
538 	args = &req->args;
539 	args->fn.fs_op_with_handle = cb_fn;
540 	args->arg = cb_arg;
541 	args->fs = fs;
542 
543 	spdk_bs_opts_init(&opts);
544 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
545 	if (opt) {
546 		opts.cluster_sz = opt->cluster_sz;
547 	}
548 	spdk_bs_init(dev, &opts, init_cb, req);
549 }
550 
551 static struct spdk_file *
552 file_alloc(struct spdk_filesystem *fs)
553 {
554 	struct spdk_file *file;
555 
556 	file = calloc(1, sizeof(*file));
557 	if (file == NULL) {
558 		return NULL;
559 	}
560 
561 	file->tree = calloc(1, sizeof(*file->tree));
562 	if (file->tree == NULL) {
563 		free(file);
564 		return NULL;
565 	}
566 
567 	file->fs = fs;
568 	TAILQ_INIT(&file->open_requests);
569 	TAILQ_INIT(&file->sync_requests);
570 	pthread_spin_init(&file->lock, 0);
571 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
572 	file->priority = SPDK_FILE_PRIORITY_LOW;
573 	return file;
574 }
575 
576 static void fs_load_done(void *ctx, int bserrno);
577 
578 static int
579 _handle_deleted_files(struct spdk_fs_request *req)
580 {
581 	struct spdk_fs_cb_args *args = &req->args;
582 	struct spdk_filesystem *fs = args->fs;
583 
584 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
585 		struct spdk_deleted_file *deleted_file;
586 
587 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
588 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
589 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
590 		free(deleted_file);
591 		return 0;
592 	}
593 
594 	return 1;
595 }
596 
597 static void
598 fs_load_done(void *ctx, int bserrno)
599 {
600 	struct spdk_fs_request *req = ctx;
601 	struct spdk_fs_cb_args *args = &req->args;
602 	struct spdk_filesystem *fs = args->fs;
603 
604 	/* The filesystem has been loaded.  Now check if there are any files that
605 	 *  were marked for deletion before last unload.  Do not complete the
606 	 *  fs_load callback until all of them have been deleted on disk.
607 	 */
608 	if (_handle_deleted_files(req) == 0) {
609 		/* We found a file that's been marked for deleting but not actually
610 		 *  deleted yet.  This function will get called again once the delete
611 		 *  operation is completed.
612 		 */
613 		return;
614 	}
615 
616 	args->fn.fs_op_with_handle(args->arg, fs, 0);
617 	free_fs_request(req);
618 
619 }
620 
621 static void
622 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
623 {
624 	struct spdk_fs_request *req = ctx;
625 	struct spdk_fs_cb_args *args = &req->args;
626 	struct spdk_filesystem *fs = args->fs;
627 	uint64_t *length;
628 	const char *name;
629 	uint32_t *is_deleted;
630 	size_t value_len;
631 
632 	if (rc < 0) {
633 		args->fn.fs_op_with_handle(args->arg, fs, rc);
634 		free_fs_request(req);
635 		return;
636 	}
637 
638 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
639 	if (rc < 0) {
640 		args->fn.fs_op_with_handle(args->arg, fs, rc);
641 		free_fs_request(req);
642 		return;
643 	}
644 
645 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
646 	if (rc < 0) {
647 		args->fn.fs_op_with_handle(args->arg, fs, rc);
648 		free_fs_request(req);
649 		return;
650 	}
651 
652 	assert(value_len == 8);
653 
654 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
655 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
656 	if (rc < 0) {
657 		struct spdk_file *f;
658 
659 		f = file_alloc(fs);
660 		if (f == NULL) {
661 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
662 			free_fs_request(req);
663 			return;
664 		}
665 
666 		f->name = strdup(name);
667 		f->blobid = spdk_blob_get_id(blob);
668 		f->length = *length;
669 		f->length_flushed = *length;
670 		f->append_pos = *length;
671 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
672 	} else {
673 		struct spdk_deleted_file *deleted_file;
674 
675 		deleted_file = calloc(1, sizeof(*deleted_file));
676 		if (deleted_file == NULL) {
677 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
678 			free_fs_request(req);
679 			return;
680 		}
681 		deleted_file->id = spdk_blob_get_id(blob);
682 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
683 	}
684 }
685 
686 static void
687 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
688 {
689 	struct spdk_fs_request *req = ctx;
690 	struct spdk_fs_cb_args *args = &req->args;
691 	struct spdk_filesystem *fs = args->fs;
692 	struct spdk_bs_type bstype;
693 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
694 	static const struct spdk_bs_type zeros;
695 
696 	if (bserrno != 0) {
697 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
698 		free_fs_request(req);
699 		free(fs);
700 		return;
701 	}
702 
703 	bstype = spdk_bs_get_bstype(bs);
704 
705 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
706 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
707 		spdk_bs_set_bstype(bs, blobfs_type);
708 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
709 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
710 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
711 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
712 		free_fs_request(req);
713 		free(fs);
714 		return;
715 	}
716 
717 	common_fs_bs_init(fs, bs);
718 	fs_load_done(req, 0);
719 }
720 
721 static void
722 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
723 {
724 	assert(fs != NULL);
725 	spdk_io_device_unregister(&fs->md_target, NULL);
726 	spdk_io_device_unregister(&fs->sync_target, NULL);
727 	spdk_io_device_unregister(&fs->io_target, NULL);
728 	free(fs);
729 }
730 
731 static void
732 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
733 {
734 	assert(fs != NULL);
735 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
736 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
737 }
738 
739 void
740 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
741 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
742 {
743 	struct spdk_filesystem *fs;
744 	struct spdk_fs_cb_args *args;
745 	struct spdk_fs_request *req;
746 	struct spdk_bs_opts	bs_opts;
747 
748 	fs = fs_alloc(dev, send_request_fn);
749 	if (fs == NULL) {
750 		cb_fn(cb_arg, NULL, -ENOMEM);
751 		return;
752 	}
753 
754 	fs_conf_parse();
755 
756 	req = alloc_fs_request(fs->md_target.md_fs_channel);
757 	if (req == NULL) {
758 		spdk_fs_free_io_channels(fs);
759 		spdk_fs_io_device_unregister(fs);
760 		cb_fn(cb_arg, NULL, -ENOMEM);
761 		return;
762 	}
763 
764 	args = &req->args;
765 	args->fn.fs_op_with_handle = cb_fn;
766 	args->arg = cb_arg;
767 	args->fs = fs;
768 	TAILQ_INIT(&args->op.fs_load.deleted_files);
769 	spdk_bs_opts_init(&bs_opts);
770 	bs_opts.iter_cb_fn = iter_cb;
771 	bs_opts.iter_cb_arg = req;
772 	spdk_bs_load(dev, &bs_opts, load_cb, req);
773 }
774 
775 static void
776 unload_cb(void *ctx, int bserrno)
777 {
778 	struct spdk_fs_request *req = ctx;
779 	struct spdk_fs_cb_args *args = &req->args;
780 	struct spdk_filesystem *fs = args->fs;
781 	struct spdk_file *file, *tmp;
782 
783 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
784 		TAILQ_REMOVE(&fs->files, file, tailq);
785 		cache_free_buffers(file);
786 		free(file->name);
787 		free(file->tree);
788 		free(file);
789 	}
790 
791 	pthread_mutex_lock(&g_cache_init_lock);
792 	g_fs_count--;
793 	if (g_fs_count == 0) {
794 		__free_cache();
795 	}
796 	pthread_mutex_unlock(&g_cache_init_lock);
797 
798 	args->fn.fs_op(args->arg, bserrno);
799 	free(req);
800 
801 	spdk_fs_io_device_unregister(fs);
802 }
803 
804 void
805 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
806 {
807 	struct spdk_fs_request *req;
808 	struct spdk_fs_cb_args *args;
809 
810 	/*
811 	 * We must free the md_channel before unloading the blobstore, so just
812 	 *  allocate this request from the general heap.
813 	 */
814 	req = calloc(1, sizeof(*req));
815 	if (req == NULL) {
816 		cb_fn(cb_arg, -ENOMEM);
817 		return;
818 	}
819 
820 	args = &req->args;
821 	args->fn.fs_op = cb_fn;
822 	args->arg = cb_arg;
823 	args->fs = fs;
824 
825 	spdk_fs_free_io_channels(fs);
826 	spdk_bs_unload(fs->bs, unload_cb, req);
827 }
828 
829 static struct spdk_file *
830 fs_find_file(struct spdk_filesystem *fs, const char *name)
831 {
832 	struct spdk_file *file;
833 
834 	TAILQ_FOREACH(file, &fs->files, tailq) {
835 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
836 			return file;
837 		}
838 	}
839 
840 	return NULL;
841 }
842 
843 void
844 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
845 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
846 {
847 	struct spdk_file_stat stat;
848 	struct spdk_file *f = NULL;
849 
850 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
851 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
852 		return;
853 	}
854 
855 	f = fs_find_file(fs, name);
856 	if (f != NULL) {
857 		stat.blobid = f->blobid;
858 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
859 		cb_fn(cb_arg, &stat, 0);
860 		return;
861 	}
862 
863 	cb_fn(cb_arg, NULL, -ENOENT);
864 }
865 
866 static void
867 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
868 {
869 	struct spdk_fs_request *req = arg;
870 	struct spdk_fs_cb_args *args = &req->args;
871 
872 	args->rc = fserrno;
873 	if (fserrno == 0) {
874 		memcpy(args->arg, stat, sizeof(*stat));
875 	}
876 	sem_post(args->sem);
877 }
878 
879 static void
880 __file_stat(void *arg)
881 {
882 	struct spdk_fs_request *req = arg;
883 	struct spdk_fs_cb_args *args = &req->args;
884 
885 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
886 				args->fn.stat_op, req);
887 }
888 
889 int
890 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
891 		  const char *name, struct spdk_file_stat *stat)
892 {
893 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
894 	struct spdk_fs_request *req;
895 	int rc;
896 
897 	req = alloc_fs_request(channel);
898 	if (req == NULL) {
899 		return -ENOMEM;
900 	}
901 
902 	req->args.fs = fs;
903 	req->args.op.stat.name = name;
904 	req->args.fn.stat_op = __copy_stat;
905 	req->args.arg = stat;
906 	req->args.sem = &channel->sem;
907 	channel->send_request(__file_stat, req);
908 	sem_wait(&channel->sem);
909 
910 	rc = req->args.rc;
911 	free_fs_request(req);
912 
913 	return rc;
914 }
915 
916 static void
917 fs_create_blob_close_cb(void *ctx, int bserrno)
918 {
919 	int rc;
920 	struct spdk_fs_request *req = ctx;
921 	struct spdk_fs_cb_args *args = &req->args;
922 
923 	rc = args->rc ? args->rc : bserrno;
924 	args->fn.file_op(args->arg, rc);
925 	free_fs_request(req);
926 }
927 
928 static void
929 fs_create_blob_resize_cb(void *ctx, int bserrno)
930 {
931 	struct spdk_fs_request *req = ctx;
932 	struct spdk_fs_cb_args *args = &req->args;
933 	struct spdk_file *f = args->file;
934 	struct spdk_blob *blob = args->op.create.blob;
935 	uint64_t length = 0;
936 
937 	args->rc = bserrno;
938 	if (bserrno) {
939 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
940 		return;
941 	}
942 
943 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
944 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
945 
946 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
947 }
948 
949 static void
950 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
951 {
952 	struct spdk_fs_request *req = ctx;
953 	struct spdk_fs_cb_args *args = &req->args;
954 
955 	if (bserrno) {
956 		args->fn.file_op(args->arg, bserrno);
957 		free_fs_request(req);
958 		return;
959 	}
960 
961 	args->op.create.blob = blob;
962 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
963 }
964 
965 static void
966 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
967 {
968 	struct spdk_fs_request *req = ctx;
969 	struct spdk_fs_cb_args *args = &req->args;
970 	struct spdk_file *f = args->file;
971 
972 	if (bserrno) {
973 		args->fn.file_op(args->arg, bserrno);
974 		free_fs_request(req);
975 		return;
976 	}
977 
978 	f->blobid = blobid;
979 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
980 }
981 
982 void
983 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
984 			  spdk_file_op_complete cb_fn, void *cb_arg)
985 {
986 	struct spdk_file *file;
987 	struct spdk_fs_request *req;
988 	struct spdk_fs_cb_args *args;
989 
990 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
991 		cb_fn(cb_arg, -ENAMETOOLONG);
992 		return;
993 	}
994 
995 	file = fs_find_file(fs, name);
996 	if (file != NULL) {
997 		cb_fn(cb_arg, -EEXIST);
998 		return;
999 	}
1000 
1001 	file = file_alloc(fs);
1002 	if (file == NULL) {
1003 		cb_fn(cb_arg, -ENOMEM);
1004 		return;
1005 	}
1006 
1007 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1008 	if (req == NULL) {
1009 		cb_fn(cb_arg, -ENOMEM);
1010 		return;
1011 	}
1012 
1013 	args = &req->args;
1014 	args->file = file;
1015 	args->fn.file_op = cb_fn;
1016 	args->arg = cb_arg;
1017 
1018 	file->name = strdup(name);
1019 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1020 }
1021 
1022 static void
1023 __fs_create_file_done(void *arg, int fserrno)
1024 {
1025 	struct spdk_fs_request *req = arg;
1026 	struct spdk_fs_cb_args *args = &req->args;
1027 
1028 	args->rc = fserrno;
1029 	sem_post(args->sem);
1030 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1031 }
1032 
1033 static void
1034 __fs_create_file(void *arg)
1035 {
1036 	struct spdk_fs_request *req = arg;
1037 	struct spdk_fs_cb_args *args = &req->args;
1038 
1039 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1040 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1041 }
1042 
1043 int
1044 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1045 {
1046 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1047 	struct spdk_fs_request *req;
1048 	struct spdk_fs_cb_args *args;
1049 	int rc;
1050 
1051 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1052 
1053 	req = alloc_fs_request(channel);
1054 	if (req == NULL) {
1055 		return -ENOMEM;
1056 	}
1057 
1058 	args = &req->args;
1059 	args->fs = fs;
1060 	args->op.create.name = name;
1061 	args->sem = &channel->sem;
1062 	fs->send_request(__fs_create_file, req);
1063 	sem_wait(&channel->sem);
1064 	rc = args->rc;
1065 	free_fs_request(req);
1066 
1067 	return rc;
1068 }
1069 
1070 static void
1071 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1072 {
1073 	struct spdk_fs_request *req = ctx;
1074 	struct spdk_fs_cb_args *args = &req->args;
1075 	struct spdk_file *f = args->file;
1076 
1077 	f->blob = blob;
1078 	while (!TAILQ_EMPTY(&f->open_requests)) {
1079 		req = TAILQ_FIRST(&f->open_requests);
1080 		args = &req->args;
1081 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1082 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1083 		free_fs_request(req);
1084 	}
1085 }
1086 
1087 static void
1088 fs_open_blob_create_cb(void *ctx, int bserrno)
1089 {
1090 	struct spdk_fs_request *req = ctx;
1091 	struct spdk_fs_cb_args *args = &req->args;
1092 	struct spdk_file *file = args->file;
1093 	struct spdk_filesystem *fs = args->fs;
1094 
1095 	if (file == NULL) {
1096 		/*
1097 		 * This is from an open with CREATE flag - the file
1098 		 *  is now created so look it up in the file list for this
1099 		 *  filesystem.
1100 		 */
1101 		file = fs_find_file(fs, args->op.open.name);
1102 		assert(file != NULL);
1103 		args->file = file;
1104 	}
1105 
1106 	file->ref_count++;
1107 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1108 	if (file->ref_count == 1) {
1109 		assert(file->blob == NULL);
1110 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1111 	} else if (file->blob != NULL) {
1112 		fs_open_blob_done(req, file->blob, 0);
1113 	} else {
1114 		/*
1115 		 * The blob open for this file is in progress due to a previous
1116 		 *  open request.  When that open completes, it will invoke the
1117 		 *  open callback for this request.
1118 		 */
1119 	}
1120 }
1121 
1122 void
1123 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1124 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1125 {
1126 	struct spdk_file *f = NULL;
1127 	struct spdk_fs_request *req;
1128 	struct spdk_fs_cb_args *args;
1129 
1130 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1131 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1132 		return;
1133 	}
1134 
1135 	f = fs_find_file(fs, name);
1136 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1137 		cb_fn(cb_arg, NULL, -ENOENT);
1138 		return;
1139 	}
1140 
1141 	if (f != NULL && f->is_deleted == true) {
1142 		cb_fn(cb_arg, NULL, -ENOENT);
1143 		return;
1144 	}
1145 
1146 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1147 	if (req == NULL) {
1148 		cb_fn(cb_arg, NULL, -ENOMEM);
1149 		return;
1150 	}
1151 
1152 	args = &req->args;
1153 	args->fn.file_op_with_handle = cb_fn;
1154 	args->arg = cb_arg;
1155 	args->file = f;
1156 	args->fs = fs;
1157 	args->op.open.name = name;
1158 
1159 	if (f == NULL) {
1160 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1161 	} else {
1162 		fs_open_blob_create_cb(req, 0);
1163 	}
1164 }
1165 
1166 static void
1167 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1168 {
1169 	struct spdk_fs_request *req = arg;
1170 	struct spdk_fs_cb_args *args = &req->args;
1171 
1172 	args->file = file;
1173 	__wake_caller(args, bserrno);
1174 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1175 }
1176 
1177 static void
1178 __fs_open_file(void *arg)
1179 {
1180 	struct spdk_fs_request *req = arg;
1181 	struct spdk_fs_cb_args *args = &req->args;
1182 
1183 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1184 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1185 				__fs_open_file_done, req);
1186 }
1187 
1188 int
1189 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1190 		  const char *name, uint32_t flags, struct spdk_file **file)
1191 {
1192 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1193 	struct spdk_fs_request *req;
1194 	struct spdk_fs_cb_args *args;
1195 	int rc;
1196 
1197 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1198 
1199 	req = alloc_fs_request(channel);
1200 	if (req == NULL) {
1201 		return -ENOMEM;
1202 	}
1203 
1204 	args = &req->args;
1205 	args->fs = fs;
1206 	args->op.open.name = name;
1207 	args->op.open.flags = flags;
1208 	args->sem = &channel->sem;
1209 	fs->send_request(__fs_open_file, req);
1210 	sem_wait(&channel->sem);
1211 	rc = args->rc;
1212 	if (rc == 0) {
1213 		*file = args->file;
1214 	} else {
1215 		*file = NULL;
1216 	}
1217 	free_fs_request(req);
1218 
1219 	return rc;
1220 }
1221 
1222 static void
1223 fs_rename_blob_close_cb(void *ctx, int bserrno)
1224 {
1225 	struct spdk_fs_request *req = ctx;
1226 	struct spdk_fs_cb_args *args = &req->args;
1227 
1228 	args->fn.fs_op(args->arg, bserrno);
1229 	free_fs_request(req);
1230 }
1231 
1232 static void
1233 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1234 {
1235 	struct spdk_fs_request *req = ctx;
1236 	struct spdk_fs_cb_args *args = &req->args;
1237 	const char *new_name = args->op.rename.new_name;
1238 
1239 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1240 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1241 }
1242 
1243 static void
1244 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1245 {
1246 	struct spdk_fs_cb_args *args = &req->args;
1247 	struct spdk_file *f;
1248 
1249 	f = fs_find_file(args->fs, args->op.rename.old_name);
1250 	if (f == NULL) {
1251 		args->fn.fs_op(args->arg, -ENOENT);
1252 		free_fs_request(req);
1253 		return;
1254 	}
1255 
1256 	free(f->name);
1257 	f->name = strdup(args->op.rename.new_name);
1258 	args->file = f;
1259 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1260 }
1261 
1262 static void
1263 fs_rename_delete_done(void *arg, int fserrno)
1264 {
1265 	__spdk_fs_md_rename_file(arg);
1266 }
1267 
1268 void
1269 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1270 			  const char *old_name, const char *new_name,
1271 			  spdk_file_op_complete cb_fn, void *cb_arg)
1272 {
1273 	struct spdk_file *f;
1274 	struct spdk_fs_request *req;
1275 	struct spdk_fs_cb_args *args;
1276 
1277 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1278 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1279 		cb_fn(cb_arg, -ENAMETOOLONG);
1280 		return;
1281 	}
1282 
1283 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1284 	if (req == NULL) {
1285 		cb_fn(cb_arg, -ENOMEM);
1286 		return;
1287 	}
1288 
1289 	args = &req->args;
1290 	args->fn.fs_op = cb_fn;
1291 	args->fs = fs;
1292 	args->arg = cb_arg;
1293 	args->op.rename.old_name = old_name;
1294 	args->op.rename.new_name = new_name;
1295 
1296 	f = fs_find_file(fs, new_name);
1297 	if (f == NULL) {
1298 		__spdk_fs_md_rename_file(req);
1299 		return;
1300 	}
1301 
1302 	/*
1303 	 * The rename overwrites an existing file.  So delete the existing file, then
1304 	 *  do the actual rename.
1305 	 */
1306 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1307 }
1308 
1309 static void
1310 __fs_rename_file_done(void *arg, int fserrno)
1311 {
1312 	struct spdk_fs_request *req = arg;
1313 	struct spdk_fs_cb_args *args = &req->args;
1314 
1315 	__wake_caller(args, fserrno);
1316 }
1317 
1318 static void
1319 __fs_rename_file(void *arg)
1320 {
1321 	struct spdk_fs_request *req = arg;
1322 	struct spdk_fs_cb_args *args = &req->args;
1323 
1324 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1325 				  __fs_rename_file_done, req);
1326 }
1327 
1328 int
1329 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1330 		    const char *old_name, const char *new_name)
1331 {
1332 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1333 	struct spdk_fs_request *req;
1334 	struct spdk_fs_cb_args *args;
1335 	int rc;
1336 
1337 	req = alloc_fs_request(channel);
1338 	if (req == NULL) {
1339 		return -ENOMEM;
1340 	}
1341 
1342 	args = &req->args;
1343 
1344 	args->fs = fs;
1345 	args->op.rename.old_name = old_name;
1346 	args->op.rename.new_name = new_name;
1347 	args->sem = &channel->sem;
1348 	fs->send_request(__fs_rename_file, req);
1349 	sem_wait(&channel->sem);
1350 	rc = args->rc;
1351 	free_fs_request(req);
1352 	return rc;
1353 }
1354 
1355 static void
1356 blob_delete_cb(void *ctx, int bserrno)
1357 {
1358 	struct spdk_fs_request *req = ctx;
1359 	struct spdk_fs_cb_args *args = &req->args;
1360 
1361 	args->fn.file_op(args->arg, bserrno);
1362 	free_fs_request(req);
1363 }
1364 
1365 void
1366 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1367 			  spdk_file_op_complete cb_fn, void *cb_arg)
1368 {
1369 	struct spdk_file *f;
1370 	spdk_blob_id blobid;
1371 	struct spdk_fs_request *req;
1372 	struct spdk_fs_cb_args *args;
1373 
1374 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1375 
1376 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1377 		cb_fn(cb_arg, -ENAMETOOLONG);
1378 		return;
1379 	}
1380 
1381 	f = fs_find_file(fs, name);
1382 	if (f == NULL) {
1383 		cb_fn(cb_arg, -ENOENT);
1384 		return;
1385 	}
1386 
1387 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1388 	if (req == NULL) {
1389 		cb_fn(cb_arg, -ENOMEM);
1390 		return;
1391 	}
1392 
1393 	args = &req->args;
1394 	args->fn.file_op = cb_fn;
1395 	args->arg = cb_arg;
1396 
1397 	if (f->ref_count > 0) {
1398 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1399 		f->is_deleted = true;
1400 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1401 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1402 		return;
1403 	}
1404 
1405 	TAILQ_REMOVE(&fs->files, f, tailq);
1406 
1407 	cache_free_buffers(f);
1408 
1409 	blobid = f->blobid;
1410 
1411 	free(f->name);
1412 	free(f->tree);
1413 	free(f);
1414 
1415 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1416 }
1417 
1418 static void
1419 __fs_delete_file_done(void *arg, int fserrno)
1420 {
1421 	struct spdk_fs_request *req = arg;
1422 	struct spdk_fs_cb_args *args = &req->args;
1423 
1424 	__wake_caller(args, fserrno);
1425 }
1426 
1427 static void
1428 __fs_delete_file(void *arg)
1429 {
1430 	struct spdk_fs_request *req = arg;
1431 	struct spdk_fs_cb_args *args = &req->args;
1432 
1433 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1434 }
1435 
1436 int
1437 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1438 		    const char *name)
1439 {
1440 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1441 	struct spdk_fs_request *req;
1442 	struct spdk_fs_cb_args *args;
1443 	int rc;
1444 
1445 	req = alloc_fs_request(channel);
1446 	if (req == NULL) {
1447 		return -ENOMEM;
1448 	}
1449 
1450 	args = &req->args;
1451 	args->fs = fs;
1452 	args->op.delete.name = name;
1453 	args->sem = &channel->sem;
1454 	fs->send_request(__fs_delete_file, req);
1455 	sem_wait(&channel->sem);
1456 	rc = args->rc;
1457 	free_fs_request(req);
1458 
1459 	return rc;
1460 }
1461 
1462 spdk_fs_iter
1463 spdk_fs_iter_first(struct spdk_filesystem *fs)
1464 {
1465 	struct spdk_file *f;
1466 
1467 	f = TAILQ_FIRST(&fs->files);
1468 	return f;
1469 }
1470 
1471 spdk_fs_iter
1472 spdk_fs_iter_next(spdk_fs_iter iter)
1473 {
1474 	struct spdk_file *f = iter;
1475 
1476 	if (f == NULL) {
1477 		return NULL;
1478 	}
1479 
1480 	f = TAILQ_NEXT(f, tailq);
1481 	return f;
1482 }
1483 
1484 const char *
1485 spdk_file_get_name(struct spdk_file *file)
1486 {
1487 	return file->name;
1488 }
1489 
1490 uint64_t
1491 spdk_file_get_length(struct spdk_file *file)
1492 {
1493 	uint64_t length;
1494 
1495 	assert(file != NULL);
1496 
1497 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1498 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1499 	return length;
1500 }
1501 
1502 static void
1503 fs_truncate_complete_cb(void *ctx, int bserrno)
1504 {
1505 	struct spdk_fs_request *req = ctx;
1506 	struct spdk_fs_cb_args *args = &req->args;
1507 
1508 	args->fn.file_op(args->arg, bserrno);
1509 	free_fs_request(req);
1510 }
1511 
1512 static void
1513 fs_truncate_resize_cb(void *ctx, int bserrno)
1514 {
1515 	struct spdk_fs_request *req = ctx;
1516 	struct spdk_fs_cb_args *args = &req->args;
1517 	struct spdk_file *file = args->file;
1518 	uint64_t *length = &args->op.truncate.length;
1519 
1520 	if (bserrno) {
1521 		args->fn.file_op(args->arg, bserrno);
1522 		free_fs_request(req);
1523 		return;
1524 	}
1525 
1526 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1527 
1528 	file->length = *length;
1529 	if (file->append_pos > file->length) {
1530 		file->append_pos = file->length;
1531 	}
1532 
1533 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1534 }
1535 
1536 static uint64_t
1537 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1538 {
1539 	return (length + cluster_sz - 1) / cluster_sz;
1540 }
1541 
1542 void
1543 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1544 			 spdk_file_op_complete cb_fn, void *cb_arg)
1545 {
1546 	struct spdk_filesystem *fs;
1547 	size_t num_clusters;
1548 	struct spdk_fs_request *req;
1549 	struct spdk_fs_cb_args *args;
1550 
1551 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1552 	if (length == file->length) {
1553 		cb_fn(cb_arg, 0);
1554 		return;
1555 	}
1556 
1557 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1558 	if (req == NULL) {
1559 		cb_fn(cb_arg, -ENOMEM);
1560 		return;
1561 	}
1562 
1563 	args = &req->args;
1564 	args->fn.file_op = cb_fn;
1565 	args->arg = cb_arg;
1566 	args->file = file;
1567 	args->op.truncate.length = length;
1568 	fs = file->fs;
1569 
1570 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1571 
1572 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1573 }
1574 
1575 static void
1576 __truncate(void *arg)
1577 {
1578 	struct spdk_fs_request *req = arg;
1579 	struct spdk_fs_cb_args *args = &req->args;
1580 
1581 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1582 				 args->fn.file_op, args);
1583 }
1584 
1585 int
1586 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1587 		   uint64_t length)
1588 {
1589 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1590 	struct spdk_fs_request *req;
1591 	struct spdk_fs_cb_args *args;
1592 	int rc;
1593 
1594 	req = alloc_fs_request(channel);
1595 	if (req == NULL) {
1596 		return -ENOMEM;
1597 	}
1598 
1599 	args = &req->args;
1600 
1601 	args->file = file;
1602 	args->op.truncate.length = length;
1603 	args->fn.file_op = __wake_caller;
1604 	args->sem = &channel->sem;
1605 
1606 	channel->send_request(__truncate, req);
1607 	sem_wait(&channel->sem);
1608 	rc = args->rc;
1609 	free_fs_request(req);
1610 
1611 	return rc;
1612 }
1613 
1614 static void
1615 __rw_done(void *ctx, int bserrno)
1616 {
1617 	struct spdk_fs_request *req = ctx;
1618 	struct spdk_fs_cb_args *args = &req->args;
1619 
1620 	spdk_free(args->op.rw.pin_buf);
1621 	args->fn.file_op(args->arg, bserrno);
1622 	free_fs_request(req);
1623 }
1624 
1625 static void
1626 __read_done(void *ctx, int bserrno)
1627 {
1628 	struct spdk_fs_request *req = ctx;
1629 	struct spdk_fs_cb_args *args = &req->args;
1630 
1631 	assert(req != NULL);
1632 	if (args->op.rw.is_read) {
1633 		memcpy(args->iovs[0].iov_base,
1634 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1635 		       args->iovs[0].iov_len);
1636 		__rw_done(req, 0);
1637 	} else {
1638 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1639 		       args->iovs[0].iov_base,
1640 		       args->iovs[0].iov_len);
1641 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1642 				   args->op.rw.pin_buf,
1643 				   args->op.rw.start_lba, args->op.rw.num_lba,
1644 				   __rw_done, req);
1645 	}
1646 }
1647 
1648 static void
1649 __do_blob_read(void *ctx, int fserrno)
1650 {
1651 	struct spdk_fs_request *req = ctx;
1652 	struct spdk_fs_cb_args *args = &req->args;
1653 
1654 	if (fserrno) {
1655 		__rw_done(req, fserrno);
1656 		return;
1657 	}
1658 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1659 			  args->op.rw.pin_buf,
1660 			  args->op.rw.start_lba, args->op.rw.num_lba,
1661 			  __read_done, req);
1662 }
1663 
1664 static void
1665 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1666 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1667 {
1668 	uint64_t end_lba;
1669 
1670 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1671 	*start_lba = offset / *lba_size;
1672 	end_lba = (offset + length - 1) / *lba_size;
1673 	*num_lba = (end_lba - *start_lba + 1);
1674 }
1675 
1676 static void
1677 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1678 	    void *payload, uint64_t offset, uint64_t length,
1679 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1680 {
1681 	struct spdk_fs_request *req;
1682 	struct spdk_fs_cb_args *args;
1683 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1684 	uint64_t start_lba, num_lba, pin_buf_length;
1685 	uint32_t lba_size;
1686 
1687 	if (is_read && offset + length > file->length) {
1688 		cb_fn(cb_arg, -EINVAL);
1689 		return;
1690 	}
1691 
1692 	req = alloc_fs_request_with_iov(channel, 1);
1693 	if (req == NULL) {
1694 		cb_fn(cb_arg, -ENOMEM);
1695 		return;
1696 	}
1697 
1698 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1699 
1700 	args = &req->args;
1701 	args->fn.file_op = cb_fn;
1702 	args->arg = cb_arg;
1703 	args->file = file;
1704 	args->op.rw.channel = channel->bs_channel;
1705 	args->iovs[0].iov_base = payload;
1706 	args->iovs[0].iov_len = (size_t)length;
1707 	args->op.rw.is_read = is_read;
1708 	args->op.rw.offset = offset;
1709 	args->op.rw.blocklen = lba_size;
1710 
1711 	pin_buf_length = num_lba * lba_size;
1712 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1713 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1714 	if (args->op.rw.pin_buf == NULL) {
1715 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1716 			      file->name, offset, length);
1717 		free_fs_request(req);
1718 		cb_fn(cb_arg, -ENOMEM);
1719 		return;
1720 	}
1721 
1722 	args->op.rw.start_lba = start_lba;
1723 	args->op.rw.num_lba = num_lba;
1724 
1725 	if (!is_read && file->length < offset + length) {
1726 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1727 	} else {
1728 		__do_blob_read(req, 0);
1729 	}
1730 }
1731 
1732 void
1733 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1734 		      void *payload, uint64_t offset, uint64_t length,
1735 		      spdk_file_op_complete cb_fn, void *cb_arg)
1736 {
1737 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1738 }
1739 
1740 void
1741 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1742 		     void *payload, uint64_t offset, uint64_t length,
1743 		     spdk_file_op_complete cb_fn, void *cb_arg)
1744 {
1745 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1746 		      file->name, offset, length);
1747 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1748 }
1749 
1750 struct spdk_io_channel *
1751 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1752 {
1753 	struct spdk_io_channel *io_channel;
1754 	struct spdk_fs_channel *fs_channel;
1755 
1756 	io_channel = spdk_get_io_channel(&fs->io_target);
1757 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1758 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1759 	fs_channel->send_request = __send_request_direct;
1760 
1761 	return io_channel;
1762 }
1763 
1764 void
1765 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1766 {
1767 	spdk_put_io_channel(channel);
1768 }
1769 
1770 struct spdk_fs_thread_ctx *
1771 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1772 {
1773 	struct spdk_fs_thread_ctx *ctx;
1774 
1775 	ctx = calloc(1, sizeof(*ctx));
1776 	if (!ctx) {
1777 		return NULL;
1778 	}
1779 
1780 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1781 
1782 	ctx->ch.send_request = fs->send_request;
1783 	ctx->ch.sync = 1;
1784 	pthread_spin_init(&ctx->ch.lock, 0);
1785 
1786 	return ctx;
1787 }
1788 
1789 
1790 void
1791 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1792 {
1793 	assert(ctx->ch.sync == 1);
1794 
1795 	while (true) {
1796 		pthread_spin_lock(&ctx->ch.lock);
1797 		if (ctx->ch.outstanding_reqs == 0) {
1798 			pthread_spin_unlock(&ctx->ch.lock);
1799 			break;
1800 		}
1801 		pthread_spin_unlock(&ctx->ch.lock);
1802 		usleep(1000);
1803 	}
1804 
1805 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1806 	free(ctx);
1807 }
1808 
1809 void
1810 spdk_fs_set_cache_size(uint64_t size_in_mb)
1811 {
1812 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1813 }
1814 
1815 uint64_t
1816 spdk_fs_get_cache_size(void)
1817 {
1818 	return g_fs_cache_size / (1024 * 1024);
1819 }
1820 
1821 static void __file_flush(void *ctx);
1822 
1823 static void *
1824 alloc_cache_memory_buffer(struct spdk_file *context)
1825 {
1826 	struct spdk_file *file;
1827 	void *buf;
1828 
1829 	buf = spdk_mempool_get(g_cache_pool);
1830 	if (buf != NULL) {
1831 		return buf;
1832 	}
1833 
1834 	pthread_spin_lock(&g_caches_lock);
1835 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1836 		if (!file->open_for_writing &&
1837 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1838 		    file != context) {
1839 			break;
1840 		}
1841 	}
1842 	pthread_spin_unlock(&g_caches_lock);
1843 	if (file != NULL) {
1844 		cache_free_buffers(file);
1845 		buf = spdk_mempool_get(g_cache_pool);
1846 		if (buf != NULL) {
1847 			return buf;
1848 		}
1849 	}
1850 
1851 	pthread_spin_lock(&g_caches_lock);
1852 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1853 		if (!file->open_for_writing && file != context) {
1854 			break;
1855 		}
1856 	}
1857 	pthread_spin_unlock(&g_caches_lock);
1858 	if (file != NULL) {
1859 		cache_free_buffers(file);
1860 		buf = spdk_mempool_get(g_cache_pool);
1861 		if (buf != NULL) {
1862 			return buf;
1863 		}
1864 	}
1865 
1866 	pthread_spin_lock(&g_caches_lock);
1867 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1868 		if (file != context) {
1869 			break;
1870 		}
1871 	}
1872 	pthread_spin_unlock(&g_caches_lock);
1873 	if (file != NULL) {
1874 		cache_free_buffers(file);
1875 		buf = spdk_mempool_get(g_cache_pool);
1876 		if (buf != NULL) {
1877 			return buf;
1878 		}
1879 	}
1880 
1881 	return NULL;
1882 }
1883 
1884 static struct cache_buffer *
1885 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1886 {
1887 	struct cache_buffer *buf;
1888 	int count = 0;
1889 
1890 	buf = calloc(1, sizeof(*buf));
1891 	if (buf == NULL) {
1892 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1893 		return NULL;
1894 	}
1895 
1896 	buf->buf = alloc_cache_memory_buffer(file);
1897 	while (buf->buf == NULL) {
1898 		/*
1899 		 * TODO: alloc_cache_memory_buffer() should eventually free
1900 		 *  some buffers.  Need a more sophisticated check here, instead
1901 		 *  of just bailing if 100 tries does not result in getting a
1902 		 *  free buffer.  This will involve using the sync channel's
1903 		 *  semaphore to block until a buffer becomes available.
1904 		 */
1905 		if (count++ == 100) {
1906 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1907 				    file, offset);
1908 			free(buf);
1909 			return NULL;
1910 		}
1911 		buf->buf = alloc_cache_memory_buffer(file);
1912 	}
1913 
1914 	buf->buf_size = CACHE_BUFFER_SIZE;
1915 	buf->offset = offset;
1916 
1917 	pthread_spin_lock(&g_caches_lock);
1918 	if (file->tree->present_mask == 0) {
1919 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1920 	}
1921 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1922 	pthread_spin_unlock(&g_caches_lock);
1923 
1924 	return buf;
1925 }
1926 
1927 static struct cache_buffer *
1928 cache_append_buffer(struct spdk_file *file)
1929 {
1930 	struct cache_buffer *last;
1931 
1932 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1933 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1934 
1935 	last = cache_insert_buffer(file, file->append_pos);
1936 	if (last == NULL) {
1937 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1938 		return NULL;
1939 	}
1940 
1941 	file->last = last;
1942 
1943 	return last;
1944 }
1945 
1946 static void __check_sync_reqs(struct spdk_file *file);
1947 
1948 static void
1949 __file_cache_finish_sync(void *ctx, int bserrno)
1950 {
1951 	struct spdk_file *file = ctx;
1952 	struct spdk_fs_request *sync_req;
1953 	struct spdk_fs_cb_args *sync_args;
1954 
1955 	pthread_spin_lock(&file->lock);
1956 	sync_req = TAILQ_FIRST(&file->sync_requests);
1957 	sync_args = &sync_req->args;
1958 	assert(sync_args->op.sync.offset <= file->length_flushed);
1959 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1960 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1961 	pthread_spin_unlock(&file->lock);
1962 
1963 	sync_args->fn.file_op(sync_args->arg, bserrno);
1964 	__check_sync_reqs(file);
1965 
1966 	pthread_spin_lock(&file->lock);
1967 	free_fs_request(sync_req);
1968 	pthread_spin_unlock(&file->lock);
1969 }
1970 
1971 static void
1972 __check_sync_reqs(struct spdk_file *file)
1973 {
1974 	struct spdk_fs_request *sync_req;
1975 
1976 	pthread_spin_lock(&file->lock);
1977 
1978 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1979 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1980 			break;
1981 		}
1982 	}
1983 
1984 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1985 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1986 		sync_req->args.op.sync.xattr_in_progress = true;
1987 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1988 				    sizeof(file->length_flushed));
1989 
1990 		pthread_spin_unlock(&file->lock);
1991 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1992 	} else {
1993 		pthread_spin_unlock(&file->lock);
1994 	}
1995 }
1996 
1997 static void
1998 __file_flush_done(void *ctx, int bserrno)
1999 {
2000 	struct spdk_fs_request *req = ctx;
2001 	struct spdk_fs_cb_args *args = &req->args;
2002 	struct spdk_file *file = args->file;
2003 	struct cache_buffer *next = args->op.flush.cache_buffer;
2004 
2005 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2006 
2007 	pthread_spin_lock(&file->lock);
2008 	next->in_progress = false;
2009 	next->bytes_flushed += args->op.flush.length;
2010 	file->length_flushed += args->op.flush.length;
2011 	if (file->length_flushed > file->length) {
2012 		file->length = file->length_flushed;
2013 	}
2014 	if (next->bytes_flushed == next->buf_size) {
2015 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2016 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2017 	}
2018 
2019 	/*
2020 	 * Assert that there is no cached data that extends past the end of the underlying
2021 	 *  blob.
2022 	 */
2023 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2024 	       next->bytes_filled == 0);
2025 
2026 	pthread_spin_unlock(&file->lock);
2027 
2028 	__check_sync_reqs(file);
2029 
2030 	__file_flush(req);
2031 }
2032 
2033 static void
2034 __file_flush(void *ctx)
2035 {
2036 	struct spdk_fs_request *req = ctx;
2037 	struct spdk_fs_cb_args *args = &req->args;
2038 	struct spdk_file *file = args->file;
2039 	struct cache_buffer *next;
2040 	uint64_t offset, length, start_lba, num_lba;
2041 	uint32_t lba_size;
2042 
2043 	pthread_spin_lock(&file->lock);
2044 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2045 	if (next == NULL || next->in_progress) {
2046 		/*
2047 		 * There is either no data to flush, or a flush I/O is already in
2048 		 *  progress.  So return immediately - if a flush I/O is in
2049 		 *  progress we will flush more data after that is completed.
2050 		 */
2051 		free_fs_request(req);
2052 		if (next == NULL) {
2053 			/*
2054 			 * For cases where a file's cache was evicted, and then the
2055 			 *  file was later appended, we will write the data directly
2056 			 *  to disk and bypass cache.  So just update length_flushed
2057 			 *  here to reflect that all data was already written to disk.
2058 			 */
2059 			file->length_flushed = file->append_pos;
2060 		}
2061 		pthread_spin_unlock(&file->lock);
2062 		if (next == NULL) {
2063 			/*
2064 			 * There is no data to flush, but we still need to check for any
2065 			 *  outstanding sync requests to make sure metadata gets updated.
2066 			 */
2067 			__check_sync_reqs(file);
2068 		}
2069 		return;
2070 	}
2071 
2072 	offset = next->offset + next->bytes_flushed;
2073 	length = next->bytes_filled - next->bytes_flushed;
2074 	if (length == 0) {
2075 		free_fs_request(req);
2076 		pthread_spin_unlock(&file->lock);
2077 		return;
2078 	}
2079 	args->op.flush.length = length;
2080 	args->op.flush.cache_buffer = next;
2081 
2082 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2083 
2084 	next->in_progress = true;
2085 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2086 		     offset, length, start_lba, num_lba);
2087 	pthread_spin_unlock(&file->lock);
2088 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2089 			   next->buf + (start_lba * lba_size) - next->offset,
2090 			   start_lba, num_lba, __file_flush_done, req);
2091 }
2092 
2093 static void
2094 __file_extend_done(void *arg, int bserrno)
2095 {
2096 	struct spdk_fs_cb_args *args = arg;
2097 
2098 	__wake_caller(args, bserrno);
2099 }
2100 
2101 static void
2102 __file_extend_resize_cb(void *_args, int bserrno)
2103 {
2104 	struct spdk_fs_cb_args *args = _args;
2105 	struct spdk_file *file = args->file;
2106 
2107 	if (bserrno) {
2108 		__wake_caller(args, bserrno);
2109 		return;
2110 	}
2111 
2112 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2113 }
2114 
2115 static void
2116 __file_extend_blob(void *_args)
2117 {
2118 	struct spdk_fs_cb_args *args = _args;
2119 	struct spdk_file *file = args->file;
2120 
2121 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2122 }
2123 
2124 static void
2125 __rw_from_file_done(void *ctx, int bserrno)
2126 {
2127 	struct spdk_fs_request *req = ctx;
2128 
2129 	__wake_caller(&req->args, bserrno);
2130 	free_fs_request(req);
2131 }
2132 
2133 static void
2134 __rw_from_file(void *ctx)
2135 {
2136 	struct spdk_fs_request *req = ctx;
2137 	struct spdk_fs_cb_args *args = &req->args;
2138 	struct spdk_file *file = args->file;
2139 
2140 	if (args->op.rw.is_read) {
2141 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2142 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2143 				     __rw_from_file_done, req);
2144 	} else {
2145 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2146 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2147 				      __rw_from_file_done, req);
2148 	}
2149 }
2150 
2151 static int
2152 __send_rw_from_file(struct spdk_file *file, void *payload,
2153 		    uint64_t offset, uint64_t length, bool is_read,
2154 		    struct spdk_fs_channel *channel)
2155 {
2156 	struct spdk_fs_request *req;
2157 	struct spdk_fs_cb_args *args;
2158 
2159 	req = alloc_fs_request_with_iov(channel, 1);
2160 	if (req == NULL) {
2161 		sem_post(&channel->sem);
2162 		return -ENOMEM;
2163 	}
2164 
2165 	args = &req->args;
2166 	args->file = file;
2167 	args->sem = &channel->sem;
2168 	args->iovs[0].iov_base = payload;
2169 	args->iovs[0].iov_len = (size_t)length;
2170 	args->op.rw.offset = offset;
2171 	args->op.rw.is_read = is_read;
2172 	file->fs->send_request(__rw_from_file, req);
2173 	return 0;
2174 }
2175 
2176 int
2177 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2178 		void *payload, uint64_t offset, uint64_t length)
2179 {
2180 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2181 	struct spdk_fs_request *flush_req;
2182 	uint64_t rem_length, copy, blob_size, cluster_sz;
2183 	uint32_t cache_buffers_filled = 0;
2184 	uint8_t *cur_payload;
2185 	struct cache_buffer *last;
2186 
2187 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2188 
2189 	if (length == 0) {
2190 		return 0;
2191 	}
2192 
2193 	if (offset != file->append_pos) {
2194 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2195 		return -EINVAL;
2196 	}
2197 
2198 	pthread_spin_lock(&file->lock);
2199 	file->open_for_writing = true;
2200 
2201 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2202 		cache_append_buffer(file);
2203 	}
2204 
2205 	if (file->last == NULL) {
2206 		int rc;
2207 
2208 		file->append_pos += length;
2209 		pthread_spin_unlock(&file->lock);
2210 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2211 		sem_wait(&channel->sem);
2212 		return rc;
2213 	}
2214 
2215 	blob_size = __file_get_blob_size(file);
2216 
2217 	if ((offset + length) > blob_size) {
2218 		struct spdk_fs_cb_args extend_args = {};
2219 
2220 		cluster_sz = file->fs->bs_opts.cluster_sz;
2221 		extend_args.sem = &channel->sem;
2222 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2223 		extend_args.file = file;
2224 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2225 		pthread_spin_unlock(&file->lock);
2226 		file->fs->send_request(__file_extend_blob, &extend_args);
2227 		sem_wait(&channel->sem);
2228 		if (extend_args.rc) {
2229 			return extend_args.rc;
2230 		}
2231 	}
2232 
2233 	flush_req = alloc_fs_request(channel);
2234 	if (flush_req == NULL) {
2235 		pthread_spin_unlock(&file->lock);
2236 		return -ENOMEM;
2237 	}
2238 
2239 	last = file->last;
2240 	rem_length = length;
2241 	cur_payload = payload;
2242 	while (rem_length > 0) {
2243 		copy = last->buf_size - last->bytes_filled;
2244 		if (copy > rem_length) {
2245 			copy = rem_length;
2246 		}
2247 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2248 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2249 		file->append_pos += copy;
2250 		if (file->length < file->append_pos) {
2251 			file->length = file->append_pos;
2252 		}
2253 		cur_payload += copy;
2254 		last->bytes_filled += copy;
2255 		rem_length -= copy;
2256 		if (last->bytes_filled == last->buf_size) {
2257 			cache_buffers_filled++;
2258 			last = cache_append_buffer(file);
2259 			if (last == NULL) {
2260 				BLOBFS_TRACE(file, "nomem\n");
2261 				free_fs_request(flush_req);
2262 				pthread_spin_unlock(&file->lock);
2263 				return -ENOMEM;
2264 			}
2265 		}
2266 	}
2267 
2268 	pthread_spin_unlock(&file->lock);
2269 
2270 	if (cache_buffers_filled == 0) {
2271 		free_fs_request(flush_req);
2272 		return 0;
2273 	}
2274 
2275 	flush_req->args.file = file;
2276 	file->fs->send_request(__file_flush, flush_req);
2277 	return 0;
2278 }
2279 
2280 static void
2281 __readahead_done(void *ctx, int bserrno)
2282 {
2283 	struct spdk_fs_request *req = ctx;
2284 	struct spdk_fs_cb_args *args = &req->args;
2285 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2286 	struct spdk_file *file = args->file;
2287 
2288 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2289 
2290 	pthread_spin_lock(&file->lock);
2291 	cache_buffer->bytes_filled = args->op.readahead.length;
2292 	cache_buffer->bytes_flushed = args->op.readahead.length;
2293 	cache_buffer->in_progress = false;
2294 	pthread_spin_unlock(&file->lock);
2295 
2296 	free_fs_request(req);
2297 }
2298 
2299 static void
2300 __readahead(void *ctx)
2301 {
2302 	struct spdk_fs_request *req = ctx;
2303 	struct spdk_fs_cb_args *args = &req->args;
2304 	struct spdk_file *file = args->file;
2305 	uint64_t offset, length, start_lba, num_lba;
2306 	uint32_t lba_size;
2307 
2308 	offset = args->op.readahead.offset;
2309 	length = args->op.readahead.length;
2310 	assert(length > 0);
2311 
2312 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2313 
2314 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2315 		     offset, length, start_lba, num_lba);
2316 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2317 			  args->op.readahead.cache_buffer->buf,
2318 			  start_lba, num_lba, __readahead_done, req);
2319 }
2320 
2321 static uint64_t
2322 __next_cache_buffer_offset(uint64_t offset)
2323 {
2324 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2325 }
2326 
2327 static void
2328 check_readahead(struct spdk_file *file, uint64_t offset,
2329 		struct spdk_fs_channel *channel)
2330 {
2331 	struct spdk_fs_request *req;
2332 	struct spdk_fs_cb_args *args;
2333 
2334 	offset = __next_cache_buffer_offset(offset);
2335 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2336 		return;
2337 	}
2338 
2339 	req = alloc_fs_request(channel);
2340 	if (req == NULL) {
2341 		return;
2342 	}
2343 	args = &req->args;
2344 
2345 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2346 
2347 	args->file = file;
2348 	args->op.readahead.offset = offset;
2349 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2350 	if (!args->op.readahead.cache_buffer) {
2351 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2352 		free_fs_request(req);
2353 		return;
2354 	}
2355 
2356 	args->op.readahead.cache_buffer->in_progress = true;
2357 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2358 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2359 	} else {
2360 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2361 	}
2362 	file->fs->send_request(__readahead, req);
2363 }
2364 
2365 static int
2366 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2367 	    struct spdk_fs_channel *channel)
2368 {
2369 	struct cache_buffer *buf;
2370 	int rc;
2371 
2372 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2373 	if (buf == NULL) {
2374 		pthread_spin_unlock(&file->lock);
2375 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2376 		pthread_spin_lock(&file->lock);
2377 		return rc;
2378 	}
2379 
2380 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2381 		length = buf->offset + buf->bytes_filled - offset;
2382 	}
2383 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2384 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2385 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2386 		pthread_spin_lock(&g_caches_lock);
2387 		spdk_tree_remove_buffer(file->tree, buf);
2388 		if (file->tree->present_mask == 0) {
2389 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2390 		}
2391 		pthread_spin_unlock(&g_caches_lock);
2392 	}
2393 
2394 	sem_post(&channel->sem);
2395 	return 0;
2396 }
2397 
2398 int64_t
2399 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2400 	       void *payload, uint64_t offset, uint64_t length)
2401 {
2402 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2403 	uint64_t final_offset, final_length;
2404 	uint32_t sub_reads = 0;
2405 	int rc = 0;
2406 
2407 	pthread_spin_lock(&file->lock);
2408 
2409 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2410 
2411 	file->open_for_writing = false;
2412 
2413 	if (length == 0 || offset >= file->append_pos) {
2414 		pthread_spin_unlock(&file->lock);
2415 		return 0;
2416 	}
2417 
2418 	if (offset + length > file->append_pos) {
2419 		length = file->append_pos - offset;
2420 	}
2421 
2422 	if (offset != file->next_seq_offset) {
2423 		file->seq_byte_count = 0;
2424 	}
2425 	file->seq_byte_count += length;
2426 	file->next_seq_offset = offset + length;
2427 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2428 		check_readahead(file, offset, channel);
2429 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2430 	}
2431 
2432 	final_length = 0;
2433 	final_offset = offset + length;
2434 	while (offset < final_offset) {
2435 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2436 		if (length > (final_offset - offset)) {
2437 			length = final_offset - offset;
2438 		}
2439 		rc = __file_read(file, payload, offset, length, channel);
2440 		if (rc == 0) {
2441 			final_length += length;
2442 		} else {
2443 			break;
2444 		}
2445 		payload += length;
2446 		offset += length;
2447 		sub_reads++;
2448 	}
2449 	pthread_spin_unlock(&file->lock);
2450 	while (sub_reads-- > 0) {
2451 		sem_wait(&channel->sem);
2452 	}
2453 	if (rc == 0) {
2454 		return final_length;
2455 	} else {
2456 		return rc;
2457 	}
2458 }
2459 
2460 static void
2461 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2462 	   spdk_file_op_complete cb_fn, void *cb_arg)
2463 {
2464 	struct spdk_fs_request *sync_req;
2465 	struct spdk_fs_request *flush_req;
2466 	struct spdk_fs_cb_args *sync_args;
2467 	struct spdk_fs_cb_args *flush_args;
2468 
2469 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2470 
2471 	pthread_spin_lock(&file->lock);
2472 	if (file->append_pos <= file->length_flushed) {
2473 		BLOBFS_TRACE(file, "done - no data to flush\n");
2474 		pthread_spin_unlock(&file->lock);
2475 		cb_fn(cb_arg, 0);
2476 		return;
2477 	}
2478 
2479 	sync_req = alloc_fs_request(channel);
2480 	if (!sync_req) {
2481 		pthread_spin_unlock(&file->lock);
2482 		cb_fn(cb_arg, -ENOMEM);
2483 		return;
2484 	}
2485 	sync_args = &sync_req->args;
2486 
2487 	flush_req = alloc_fs_request(channel);
2488 	if (!flush_req) {
2489 		pthread_spin_unlock(&file->lock);
2490 		cb_fn(cb_arg, -ENOMEM);
2491 		return;
2492 	}
2493 	flush_args = &flush_req->args;
2494 
2495 	sync_args->file = file;
2496 	sync_args->fn.file_op = cb_fn;
2497 	sync_args->arg = cb_arg;
2498 	sync_args->op.sync.offset = file->append_pos;
2499 	sync_args->op.sync.xattr_in_progress = false;
2500 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2501 	pthread_spin_unlock(&file->lock);
2502 
2503 	flush_args->file = file;
2504 	channel->send_request(__file_flush, flush_req);
2505 }
2506 
2507 int
2508 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2509 {
2510 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2511 	struct spdk_fs_cb_args args = {};
2512 
2513 	args.sem = &channel->sem;
2514 	_file_sync(file, channel, __wake_caller, &args);
2515 	sem_wait(&channel->sem);
2516 
2517 	return args.rc;
2518 }
2519 
2520 void
2521 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2522 		     spdk_file_op_complete cb_fn, void *cb_arg)
2523 {
2524 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2525 
2526 	_file_sync(file, channel, cb_fn, cb_arg);
2527 }
2528 
2529 void
2530 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2531 {
2532 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2533 	file->priority = priority;
2534 
2535 }
2536 
2537 /*
2538  * Close routines
2539  */
2540 
2541 static void
2542 __file_close_async_done(void *ctx, int bserrno)
2543 {
2544 	struct spdk_fs_request *req = ctx;
2545 	struct spdk_fs_cb_args *args = &req->args;
2546 	struct spdk_file *file = args->file;
2547 
2548 	if (file->is_deleted) {
2549 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2550 		return;
2551 	}
2552 
2553 	args->fn.file_op(args->arg, bserrno);
2554 	free_fs_request(req);
2555 }
2556 
2557 static void
2558 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2559 {
2560 	struct spdk_blob *blob;
2561 
2562 	pthread_spin_lock(&file->lock);
2563 	if (file->ref_count == 0) {
2564 		pthread_spin_unlock(&file->lock);
2565 		__file_close_async_done(req, -EBADF);
2566 		return;
2567 	}
2568 
2569 	file->ref_count--;
2570 	if (file->ref_count > 0) {
2571 		pthread_spin_unlock(&file->lock);
2572 		req->args.fn.file_op(req->args.arg, 0);
2573 		free_fs_request(req);
2574 		return;
2575 	}
2576 
2577 	pthread_spin_unlock(&file->lock);
2578 
2579 	blob = file->blob;
2580 	file->blob = NULL;
2581 	spdk_blob_close(blob, __file_close_async_done, req);
2582 }
2583 
2584 static void
2585 __file_close_async__sync_done(void *arg, int fserrno)
2586 {
2587 	struct spdk_fs_request *req = arg;
2588 	struct spdk_fs_cb_args *args = &req->args;
2589 
2590 	__file_close_async(args->file, req);
2591 }
2592 
2593 void
2594 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2595 {
2596 	struct spdk_fs_request *req;
2597 	struct spdk_fs_cb_args *args;
2598 
2599 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2600 	if (req == NULL) {
2601 		cb_fn(cb_arg, -ENOMEM);
2602 		return;
2603 	}
2604 
2605 	args = &req->args;
2606 	args->file = file;
2607 	args->fn.file_op = cb_fn;
2608 	args->arg = cb_arg;
2609 
2610 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2611 }
2612 
2613 static void
2614 __file_close(void *arg)
2615 {
2616 	struct spdk_fs_request *req = arg;
2617 	struct spdk_fs_cb_args *args = &req->args;
2618 	struct spdk_file *file = args->file;
2619 
2620 	__file_close_async(file, req);
2621 }
2622 
2623 int
2624 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2625 {
2626 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2627 	struct spdk_fs_request *req;
2628 	struct spdk_fs_cb_args *args;
2629 
2630 	req = alloc_fs_request(channel);
2631 	if (req == NULL) {
2632 		return -ENOMEM;
2633 	}
2634 
2635 	args = &req->args;
2636 
2637 	spdk_file_sync(file, ctx);
2638 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2639 	args->file = file;
2640 	args->sem = &channel->sem;
2641 	args->fn.file_op = __wake_caller;
2642 	args->arg = req;
2643 	channel->send_request(__file_close, req);
2644 	sem_wait(&channel->sem);
2645 
2646 	return args->rc;
2647 }
2648 
2649 int
2650 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2651 {
2652 	if (size < sizeof(spdk_blob_id)) {
2653 		return -EINVAL;
2654 	}
2655 
2656 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2657 
2658 	return sizeof(spdk_blob_id);
2659 }
2660 
2661 static void
2662 cache_free_buffers(struct spdk_file *file)
2663 {
2664 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2665 	pthread_spin_lock(&file->lock);
2666 	pthread_spin_lock(&g_caches_lock);
2667 	if (file->tree->present_mask == 0) {
2668 		pthread_spin_unlock(&g_caches_lock);
2669 		pthread_spin_unlock(&file->lock);
2670 		return;
2671 	}
2672 	spdk_tree_free_buffers(file->tree);
2673 
2674 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2675 	/* If not freed, put it in the end of the queue */
2676 	if (file->tree->present_mask != 0) {
2677 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2678 	}
2679 	file->last = NULL;
2680 	pthread_spin_unlock(&g_caches_lock);
2681 	pthread_spin_unlock(&file->lock);
2682 }
2683 
2684 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2685 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2686