xref: /spdk/lib/blobfs/blobfs.c (revision 2db73782738848b03c58b17cb7b3b8578e4eb8ac)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	struct iovec *iovs;
138 	uint32_t iovcnt;
139 	struct iovec iov;
140 	union {
141 		struct {
142 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
143 		} fs_load;
144 		struct {
145 			uint64_t	length;
146 		} truncate;
147 		struct {
148 			struct spdk_io_channel	*channel;
149 			void		*pin_buf;
150 			int		is_read;
151 			off_t		offset;
152 			size_t		length;
153 			uint64_t	start_lba;
154 			uint64_t	num_lba;
155 			uint32_t	blocklen;
156 		} rw;
157 		struct {
158 			const char	*old_name;
159 			const char	*new_name;
160 		} rename;
161 		struct {
162 			struct cache_buffer	*cache_buffer;
163 			uint64_t		length;
164 		} flush;
165 		struct {
166 			struct cache_buffer	*cache_buffer;
167 			uint64_t		length;
168 			uint64_t		offset;
169 		} readahead;
170 		struct {
171 			uint64_t			offset;
172 			TAILQ_ENTRY(spdk_fs_request)	tailq;
173 			bool				xattr_in_progress;
174 		} sync;
175 		struct {
176 			uint32_t			num_clusters;
177 		} resize;
178 		struct {
179 			const char	*name;
180 			uint32_t	flags;
181 			TAILQ_ENTRY(spdk_fs_request)	tailq;
182 		} open;
183 		struct {
184 			const char		*name;
185 			struct spdk_blob	*blob;
186 		} create;
187 		struct {
188 			const char	*name;
189 		} delete;
190 		struct {
191 			const char	*name;
192 		} stat;
193 	} op;
194 };
195 
196 static void cache_free_buffers(struct spdk_file *file);
197 
198 void
199 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
200 {
201 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
202 }
203 
204 static void
205 __initialize_cache(void)
206 {
207 	assert(g_cache_pool == NULL);
208 
209 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
210 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
211 					   CACHE_BUFFER_SIZE,
212 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
213 					   SPDK_ENV_SOCKET_ID_ANY);
214 	if (!g_cache_pool) {
215 		SPDK_ERRLOG("Create mempool failed, you may "
216 			    "increase the memory and try again\n");
217 		assert(false);
218 	}
219 	TAILQ_INIT(&g_caches);
220 	pthread_spin_init(&g_caches_lock, 0);
221 }
222 
223 static void
224 __free_cache(void)
225 {
226 	assert(g_cache_pool != NULL);
227 
228 	spdk_mempool_free(g_cache_pool);
229 	g_cache_pool = NULL;
230 }
231 
232 static uint64_t
233 __file_get_blob_size(struct spdk_file *file)
234 {
235 	uint64_t cluster_sz;
236 
237 	cluster_sz = file->fs->bs_opts.cluster_sz;
238 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
239 }
240 
241 struct spdk_fs_request {
242 	struct spdk_fs_cb_args		args;
243 	TAILQ_ENTRY(spdk_fs_request)	link;
244 	struct spdk_fs_channel		*channel;
245 };
246 
247 struct spdk_fs_channel {
248 	struct spdk_fs_request		*req_mem;
249 	TAILQ_HEAD(, spdk_fs_request)	reqs;
250 	sem_t				sem;
251 	struct spdk_filesystem		*fs;
252 	struct spdk_io_channel		*bs_channel;
253 	fs_send_request_fn		send_request;
254 	bool				sync;
255 	pthread_spinlock_t		lock;
256 };
257 
258 /* For now, this is effectively an alias. But eventually we'll shift
259  * some data members over. */
260 struct spdk_fs_thread_ctx {
261 	struct spdk_fs_channel	ch;
262 };
263 
264 static struct spdk_fs_request *
265 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
266 {
267 	struct spdk_fs_request *req;
268 	struct iovec *iovs = NULL;
269 
270 	if (iovcnt > 1) {
271 		iovs = calloc(iovcnt, sizeof(struct iovec));
272 		if (!iovs) {
273 			return NULL;
274 		}
275 	}
276 
277 	if (channel->sync) {
278 		pthread_spin_lock(&channel->lock);
279 	}
280 
281 	req = TAILQ_FIRST(&channel->reqs);
282 	if (req) {
283 		TAILQ_REMOVE(&channel->reqs, req, link);
284 	}
285 
286 	if (channel->sync) {
287 		pthread_spin_unlock(&channel->lock);
288 	}
289 
290 	if (req == NULL) {
291 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
292 		free(iovs);
293 		return NULL;
294 	}
295 	memset(req, 0, sizeof(*req));
296 	req->channel = channel;
297 	if (iovcnt > 1) {
298 		req->args.iovs = iovs;
299 	} else {
300 		req->args.iovs = &req->args.iov;
301 	}
302 	req->args.iovcnt = iovcnt;
303 
304 	return req;
305 }
306 
307 static struct spdk_fs_request *
308 alloc_fs_request(struct spdk_fs_channel *channel)
309 {
310 	return alloc_fs_request_with_iov(channel, 0);
311 }
312 
313 static void
314 free_fs_request(struct spdk_fs_request *req)
315 {
316 	struct spdk_fs_channel *channel = req->channel;
317 
318 	if (req->args.iovcnt > 1) {
319 		free(req->args.iovs);
320 	}
321 
322 	if (channel->sync) {
323 		pthread_spin_lock(&channel->lock);
324 	}
325 
326 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
327 
328 	if (channel->sync) {
329 		pthread_spin_unlock(&channel->lock);
330 	}
331 }
332 
333 static int
334 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
335 			uint32_t max_ops)
336 {
337 	uint32_t i;
338 
339 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
340 	if (!channel->req_mem) {
341 		return -1;
342 	}
343 
344 	TAILQ_INIT(&channel->reqs);
345 	sem_init(&channel->sem, 0, 0);
346 
347 	for (i = 0; i < max_ops; i++) {
348 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
349 	}
350 
351 	channel->fs = fs;
352 
353 	return 0;
354 }
355 
356 static int
357 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
358 {
359 	struct spdk_filesystem		*fs;
360 	struct spdk_fs_channel		*channel = ctx_buf;
361 
362 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
363 
364 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
365 }
366 
367 static int
368 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
369 {
370 	struct spdk_filesystem		*fs;
371 	struct spdk_fs_channel		*channel = ctx_buf;
372 
373 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
374 
375 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
376 }
377 
378 static int
379 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
380 {
381 	struct spdk_filesystem		*fs;
382 	struct spdk_fs_channel		*channel = ctx_buf;
383 
384 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
385 
386 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
387 }
388 
389 static void
390 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
391 {
392 	struct spdk_fs_channel *channel = ctx_buf;
393 
394 	free(channel->req_mem);
395 	if (channel->bs_channel != NULL) {
396 		spdk_bs_free_io_channel(channel->bs_channel);
397 	}
398 }
399 
400 static void
401 __send_request_direct(fs_request_fn fn, void *arg)
402 {
403 	fn(arg);
404 }
405 
406 static void
407 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
408 {
409 	fs->bs = bs;
410 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
411 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
412 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
413 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
414 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
415 
416 	pthread_mutex_lock(&g_cache_init_lock);
417 	if (g_fs_count == 0) {
418 		__initialize_cache();
419 	}
420 	g_fs_count++;
421 	pthread_mutex_unlock(&g_cache_init_lock);
422 }
423 
424 static void
425 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
426 {
427 	struct spdk_fs_request *req = ctx;
428 	struct spdk_fs_cb_args *args = &req->args;
429 	struct spdk_filesystem *fs = args->fs;
430 
431 	if (bserrno == 0) {
432 		common_fs_bs_init(fs, bs);
433 	} else {
434 		free(fs);
435 		fs = NULL;
436 	}
437 
438 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
439 	free_fs_request(req);
440 }
441 
442 static void
443 fs_conf_parse(void)
444 {
445 	struct spdk_conf_section *sp;
446 
447 	sp = spdk_conf_find_section(NULL, "Blobfs");
448 	if (sp == NULL) {
449 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
450 		return;
451 	}
452 
453 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
454 	if (g_fs_cache_buffer_shift <= 0) {
455 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
456 	}
457 }
458 
459 static struct spdk_filesystem *
460 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
461 {
462 	struct spdk_filesystem *fs;
463 
464 	fs = calloc(1, sizeof(*fs));
465 	if (fs == NULL) {
466 		return NULL;
467 	}
468 
469 	fs->bdev = dev;
470 	fs->send_request = send_request_fn;
471 	TAILQ_INIT(&fs->files);
472 
473 	fs->md_target.max_ops = 512;
474 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
475 				sizeof(struct spdk_fs_channel), "blobfs_md");
476 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
477 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
478 
479 	fs->sync_target.max_ops = 512;
480 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
481 				sizeof(struct spdk_fs_channel), "blobfs_sync");
482 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
483 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
484 
485 	fs->io_target.max_ops = 512;
486 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
487 				sizeof(struct spdk_fs_channel), "blobfs_io");
488 
489 	return fs;
490 }
491 
492 static void
493 __wake_caller(void *arg, int fserrno)
494 {
495 	struct spdk_fs_cb_args *args = arg;
496 
497 	args->rc = fserrno;
498 	sem_post(args->sem);
499 }
500 
501 void
502 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
503 	     fs_send_request_fn send_request_fn,
504 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
505 {
506 	struct spdk_filesystem *fs;
507 	struct spdk_fs_request *req;
508 	struct spdk_fs_cb_args *args;
509 	struct spdk_bs_opts opts = {};
510 
511 	fs = fs_alloc(dev, send_request_fn);
512 	if (fs == NULL) {
513 		cb_fn(cb_arg, NULL, -ENOMEM);
514 		return;
515 	}
516 
517 	fs_conf_parse();
518 
519 	req = alloc_fs_request(fs->md_target.md_fs_channel);
520 	if (req == NULL) {
521 		spdk_put_io_channel(fs->md_target.md_io_channel);
522 		spdk_io_device_unregister(&fs->md_target, NULL);
523 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
524 		spdk_io_device_unregister(&fs->sync_target, NULL);
525 		spdk_io_device_unregister(&fs->io_target, NULL);
526 		free(fs);
527 		cb_fn(cb_arg, NULL, -ENOMEM);
528 		return;
529 	}
530 
531 	args = &req->args;
532 	args->fn.fs_op_with_handle = cb_fn;
533 	args->arg = cb_arg;
534 	args->fs = fs;
535 
536 	spdk_bs_opts_init(&opts);
537 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
538 	if (opt) {
539 		opts.cluster_sz = opt->cluster_sz;
540 	}
541 	spdk_bs_init(dev, &opts, init_cb, req);
542 }
543 
544 static struct spdk_file *
545 file_alloc(struct spdk_filesystem *fs)
546 {
547 	struct spdk_file *file;
548 
549 	file = calloc(1, sizeof(*file));
550 	if (file == NULL) {
551 		return NULL;
552 	}
553 
554 	file->tree = calloc(1, sizeof(*file->tree));
555 	if (file->tree == NULL) {
556 		free(file);
557 		return NULL;
558 	}
559 
560 	file->fs = fs;
561 	TAILQ_INIT(&file->open_requests);
562 	TAILQ_INIT(&file->sync_requests);
563 	pthread_spin_init(&file->lock, 0);
564 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
565 	file->priority = SPDK_FILE_PRIORITY_LOW;
566 	return file;
567 }
568 
569 static void fs_load_done(void *ctx, int bserrno);
570 
571 static int
572 _handle_deleted_files(struct spdk_fs_request *req)
573 {
574 	struct spdk_fs_cb_args *args = &req->args;
575 	struct spdk_filesystem *fs = args->fs;
576 
577 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
578 		struct spdk_deleted_file *deleted_file;
579 
580 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
581 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
582 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
583 		free(deleted_file);
584 		return 0;
585 	}
586 
587 	return 1;
588 }
589 
590 static void
591 fs_load_done(void *ctx, int bserrno)
592 {
593 	struct spdk_fs_request *req = ctx;
594 	struct spdk_fs_cb_args *args = &req->args;
595 	struct spdk_filesystem *fs = args->fs;
596 
597 	/* The filesystem has been loaded.  Now check if there are any files that
598 	 *  were marked for deletion before last unload.  Do not complete the
599 	 *  fs_load callback until all of them have been deleted on disk.
600 	 */
601 	if (_handle_deleted_files(req) == 0) {
602 		/* We found a file that's been marked for deleting but not actually
603 		 *  deleted yet.  This function will get called again once the delete
604 		 *  operation is completed.
605 		 */
606 		return;
607 	}
608 
609 	args->fn.fs_op_with_handle(args->arg, fs, 0);
610 	free_fs_request(req);
611 
612 }
613 
614 static void
615 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
616 {
617 	struct spdk_fs_request *req = ctx;
618 	struct spdk_fs_cb_args *args = &req->args;
619 	struct spdk_filesystem *fs = args->fs;
620 	uint64_t *length;
621 	const char *name;
622 	uint32_t *is_deleted;
623 	size_t value_len;
624 
625 	if (rc < 0) {
626 		args->fn.fs_op_with_handle(args->arg, fs, rc);
627 		free_fs_request(req);
628 		return;
629 	}
630 
631 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
632 	if (rc < 0) {
633 		args->fn.fs_op_with_handle(args->arg, fs, rc);
634 		free_fs_request(req);
635 		return;
636 	}
637 
638 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
639 	if (rc < 0) {
640 		args->fn.fs_op_with_handle(args->arg, fs, rc);
641 		free_fs_request(req);
642 		return;
643 	}
644 
645 	assert(value_len == 8);
646 
647 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
648 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
649 	if (rc < 0) {
650 		struct spdk_file *f;
651 
652 		f = file_alloc(fs);
653 		if (f == NULL) {
654 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
655 			free_fs_request(req);
656 			return;
657 		}
658 
659 		f->name = strdup(name);
660 		f->blobid = spdk_blob_get_id(blob);
661 		f->length = *length;
662 		f->length_flushed = *length;
663 		f->append_pos = *length;
664 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
665 	} else {
666 		struct spdk_deleted_file *deleted_file;
667 
668 		deleted_file = calloc(1, sizeof(*deleted_file));
669 		if (deleted_file == NULL) {
670 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
671 			free_fs_request(req);
672 			return;
673 		}
674 		deleted_file->id = spdk_blob_get_id(blob);
675 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
676 	}
677 }
678 
679 static void
680 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
681 {
682 	struct spdk_fs_request *req = ctx;
683 	struct spdk_fs_cb_args *args = &req->args;
684 	struct spdk_filesystem *fs = args->fs;
685 	struct spdk_bs_type bstype;
686 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
687 	static const struct spdk_bs_type zeros;
688 
689 	if (bserrno != 0) {
690 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
691 		free_fs_request(req);
692 		free(fs);
693 		return;
694 	}
695 
696 	bstype = spdk_bs_get_bstype(bs);
697 
698 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
699 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
700 		spdk_bs_set_bstype(bs, blobfs_type);
701 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
702 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
703 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
704 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
705 		free_fs_request(req);
706 		free(fs);
707 		return;
708 	}
709 
710 	common_fs_bs_init(fs, bs);
711 	fs_load_done(req, 0);
712 }
713 
714 static void
715 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
716 {
717 	assert(fs != NULL);
718 	spdk_io_device_unregister(&fs->md_target, NULL);
719 	spdk_io_device_unregister(&fs->sync_target, NULL);
720 	spdk_io_device_unregister(&fs->io_target, NULL);
721 	free(fs);
722 }
723 
724 static void
725 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
726 {
727 	assert(fs != NULL);
728 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
729 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
730 }
731 
732 void
733 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
734 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
735 {
736 	struct spdk_filesystem *fs;
737 	struct spdk_fs_cb_args *args;
738 	struct spdk_fs_request *req;
739 	struct spdk_bs_opts	bs_opts;
740 
741 	fs = fs_alloc(dev, send_request_fn);
742 	if (fs == NULL) {
743 		cb_fn(cb_arg, NULL, -ENOMEM);
744 		return;
745 	}
746 
747 	fs_conf_parse();
748 
749 	req = alloc_fs_request(fs->md_target.md_fs_channel);
750 	if (req == NULL) {
751 		spdk_fs_free_io_channels(fs);
752 		spdk_fs_io_device_unregister(fs);
753 		cb_fn(cb_arg, NULL, -ENOMEM);
754 		return;
755 	}
756 
757 	args = &req->args;
758 	args->fn.fs_op_with_handle = cb_fn;
759 	args->arg = cb_arg;
760 	args->fs = fs;
761 	TAILQ_INIT(&args->op.fs_load.deleted_files);
762 	spdk_bs_opts_init(&bs_opts);
763 	bs_opts.iter_cb_fn = iter_cb;
764 	bs_opts.iter_cb_arg = req;
765 	spdk_bs_load(dev, &bs_opts, load_cb, req);
766 }
767 
768 static void
769 unload_cb(void *ctx, int bserrno)
770 {
771 	struct spdk_fs_request *req = ctx;
772 	struct spdk_fs_cb_args *args = &req->args;
773 	struct spdk_filesystem *fs = args->fs;
774 	struct spdk_file *file, *tmp;
775 
776 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
777 		TAILQ_REMOVE(&fs->files, file, tailq);
778 		cache_free_buffers(file);
779 		free(file->name);
780 		free(file->tree);
781 		free(file);
782 	}
783 
784 	pthread_mutex_lock(&g_cache_init_lock);
785 	g_fs_count--;
786 	if (g_fs_count == 0) {
787 		__free_cache();
788 	}
789 	pthread_mutex_unlock(&g_cache_init_lock);
790 
791 	args->fn.fs_op(args->arg, bserrno);
792 	free(req);
793 
794 	spdk_fs_io_device_unregister(fs);
795 }
796 
797 void
798 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
799 {
800 	struct spdk_fs_request *req;
801 	struct spdk_fs_cb_args *args;
802 
803 	/*
804 	 * We must free the md_channel before unloading the blobstore, so just
805 	 *  allocate this request from the general heap.
806 	 */
807 	req = calloc(1, sizeof(*req));
808 	if (req == NULL) {
809 		cb_fn(cb_arg, -ENOMEM);
810 		return;
811 	}
812 
813 	args = &req->args;
814 	args->fn.fs_op = cb_fn;
815 	args->arg = cb_arg;
816 	args->fs = fs;
817 
818 	spdk_fs_free_io_channels(fs);
819 	spdk_bs_unload(fs->bs, unload_cb, req);
820 }
821 
822 static struct spdk_file *
823 fs_find_file(struct spdk_filesystem *fs, const char *name)
824 {
825 	struct spdk_file *file;
826 
827 	TAILQ_FOREACH(file, &fs->files, tailq) {
828 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
829 			return file;
830 		}
831 	}
832 
833 	return NULL;
834 }
835 
836 void
837 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
838 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
839 {
840 	struct spdk_file_stat stat;
841 	struct spdk_file *f = NULL;
842 
843 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
844 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
845 		return;
846 	}
847 
848 	f = fs_find_file(fs, name);
849 	if (f != NULL) {
850 		stat.blobid = f->blobid;
851 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
852 		cb_fn(cb_arg, &stat, 0);
853 		return;
854 	}
855 
856 	cb_fn(cb_arg, NULL, -ENOENT);
857 }
858 
859 static void
860 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
861 {
862 	struct spdk_fs_request *req = arg;
863 	struct spdk_fs_cb_args *args = &req->args;
864 
865 	args->rc = fserrno;
866 	if (fserrno == 0) {
867 		memcpy(args->arg, stat, sizeof(*stat));
868 	}
869 	sem_post(args->sem);
870 }
871 
872 static void
873 __file_stat(void *arg)
874 {
875 	struct spdk_fs_request *req = arg;
876 	struct spdk_fs_cb_args *args = &req->args;
877 
878 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
879 				args->fn.stat_op, req);
880 }
881 
882 int
883 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
884 		  const char *name, struct spdk_file_stat *stat)
885 {
886 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
887 	struct spdk_fs_request *req;
888 	int rc;
889 
890 	req = alloc_fs_request(channel);
891 	if (req == NULL) {
892 		return -ENOMEM;
893 	}
894 
895 	req->args.fs = fs;
896 	req->args.op.stat.name = name;
897 	req->args.fn.stat_op = __copy_stat;
898 	req->args.arg = stat;
899 	req->args.sem = &channel->sem;
900 	channel->send_request(__file_stat, req);
901 	sem_wait(&channel->sem);
902 
903 	rc = req->args.rc;
904 	free_fs_request(req);
905 
906 	return rc;
907 }
908 
909 static void
910 fs_create_blob_close_cb(void *ctx, int bserrno)
911 {
912 	int rc;
913 	struct spdk_fs_request *req = ctx;
914 	struct spdk_fs_cb_args *args = &req->args;
915 
916 	rc = args->rc ? args->rc : bserrno;
917 	args->fn.file_op(args->arg, rc);
918 	free_fs_request(req);
919 }
920 
921 static void
922 fs_create_blob_resize_cb(void *ctx, int bserrno)
923 {
924 	struct spdk_fs_request *req = ctx;
925 	struct spdk_fs_cb_args *args = &req->args;
926 	struct spdk_file *f = args->file;
927 	struct spdk_blob *blob = args->op.create.blob;
928 	uint64_t length = 0;
929 
930 	args->rc = bserrno;
931 	if (bserrno) {
932 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
933 		return;
934 	}
935 
936 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
937 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
938 
939 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
940 }
941 
942 static void
943 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
944 {
945 	struct spdk_fs_request *req = ctx;
946 	struct spdk_fs_cb_args *args = &req->args;
947 
948 	if (bserrno) {
949 		args->fn.file_op(args->arg, bserrno);
950 		free_fs_request(req);
951 		return;
952 	}
953 
954 	args->op.create.blob = blob;
955 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
956 }
957 
958 static void
959 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
960 {
961 	struct spdk_fs_request *req = ctx;
962 	struct spdk_fs_cb_args *args = &req->args;
963 	struct spdk_file *f = args->file;
964 
965 	if (bserrno) {
966 		args->fn.file_op(args->arg, bserrno);
967 		free_fs_request(req);
968 		return;
969 	}
970 
971 	f->blobid = blobid;
972 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
973 }
974 
975 void
976 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
977 			  spdk_file_op_complete cb_fn, void *cb_arg)
978 {
979 	struct spdk_file *file;
980 	struct spdk_fs_request *req;
981 	struct spdk_fs_cb_args *args;
982 
983 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
984 		cb_fn(cb_arg, -ENAMETOOLONG);
985 		return;
986 	}
987 
988 	file = fs_find_file(fs, name);
989 	if (file != NULL) {
990 		cb_fn(cb_arg, -EEXIST);
991 		return;
992 	}
993 
994 	file = file_alloc(fs);
995 	if (file == NULL) {
996 		cb_fn(cb_arg, -ENOMEM);
997 		return;
998 	}
999 
1000 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1001 	if (req == NULL) {
1002 		cb_fn(cb_arg, -ENOMEM);
1003 		return;
1004 	}
1005 
1006 	args = &req->args;
1007 	args->file = file;
1008 	args->fn.file_op = cb_fn;
1009 	args->arg = cb_arg;
1010 
1011 	file->name = strdup(name);
1012 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1013 }
1014 
1015 static void
1016 __fs_create_file_done(void *arg, int fserrno)
1017 {
1018 	struct spdk_fs_request *req = arg;
1019 	struct spdk_fs_cb_args *args = &req->args;
1020 
1021 	args->rc = fserrno;
1022 	sem_post(args->sem);
1023 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1024 }
1025 
1026 static void
1027 __fs_create_file(void *arg)
1028 {
1029 	struct spdk_fs_request *req = arg;
1030 	struct spdk_fs_cb_args *args = &req->args;
1031 
1032 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1033 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1034 }
1035 
1036 int
1037 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1038 {
1039 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1040 	struct spdk_fs_request *req;
1041 	struct spdk_fs_cb_args *args;
1042 	int rc;
1043 
1044 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1045 
1046 	req = alloc_fs_request(channel);
1047 	if (req == NULL) {
1048 		return -ENOMEM;
1049 	}
1050 
1051 	args = &req->args;
1052 	args->fs = fs;
1053 	args->op.create.name = name;
1054 	args->sem = &channel->sem;
1055 	fs->send_request(__fs_create_file, req);
1056 	sem_wait(&channel->sem);
1057 	rc = args->rc;
1058 	free_fs_request(req);
1059 
1060 	return rc;
1061 }
1062 
1063 static void
1064 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1065 {
1066 	struct spdk_fs_request *req = ctx;
1067 	struct spdk_fs_cb_args *args = &req->args;
1068 	struct spdk_file *f = args->file;
1069 
1070 	f->blob = blob;
1071 	while (!TAILQ_EMPTY(&f->open_requests)) {
1072 		req = TAILQ_FIRST(&f->open_requests);
1073 		args = &req->args;
1074 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1075 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1076 		free_fs_request(req);
1077 	}
1078 }
1079 
1080 static void
1081 fs_open_blob_create_cb(void *ctx, int bserrno)
1082 {
1083 	struct spdk_fs_request *req = ctx;
1084 	struct spdk_fs_cb_args *args = &req->args;
1085 	struct spdk_file *file = args->file;
1086 	struct spdk_filesystem *fs = args->fs;
1087 
1088 	if (file == NULL) {
1089 		/*
1090 		 * This is from an open with CREATE flag - the file
1091 		 *  is now created so look it up in the file list for this
1092 		 *  filesystem.
1093 		 */
1094 		file = fs_find_file(fs, args->op.open.name);
1095 		assert(file != NULL);
1096 		args->file = file;
1097 	}
1098 
1099 	file->ref_count++;
1100 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1101 	if (file->ref_count == 1) {
1102 		assert(file->blob == NULL);
1103 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1104 	} else if (file->blob != NULL) {
1105 		fs_open_blob_done(req, file->blob, 0);
1106 	} else {
1107 		/*
1108 		 * The blob open for this file is in progress due to a previous
1109 		 *  open request.  When that open completes, it will invoke the
1110 		 *  open callback for this request.
1111 		 */
1112 	}
1113 }
1114 
1115 void
1116 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1117 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1118 {
1119 	struct spdk_file *f = NULL;
1120 	struct spdk_fs_request *req;
1121 	struct spdk_fs_cb_args *args;
1122 
1123 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1124 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1125 		return;
1126 	}
1127 
1128 	f = fs_find_file(fs, name);
1129 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1130 		cb_fn(cb_arg, NULL, -ENOENT);
1131 		return;
1132 	}
1133 
1134 	if (f != NULL && f->is_deleted == true) {
1135 		cb_fn(cb_arg, NULL, -ENOENT);
1136 		return;
1137 	}
1138 
1139 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1140 	if (req == NULL) {
1141 		cb_fn(cb_arg, NULL, -ENOMEM);
1142 		return;
1143 	}
1144 
1145 	args = &req->args;
1146 	args->fn.file_op_with_handle = cb_fn;
1147 	args->arg = cb_arg;
1148 	args->file = f;
1149 	args->fs = fs;
1150 	args->op.open.name = name;
1151 
1152 	if (f == NULL) {
1153 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1154 	} else {
1155 		fs_open_blob_create_cb(req, 0);
1156 	}
1157 }
1158 
1159 static void
1160 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1161 {
1162 	struct spdk_fs_request *req = arg;
1163 	struct spdk_fs_cb_args *args = &req->args;
1164 
1165 	args->file = file;
1166 	__wake_caller(args, bserrno);
1167 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1168 }
1169 
1170 static void
1171 __fs_open_file(void *arg)
1172 {
1173 	struct spdk_fs_request *req = arg;
1174 	struct spdk_fs_cb_args *args = &req->args;
1175 
1176 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1177 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1178 				__fs_open_file_done, req);
1179 }
1180 
1181 int
1182 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1183 		  const char *name, uint32_t flags, struct spdk_file **file)
1184 {
1185 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1186 	struct spdk_fs_request *req;
1187 	struct spdk_fs_cb_args *args;
1188 	int rc;
1189 
1190 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1191 
1192 	req = alloc_fs_request(channel);
1193 	if (req == NULL) {
1194 		return -ENOMEM;
1195 	}
1196 
1197 	args = &req->args;
1198 	args->fs = fs;
1199 	args->op.open.name = name;
1200 	args->op.open.flags = flags;
1201 	args->sem = &channel->sem;
1202 	fs->send_request(__fs_open_file, req);
1203 	sem_wait(&channel->sem);
1204 	rc = args->rc;
1205 	if (rc == 0) {
1206 		*file = args->file;
1207 	} else {
1208 		*file = NULL;
1209 	}
1210 	free_fs_request(req);
1211 
1212 	return rc;
1213 }
1214 
1215 static void
1216 fs_rename_blob_close_cb(void *ctx, int bserrno)
1217 {
1218 	struct spdk_fs_request *req = ctx;
1219 	struct spdk_fs_cb_args *args = &req->args;
1220 
1221 	args->fn.fs_op(args->arg, bserrno);
1222 	free_fs_request(req);
1223 }
1224 
1225 static void
1226 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1227 {
1228 	struct spdk_fs_request *req = ctx;
1229 	struct spdk_fs_cb_args *args = &req->args;
1230 	const char *new_name = args->op.rename.new_name;
1231 
1232 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1233 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1234 }
1235 
1236 static void
1237 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1238 {
1239 	struct spdk_fs_cb_args *args = &req->args;
1240 	struct spdk_file *f;
1241 
1242 	f = fs_find_file(args->fs, args->op.rename.old_name);
1243 	if (f == NULL) {
1244 		args->fn.fs_op(args->arg, -ENOENT);
1245 		free_fs_request(req);
1246 		return;
1247 	}
1248 
1249 	free(f->name);
1250 	f->name = strdup(args->op.rename.new_name);
1251 	args->file = f;
1252 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1253 }
1254 
1255 static void
1256 fs_rename_delete_done(void *arg, int fserrno)
1257 {
1258 	__spdk_fs_md_rename_file(arg);
1259 }
1260 
1261 void
1262 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1263 			  const char *old_name, const char *new_name,
1264 			  spdk_file_op_complete cb_fn, void *cb_arg)
1265 {
1266 	struct spdk_file *f;
1267 	struct spdk_fs_request *req;
1268 	struct spdk_fs_cb_args *args;
1269 
1270 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1271 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1272 		cb_fn(cb_arg, -ENAMETOOLONG);
1273 		return;
1274 	}
1275 
1276 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1277 	if (req == NULL) {
1278 		cb_fn(cb_arg, -ENOMEM);
1279 		return;
1280 	}
1281 
1282 	args = &req->args;
1283 	args->fn.fs_op = cb_fn;
1284 	args->fs = fs;
1285 	args->arg = cb_arg;
1286 	args->op.rename.old_name = old_name;
1287 	args->op.rename.new_name = new_name;
1288 
1289 	f = fs_find_file(fs, new_name);
1290 	if (f == NULL) {
1291 		__spdk_fs_md_rename_file(req);
1292 		return;
1293 	}
1294 
1295 	/*
1296 	 * The rename overwrites an existing file.  So delete the existing file, then
1297 	 *  do the actual rename.
1298 	 */
1299 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1300 }
1301 
1302 static void
1303 __fs_rename_file_done(void *arg, int fserrno)
1304 {
1305 	struct spdk_fs_request *req = arg;
1306 	struct spdk_fs_cb_args *args = &req->args;
1307 
1308 	__wake_caller(args, fserrno);
1309 }
1310 
1311 static void
1312 __fs_rename_file(void *arg)
1313 {
1314 	struct spdk_fs_request *req = arg;
1315 	struct spdk_fs_cb_args *args = &req->args;
1316 
1317 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1318 				  __fs_rename_file_done, req);
1319 }
1320 
1321 int
1322 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1323 		    const char *old_name, const char *new_name)
1324 {
1325 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1326 	struct spdk_fs_request *req;
1327 	struct spdk_fs_cb_args *args;
1328 	int rc;
1329 
1330 	req = alloc_fs_request(channel);
1331 	if (req == NULL) {
1332 		return -ENOMEM;
1333 	}
1334 
1335 	args = &req->args;
1336 
1337 	args->fs = fs;
1338 	args->op.rename.old_name = old_name;
1339 	args->op.rename.new_name = new_name;
1340 	args->sem = &channel->sem;
1341 	fs->send_request(__fs_rename_file, req);
1342 	sem_wait(&channel->sem);
1343 	rc = args->rc;
1344 	free_fs_request(req);
1345 	return rc;
1346 }
1347 
1348 static void
1349 blob_delete_cb(void *ctx, int bserrno)
1350 {
1351 	struct spdk_fs_request *req = ctx;
1352 	struct spdk_fs_cb_args *args = &req->args;
1353 
1354 	args->fn.file_op(args->arg, bserrno);
1355 	free_fs_request(req);
1356 }
1357 
1358 void
1359 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1360 			  spdk_file_op_complete cb_fn, void *cb_arg)
1361 {
1362 	struct spdk_file *f;
1363 	spdk_blob_id blobid;
1364 	struct spdk_fs_request *req;
1365 	struct spdk_fs_cb_args *args;
1366 
1367 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1368 
1369 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1370 		cb_fn(cb_arg, -ENAMETOOLONG);
1371 		return;
1372 	}
1373 
1374 	f = fs_find_file(fs, name);
1375 	if (f == NULL) {
1376 		cb_fn(cb_arg, -ENOENT);
1377 		return;
1378 	}
1379 
1380 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1381 	if (req == NULL) {
1382 		cb_fn(cb_arg, -ENOMEM);
1383 		return;
1384 	}
1385 
1386 	args = &req->args;
1387 	args->fn.file_op = cb_fn;
1388 	args->arg = cb_arg;
1389 
1390 	if (f->ref_count > 0) {
1391 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1392 		f->is_deleted = true;
1393 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1394 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1395 		return;
1396 	}
1397 
1398 	TAILQ_REMOVE(&fs->files, f, tailq);
1399 
1400 	cache_free_buffers(f);
1401 
1402 	blobid = f->blobid;
1403 
1404 	free(f->name);
1405 	free(f->tree);
1406 	free(f);
1407 
1408 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1409 }
1410 
1411 static void
1412 __fs_delete_file_done(void *arg, int fserrno)
1413 {
1414 	struct spdk_fs_request *req = arg;
1415 	struct spdk_fs_cb_args *args = &req->args;
1416 
1417 	__wake_caller(args, fserrno);
1418 }
1419 
1420 static void
1421 __fs_delete_file(void *arg)
1422 {
1423 	struct spdk_fs_request *req = arg;
1424 	struct spdk_fs_cb_args *args = &req->args;
1425 
1426 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1427 }
1428 
1429 int
1430 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1431 		    const char *name)
1432 {
1433 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1434 	struct spdk_fs_request *req;
1435 	struct spdk_fs_cb_args *args;
1436 	int rc;
1437 
1438 	req = alloc_fs_request(channel);
1439 	if (req == NULL) {
1440 		return -ENOMEM;
1441 	}
1442 
1443 	args = &req->args;
1444 	args->fs = fs;
1445 	args->op.delete.name = name;
1446 	args->sem = &channel->sem;
1447 	fs->send_request(__fs_delete_file, req);
1448 	sem_wait(&channel->sem);
1449 	rc = args->rc;
1450 	free_fs_request(req);
1451 
1452 	return rc;
1453 }
1454 
1455 spdk_fs_iter
1456 spdk_fs_iter_first(struct spdk_filesystem *fs)
1457 {
1458 	struct spdk_file *f;
1459 
1460 	f = TAILQ_FIRST(&fs->files);
1461 	return f;
1462 }
1463 
1464 spdk_fs_iter
1465 spdk_fs_iter_next(spdk_fs_iter iter)
1466 {
1467 	struct spdk_file *f = iter;
1468 
1469 	if (f == NULL) {
1470 		return NULL;
1471 	}
1472 
1473 	f = TAILQ_NEXT(f, tailq);
1474 	return f;
1475 }
1476 
1477 const char *
1478 spdk_file_get_name(struct spdk_file *file)
1479 {
1480 	return file->name;
1481 }
1482 
1483 uint64_t
1484 spdk_file_get_length(struct spdk_file *file)
1485 {
1486 	uint64_t length;
1487 
1488 	assert(file != NULL);
1489 
1490 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1491 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1492 	return length;
1493 }
1494 
1495 static void
1496 fs_truncate_complete_cb(void *ctx, int bserrno)
1497 {
1498 	struct spdk_fs_request *req = ctx;
1499 	struct spdk_fs_cb_args *args = &req->args;
1500 
1501 	args->fn.file_op(args->arg, bserrno);
1502 	free_fs_request(req);
1503 }
1504 
1505 static void
1506 fs_truncate_resize_cb(void *ctx, int bserrno)
1507 {
1508 	struct spdk_fs_request *req = ctx;
1509 	struct spdk_fs_cb_args *args = &req->args;
1510 	struct spdk_file *file = args->file;
1511 	uint64_t *length = &args->op.truncate.length;
1512 
1513 	if (bserrno) {
1514 		args->fn.file_op(args->arg, bserrno);
1515 		free_fs_request(req);
1516 		return;
1517 	}
1518 
1519 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1520 
1521 	file->length = *length;
1522 	if (file->append_pos > file->length) {
1523 		file->append_pos = file->length;
1524 	}
1525 
1526 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1527 }
1528 
1529 static uint64_t
1530 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1531 {
1532 	return (length + cluster_sz - 1) / cluster_sz;
1533 }
1534 
1535 void
1536 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1537 			 spdk_file_op_complete cb_fn, void *cb_arg)
1538 {
1539 	struct spdk_filesystem *fs;
1540 	size_t num_clusters;
1541 	struct spdk_fs_request *req;
1542 	struct spdk_fs_cb_args *args;
1543 
1544 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1545 	if (length == file->length) {
1546 		cb_fn(cb_arg, 0);
1547 		return;
1548 	}
1549 
1550 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1551 	if (req == NULL) {
1552 		cb_fn(cb_arg, -ENOMEM);
1553 		return;
1554 	}
1555 
1556 	args = &req->args;
1557 	args->fn.file_op = cb_fn;
1558 	args->arg = cb_arg;
1559 	args->file = file;
1560 	args->op.truncate.length = length;
1561 	fs = file->fs;
1562 
1563 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1564 
1565 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1566 }
1567 
1568 static void
1569 __truncate(void *arg)
1570 {
1571 	struct spdk_fs_request *req = arg;
1572 	struct spdk_fs_cb_args *args = &req->args;
1573 
1574 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1575 				 args->fn.file_op, args);
1576 }
1577 
1578 int
1579 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1580 		   uint64_t length)
1581 {
1582 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1583 	struct spdk_fs_request *req;
1584 	struct spdk_fs_cb_args *args;
1585 	int rc;
1586 
1587 	req = alloc_fs_request(channel);
1588 	if (req == NULL) {
1589 		return -ENOMEM;
1590 	}
1591 
1592 	args = &req->args;
1593 
1594 	args->file = file;
1595 	args->op.truncate.length = length;
1596 	args->fn.file_op = __wake_caller;
1597 	args->sem = &channel->sem;
1598 
1599 	channel->send_request(__truncate, req);
1600 	sem_wait(&channel->sem);
1601 	rc = args->rc;
1602 	free_fs_request(req);
1603 
1604 	return rc;
1605 }
1606 
1607 static void
1608 __rw_done(void *ctx, int bserrno)
1609 {
1610 	struct spdk_fs_request *req = ctx;
1611 	struct spdk_fs_cb_args *args = &req->args;
1612 
1613 	spdk_free(args->op.rw.pin_buf);
1614 	args->fn.file_op(args->arg, bserrno);
1615 	free_fs_request(req);
1616 }
1617 
1618 static void
1619 __read_done(void *ctx, int bserrno)
1620 {
1621 	struct spdk_fs_request *req = ctx;
1622 	struct spdk_fs_cb_args *args = &req->args;
1623 
1624 	assert(req != NULL);
1625 	if (args->op.rw.is_read) {
1626 		memcpy(args->iovs[0].iov_base,
1627 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1628 		       args->iovs[0].iov_len);
1629 		__rw_done(req, 0);
1630 	} else {
1631 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1632 		       args->iovs[0].iov_base,
1633 		       args->iovs[0].iov_len);
1634 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1635 				   args->op.rw.pin_buf,
1636 				   args->op.rw.start_lba, args->op.rw.num_lba,
1637 				   __rw_done, req);
1638 	}
1639 }
1640 
1641 static void
1642 __do_blob_read(void *ctx, int fserrno)
1643 {
1644 	struct spdk_fs_request *req = ctx;
1645 	struct spdk_fs_cb_args *args = &req->args;
1646 
1647 	if (fserrno) {
1648 		__rw_done(req, fserrno);
1649 		return;
1650 	}
1651 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1652 			  args->op.rw.pin_buf,
1653 			  args->op.rw.start_lba, args->op.rw.num_lba,
1654 			  __read_done, req);
1655 }
1656 
1657 static void
1658 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1659 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1660 {
1661 	uint64_t end_lba;
1662 
1663 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1664 	*start_lba = offset / *lba_size;
1665 	end_lba = (offset + length - 1) / *lba_size;
1666 	*num_lba = (end_lba - *start_lba + 1);
1667 }
1668 
1669 static void
1670 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1671 	    void *payload, uint64_t offset, uint64_t length,
1672 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1673 {
1674 	struct spdk_fs_request *req;
1675 	struct spdk_fs_cb_args *args;
1676 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1677 	uint64_t start_lba, num_lba, pin_buf_length;
1678 	uint32_t lba_size;
1679 
1680 	if (is_read && offset + length > file->length) {
1681 		cb_fn(cb_arg, -EINVAL);
1682 		return;
1683 	}
1684 
1685 	req = alloc_fs_request_with_iov(channel, 1);
1686 	if (req == NULL) {
1687 		cb_fn(cb_arg, -ENOMEM);
1688 		return;
1689 	}
1690 
1691 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1692 
1693 	args = &req->args;
1694 	args->fn.file_op = cb_fn;
1695 	args->arg = cb_arg;
1696 	args->file = file;
1697 	args->op.rw.channel = channel->bs_channel;
1698 	args->iovs[0].iov_base = payload;
1699 	args->iovs[0].iov_len = (size_t)length;
1700 	args->op.rw.is_read = is_read;
1701 	args->op.rw.offset = offset;
1702 	args->op.rw.blocklen = lba_size;
1703 
1704 	pin_buf_length = num_lba * lba_size;
1705 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1706 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1707 	if (args->op.rw.pin_buf == NULL) {
1708 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1709 			      file->name, offset, length);
1710 		free_fs_request(req);
1711 		cb_fn(cb_arg, -ENOMEM);
1712 		return;
1713 	}
1714 
1715 	args->op.rw.start_lba = start_lba;
1716 	args->op.rw.num_lba = num_lba;
1717 
1718 	if (!is_read && file->length < offset + length) {
1719 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1720 	} else {
1721 		__do_blob_read(req, 0);
1722 	}
1723 }
1724 
1725 void
1726 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1727 		      void *payload, uint64_t offset, uint64_t length,
1728 		      spdk_file_op_complete cb_fn, void *cb_arg)
1729 {
1730 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1731 }
1732 
1733 void
1734 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1735 		     void *payload, uint64_t offset, uint64_t length,
1736 		     spdk_file_op_complete cb_fn, void *cb_arg)
1737 {
1738 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1739 		      file->name, offset, length);
1740 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1741 }
1742 
1743 struct spdk_io_channel *
1744 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1745 {
1746 	struct spdk_io_channel *io_channel;
1747 	struct spdk_fs_channel *fs_channel;
1748 
1749 	io_channel = spdk_get_io_channel(&fs->io_target);
1750 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1751 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1752 	fs_channel->send_request = __send_request_direct;
1753 
1754 	return io_channel;
1755 }
1756 
1757 void
1758 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1759 {
1760 	spdk_put_io_channel(channel);
1761 }
1762 
1763 struct spdk_fs_thread_ctx *
1764 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1765 {
1766 	struct spdk_fs_thread_ctx *ctx;
1767 
1768 	ctx = calloc(1, sizeof(*ctx));
1769 	if (!ctx) {
1770 		return NULL;
1771 	}
1772 
1773 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1774 
1775 	ctx->ch.send_request = fs->send_request;
1776 	ctx->ch.sync = 1;
1777 	pthread_spin_init(&ctx->ch.lock, 0);
1778 
1779 	return ctx;
1780 }
1781 
1782 
1783 void
1784 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1785 {
1786 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1787 	free(ctx);
1788 }
1789 
1790 void
1791 spdk_fs_set_cache_size(uint64_t size_in_mb)
1792 {
1793 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1794 }
1795 
1796 uint64_t
1797 spdk_fs_get_cache_size(void)
1798 {
1799 	return g_fs_cache_size / (1024 * 1024);
1800 }
1801 
1802 static void __file_flush(void *ctx);
1803 
1804 static void *
1805 alloc_cache_memory_buffer(struct spdk_file *context)
1806 {
1807 	struct spdk_file *file;
1808 	void *buf;
1809 
1810 	buf = spdk_mempool_get(g_cache_pool);
1811 	if (buf != NULL) {
1812 		return buf;
1813 	}
1814 
1815 	pthread_spin_lock(&g_caches_lock);
1816 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1817 		if (!file->open_for_writing &&
1818 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1819 		    file != context) {
1820 			break;
1821 		}
1822 	}
1823 	pthread_spin_unlock(&g_caches_lock);
1824 	if (file != NULL) {
1825 		cache_free_buffers(file);
1826 		buf = spdk_mempool_get(g_cache_pool);
1827 		if (buf != NULL) {
1828 			return buf;
1829 		}
1830 	}
1831 
1832 	pthread_spin_lock(&g_caches_lock);
1833 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1834 		if (!file->open_for_writing && file != context) {
1835 			break;
1836 		}
1837 	}
1838 	pthread_spin_unlock(&g_caches_lock);
1839 	if (file != NULL) {
1840 		cache_free_buffers(file);
1841 		buf = spdk_mempool_get(g_cache_pool);
1842 		if (buf != NULL) {
1843 			return buf;
1844 		}
1845 	}
1846 
1847 	pthread_spin_lock(&g_caches_lock);
1848 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1849 		if (file != context) {
1850 			break;
1851 		}
1852 	}
1853 	pthread_spin_unlock(&g_caches_lock);
1854 	if (file != NULL) {
1855 		cache_free_buffers(file);
1856 		buf = spdk_mempool_get(g_cache_pool);
1857 		if (buf != NULL) {
1858 			return buf;
1859 		}
1860 	}
1861 
1862 	return NULL;
1863 }
1864 
1865 static struct cache_buffer *
1866 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1867 {
1868 	struct cache_buffer *buf;
1869 	int count = 0;
1870 
1871 	buf = calloc(1, sizeof(*buf));
1872 	if (buf == NULL) {
1873 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1874 		return NULL;
1875 	}
1876 
1877 	buf->buf = alloc_cache_memory_buffer(file);
1878 	while (buf->buf == NULL) {
1879 		/*
1880 		 * TODO: alloc_cache_memory_buffer() should eventually free
1881 		 *  some buffers.  Need a more sophisticated check here, instead
1882 		 *  of just bailing if 100 tries does not result in getting a
1883 		 *  free buffer.  This will involve using the sync channel's
1884 		 *  semaphore to block until a buffer becomes available.
1885 		 */
1886 		if (count++ == 100) {
1887 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1888 				    file, offset);
1889 			free(buf);
1890 			return NULL;
1891 		}
1892 		buf->buf = alloc_cache_memory_buffer(file);
1893 	}
1894 
1895 	buf->buf_size = CACHE_BUFFER_SIZE;
1896 	buf->offset = offset;
1897 
1898 	pthread_spin_lock(&g_caches_lock);
1899 	if (file->tree->present_mask == 0) {
1900 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1901 	}
1902 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1903 	pthread_spin_unlock(&g_caches_lock);
1904 
1905 	return buf;
1906 }
1907 
1908 static struct cache_buffer *
1909 cache_append_buffer(struct spdk_file *file)
1910 {
1911 	struct cache_buffer *last;
1912 
1913 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1914 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1915 
1916 	last = cache_insert_buffer(file, file->append_pos);
1917 	if (last == NULL) {
1918 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1919 		return NULL;
1920 	}
1921 
1922 	file->last = last;
1923 
1924 	return last;
1925 }
1926 
1927 static void __check_sync_reqs(struct spdk_file *file);
1928 
1929 static void
1930 __file_cache_finish_sync(void *ctx, int bserrno)
1931 {
1932 	struct spdk_file *file = ctx;
1933 	struct spdk_fs_request *sync_req;
1934 	struct spdk_fs_cb_args *sync_args;
1935 
1936 	pthread_spin_lock(&file->lock);
1937 	sync_req = TAILQ_FIRST(&file->sync_requests);
1938 	sync_args = &sync_req->args;
1939 	assert(sync_args->op.sync.offset <= file->length_flushed);
1940 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1941 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1942 	pthread_spin_unlock(&file->lock);
1943 
1944 	sync_args->fn.file_op(sync_args->arg, bserrno);
1945 	__check_sync_reqs(file);
1946 
1947 	pthread_spin_lock(&file->lock);
1948 	free_fs_request(sync_req);
1949 	pthread_spin_unlock(&file->lock);
1950 }
1951 
1952 static void
1953 __check_sync_reqs(struct spdk_file *file)
1954 {
1955 	struct spdk_fs_request *sync_req;
1956 
1957 	pthread_spin_lock(&file->lock);
1958 
1959 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1960 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1961 			break;
1962 		}
1963 	}
1964 
1965 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1966 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1967 		sync_req->args.op.sync.xattr_in_progress = true;
1968 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1969 				    sizeof(file->length_flushed));
1970 
1971 		pthread_spin_unlock(&file->lock);
1972 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1973 	} else {
1974 		pthread_spin_unlock(&file->lock);
1975 	}
1976 }
1977 
1978 static void
1979 __file_flush_done(void *ctx, int bserrno)
1980 {
1981 	struct spdk_fs_request *req = ctx;
1982 	struct spdk_fs_cb_args *args = &req->args;
1983 	struct spdk_file *file = args->file;
1984 	struct cache_buffer *next = args->op.flush.cache_buffer;
1985 
1986 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1987 
1988 	pthread_spin_lock(&file->lock);
1989 	next->in_progress = false;
1990 	next->bytes_flushed += args->op.flush.length;
1991 	file->length_flushed += args->op.flush.length;
1992 	if (file->length_flushed > file->length) {
1993 		file->length = file->length_flushed;
1994 	}
1995 	if (next->bytes_flushed == next->buf_size) {
1996 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1997 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1998 	}
1999 
2000 	/*
2001 	 * Assert that there is no cached data that extends past the end of the underlying
2002 	 *  blob.
2003 	 */
2004 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2005 	       next->bytes_filled == 0);
2006 
2007 	pthread_spin_unlock(&file->lock);
2008 
2009 	__check_sync_reqs(file);
2010 
2011 	__file_flush(req);
2012 }
2013 
2014 static void
2015 __file_flush(void *ctx)
2016 {
2017 	struct spdk_fs_request *req = ctx;
2018 	struct spdk_fs_cb_args *args = &req->args;
2019 	struct spdk_file *file = args->file;
2020 	struct cache_buffer *next;
2021 	uint64_t offset, length, start_lba, num_lba;
2022 	uint32_t lba_size;
2023 
2024 	pthread_spin_lock(&file->lock);
2025 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2026 	if (next == NULL || next->in_progress) {
2027 		/*
2028 		 * There is either no data to flush, or a flush I/O is already in
2029 		 *  progress.  So return immediately - if a flush I/O is in
2030 		 *  progress we will flush more data after that is completed.
2031 		 */
2032 		free_fs_request(req);
2033 		if (next == NULL) {
2034 			/*
2035 			 * For cases where a file's cache was evicted, and then the
2036 			 *  file was later appended, we will write the data directly
2037 			 *  to disk and bypass cache.  So just update length_flushed
2038 			 *  here to reflect that all data was already written to disk.
2039 			 */
2040 			file->length_flushed = file->append_pos;
2041 		}
2042 		pthread_spin_unlock(&file->lock);
2043 		if (next == NULL) {
2044 			/*
2045 			 * There is no data to flush, but we still need to check for any
2046 			 *  outstanding sync requests to make sure metadata gets updated.
2047 			 */
2048 			__check_sync_reqs(file);
2049 		}
2050 		return;
2051 	}
2052 
2053 	offset = next->offset + next->bytes_flushed;
2054 	length = next->bytes_filled - next->bytes_flushed;
2055 	if (length == 0) {
2056 		free_fs_request(req);
2057 		pthread_spin_unlock(&file->lock);
2058 		return;
2059 	}
2060 	args->op.flush.length = length;
2061 	args->op.flush.cache_buffer = next;
2062 
2063 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2064 
2065 	next->in_progress = true;
2066 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2067 		     offset, length, start_lba, num_lba);
2068 	pthread_spin_unlock(&file->lock);
2069 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2070 			   next->buf + (start_lba * lba_size) - next->offset,
2071 			   start_lba, num_lba, __file_flush_done, req);
2072 }
2073 
2074 static void
2075 __file_extend_done(void *arg, int bserrno)
2076 {
2077 	struct spdk_fs_cb_args *args = arg;
2078 
2079 	__wake_caller(args, bserrno);
2080 }
2081 
2082 static void
2083 __file_extend_resize_cb(void *_args, int bserrno)
2084 {
2085 	struct spdk_fs_cb_args *args = _args;
2086 	struct spdk_file *file = args->file;
2087 
2088 	if (bserrno) {
2089 		__wake_caller(args, bserrno);
2090 		return;
2091 	}
2092 
2093 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2094 }
2095 
2096 static void
2097 __file_extend_blob(void *_args)
2098 {
2099 	struct spdk_fs_cb_args *args = _args;
2100 	struct spdk_file *file = args->file;
2101 
2102 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2103 }
2104 
2105 static void
2106 __rw_from_file_done(void *ctx, int bserrno)
2107 {
2108 	struct spdk_fs_request *req = ctx;
2109 
2110 	__wake_caller(&req->args, bserrno);
2111 	free_fs_request(req);
2112 }
2113 
2114 static void
2115 __rw_from_file(void *ctx)
2116 {
2117 	struct spdk_fs_request *req = ctx;
2118 	struct spdk_fs_cb_args *args = &req->args;
2119 	struct spdk_file *file = args->file;
2120 
2121 	if (args->op.rw.is_read) {
2122 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2123 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2124 				     __rw_from_file_done, req);
2125 	} else {
2126 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2127 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2128 				      __rw_from_file_done, req);
2129 	}
2130 }
2131 
2132 static int
2133 __send_rw_from_file(struct spdk_file *file, void *payload,
2134 		    uint64_t offset, uint64_t length, bool is_read,
2135 		    struct spdk_fs_channel *channel)
2136 {
2137 	struct spdk_fs_request *req;
2138 	struct spdk_fs_cb_args *args;
2139 
2140 	req = alloc_fs_request_with_iov(channel, 1);
2141 	if (req == NULL) {
2142 		sem_post(&channel->sem);
2143 		return -ENOMEM;
2144 	}
2145 
2146 	args = &req->args;
2147 	args->file = file;
2148 	args->sem = &channel->sem;
2149 	args->iovs[0].iov_base = payload;
2150 	args->iovs[0].iov_len = (size_t)length;
2151 	args->op.rw.offset = offset;
2152 	args->op.rw.is_read = is_read;
2153 	file->fs->send_request(__rw_from_file, req);
2154 	return 0;
2155 }
2156 
2157 int
2158 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2159 		void *payload, uint64_t offset, uint64_t length)
2160 {
2161 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2162 	struct spdk_fs_request *flush_req;
2163 	uint64_t rem_length, copy, blob_size, cluster_sz;
2164 	uint32_t cache_buffers_filled = 0;
2165 	uint8_t *cur_payload;
2166 	struct cache_buffer *last;
2167 
2168 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2169 
2170 	if (length == 0) {
2171 		return 0;
2172 	}
2173 
2174 	if (offset != file->append_pos) {
2175 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2176 		return -EINVAL;
2177 	}
2178 
2179 	pthread_spin_lock(&file->lock);
2180 	file->open_for_writing = true;
2181 
2182 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2183 		cache_append_buffer(file);
2184 	}
2185 
2186 	if (file->last == NULL) {
2187 		int rc;
2188 
2189 		file->append_pos += length;
2190 		pthread_spin_unlock(&file->lock);
2191 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2192 		sem_wait(&channel->sem);
2193 		return rc;
2194 	}
2195 
2196 	blob_size = __file_get_blob_size(file);
2197 
2198 	if ((offset + length) > blob_size) {
2199 		struct spdk_fs_cb_args extend_args = {};
2200 
2201 		cluster_sz = file->fs->bs_opts.cluster_sz;
2202 		extend_args.sem = &channel->sem;
2203 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2204 		extend_args.file = file;
2205 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2206 		pthread_spin_unlock(&file->lock);
2207 		file->fs->send_request(__file_extend_blob, &extend_args);
2208 		sem_wait(&channel->sem);
2209 		if (extend_args.rc) {
2210 			return extend_args.rc;
2211 		}
2212 	}
2213 
2214 	flush_req = alloc_fs_request(channel);
2215 	if (flush_req == NULL) {
2216 		pthread_spin_unlock(&file->lock);
2217 		return -ENOMEM;
2218 	}
2219 
2220 	last = file->last;
2221 	rem_length = length;
2222 	cur_payload = payload;
2223 	while (rem_length > 0) {
2224 		copy = last->buf_size - last->bytes_filled;
2225 		if (copy > rem_length) {
2226 			copy = rem_length;
2227 		}
2228 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2229 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2230 		file->append_pos += copy;
2231 		if (file->length < file->append_pos) {
2232 			file->length = file->append_pos;
2233 		}
2234 		cur_payload += copy;
2235 		last->bytes_filled += copy;
2236 		rem_length -= copy;
2237 		if (last->bytes_filled == last->buf_size) {
2238 			cache_buffers_filled++;
2239 			last = cache_append_buffer(file);
2240 			if (last == NULL) {
2241 				BLOBFS_TRACE(file, "nomem\n");
2242 				free_fs_request(flush_req);
2243 				pthread_spin_unlock(&file->lock);
2244 				return -ENOMEM;
2245 			}
2246 		}
2247 	}
2248 
2249 	pthread_spin_unlock(&file->lock);
2250 
2251 	if (cache_buffers_filled == 0) {
2252 		free_fs_request(flush_req);
2253 		return 0;
2254 	}
2255 
2256 	flush_req->args.file = file;
2257 	file->fs->send_request(__file_flush, flush_req);
2258 	return 0;
2259 }
2260 
2261 static void
2262 __readahead_done(void *ctx, int bserrno)
2263 {
2264 	struct spdk_fs_request *req = ctx;
2265 	struct spdk_fs_cb_args *args = &req->args;
2266 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2267 	struct spdk_file *file = args->file;
2268 
2269 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2270 
2271 	pthread_spin_lock(&file->lock);
2272 	cache_buffer->bytes_filled = args->op.readahead.length;
2273 	cache_buffer->bytes_flushed = args->op.readahead.length;
2274 	cache_buffer->in_progress = false;
2275 	pthread_spin_unlock(&file->lock);
2276 
2277 	free_fs_request(req);
2278 }
2279 
2280 static void
2281 __readahead(void *ctx)
2282 {
2283 	struct spdk_fs_request *req = ctx;
2284 	struct spdk_fs_cb_args *args = &req->args;
2285 	struct spdk_file *file = args->file;
2286 	uint64_t offset, length, start_lba, num_lba;
2287 	uint32_t lba_size;
2288 
2289 	offset = args->op.readahead.offset;
2290 	length = args->op.readahead.length;
2291 	assert(length > 0);
2292 
2293 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2294 
2295 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2296 		     offset, length, start_lba, num_lba);
2297 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2298 			  args->op.readahead.cache_buffer->buf,
2299 			  start_lba, num_lba, __readahead_done, req);
2300 }
2301 
2302 static uint64_t
2303 __next_cache_buffer_offset(uint64_t offset)
2304 {
2305 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2306 }
2307 
2308 static void
2309 check_readahead(struct spdk_file *file, uint64_t offset,
2310 		struct spdk_fs_channel *channel)
2311 {
2312 	struct spdk_fs_request *req;
2313 	struct spdk_fs_cb_args *args;
2314 
2315 	offset = __next_cache_buffer_offset(offset);
2316 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2317 		return;
2318 	}
2319 
2320 	req = alloc_fs_request(channel);
2321 	if (req == NULL) {
2322 		return;
2323 	}
2324 	args = &req->args;
2325 
2326 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2327 
2328 	args->file = file;
2329 	args->op.readahead.offset = offset;
2330 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2331 	if (!args->op.readahead.cache_buffer) {
2332 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2333 		free_fs_request(req);
2334 		return;
2335 	}
2336 
2337 	args->op.readahead.cache_buffer->in_progress = true;
2338 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2339 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2340 	} else {
2341 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2342 	}
2343 	file->fs->send_request(__readahead, req);
2344 }
2345 
2346 static int
2347 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2348 	    struct spdk_fs_channel *channel)
2349 {
2350 	struct cache_buffer *buf;
2351 	int rc;
2352 
2353 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2354 	if (buf == NULL) {
2355 		pthread_spin_unlock(&file->lock);
2356 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2357 		pthread_spin_lock(&file->lock);
2358 		return rc;
2359 	}
2360 
2361 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2362 		length = buf->offset + buf->bytes_filled - offset;
2363 	}
2364 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2365 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2366 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2367 		pthread_spin_lock(&g_caches_lock);
2368 		spdk_tree_remove_buffer(file->tree, buf);
2369 		if (file->tree->present_mask == 0) {
2370 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2371 		}
2372 		pthread_spin_unlock(&g_caches_lock);
2373 	}
2374 
2375 	sem_post(&channel->sem);
2376 	return 0;
2377 }
2378 
2379 int64_t
2380 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2381 	       void *payload, uint64_t offset, uint64_t length)
2382 {
2383 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2384 	uint64_t final_offset, final_length;
2385 	uint32_t sub_reads = 0;
2386 	int rc = 0;
2387 
2388 	pthread_spin_lock(&file->lock);
2389 
2390 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2391 
2392 	file->open_for_writing = false;
2393 
2394 	if (length == 0 || offset >= file->append_pos) {
2395 		pthread_spin_unlock(&file->lock);
2396 		return 0;
2397 	}
2398 
2399 	if (offset + length > file->append_pos) {
2400 		length = file->append_pos - offset;
2401 	}
2402 
2403 	if (offset != file->next_seq_offset) {
2404 		file->seq_byte_count = 0;
2405 	}
2406 	file->seq_byte_count += length;
2407 	file->next_seq_offset = offset + length;
2408 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2409 		check_readahead(file, offset, channel);
2410 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2411 	}
2412 
2413 	final_length = 0;
2414 	final_offset = offset + length;
2415 	while (offset < final_offset) {
2416 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2417 		if (length > (final_offset - offset)) {
2418 			length = final_offset - offset;
2419 		}
2420 		rc = __file_read(file, payload, offset, length, channel);
2421 		if (rc == 0) {
2422 			final_length += length;
2423 		} else {
2424 			break;
2425 		}
2426 		payload += length;
2427 		offset += length;
2428 		sub_reads++;
2429 	}
2430 	pthread_spin_unlock(&file->lock);
2431 	while (sub_reads-- > 0) {
2432 		sem_wait(&channel->sem);
2433 	}
2434 	if (rc == 0) {
2435 		return final_length;
2436 	} else {
2437 		return rc;
2438 	}
2439 }
2440 
2441 static void
2442 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2443 	   spdk_file_op_complete cb_fn, void *cb_arg)
2444 {
2445 	struct spdk_fs_request *sync_req;
2446 	struct spdk_fs_request *flush_req;
2447 	struct spdk_fs_cb_args *sync_args;
2448 	struct spdk_fs_cb_args *flush_args;
2449 
2450 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2451 
2452 	pthread_spin_lock(&file->lock);
2453 	if (file->append_pos <= file->length_flushed) {
2454 		BLOBFS_TRACE(file, "done - no data to flush\n");
2455 		pthread_spin_unlock(&file->lock);
2456 		cb_fn(cb_arg, 0);
2457 		return;
2458 	}
2459 
2460 	sync_req = alloc_fs_request(channel);
2461 	if (!sync_req) {
2462 		pthread_spin_unlock(&file->lock);
2463 		cb_fn(cb_arg, -ENOMEM);
2464 		return;
2465 	}
2466 	sync_args = &sync_req->args;
2467 
2468 	flush_req = alloc_fs_request(channel);
2469 	if (!flush_req) {
2470 		pthread_spin_unlock(&file->lock);
2471 		cb_fn(cb_arg, -ENOMEM);
2472 		return;
2473 	}
2474 	flush_args = &flush_req->args;
2475 
2476 	sync_args->file = file;
2477 	sync_args->fn.file_op = cb_fn;
2478 	sync_args->arg = cb_arg;
2479 	sync_args->op.sync.offset = file->append_pos;
2480 	sync_args->op.sync.xattr_in_progress = false;
2481 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2482 	pthread_spin_unlock(&file->lock);
2483 
2484 	flush_args->file = file;
2485 	channel->send_request(__file_flush, flush_req);
2486 }
2487 
2488 int
2489 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2490 {
2491 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2492 	struct spdk_fs_cb_args args = {};
2493 
2494 	args.sem = &channel->sem;
2495 	_file_sync(file, channel, __wake_caller, &args);
2496 	sem_wait(&channel->sem);
2497 
2498 	return args.rc;
2499 }
2500 
2501 void
2502 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2503 		     spdk_file_op_complete cb_fn, void *cb_arg)
2504 {
2505 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2506 
2507 	_file_sync(file, channel, cb_fn, cb_arg);
2508 }
2509 
2510 void
2511 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2512 {
2513 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2514 	file->priority = priority;
2515 
2516 }
2517 
2518 /*
2519  * Close routines
2520  */
2521 
2522 static void
2523 __file_close_async_done(void *ctx, int bserrno)
2524 {
2525 	struct spdk_fs_request *req = ctx;
2526 	struct spdk_fs_cb_args *args = &req->args;
2527 	struct spdk_file *file = args->file;
2528 
2529 	if (file->is_deleted) {
2530 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2531 		return;
2532 	}
2533 
2534 	args->fn.file_op(args->arg, bserrno);
2535 	free_fs_request(req);
2536 }
2537 
2538 static void
2539 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2540 {
2541 	struct spdk_blob *blob;
2542 
2543 	pthread_spin_lock(&file->lock);
2544 	if (file->ref_count == 0) {
2545 		pthread_spin_unlock(&file->lock);
2546 		__file_close_async_done(req, -EBADF);
2547 		return;
2548 	}
2549 
2550 	file->ref_count--;
2551 	if (file->ref_count > 0) {
2552 		pthread_spin_unlock(&file->lock);
2553 		req->args.fn.file_op(req->args.arg, 0);
2554 		free_fs_request(req);
2555 		return;
2556 	}
2557 
2558 	pthread_spin_unlock(&file->lock);
2559 
2560 	blob = file->blob;
2561 	file->blob = NULL;
2562 	spdk_blob_close(blob, __file_close_async_done, req);
2563 }
2564 
2565 static void
2566 __file_close_async__sync_done(void *arg, int fserrno)
2567 {
2568 	struct spdk_fs_request *req = arg;
2569 	struct spdk_fs_cb_args *args = &req->args;
2570 
2571 	__file_close_async(args->file, req);
2572 }
2573 
2574 void
2575 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2576 {
2577 	struct spdk_fs_request *req;
2578 	struct spdk_fs_cb_args *args;
2579 
2580 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2581 	if (req == NULL) {
2582 		cb_fn(cb_arg, -ENOMEM);
2583 		return;
2584 	}
2585 
2586 	args = &req->args;
2587 	args->file = file;
2588 	args->fn.file_op = cb_fn;
2589 	args->arg = cb_arg;
2590 
2591 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2592 }
2593 
2594 static void
2595 __file_close(void *arg)
2596 {
2597 	struct spdk_fs_request *req = arg;
2598 	struct spdk_fs_cb_args *args = &req->args;
2599 	struct spdk_file *file = args->file;
2600 
2601 	__file_close_async(file, req);
2602 }
2603 
2604 int
2605 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2606 {
2607 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2608 	struct spdk_fs_request *req;
2609 	struct spdk_fs_cb_args *args;
2610 
2611 	req = alloc_fs_request(channel);
2612 	if (req == NULL) {
2613 		return -ENOMEM;
2614 	}
2615 
2616 	args = &req->args;
2617 
2618 	spdk_file_sync(file, ctx);
2619 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2620 	args->file = file;
2621 	args->sem = &channel->sem;
2622 	args->fn.file_op = __wake_caller;
2623 	args->arg = req;
2624 	channel->send_request(__file_close, req);
2625 	sem_wait(&channel->sem);
2626 
2627 	return args->rc;
2628 }
2629 
2630 int
2631 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2632 {
2633 	if (size < sizeof(spdk_blob_id)) {
2634 		return -EINVAL;
2635 	}
2636 
2637 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2638 
2639 	return sizeof(spdk_blob_id);
2640 }
2641 
2642 static void
2643 cache_free_buffers(struct spdk_file *file)
2644 {
2645 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2646 	pthread_spin_lock(&file->lock);
2647 	pthread_spin_lock(&g_caches_lock);
2648 	if (file->tree->present_mask == 0) {
2649 		pthread_spin_unlock(&g_caches_lock);
2650 		pthread_spin_unlock(&file->lock);
2651 		return;
2652 	}
2653 	spdk_tree_free_buffers(file->tree);
2654 
2655 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2656 	/* If not freed, put it in the end of the queue */
2657 	if (file->tree->present_mask != 0) {
2658 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2659 	}
2660 	file->last = NULL;
2661 	pthread_spin_unlock(&g_caches_lock);
2662 	pthread_spin_unlock(&file->lock);
2663 }
2664 
2665 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2666 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2667