xref: /spdk/lib/blobfs/blobfs.c (revision 5d0b5e2ce9b5e6e861daab837ca9f527ee64b07c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	struct iovec *iovs;
138 	uint32_t iovcnt;
139 	struct iovec iov;
140 	union {
141 		struct {
142 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
143 		} fs_load;
144 		struct {
145 			uint64_t	length;
146 		} truncate;
147 		struct {
148 			struct spdk_io_channel	*channel;
149 			void		*pin_buf;
150 			int		is_read;
151 			off_t		offset;
152 			size_t		length;
153 			uint64_t	start_lba;
154 			uint64_t	num_lba;
155 			uint32_t	blocklen;
156 		} rw;
157 		struct {
158 			const char	*old_name;
159 			const char	*new_name;
160 		} rename;
161 		struct {
162 			struct cache_buffer	*cache_buffer;
163 			uint64_t		length;
164 		} flush;
165 		struct {
166 			struct cache_buffer	*cache_buffer;
167 			uint64_t		length;
168 			uint64_t		offset;
169 		} readahead;
170 		struct {
171 			uint64_t			offset;
172 			TAILQ_ENTRY(spdk_fs_request)	tailq;
173 			bool				xattr_in_progress;
174 		} sync;
175 		struct {
176 			uint32_t			num_clusters;
177 		} resize;
178 		struct {
179 			const char	*name;
180 			uint32_t	flags;
181 			TAILQ_ENTRY(spdk_fs_request)	tailq;
182 		} open;
183 		struct {
184 			const char		*name;
185 			struct spdk_blob	*blob;
186 		} create;
187 		struct {
188 			const char	*name;
189 		} delete;
190 		struct {
191 			const char	*name;
192 		} stat;
193 	} op;
194 };
195 
196 static void cache_free_buffers(struct spdk_file *file);
197 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
198 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
199 
200 void
201 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
202 {
203 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
204 }
205 
206 static void
207 __initialize_cache(void)
208 {
209 	assert(g_cache_pool == NULL);
210 
211 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
212 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
213 					   CACHE_BUFFER_SIZE,
214 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
215 					   SPDK_ENV_SOCKET_ID_ANY);
216 	if (!g_cache_pool) {
217 		SPDK_ERRLOG("Create mempool failed, you may "
218 			    "increase the memory and try again\n");
219 		assert(false);
220 	}
221 	TAILQ_INIT(&g_caches);
222 	pthread_spin_init(&g_caches_lock, 0);
223 }
224 
225 static void
226 __free_cache(void)
227 {
228 	assert(g_cache_pool != NULL);
229 
230 	spdk_mempool_free(g_cache_pool);
231 	g_cache_pool = NULL;
232 }
233 
234 static uint64_t
235 __file_get_blob_size(struct spdk_file *file)
236 {
237 	uint64_t cluster_sz;
238 
239 	cluster_sz = file->fs->bs_opts.cluster_sz;
240 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
241 }
242 
243 struct spdk_fs_request {
244 	struct spdk_fs_cb_args		args;
245 	TAILQ_ENTRY(spdk_fs_request)	link;
246 	struct spdk_fs_channel		*channel;
247 };
248 
249 struct spdk_fs_channel {
250 	struct spdk_fs_request		*req_mem;
251 	TAILQ_HEAD(, spdk_fs_request)	reqs;
252 	sem_t				sem;
253 	struct spdk_filesystem		*fs;
254 	struct spdk_io_channel		*bs_channel;
255 	fs_send_request_fn		send_request;
256 	bool				sync;
257 	pthread_spinlock_t		lock;
258 };
259 
260 /* For now, this is effectively an alias. But eventually we'll shift
261  * some data members over. */
262 struct spdk_fs_thread_ctx {
263 	struct spdk_fs_channel	ch;
264 };
265 
266 static struct spdk_fs_request *
267 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
268 {
269 	struct spdk_fs_request *req;
270 	struct iovec *iovs = NULL;
271 
272 	if (iovcnt > 1) {
273 		iovs = calloc(iovcnt, sizeof(struct iovec));
274 		if (!iovs) {
275 			return NULL;
276 		}
277 	}
278 
279 	if (channel->sync) {
280 		pthread_spin_lock(&channel->lock);
281 	}
282 
283 	req = TAILQ_FIRST(&channel->reqs);
284 	if (req) {
285 		TAILQ_REMOVE(&channel->reqs, req, link);
286 	}
287 
288 	if (channel->sync) {
289 		pthread_spin_unlock(&channel->lock);
290 	}
291 
292 	if (req == NULL) {
293 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
294 		free(iovs);
295 		return NULL;
296 	}
297 	memset(req, 0, sizeof(*req));
298 	req->channel = channel;
299 	if (iovcnt > 1) {
300 		req->args.iovs = iovs;
301 	} else {
302 		req->args.iovs = &req->args.iov;
303 	}
304 	req->args.iovcnt = iovcnt;
305 
306 	return req;
307 }
308 
309 static struct spdk_fs_request *
310 alloc_fs_request(struct spdk_fs_channel *channel)
311 {
312 	return alloc_fs_request_with_iov(channel, 0);
313 }
314 
315 static void
316 free_fs_request(struct spdk_fs_request *req)
317 {
318 	struct spdk_fs_channel *channel = req->channel;
319 
320 	if (req->args.iovcnt > 1) {
321 		free(req->args.iovs);
322 	}
323 
324 	if (channel->sync) {
325 		pthread_spin_lock(&channel->lock);
326 	}
327 
328 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
329 
330 	if (channel->sync) {
331 		pthread_spin_unlock(&channel->lock);
332 	}
333 }
334 
335 static int
336 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
337 			uint32_t max_ops)
338 {
339 	uint32_t i;
340 
341 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
342 	if (!channel->req_mem) {
343 		return -1;
344 	}
345 
346 	TAILQ_INIT(&channel->reqs);
347 	sem_init(&channel->sem, 0, 0);
348 
349 	for (i = 0; i < max_ops; i++) {
350 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
351 	}
352 
353 	channel->fs = fs;
354 
355 	return 0;
356 }
357 
358 static int
359 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
360 {
361 	struct spdk_filesystem		*fs;
362 	struct spdk_fs_channel		*channel = ctx_buf;
363 
364 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
365 
366 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
367 }
368 
369 static int
370 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
371 {
372 	struct spdk_filesystem		*fs;
373 	struct spdk_fs_channel		*channel = ctx_buf;
374 
375 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
376 
377 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
378 }
379 
380 static int
381 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
382 {
383 	struct spdk_filesystem		*fs;
384 	struct spdk_fs_channel		*channel = ctx_buf;
385 
386 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
387 
388 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
389 }
390 
391 static void
392 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
393 {
394 	struct spdk_fs_channel *channel = ctx_buf;
395 
396 	free(channel->req_mem);
397 	if (channel->bs_channel != NULL) {
398 		spdk_bs_free_io_channel(channel->bs_channel);
399 	}
400 }
401 
402 static void
403 __send_request_direct(fs_request_fn fn, void *arg)
404 {
405 	fn(arg);
406 }
407 
408 static void
409 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
410 {
411 	fs->bs = bs;
412 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
413 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
414 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
415 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
416 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
417 
418 	pthread_mutex_lock(&g_cache_init_lock);
419 	if (g_fs_count == 0) {
420 		__initialize_cache();
421 	}
422 	g_fs_count++;
423 	pthread_mutex_unlock(&g_cache_init_lock);
424 }
425 
426 static void
427 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
428 {
429 	struct spdk_fs_request *req = ctx;
430 	struct spdk_fs_cb_args *args = &req->args;
431 	struct spdk_filesystem *fs = args->fs;
432 
433 	if (bserrno == 0) {
434 		common_fs_bs_init(fs, bs);
435 	} else {
436 		free(fs);
437 		fs = NULL;
438 	}
439 
440 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
441 	free_fs_request(req);
442 }
443 
444 static void
445 fs_conf_parse(void)
446 {
447 	struct spdk_conf_section *sp;
448 
449 	sp = spdk_conf_find_section(NULL, "Blobfs");
450 	if (sp == NULL) {
451 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
452 		return;
453 	}
454 
455 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
456 	if (g_fs_cache_buffer_shift <= 0) {
457 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
458 	}
459 }
460 
461 static struct spdk_filesystem *
462 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
463 {
464 	struct spdk_filesystem *fs;
465 
466 	fs = calloc(1, sizeof(*fs));
467 	if (fs == NULL) {
468 		return NULL;
469 	}
470 
471 	fs->bdev = dev;
472 	fs->send_request = send_request_fn;
473 	TAILQ_INIT(&fs->files);
474 
475 	fs->md_target.max_ops = 512;
476 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
477 				sizeof(struct spdk_fs_channel), "blobfs_md");
478 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
479 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
480 
481 	fs->sync_target.max_ops = 512;
482 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
483 				sizeof(struct spdk_fs_channel), "blobfs_sync");
484 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
485 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
486 
487 	fs->io_target.max_ops = 512;
488 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
489 				sizeof(struct spdk_fs_channel), "blobfs_io");
490 
491 	return fs;
492 }
493 
494 static void
495 __wake_caller(void *arg, int fserrno)
496 {
497 	struct spdk_fs_cb_args *args = arg;
498 
499 	args->rc = fserrno;
500 	sem_post(args->sem);
501 }
502 
503 void
504 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
505 	     fs_send_request_fn send_request_fn,
506 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
507 {
508 	struct spdk_filesystem *fs;
509 	struct spdk_fs_request *req;
510 	struct spdk_fs_cb_args *args;
511 	struct spdk_bs_opts opts = {};
512 
513 	fs = fs_alloc(dev, send_request_fn);
514 	if (fs == NULL) {
515 		cb_fn(cb_arg, NULL, -ENOMEM);
516 		return;
517 	}
518 
519 	fs_conf_parse();
520 
521 	req = alloc_fs_request(fs->md_target.md_fs_channel);
522 	if (req == NULL) {
523 		spdk_fs_free_io_channels(fs);
524 		spdk_fs_io_device_unregister(fs);
525 		cb_fn(cb_arg, NULL, -ENOMEM);
526 		return;
527 	}
528 
529 	args = &req->args;
530 	args->fn.fs_op_with_handle = cb_fn;
531 	args->arg = cb_arg;
532 	args->fs = fs;
533 
534 	spdk_bs_opts_init(&opts);
535 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
536 	if (opt) {
537 		opts.cluster_sz = opt->cluster_sz;
538 	}
539 	spdk_bs_init(dev, &opts, init_cb, req);
540 }
541 
542 static struct spdk_file *
543 file_alloc(struct spdk_filesystem *fs)
544 {
545 	struct spdk_file *file;
546 
547 	file = calloc(1, sizeof(*file));
548 	if (file == NULL) {
549 		return NULL;
550 	}
551 
552 	file->tree = calloc(1, sizeof(*file->tree));
553 	if (file->tree == NULL) {
554 		free(file);
555 		return NULL;
556 	}
557 
558 	file->fs = fs;
559 	TAILQ_INIT(&file->open_requests);
560 	TAILQ_INIT(&file->sync_requests);
561 	pthread_spin_init(&file->lock, 0);
562 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
563 	file->priority = SPDK_FILE_PRIORITY_LOW;
564 	return file;
565 }
566 
567 static void fs_load_done(void *ctx, int bserrno);
568 
569 static int
570 _handle_deleted_files(struct spdk_fs_request *req)
571 {
572 	struct spdk_fs_cb_args *args = &req->args;
573 	struct spdk_filesystem *fs = args->fs;
574 
575 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
576 		struct spdk_deleted_file *deleted_file;
577 
578 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
579 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
580 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
581 		free(deleted_file);
582 		return 0;
583 	}
584 
585 	return 1;
586 }
587 
588 static void
589 fs_load_done(void *ctx, int bserrno)
590 {
591 	struct spdk_fs_request *req = ctx;
592 	struct spdk_fs_cb_args *args = &req->args;
593 	struct spdk_filesystem *fs = args->fs;
594 
595 	/* The filesystem has been loaded.  Now check if there are any files that
596 	 *  were marked for deletion before last unload.  Do not complete the
597 	 *  fs_load callback until all of them have been deleted on disk.
598 	 */
599 	if (_handle_deleted_files(req) == 0) {
600 		/* We found a file that's been marked for deleting but not actually
601 		 *  deleted yet.  This function will get called again once the delete
602 		 *  operation is completed.
603 		 */
604 		return;
605 	}
606 
607 	args->fn.fs_op_with_handle(args->arg, fs, 0);
608 	free_fs_request(req);
609 
610 }
611 
612 static void
613 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
614 {
615 	struct spdk_fs_request *req = ctx;
616 	struct spdk_fs_cb_args *args = &req->args;
617 	struct spdk_filesystem *fs = args->fs;
618 	uint64_t *length;
619 	const char *name;
620 	uint32_t *is_deleted;
621 	size_t value_len;
622 
623 	if (rc < 0) {
624 		args->fn.fs_op_with_handle(args->arg, fs, rc);
625 		free_fs_request(req);
626 		return;
627 	}
628 
629 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
630 	if (rc < 0) {
631 		args->fn.fs_op_with_handle(args->arg, fs, rc);
632 		free_fs_request(req);
633 		return;
634 	}
635 
636 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
637 	if (rc < 0) {
638 		args->fn.fs_op_with_handle(args->arg, fs, rc);
639 		free_fs_request(req);
640 		return;
641 	}
642 
643 	assert(value_len == 8);
644 
645 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
646 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
647 	if (rc < 0) {
648 		struct spdk_file *f;
649 
650 		f = file_alloc(fs);
651 		if (f == NULL) {
652 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
653 			free_fs_request(req);
654 			return;
655 		}
656 
657 		f->name = strdup(name);
658 		f->blobid = spdk_blob_get_id(blob);
659 		f->length = *length;
660 		f->length_flushed = *length;
661 		f->append_pos = *length;
662 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
663 	} else {
664 		struct spdk_deleted_file *deleted_file;
665 
666 		deleted_file = calloc(1, sizeof(*deleted_file));
667 		if (deleted_file == NULL) {
668 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
669 			free_fs_request(req);
670 			return;
671 		}
672 		deleted_file->id = spdk_blob_get_id(blob);
673 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
674 	}
675 }
676 
677 static void
678 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
679 {
680 	struct spdk_fs_request *req = ctx;
681 	struct spdk_fs_cb_args *args = &req->args;
682 	struct spdk_filesystem *fs = args->fs;
683 	struct spdk_bs_type bstype;
684 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
685 	static const struct spdk_bs_type zeros;
686 
687 	if (bserrno != 0) {
688 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
689 		free_fs_request(req);
690 		free(fs);
691 		return;
692 	}
693 
694 	bstype = spdk_bs_get_bstype(bs);
695 
696 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
697 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
698 		spdk_bs_set_bstype(bs, blobfs_type);
699 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
700 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
701 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
702 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
703 		free_fs_request(req);
704 		free(fs);
705 		return;
706 	}
707 
708 	common_fs_bs_init(fs, bs);
709 	fs_load_done(req, 0);
710 }
711 
712 static void
713 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
714 {
715 	assert(fs != NULL);
716 	spdk_io_device_unregister(&fs->md_target, NULL);
717 	spdk_io_device_unregister(&fs->sync_target, NULL);
718 	spdk_io_device_unregister(&fs->io_target, NULL);
719 	free(fs);
720 }
721 
722 static void
723 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
724 {
725 	assert(fs != NULL);
726 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
727 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
728 }
729 
730 void
731 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
732 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
733 {
734 	struct spdk_filesystem *fs;
735 	struct spdk_fs_cb_args *args;
736 	struct spdk_fs_request *req;
737 	struct spdk_bs_opts	bs_opts;
738 
739 	fs = fs_alloc(dev, send_request_fn);
740 	if (fs == NULL) {
741 		cb_fn(cb_arg, NULL, -ENOMEM);
742 		return;
743 	}
744 
745 	fs_conf_parse();
746 
747 	req = alloc_fs_request(fs->md_target.md_fs_channel);
748 	if (req == NULL) {
749 		spdk_fs_free_io_channels(fs);
750 		spdk_fs_io_device_unregister(fs);
751 		cb_fn(cb_arg, NULL, -ENOMEM);
752 		return;
753 	}
754 
755 	args = &req->args;
756 	args->fn.fs_op_with_handle = cb_fn;
757 	args->arg = cb_arg;
758 	args->fs = fs;
759 	TAILQ_INIT(&args->op.fs_load.deleted_files);
760 	spdk_bs_opts_init(&bs_opts);
761 	bs_opts.iter_cb_fn = iter_cb;
762 	bs_opts.iter_cb_arg = req;
763 	spdk_bs_load(dev, &bs_opts, load_cb, req);
764 }
765 
766 static void
767 unload_cb(void *ctx, int bserrno)
768 {
769 	struct spdk_fs_request *req = ctx;
770 	struct spdk_fs_cb_args *args = &req->args;
771 	struct spdk_filesystem *fs = args->fs;
772 	struct spdk_file *file, *tmp;
773 
774 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
775 		TAILQ_REMOVE(&fs->files, file, tailq);
776 		cache_free_buffers(file);
777 		free(file->name);
778 		free(file->tree);
779 		free(file);
780 	}
781 
782 	pthread_mutex_lock(&g_cache_init_lock);
783 	g_fs_count--;
784 	if (g_fs_count == 0) {
785 		__free_cache();
786 	}
787 	pthread_mutex_unlock(&g_cache_init_lock);
788 
789 	args->fn.fs_op(args->arg, bserrno);
790 	free(req);
791 
792 	spdk_fs_io_device_unregister(fs);
793 }
794 
795 void
796 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
797 {
798 	struct spdk_fs_request *req;
799 	struct spdk_fs_cb_args *args;
800 
801 	/*
802 	 * We must free the md_channel before unloading the blobstore, so just
803 	 *  allocate this request from the general heap.
804 	 */
805 	req = calloc(1, sizeof(*req));
806 	if (req == NULL) {
807 		cb_fn(cb_arg, -ENOMEM);
808 		return;
809 	}
810 
811 	args = &req->args;
812 	args->fn.fs_op = cb_fn;
813 	args->arg = cb_arg;
814 	args->fs = fs;
815 
816 	spdk_fs_free_io_channels(fs);
817 	spdk_bs_unload(fs->bs, unload_cb, req);
818 }
819 
820 static struct spdk_file *
821 fs_find_file(struct spdk_filesystem *fs, const char *name)
822 {
823 	struct spdk_file *file;
824 
825 	TAILQ_FOREACH(file, &fs->files, tailq) {
826 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
827 			return file;
828 		}
829 	}
830 
831 	return NULL;
832 }
833 
834 void
835 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
836 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
837 {
838 	struct spdk_file_stat stat;
839 	struct spdk_file *f = NULL;
840 
841 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
842 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
843 		return;
844 	}
845 
846 	f = fs_find_file(fs, name);
847 	if (f != NULL) {
848 		stat.blobid = f->blobid;
849 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
850 		cb_fn(cb_arg, &stat, 0);
851 		return;
852 	}
853 
854 	cb_fn(cb_arg, NULL, -ENOENT);
855 }
856 
857 static void
858 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
859 {
860 	struct spdk_fs_request *req = arg;
861 	struct spdk_fs_cb_args *args = &req->args;
862 
863 	args->rc = fserrno;
864 	if (fserrno == 0) {
865 		memcpy(args->arg, stat, sizeof(*stat));
866 	}
867 	sem_post(args->sem);
868 }
869 
870 static void
871 __file_stat(void *arg)
872 {
873 	struct spdk_fs_request *req = arg;
874 	struct spdk_fs_cb_args *args = &req->args;
875 
876 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
877 				args->fn.stat_op, req);
878 }
879 
880 int
881 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
882 		  const char *name, struct spdk_file_stat *stat)
883 {
884 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
885 	struct spdk_fs_request *req;
886 	int rc;
887 
888 	req = alloc_fs_request(channel);
889 	if (req == NULL) {
890 		return -ENOMEM;
891 	}
892 
893 	req->args.fs = fs;
894 	req->args.op.stat.name = name;
895 	req->args.fn.stat_op = __copy_stat;
896 	req->args.arg = stat;
897 	req->args.sem = &channel->sem;
898 	channel->send_request(__file_stat, req);
899 	sem_wait(&channel->sem);
900 
901 	rc = req->args.rc;
902 	free_fs_request(req);
903 
904 	return rc;
905 }
906 
907 static void
908 fs_create_blob_close_cb(void *ctx, int bserrno)
909 {
910 	int rc;
911 	struct spdk_fs_request *req = ctx;
912 	struct spdk_fs_cb_args *args = &req->args;
913 
914 	rc = args->rc ? args->rc : bserrno;
915 	args->fn.file_op(args->arg, rc);
916 	free_fs_request(req);
917 }
918 
919 static void
920 fs_create_blob_resize_cb(void *ctx, int bserrno)
921 {
922 	struct spdk_fs_request *req = ctx;
923 	struct spdk_fs_cb_args *args = &req->args;
924 	struct spdk_file *f = args->file;
925 	struct spdk_blob *blob = args->op.create.blob;
926 	uint64_t length = 0;
927 
928 	args->rc = bserrno;
929 	if (bserrno) {
930 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
931 		return;
932 	}
933 
934 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
935 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
936 
937 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
938 }
939 
940 static void
941 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
942 {
943 	struct spdk_fs_request *req = ctx;
944 	struct spdk_fs_cb_args *args = &req->args;
945 
946 	if (bserrno) {
947 		args->fn.file_op(args->arg, bserrno);
948 		free_fs_request(req);
949 		return;
950 	}
951 
952 	args->op.create.blob = blob;
953 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
954 }
955 
956 static void
957 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
958 {
959 	struct spdk_fs_request *req = ctx;
960 	struct spdk_fs_cb_args *args = &req->args;
961 	struct spdk_file *f = args->file;
962 
963 	if (bserrno) {
964 		args->fn.file_op(args->arg, bserrno);
965 		free_fs_request(req);
966 		return;
967 	}
968 
969 	f->blobid = blobid;
970 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
971 }
972 
973 void
974 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
975 			  spdk_file_op_complete cb_fn, void *cb_arg)
976 {
977 	struct spdk_file *file;
978 	struct spdk_fs_request *req;
979 	struct spdk_fs_cb_args *args;
980 
981 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
982 		cb_fn(cb_arg, -ENAMETOOLONG);
983 		return;
984 	}
985 
986 	file = fs_find_file(fs, name);
987 	if (file != NULL) {
988 		cb_fn(cb_arg, -EEXIST);
989 		return;
990 	}
991 
992 	file = file_alloc(fs);
993 	if (file == NULL) {
994 		cb_fn(cb_arg, -ENOMEM);
995 		return;
996 	}
997 
998 	req = alloc_fs_request(fs->md_target.md_fs_channel);
999 	if (req == NULL) {
1000 		cb_fn(cb_arg, -ENOMEM);
1001 		return;
1002 	}
1003 
1004 	args = &req->args;
1005 	args->file = file;
1006 	args->fn.file_op = cb_fn;
1007 	args->arg = cb_arg;
1008 
1009 	file->name = strdup(name);
1010 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1011 }
1012 
1013 static void
1014 __fs_create_file_done(void *arg, int fserrno)
1015 {
1016 	struct spdk_fs_request *req = arg;
1017 	struct spdk_fs_cb_args *args = &req->args;
1018 
1019 	args->rc = fserrno;
1020 	sem_post(args->sem);
1021 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1022 }
1023 
1024 static void
1025 __fs_create_file(void *arg)
1026 {
1027 	struct spdk_fs_request *req = arg;
1028 	struct spdk_fs_cb_args *args = &req->args;
1029 
1030 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1031 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1032 }
1033 
1034 int
1035 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1036 {
1037 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1038 	struct spdk_fs_request *req;
1039 	struct spdk_fs_cb_args *args;
1040 	int rc;
1041 
1042 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1043 
1044 	req = alloc_fs_request(channel);
1045 	if (req == NULL) {
1046 		return -ENOMEM;
1047 	}
1048 
1049 	args = &req->args;
1050 	args->fs = fs;
1051 	args->op.create.name = name;
1052 	args->sem = &channel->sem;
1053 	fs->send_request(__fs_create_file, req);
1054 	sem_wait(&channel->sem);
1055 	rc = args->rc;
1056 	free_fs_request(req);
1057 
1058 	return rc;
1059 }
1060 
1061 static void
1062 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1063 {
1064 	struct spdk_fs_request *req = ctx;
1065 	struct spdk_fs_cb_args *args = &req->args;
1066 	struct spdk_file *f = args->file;
1067 
1068 	f->blob = blob;
1069 	while (!TAILQ_EMPTY(&f->open_requests)) {
1070 		req = TAILQ_FIRST(&f->open_requests);
1071 		args = &req->args;
1072 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1073 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1074 		free_fs_request(req);
1075 	}
1076 }
1077 
1078 static void
1079 fs_open_blob_create_cb(void *ctx, int bserrno)
1080 {
1081 	struct spdk_fs_request *req = ctx;
1082 	struct spdk_fs_cb_args *args = &req->args;
1083 	struct spdk_file *file = args->file;
1084 	struct spdk_filesystem *fs = args->fs;
1085 
1086 	if (file == NULL) {
1087 		/*
1088 		 * This is from an open with CREATE flag - the file
1089 		 *  is now created so look it up in the file list for this
1090 		 *  filesystem.
1091 		 */
1092 		file = fs_find_file(fs, args->op.open.name);
1093 		assert(file != NULL);
1094 		args->file = file;
1095 	}
1096 
1097 	file->ref_count++;
1098 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1099 	if (file->ref_count == 1) {
1100 		assert(file->blob == NULL);
1101 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1102 	} else if (file->blob != NULL) {
1103 		fs_open_blob_done(req, file->blob, 0);
1104 	} else {
1105 		/*
1106 		 * The blob open for this file is in progress due to a previous
1107 		 *  open request.  When that open completes, it will invoke the
1108 		 *  open callback for this request.
1109 		 */
1110 	}
1111 }
1112 
1113 void
1114 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1115 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1116 {
1117 	struct spdk_file *f = NULL;
1118 	struct spdk_fs_request *req;
1119 	struct spdk_fs_cb_args *args;
1120 
1121 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1122 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1123 		return;
1124 	}
1125 
1126 	f = fs_find_file(fs, name);
1127 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1128 		cb_fn(cb_arg, NULL, -ENOENT);
1129 		return;
1130 	}
1131 
1132 	if (f != NULL && f->is_deleted == true) {
1133 		cb_fn(cb_arg, NULL, -ENOENT);
1134 		return;
1135 	}
1136 
1137 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1138 	if (req == NULL) {
1139 		cb_fn(cb_arg, NULL, -ENOMEM);
1140 		return;
1141 	}
1142 
1143 	args = &req->args;
1144 	args->fn.file_op_with_handle = cb_fn;
1145 	args->arg = cb_arg;
1146 	args->file = f;
1147 	args->fs = fs;
1148 	args->op.open.name = name;
1149 
1150 	if (f == NULL) {
1151 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1152 	} else {
1153 		fs_open_blob_create_cb(req, 0);
1154 	}
1155 }
1156 
1157 static void
1158 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1159 {
1160 	struct spdk_fs_request *req = arg;
1161 	struct spdk_fs_cb_args *args = &req->args;
1162 
1163 	args->file = file;
1164 	__wake_caller(args, bserrno);
1165 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1166 }
1167 
1168 static void
1169 __fs_open_file(void *arg)
1170 {
1171 	struct spdk_fs_request *req = arg;
1172 	struct spdk_fs_cb_args *args = &req->args;
1173 
1174 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1175 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1176 				__fs_open_file_done, req);
1177 }
1178 
1179 int
1180 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1181 		  const char *name, uint32_t flags, struct spdk_file **file)
1182 {
1183 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1184 	struct spdk_fs_request *req;
1185 	struct spdk_fs_cb_args *args;
1186 	int rc;
1187 
1188 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1189 
1190 	req = alloc_fs_request(channel);
1191 	if (req == NULL) {
1192 		return -ENOMEM;
1193 	}
1194 
1195 	args = &req->args;
1196 	args->fs = fs;
1197 	args->op.open.name = name;
1198 	args->op.open.flags = flags;
1199 	args->sem = &channel->sem;
1200 	fs->send_request(__fs_open_file, req);
1201 	sem_wait(&channel->sem);
1202 	rc = args->rc;
1203 	if (rc == 0) {
1204 		*file = args->file;
1205 	} else {
1206 		*file = NULL;
1207 	}
1208 	free_fs_request(req);
1209 
1210 	return rc;
1211 }
1212 
1213 static void
1214 fs_rename_blob_close_cb(void *ctx, int bserrno)
1215 {
1216 	struct spdk_fs_request *req = ctx;
1217 	struct spdk_fs_cb_args *args = &req->args;
1218 
1219 	args->fn.fs_op(args->arg, bserrno);
1220 	free_fs_request(req);
1221 }
1222 
1223 static void
1224 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1225 {
1226 	struct spdk_fs_request *req = ctx;
1227 	struct spdk_fs_cb_args *args = &req->args;
1228 	const char *new_name = args->op.rename.new_name;
1229 
1230 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1231 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1232 }
1233 
1234 static void
1235 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1236 {
1237 	struct spdk_fs_cb_args *args = &req->args;
1238 	struct spdk_file *f;
1239 
1240 	f = fs_find_file(args->fs, args->op.rename.old_name);
1241 	if (f == NULL) {
1242 		args->fn.fs_op(args->arg, -ENOENT);
1243 		free_fs_request(req);
1244 		return;
1245 	}
1246 
1247 	free(f->name);
1248 	f->name = strdup(args->op.rename.new_name);
1249 	args->file = f;
1250 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1251 }
1252 
1253 static void
1254 fs_rename_delete_done(void *arg, int fserrno)
1255 {
1256 	__spdk_fs_md_rename_file(arg);
1257 }
1258 
1259 void
1260 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1261 			  const char *old_name, const char *new_name,
1262 			  spdk_file_op_complete cb_fn, void *cb_arg)
1263 {
1264 	struct spdk_file *f;
1265 	struct spdk_fs_request *req;
1266 	struct spdk_fs_cb_args *args;
1267 
1268 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1269 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1270 		cb_fn(cb_arg, -ENAMETOOLONG);
1271 		return;
1272 	}
1273 
1274 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1275 	if (req == NULL) {
1276 		cb_fn(cb_arg, -ENOMEM);
1277 		return;
1278 	}
1279 
1280 	args = &req->args;
1281 	args->fn.fs_op = cb_fn;
1282 	args->fs = fs;
1283 	args->arg = cb_arg;
1284 	args->op.rename.old_name = old_name;
1285 	args->op.rename.new_name = new_name;
1286 
1287 	f = fs_find_file(fs, new_name);
1288 	if (f == NULL) {
1289 		__spdk_fs_md_rename_file(req);
1290 		return;
1291 	}
1292 
1293 	/*
1294 	 * The rename overwrites an existing file.  So delete the existing file, then
1295 	 *  do the actual rename.
1296 	 */
1297 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1298 }
1299 
1300 static void
1301 __fs_rename_file_done(void *arg, int fserrno)
1302 {
1303 	struct spdk_fs_request *req = arg;
1304 	struct spdk_fs_cb_args *args = &req->args;
1305 
1306 	__wake_caller(args, fserrno);
1307 }
1308 
1309 static void
1310 __fs_rename_file(void *arg)
1311 {
1312 	struct spdk_fs_request *req = arg;
1313 	struct spdk_fs_cb_args *args = &req->args;
1314 
1315 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1316 				  __fs_rename_file_done, req);
1317 }
1318 
1319 int
1320 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1321 		    const char *old_name, const char *new_name)
1322 {
1323 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1324 	struct spdk_fs_request *req;
1325 	struct spdk_fs_cb_args *args;
1326 	int rc;
1327 
1328 	req = alloc_fs_request(channel);
1329 	if (req == NULL) {
1330 		return -ENOMEM;
1331 	}
1332 
1333 	args = &req->args;
1334 
1335 	args->fs = fs;
1336 	args->op.rename.old_name = old_name;
1337 	args->op.rename.new_name = new_name;
1338 	args->sem = &channel->sem;
1339 	fs->send_request(__fs_rename_file, req);
1340 	sem_wait(&channel->sem);
1341 	rc = args->rc;
1342 	free_fs_request(req);
1343 	return rc;
1344 }
1345 
1346 static void
1347 blob_delete_cb(void *ctx, int bserrno)
1348 {
1349 	struct spdk_fs_request *req = ctx;
1350 	struct spdk_fs_cb_args *args = &req->args;
1351 
1352 	args->fn.file_op(args->arg, bserrno);
1353 	free_fs_request(req);
1354 }
1355 
1356 void
1357 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1358 			  spdk_file_op_complete cb_fn, void *cb_arg)
1359 {
1360 	struct spdk_file *f;
1361 	spdk_blob_id blobid;
1362 	struct spdk_fs_request *req;
1363 	struct spdk_fs_cb_args *args;
1364 
1365 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1366 
1367 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1368 		cb_fn(cb_arg, -ENAMETOOLONG);
1369 		return;
1370 	}
1371 
1372 	f = fs_find_file(fs, name);
1373 	if (f == NULL) {
1374 		cb_fn(cb_arg, -ENOENT);
1375 		return;
1376 	}
1377 
1378 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1379 	if (req == NULL) {
1380 		cb_fn(cb_arg, -ENOMEM);
1381 		return;
1382 	}
1383 
1384 	args = &req->args;
1385 	args->fn.file_op = cb_fn;
1386 	args->arg = cb_arg;
1387 
1388 	if (f->ref_count > 0) {
1389 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1390 		f->is_deleted = true;
1391 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1392 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1393 		return;
1394 	}
1395 
1396 	TAILQ_REMOVE(&fs->files, f, tailq);
1397 
1398 	cache_free_buffers(f);
1399 
1400 	blobid = f->blobid;
1401 
1402 	free(f->name);
1403 	free(f->tree);
1404 	free(f);
1405 
1406 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1407 }
1408 
1409 static void
1410 __fs_delete_file_done(void *arg, int fserrno)
1411 {
1412 	struct spdk_fs_request *req = arg;
1413 	struct spdk_fs_cb_args *args = &req->args;
1414 
1415 	__wake_caller(args, fserrno);
1416 }
1417 
1418 static void
1419 __fs_delete_file(void *arg)
1420 {
1421 	struct spdk_fs_request *req = arg;
1422 	struct spdk_fs_cb_args *args = &req->args;
1423 
1424 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1425 }
1426 
1427 int
1428 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1429 		    const char *name)
1430 {
1431 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1432 	struct spdk_fs_request *req;
1433 	struct spdk_fs_cb_args *args;
1434 	int rc;
1435 
1436 	req = alloc_fs_request(channel);
1437 	if (req == NULL) {
1438 		return -ENOMEM;
1439 	}
1440 
1441 	args = &req->args;
1442 	args->fs = fs;
1443 	args->op.delete.name = name;
1444 	args->sem = &channel->sem;
1445 	fs->send_request(__fs_delete_file, req);
1446 	sem_wait(&channel->sem);
1447 	rc = args->rc;
1448 	free_fs_request(req);
1449 
1450 	return rc;
1451 }
1452 
1453 spdk_fs_iter
1454 spdk_fs_iter_first(struct spdk_filesystem *fs)
1455 {
1456 	struct spdk_file *f;
1457 
1458 	f = TAILQ_FIRST(&fs->files);
1459 	return f;
1460 }
1461 
1462 spdk_fs_iter
1463 spdk_fs_iter_next(spdk_fs_iter iter)
1464 {
1465 	struct spdk_file *f = iter;
1466 
1467 	if (f == NULL) {
1468 		return NULL;
1469 	}
1470 
1471 	f = TAILQ_NEXT(f, tailq);
1472 	return f;
1473 }
1474 
1475 const char *
1476 spdk_file_get_name(struct spdk_file *file)
1477 {
1478 	return file->name;
1479 }
1480 
1481 uint64_t
1482 spdk_file_get_length(struct spdk_file *file)
1483 {
1484 	uint64_t length;
1485 
1486 	assert(file != NULL);
1487 
1488 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1489 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1490 	return length;
1491 }
1492 
1493 static void
1494 fs_truncate_complete_cb(void *ctx, int bserrno)
1495 {
1496 	struct spdk_fs_request *req = ctx;
1497 	struct spdk_fs_cb_args *args = &req->args;
1498 
1499 	args->fn.file_op(args->arg, bserrno);
1500 	free_fs_request(req);
1501 }
1502 
1503 static void
1504 fs_truncate_resize_cb(void *ctx, int bserrno)
1505 {
1506 	struct spdk_fs_request *req = ctx;
1507 	struct spdk_fs_cb_args *args = &req->args;
1508 	struct spdk_file *file = args->file;
1509 	uint64_t *length = &args->op.truncate.length;
1510 
1511 	if (bserrno) {
1512 		args->fn.file_op(args->arg, bserrno);
1513 		free_fs_request(req);
1514 		return;
1515 	}
1516 
1517 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1518 
1519 	file->length = *length;
1520 	if (file->append_pos > file->length) {
1521 		file->append_pos = file->length;
1522 	}
1523 
1524 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1525 }
1526 
1527 static uint64_t
1528 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1529 {
1530 	return (length + cluster_sz - 1) / cluster_sz;
1531 }
1532 
1533 void
1534 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1535 			 spdk_file_op_complete cb_fn, void *cb_arg)
1536 {
1537 	struct spdk_filesystem *fs;
1538 	size_t num_clusters;
1539 	struct spdk_fs_request *req;
1540 	struct spdk_fs_cb_args *args;
1541 
1542 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1543 	if (length == file->length) {
1544 		cb_fn(cb_arg, 0);
1545 		return;
1546 	}
1547 
1548 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1549 	if (req == NULL) {
1550 		cb_fn(cb_arg, -ENOMEM);
1551 		return;
1552 	}
1553 
1554 	args = &req->args;
1555 	args->fn.file_op = cb_fn;
1556 	args->arg = cb_arg;
1557 	args->file = file;
1558 	args->op.truncate.length = length;
1559 	fs = file->fs;
1560 
1561 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1562 
1563 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1564 }
1565 
1566 static void
1567 __truncate(void *arg)
1568 {
1569 	struct spdk_fs_request *req = arg;
1570 	struct spdk_fs_cb_args *args = &req->args;
1571 
1572 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1573 				 args->fn.file_op, args);
1574 }
1575 
1576 int
1577 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1578 		   uint64_t length)
1579 {
1580 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1581 	struct spdk_fs_request *req;
1582 	struct spdk_fs_cb_args *args;
1583 	int rc;
1584 
1585 	req = alloc_fs_request(channel);
1586 	if (req == NULL) {
1587 		return -ENOMEM;
1588 	}
1589 
1590 	args = &req->args;
1591 
1592 	args->file = file;
1593 	args->op.truncate.length = length;
1594 	args->fn.file_op = __wake_caller;
1595 	args->sem = &channel->sem;
1596 
1597 	channel->send_request(__truncate, req);
1598 	sem_wait(&channel->sem);
1599 	rc = args->rc;
1600 	free_fs_request(req);
1601 
1602 	return rc;
1603 }
1604 
1605 static void
1606 __rw_done(void *ctx, int bserrno)
1607 {
1608 	struct spdk_fs_request *req = ctx;
1609 	struct spdk_fs_cb_args *args = &req->args;
1610 
1611 	spdk_free(args->op.rw.pin_buf);
1612 	args->fn.file_op(args->arg, bserrno);
1613 	free_fs_request(req);
1614 }
1615 
1616 static void
1617 __read_done(void *ctx, int bserrno)
1618 {
1619 	struct spdk_fs_request *req = ctx;
1620 	struct spdk_fs_cb_args *args = &req->args;
1621 
1622 	assert(req != NULL);
1623 	if (args->op.rw.is_read) {
1624 		memcpy(args->iovs[0].iov_base,
1625 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1626 		       args->iovs[0].iov_len);
1627 		__rw_done(req, 0);
1628 	} else {
1629 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1630 		       args->iovs[0].iov_base,
1631 		       args->iovs[0].iov_len);
1632 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1633 				   args->op.rw.pin_buf,
1634 				   args->op.rw.start_lba, args->op.rw.num_lba,
1635 				   __rw_done, req);
1636 	}
1637 }
1638 
1639 static void
1640 __do_blob_read(void *ctx, int fserrno)
1641 {
1642 	struct spdk_fs_request *req = ctx;
1643 	struct spdk_fs_cb_args *args = &req->args;
1644 
1645 	if (fserrno) {
1646 		__rw_done(req, fserrno);
1647 		return;
1648 	}
1649 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1650 			  args->op.rw.pin_buf,
1651 			  args->op.rw.start_lba, args->op.rw.num_lba,
1652 			  __read_done, req);
1653 }
1654 
1655 static void
1656 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1657 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1658 {
1659 	uint64_t end_lba;
1660 
1661 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1662 	*start_lba = offset / *lba_size;
1663 	end_lba = (offset + length - 1) / *lba_size;
1664 	*num_lba = (end_lba - *start_lba + 1);
1665 }
1666 
1667 static void
1668 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1669 	    void *payload, uint64_t offset, uint64_t length,
1670 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1671 {
1672 	struct spdk_fs_request *req;
1673 	struct spdk_fs_cb_args *args;
1674 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1675 	uint64_t start_lba, num_lba, pin_buf_length;
1676 	uint32_t lba_size;
1677 
1678 	if (is_read && offset + length > file->length) {
1679 		cb_fn(cb_arg, -EINVAL);
1680 		return;
1681 	}
1682 
1683 	req = alloc_fs_request_with_iov(channel, 1);
1684 	if (req == NULL) {
1685 		cb_fn(cb_arg, -ENOMEM);
1686 		return;
1687 	}
1688 
1689 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1690 
1691 	args = &req->args;
1692 	args->fn.file_op = cb_fn;
1693 	args->arg = cb_arg;
1694 	args->file = file;
1695 	args->op.rw.channel = channel->bs_channel;
1696 	args->iovs[0].iov_base = payload;
1697 	args->iovs[0].iov_len = (size_t)length;
1698 	args->op.rw.is_read = is_read;
1699 	args->op.rw.offset = offset;
1700 	args->op.rw.blocklen = lba_size;
1701 
1702 	pin_buf_length = num_lba * lba_size;
1703 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1704 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1705 	if (args->op.rw.pin_buf == NULL) {
1706 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1707 			      file->name, offset, length);
1708 		free_fs_request(req);
1709 		cb_fn(cb_arg, -ENOMEM);
1710 		return;
1711 	}
1712 
1713 	args->op.rw.start_lba = start_lba;
1714 	args->op.rw.num_lba = num_lba;
1715 
1716 	if (!is_read && file->length < offset + length) {
1717 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1718 	} else {
1719 		__do_blob_read(req, 0);
1720 	}
1721 }
1722 
1723 void
1724 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1725 		      void *payload, uint64_t offset, uint64_t length,
1726 		      spdk_file_op_complete cb_fn, void *cb_arg)
1727 {
1728 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1729 }
1730 
1731 void
1732 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1733 		     void *payload, uint64_t offset, uint64_t length,
1734 		     spdk_file_op_complete cb_fn, void *cb_arg)
1735 {
1736 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1737 		      file->name, offset, length);
1738 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1739 }
1740 
1741 struct spdk_io_channel *
1742 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1743 {
1744 	struct spdk_io_channel *io_channel;
1745 	struct spdk_fs_channel *fs_channel;
1746 
1747 	io_channel = spdk_get_io_channel(&fs->io_target);
1748 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1749 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1750 	fs_channel->send_request = __send_request_direct;
1751 
1752 	return io_channel;
1753 }
1754 
1755 void
1756 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1757 {
1758 	spdk_put_io_channel(channel);
1759 }
1760 
1761 struct spdk_fs_thread_ctx *
1762 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1763 {
1764 	struct spdk_fs_thread_ctx *ctx;
1765 
1766 	ctx = calloc(1, sizeof(*ctx));
1767 	if (!ctx) {
1768 		return NULL;
1769 	}
1770 
1771 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1772 
1773 	ctx->ch.send_request = fs->send_request;
1774 	ctx->ch.sync = 1;
1775 	pthread_spin_init(&ctx->ch.lock, 0);
1776 
1777 	return ctx;
1778 }
1779 
1780 
1781 void
1782 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1783 {
1784 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1785 	free(ctx);
1786 }
1787 
1788 void
1789 spdk_fs_set_cache_size(uint64_t size_in_mb)
1790 {
1791 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1792 }
1793 
1794 uint64_t
1795 spdk_fs_get_cache_size(void)
1796 {
1797 	return g_fs_cache_size / (1024 * 1024);
1798 }
1799 
1800 static void __file_flush(void *ctx);
1801 
1802 static void *
1803 alloc_cache_memory_buffer(struct spdk_file *context)
1804 {
1805 	struct spdk_file *file;
1806 	void *buf;
1807 
1808 	buf = spdk_mempool_get(g_cache_pool);
1809 	if (buf != NULL) {
1810 		return buf;
1811 	}
1812 
1813 	pthread_spin_lock(&g_caches_lock);
1814 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1815 		if (!file->open_for_writing &&
1816 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1817 		    file != context) {
1818 			break;
1819 		}
1820 	}
1821 	pthread_spin_unlock(&g_caches_lock);
1822 	if (file != NULL) {
1823 		cache_free_buffers(file);
1824 		buf = spdk_mempool_get(g_cache_pool);
1825 		if (buf != NULL) {
1826 			return buf;
1827 		}
1828 	}
1829 
1830 	pthread_spin_lock(&g_caches_lock);
1831 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1832 		if (!file->open_for_writing && file != context) {
1833 			break;
1834 		}
1835 	}
1836 	pthread_spin_unlock(&g_caches_lock);
1837 	if (file != NULL) {
1838 		cache_free_buffers(file);
1839 		buf = spdk_mempool_get(g_cache_pool);
1840 		if (buf != NULL) {
1841 			return buf;
1842 		}
1843 	}
1844 
1845 	pthread_spin_lock(&g_caches_lock);
1846 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1847 		if (file != context) {
1848 			break;
1849 		}
1850 	}
1851 	pthread_spin_unlock(&g_caches_lock);
1852 	if (file != NULL) {
1853 		cache_free_buffers(file);
1854 		buf = spdk_mempool_get(g_cache_pool);
1855 		if (buf != NULL) {
1856 			return buf;
1857 		}
1858 	}
1859 
1860 	return NULL;
1861 }
1862 
1863 static struct cache_buffer *
1864 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1865 {
1866 	struct cache_buffer *buf;
1867 	int count = 0;
1868 
1869 	buf = calloc(1, sizeof(*buf));
1870 	if (buf == NULL) {
1871 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1872 		return NULL;
1873 	}
1874 
1875 	buf->buf = alloc_cache_memory_buffer(file);
1876 	while (buf->buf == NULL) {
1877 		/*
1878 		 * TODO: alloc_cache_memory_buffer() should eventually free
1879 		 *  some buffers.  Need a more sophisticated check here, instead
1880 		 *  of just bailing if 100 tries does not result in getting a
1881 		 *  free buffer.  This will involve using the sync channel's
1882 		 *  semaphore to block until a buffer becomes available.
1883 		 */
1884 		if (count++ == 100) {
1885 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1886 				    file, offset);
1887 			free(buf);
1888 			return NULL;
1889 		}
1890 		buf->buf = alloc_cache_memory_buffer(file);
1891 	}
1892 
1893 	buf->buf_size = CACHE_BUFFER_SIZE;
1894 	buf->offset = offset;
1895 
1896 	pthread_spin_lock(&g_caches_lock);
1897 	if (file->tree->present_mask == 0) {
1898 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1899 	}
1900 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1901 	pthread_spin_unlock(&g_caches_lock);
1902 
1903 	return buf;
1904 }
1905 
1906 static struct cache_buffer *
1907 cache_append_buffer(struct spdk_file *file)
1908 {
1909 	struct cache_buffer *last;
1910 
1911 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1912 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1913 
1914 	last = cache_insert_buffer(file, file->append_pos);
1915 	if (last == NULL) {
1916 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1917 		return NULL;
1918 	}
1919 
1920 	file->last = last;
1921 
1922 	return last;
1923 }
1924 
1925 static void __check_sync_reqs(struct spdk_file *file);
1926 
1927 static void
1928 __file_cache_finish_sync(void *ctx, int bserrno)
1929 {
1930 	struct spdk_file *file = ctx;
1931 	struct spdk_fs_request *sync_req;
1932 	struct spdk_fs_cb_args *sync_args;
1933 
1934 	pthread_spin_lock(&file->lock);
1935 	sync_req = TAILQ_FIRST(&file->sync_requests);
1936 	sync_args = &sync_req->args;
1937 	assert(sync_args->op.sync.offset <= file->length_flushed);
1938 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1939 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1940 	pthread_spin_unlock(&file->lock);
1941 
1942 	sync_args->fn.file_op(sync_args->arg, bserrno);
1943 	__check_sync_reqs(file);
1944 
1945 	pthread_spin_lock(&file->lock);
1946 	free_fs_request(sync_req);
1947 	pthread_spin_unlock(&file->lock);
1948 }
1949 
1950 static void
1951 __check_sync_reqs(struct spdk_file *file)
1952 {
1953 	struct spdk_fs_request *sync_req;
1954 
1955 	pthread_spin_lock(&file->lock);
1956 
1957 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1958 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1959 			break;
1960 		}
1961 	}
1962 
1963 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1964 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1965 		sync_req->args.op.sync.xattr_in_progress = true;
1966 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1967 				    sizeof(file->length_flushed));
1968 
1969 		pthread_spin_unlock(&file->lock);
1970 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1971 	} else {
1972 		pthread_spin_unlock(&file->lock);
1973 	}
1974 }
1975 
1976 static void
1977 __file_flush_done(void *ctx, int bserrno)
1978 {
1979 	struct spdk_fs_request *req = ctx;
1980 	struct spdk_fs_cb_args *args = &req->args;
1981 	struct spdk_file *file = args->file;
1982 	struct cache_buffer *next = args->op.flush.cache_buffer;
1983 
1984 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1985 
1986 	pthread_spin_lock(&file->lock);
1987 	next->in_progress = false;
1988 	next->bytes_flushed += args->op.flush.length;
1989 	file->length_flushed += args->op.flush.length;
1990 	if (file->length_flushed > file->length) {
1991 		file->length = file->length_flushed;
1992 	}
1993 	if (next->bytes_flushed == next->buf_size) {
1994 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1995 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1996 	}
1997 
1998 	/*
1999 	 * Assert that there is no cached data that extends past the end of the underlying
2000 	 *  blob.
2001 	 */
2002 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2003 	       next->bytes_filled == 0);
2004 
2005 	pthread_spin_unlock(&file->lock);
2006 
2007 	__check_sync_reqs(file);
2008 
2009 	__file_flush(req);
2010 }
2011 
2012 static void
2013 __file_flush(void *ctx)
2014 {
2015 	struct spdk_fs_request *req = ctx;
2016 	struct spdk_fs_cb_args *args = &req->args;
2017 	struct spdk_file *file = args->file;
2018 	struct cache_buffer *next;
2019 	uint64_t offset, length, start_lba, num_lba;
2020 	uint32_t lba_size;
2021 
2022 	pthread_spin_lock(&file->lock);
2023 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2024 	if (next == NULL || next->in_progress) {
2025 		/*
2026 		 * There is either no data to flush, or a flush I/O is already in
2027 		 *  progress.  So return immediately - if a flush I/O is in
2028 		 *  progress we will flush more data after that is completed.
2029 		 */
2030 		free_fs_request(req);
2031 		if (next == NULL) {
2032 			/*
2033 			 * For cases where a file's cache was evicted, and then the
2034 			 *  file was later appended, we will write the data directly
2035 			 *  to disk and bypass cache.  So just update length_flushed
2036 			 *  here to reflect that all data was already written to disk.
2037 			 */
2038 			file->length_flushed = file->append_pos;
2039 		}
2040 		pthread_spin_unlock(&file->lock);
2041 		if (next == NULL) {
2042 			/*
2043 			 * There is no data to flush, but we still need to check for any
2044 			 *  outstanding sync requests to make sure metadata gets updated.
2045 			 */
2046 			__check_sync_reqs(file);
2047 		}
2048 		return;
2049 	}
2050 
2051 	offset = next->offset + next->bytes_flushed;
2052 	length = next->bytes_filled - next->bytes_flushed;
2053 	if (length == 0) {
2054 		free_fs_request(req);
2055 		pthread_spin_unlock(&file->lock);
2056 		return;
2057 	}
2058 	args->op.flush.length = length;
2059 	args->op.flush.cache_buffer = next;
2060 
2061 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2062 
2063 	next->in_progress = true;
2064 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2065 		     offset, length, start_lba, num_lba);
2066 	pthread_spin_unlock(&file->lock);
2067 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2068 			   next->buf + (start_lba * lba_size) - next->offset,
2069 			   start_lba, num_lba, __file_flush_done, req);
2070 }
2071 
2072 static void
2073 __file_extend_done(void *arg, int bserrno)
2074 {
2075 	struct spdk_fs_cb_args *args = arg;
2076 
2077 	__wake_caller(args, bserrno);
2078 }
2079 
2080 static void
2081 __file_extend_resize_cb(void *_args, int bserrno)
2082 {
2083 	struct spdk_fs_cb_args *args = _args;
2084 	struct spdk_file *file = args->file;
2085 
2086 	if (bserrno) {
2087 		__wake_caller(args, bserrno);
2088 		return;
2089 	}
2090 
2091 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2092 }
2093 
2094 static void
2095 __file_extend_blob(void *_args)
2096 {
2097 	struct spdk_fs_cb_args *args = _args;
2098 	struct spdk_file *file = args->file;
2099 
2100 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2101 }
2102 
2103 static void
2104 __rw_from_file_done(void *ctx, int bserrno)
2105 {
2106 	struct spdk_fs_request *req = ctx;
2107 
2108 	__wake_caller(&req->args, bserrno);
2109 	free_fs_request(req);
2110 }
2111 
2112 static void
2113 __rw_from_file(void *ctx)
2114 {
2115 	struct spdk_fs_request *req = ctx;
2116 	struct spdk_fs_cb_args *args = &req->args;
2117 	struct spdk_file *file = args->file;
2118 
2119 	if (args->op.rw.is_read) {
2120 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2121 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2122 				     __rw_from_file_done, req);
2123 	} else {
2124 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2125 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2126 				      __rw_from_file_done, req);
2127 	}
2128 }
2129 
2130 static int
2131 __send_rw_from_file(struct spdk_file *file, void *payload,
2132 		    uint64_t offset, uint64_t length, bool is_read,
2133 		    struct spdk_fs_channel *channel)
2134 {
2135 	struct spdk_fs_request *req;
2136 	struct spdk_fs_cb_args *args;
2137 
2138 	req = alloc_fs_request_with_iov(channel, 1);
2139 	if (req == NULL) {
2140 		sem_post(&channel->sem);
2141 		return -ENOMEM;
2142 	}
2143 
2144 	args = &req->args;
2145 	args->file = file;
2146 	args->sem = &channel->sem;
2147 	args->iovs[0].iov_base = payload;
2148 	args->iovs[0].iov_len = (size_t)length;
2149 	args->op.rw.offset = offset;
2150 	args->op.rw.is_read = is_read;
2151 	file->fs->send_request(__rw_from_file, req);
2152 	return 0;
2153 }
2154 
2155 int
2156 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2157 		void *payload, uint64_t offset, uint64_t length)
2158 {
2159 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2160 	struct spdk_fs_request *flush_req;
2161 	uint64_t rem_length, copy, blob_size, cluster_sz;
2162 	uint32_t cache_buffers_filled = 0;
2163 	uint8_t *cur_payload;
2164 	struct cache_buffer *last;
2165 
2166 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2167 
2168 	if (length == 0) {
2169 		return 0;
2170 	}
2171 
2172 	if (offset != file->append_pos) {
2173 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2174 		return -EINVAL;
2175 	}
2176 
2177 	pthread_spin_lock(&file->lock);
2178 	file->open_for_writing = true;
2179 
2180 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2181 		cache_append_buffer(file);
2182 	}
2183 
2184 	if (file->last == NULL) {
2185 		int rc;
2186 
2187 		file->append_pos += length;
2188 		pthread_spin_unlock(&file->lock);
2189 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2190 		sem_wait(&channel->sem);
2191 		return rc;
2192 	}
2193 
2194 	blob_size = __file_get_blob_size(file);
2195 
2196 	if ((offset + length) > blob_size) {
2197 		struct spdk_fs_cb_args extend_args = {};
2198 
2199 		cluster_sz = file->fs->bs_opts.cluster_sz;
2200 		extend_args.sem = &channel->sem;
2201 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2202 		extend_args.file = file;
2203 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2204 		pthread_spin_unlock(&file->lock);
2205 		file->fs->send_request(__file_extend_blob, &extend_args);
2206 		sem_wait(&channel->sem);
2207 		if (extend_args.rc) {
2208 			return extend_args.rc;
2209 		}
2210 	}
2211 
2212 	flush_req = alloc_fs_request(channel);
2213 	if (flush_req == NULL) {
2214 		pthread_spin_unlock(&file->lock);
2215 		return -ENOMEM;
2216 	}
2217 
2218 	last = file->last;
2219 	rem_length = length;
2220 	cur_payload = payload;
2221 	while (rem_length > 0) {
2222 		copy = last->buf_size - last->bytes_filled;
2223 		if (copy > rem_length) {
2224 			copy = rem_length;
2225 		}
2226 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2227 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2228 		file->append_pos += copy;
2229 		if (file->length < file->append_pos) {
2230 			file->length = file->append_pos;
2231 		}
2232 		cur_payload += copy;
2233 		last->bytes_filled += copy;
2234 		rem_length -= copy;
2235 		if (last->bytes_filled == last->buf_size) {
2236 			cache_buffers_filled++;
2237 			last = cache_append_buffer(file);
2238 			if (last == NULL) {
2239 				BLOBFS_TRACE(file, "nomem\n");
2240 				free_fs_request(flush_req);
2241 				pthread_spin_unlock(&file->lock);
2242 				return -ENOMEM;
2243 			}
2244 		}
2245 	}
2246 
2247 	pthread_spin_unlock(&file->lock);
2248 
2249 	if (cache_buffers_filled == 0) {
2250 		free_fs_request(flush_req);
2251 		return 0;
2252 	}
2253 
2254 	flush_req->args.file = file;
2255 	file->fs->send_request(__file_flush, flush_req);
2256 	return 0;
2257 }
2258 
2259 static void
2260 __readahead_done(void *ctx, int bserrno)
2261 {
2262 	struct spdk_fs_request *req = ctx;
2263 	struct spdk_fs_cb_args *args = &req->args;
2264 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2265 	struct spdk_file *file = args->file;
2266 
2267 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2268 
2269 	pthread_spin_lock(&file->lock);
2270 	cache_buffer->bytes_filled = args->op.readahead.length;
2271 	cache_buffer->bytes_flushed = args->op.readahead.length;
2272 	cache_buffer->in_progress = false;
2273 	pthread_spin_unlock(&file->lock);
2274 
2275 	free_fs_request(req);
2276 }
2277 
2278 static void
2279 __readahead(void *ctx)
2280 {
2281 	struct spdk_fs_request *req = ctx;
2282 	struct spdk_fs_cb_args *args = &req->args;
2283 	struct spdk_file *file = args->file;
2284 	uint64_t offset, length, start_lba, num_lba;
2285 	uint32_t lba_size;
2286 
2287 	offset = args->op.readahead.offset;
2288 	length = args->op.readahead.length;
2289 	assert(length > 0);
2290 
2291 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2292 
2293 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2294 		     offset, length, start_lba, num_lba);
2295 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2296 			  args->op.readahead.cache_buffer->buf,
2297 			  start_lba, num_lba, __readahead_done, req);
2298 }
2299 
2300 static uint64_t
2301 __next_cache_buffer_offset(uint64_t offset)
2302 {
2303 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2304 }
2305 
2306 static void
2307 check_readahead(struct spdk_file *file, uint64_t offset,
2308 		struct spdk_fs_channel *channel)
2309 {
2310 	struct spdk_fs_request *req;
2311 	struct spdk_fs_cb_args *args;
2312 
2313 	offset = __next_cache_buffer_offset(offset);
2314 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2315 		return;
2316 	}
2317 
2318 	req = alloc_fs_request(channel);
2319 	if (req == NULL) {
2320 		return;
2321 	}
2322 	args = &req->args;
2323 
2324 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2325 
2326 	args->file = file;
2327 	args->op.readahead.offset = offset;
2328 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2329 	if (!args->op.readahead.cache_buffer) {
2330 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2331 		free_fs_request(req);
2332 		return;
2333 	}
2334 
2335 	args->op.readahead.cache_buffer->in_progress = true;
2336 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2337 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2338 	} else {
2339 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2340 	}
2341 	file->fs->send_request(__readahead, req);
2342 }
2343 
2344 static int
2345 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2346 	    struct spdk_fs_channel *channel)
2347 {
2348 	struct cache_buffer *buf;
2349 	int rc;
2350 
2351 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2352 	if (buf == NULL) {
2353 		pthread_spin_unlock(&file->lock);
2354 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2355 		pthread_spin_lock(&file->lock);
2356 		return rc;
2357 	}
2358 
2359 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2360 		length = buf->offset + buf->bytes_filled - offset;
2361 	}
2362 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2363 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2364 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2365 		pthread_spin_lock(&g_caches_lock);
2366 		spdk_tree_remove_buffer(file->tree, buf);
2367 		if (file->tree->present_mask == 0) {
2368 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2369 		}
2370 		pthread_spin_unlock(&g_caches_lock);
2371 	}
2372 
2373 	sem_post(&channel->sem);
2374 	return 0;
2375 }
2376 
2377 int64_t
2378 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2379 	       void *payload, uint64_t offset, uint64_t length)
2380 {
2381 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2382 	uint64_t final_offset, final_length;
2383 	uint32_t sub_reads = 0;
2384 	int rc = 0;
2385 
2386 	pthread_spin_lock(&file->lock);
2387 
2388 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2389 
2390 	file->open_for_writing = false;
2391 
2392 	if (length == 0 || offset >= file->append_pos) {
2393 		pthread_spin_unlock(&file->lock);
2394 		return 0;
2395 	}
2396 
2397 	if (offset + length > file->append_pos) {
2398 		length = file->append_pos - offset;
2399 	}
2400 
2401 	if (offset != file->next_seq_offset) {
2402 		file->seq_byte_count = 0;
2403 	}
2404 	file->seq_byte_count += length;
2405 	file->next_seq_offset = offset + length;
2406 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2407 		check_readahead(file, offset, channel);
2408 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2409 	}
2410 
2411 	final_length = 0;
2412 	final_offset = offset + length;
2413 	while (offset < final_offset) {
2414 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2415 		if (length > (final_offset - offset)) {
2416 			length = final_offset - offset;
2417 		}
2418 		rc = __file_read(file, payload, offset, length, channel);
2419 		if (rc == 0) {
2420 			final_length += length;
2421 		} else {
2422 			break;
2423 		}
2424 		payload += length;
2425 		offset += length;
2426 		sub_reads++;
2427 	}
2428 	pthread_spin_unlock(&file->lock);
2429 	while (sub_reads-- > 0) {
2430 		sem_wait(&channel->sem);
2431 	}
2432 	if (rc == 0) {
2433 		return final_length;
2434 	} else {
2435 		return rc;
2436 	}
2437 }
2438 
2439 static void
2440 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2441 	   spdk_file_op_complete cb_fn, void *cb_arg)
2442 {
2443 	struct spdk_fs_request *sync_req;
2444 	struct spdk_fs_request *flush_req;
2445 	struct spdk_fs_cb_args *sync_args;
2446 	struct spdk_fs_cb_args *flush_args;
2447 
2448 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2449 
2450 	pthread_spin_lock(&file->lock);
2451 	if (file->append_pos <= file->length_flushed) {
2452 		BLOBFS_TRACE(file, "done - no data to flush\n");
2453 		pthread_spin_unlock(&file->lock);
2454 		cb_fn(cb_arg, 0);
2455 		return;
2456 	}
2457 
2458 	sync_req = alloc_fs_request(channel);
2459 	if (!sync_req) {
2460 		pthread_spin_unlock(&file->lock);
2461 		cb_fn(cb_arg, -ENOMEM);
2462 		return;
2463 	}
2464 	sync_args = &sync_req->args;
2465 
2466 	flush_req = alloc_fs_request(channel);
2467 	if (!flush_req) {
2468 		pthread_spin_unlock(&file->lock);
2469 		cb_fn(cb_arg, -ENOMEM);
2470 		return;
2471 	}
2472 	flush_args = &flush_req->args;
2473 
2474 	sync_args->file = file;
2475 	sync_args->fn.file_op = cb_fn;
2476 	sync_args->arg = cb_arg;
2477 	sync_args->op.sync.offset = file->append_pos;
2478 	sync_args->op.sync.xattr_in_progress = false;
2479 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2480 	pthread_spin_unlock(&file->lock);
2481 
2482 	flush_args->file = file;
2483 	channel->send_request(__file_flush, flush_req);
2484 }
2485 
2486 int
2487 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2488 {
2489 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2490 	struct spdk_fs_cb_args args = {};
2491 
2492 	args.sem = &channel->sem;
2493 	_file_sync(file, channel, __wake_caller, &args);
2494 	sem_wait(&channel->sem);
2495 
2496 	return args.rc;
2497 }
2498 
2499 void
2500 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2501 		     spdk_file_op_complete cb_fn, void *cb_arg)
2502 {
2503 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2504 
2505 	_file_sync(file, channel, cb_fn, cb_arg);
2506 }
2507 
2508 void
2509 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2510 {
2511 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2512 	file->priority = priority;
2513 
2514 }
2515 
2516 /*
2517  * Close routines
2518  */
2519 
2520 static void
2521 __file_close_async_done(void *ctx, int bserrno)
2522 {
2523 	struct spdk_fs_request *req = ctx;
2524 	struct spdk_fs_cb_args *args = &req->args;
2525 	struct spdk_file *file = args->file;
2526 
2527 	if (file->is_deleted) {
2528 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2529 		return;
2530 	}
2531 
2532 	args->fn.file_op(args->arg, bserrno);
2533 	free_fs_request(req);
2534 }
2535 
2536 static void
2537 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2538 {
2539 	struct spdk_blob *blob;
2540 
2541 	pthread_spin_lock(&file->lock);
2542 	if (file->ref_count == 0) {
2543 		pthread_spin_unlock(&file->lock);
2544 		__file_close_async_done(req, -EBADF);
2545 		return;
2546 	}
2547 
2548 	file->ref_count--;
2549 	if (file->ref_count > 0) {
2550 		pthread_spin_unlock(&file->lock);
2551 		req->args.fn.file_op(req->args.arg, 0);
2552 		free_fs_request(req);
2553 		return;
2554 	}
2555 
2556 	pthread_spin_unlock(&file->lock);
2557 
2558 	blob = file->blob;
2559 	file->blob = NULL;
2560 	spdk_blob_close(blob, __file_close_async_done, req);
2561 }
2562 
2563 static void
2564 __file_close_async__sync_done(void *arg, int fserrno)
2565 {
2566 	struct spdk_fs_request *req = arg;
2567 	struct spdk_fs_cb_args *args = &req->args;
2568 
2569 	__file_close_async(args->file, req);
2570 }
2571 
2572 void
2573 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2574 {
2575 	struct spdk_fs_request *req;
2576 	struct spdk_fs_cb_args *args;
2577 
2578 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2579 	if (req == NULL) {
2580 		cb_fn(cb_arg, -ENOMEM);
2581 		return;
2582 	}
2583 
2584 	args = &req->args;
2585 	args->file = file;
2586 	args->fn.file_op = cb_fn;
2587 	args->arg = cb_arg;
2588 
2589 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2590 }
2591 
2592 static void
2593 __file_close(void *arg)
2594 {
2595 	struct spdk_fs_request *req = arg;
2596 	struct spdk_fs_cb_args *args = &req->args;
2597 	struct spdk_file *file = args->file;
2598 
2599 	__file_close_async(file, req);
2600 }
2601 
2602 int
2603 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2604 {
2605 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2606 	struct spdk_fs_request *req;
2607 	struct spdk_fs_cb_args *args;
2608 
2609 	req = alloc_fs_request(channel);
2610 	if (req == NULL) {
2611 		return -ENOMEM;
2612 	}
2613 
2614 	args = &req->args;
2615 
2616 	spdk_file_sync(file, ctx);
2617 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2618 	args->file = file;
2619 	args->sem = &channel->sem;
2620 	args->fn.file_op = __wake_caller;
2621 	args->arg = req;
2622 	channel->send_request(__file_close, req);
2623 	sem_wait(&channel->sem);
2624 
2625 	return args->rc;
2626 }
2627 
2628 int
2629 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2630 {
2631 	if (size < sizeof(spdk_blob_id)) {
2632 		return -EINVAL;
2633 	}
2634 
2635 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2636 
2637 	return sizeof(spdk_blob_id);
2638 }
2639 
2640 static void
2641 cache_free_buffers(struct spdk_file *file)
2642 {
2643 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2644 	pthread_spin_lock(&file->lock);
2645 	pthread_spin_lock(&g_caches_lock);
2646 	if (file->tree->present_mask == 0) {
2647 		pthread_spin_unlock(&g_caches_lock);
2648 		pthread_spin_unlock(&file->lock);
2649 		return;
2650 	}
2651 	spdk_tree_free_buffers(file->tree);
2652 
2653 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2654 	/* If not freed, put it in the end of the queue */
2655 	if (file->tree->present_mask != 0) {
2656 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2657 	}
2658 	file->last = NULL;
2659 	pthread_spin_unlock(&g_caches_lock);
2660 	pthread_spin_unlock(&file->lock);
2661 }
2662 
2663 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2664 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2665