xref: /spdk/lib/blobfs/blobfs.c (revision bb488d2829a9b7863daab45917dd2174905cc0ae)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	union {
138 		struct {
139 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
140 		} fs_load;
141 		struct {
142 			uint64_t	length;
143 		} truncate;
144 		struct {
145 			struct spdk_io_channel	*channel;
146 			void		*user_buf;
147 			void		*pin_buf;
148 			int		is_read;
149 			off_t		offset;
150 			size_t		length;
151 			uint64_t	start_lba;
152 			uint64_t	num_lba;
153 			uint32_t	blocklen;
154 		} rw;
155 		struct {
156 			const char	*old_name;
157 			const char	*new_name;
158 		} rename;
159 		struct {
160 			struct cache_buffer	*cache_buffer;
161 			uint64_t		length;
162 		} flush;
163 		struct {
164 			struct cache_buffer	*cache_buffer;
165 			uint64_t		length;
166 			uint64_t		offset;
167 		} readahead;
168 		struct {
169 			uint64_t			offset;
170 			TAILQ_ENTRY(spdk_fs_request)	tailq;
171 			bool				xattr_in_progress;
172 		} sync;
173 		struct {
174 			uint32_t			num_clusters;
175 		} resize;
176 		struct {
177 			const char	*name;
178 			uint32_t	flags;
179 			TAILQ_ENTRY(spdk_fs_request)	tailq;
180 		} open;
181 		struct {
182 			const char		*name;
183 			struct spdk_blob	*blob;
184 		} create;
185 		struct {
186 			const char	*name;
187 		} delete;
188 		struct {
189 			const char	*name;
190 		} stat;
191 	} op;
192 };
193 
194 static void cache_free_buffers(struct spdk_file *file);
195 
196 void
197 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
198 {
199 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
200 }
201 
202 static void
203 __initialize_cache(void)
204 {
205 	assert(g_cache_pool == NULL);
206 
207 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
208 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
209 					   CACHE_BUFFER_SIZE,
210 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
211 					   SPDK_ENV_SOCKET_ID_ANY);
212 	if (!g_cache_pool) {
213 		SPDK_ERRLOG("Create mempool failed, you may "
214 			    "increase the memory and try again\n");
215 		assert(false);
216 	}
217 	TAILQ_INIT(&g_caches);
218 	pthread_spin_init(&g_caches_lock, 0);
219 }
220 
221 static void
222 __free_cache(void)
223 {
224 	assert(g_cache_pool != NULL);
225 
226 	spdk_mempool_free(g_cache_pool);
227 	g_cache_pool = NULL;
228 }
229 
230 static uint64_t
231 __file_get_blob_size(struct spdk_file *file)
232 {
233 	uint64_t cluster_sz;
234 
235 	cluster_sz = file->fs->bs_opts.cluster_sz;
236 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
237 }
238 
239 struct spdk_fs_request {
240 	struct spdk_fs_cb_args		args;
241 	TAILQ_ENTRY(spdk_fs_request)	link;
242 	struct spdk_fs_channel		*channel;
243 };
244 
245 struct spdk_fs_channel {
246 	struct spdk_fs_request		*req_mem;
247 	TAILQ_HEAD(, spdk_fs_request)	reqs;
248 	sem_t				sem;
249 	struct spdk_filesystem		*fs;
250 	struct spdk_io_channel		*bs_channel;
251 	fs_send_request_fn		send_request;
252 	bool				sync;
253 	pthread_spinlock_t		lock;
254 };
255 
256 /* For now, this is effectively an alias. But eventually we'll shift
257  * some data members over. */
258 struct spdk_fs_thread_ctx {
259 	struct spdk_fs_channel	ch;
260 };
261 
262 static struct spdk_fs_request *
263 alloc_fs_request(struct spdk_fs_channel *channel)
264 {
265 	struct spdk_fs_request *req;
266 
267 	if (channel->sync) {
268 		pthread_spin_lock(&channel->lock);
269 	}
270 
271 	req = TAILQ_FIRST(&channel->reqs);
272 	if (req) {
273 		TAILQ_REMOVE(&channel->reqs, req, link);
274 	}
275 
276 	if (channel->sync) {
277 		pthread_spin_unlock(&channel->lock);
278 	}
279 
280 	if (req == NULL) {
281 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
282 		return NULL;
283 	}
284 	memset(req, 0, sizeof(*req));
285 	req->channel = channel;
286 
287 	return req;
288 }
289 
290 static void
291 free_fs_request(struct spdk_fs_request *req)
292 {
293 	struct spdk_fs_channel *channel = req->channel;
294 
295 	if (channel->sync) {
296 		pthread_spin_lock(&channel->lock);
297 	}
298 
299 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
300 
301 	if (channel->sync) {
302 		pthread_spin_unlock(&channel->lock);
303 	}
304 }
305 
306 static int
307 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
308 			uint32_t max_ops)
309 {
310 	uint32_t i;
311 
312 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
313 	if (!channel->req_mem) {
314 		return -1;
315 	}
316 
317 	TAILQ_INIT(&channel->reqs);
318 	sem_init(&channel->sem, 0, 0);
319 
320 	for (i = 0; i < max_ops; i++) {
321 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
322 	}
323 
324 	channel->fs = fs;
325 
326 	return 0;
327 }
328 
329 static int
330 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
331 {
332 	struct spdk_filesystem		*fs;
333 	struct spdk_fs_channel		*channel = ctx_buf;
334 
335 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
336 
337 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
338 }
339 
340 static int
341 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
342 {
343 	struct spdk_filesystem		*fs;
344 	struct spdk_fs_channel		*channel = ctx_buf;
345 
346 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
347 
348 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
349 }
350 
351 static int
352 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
353 {
354 	struct spdk_filesystem		*fs;
355 	struct spdk_fs_channel		*channel = ctx_buf;
356 
357 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
358 
359 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
360 }
361 
362 static void
363 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
364 {
365 	struct spdk_fs_channel *channel = ctx_buf;
366 
367 	free(channel->req_mem);
368 	if (channel->bs_channel != NULL) {
369 		spdk_bs_free_io_channel(channel->bs_channel);
370 	}
371 }
372 
373 static void
374 __send_request_direct(fs_request_fn fn, void *arg)
375 {
376 	fn(arg);
377 }
378 
379 static void
380 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
381 {
382 	fs->bs = bs;
383 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
384 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
385 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
386 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
387 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
388 
389 	pthread_mutex_lock(&g_cache_init_lock);
390 	if (g_fs_count == 0) {
391 		__initialize_cache();
392 	}
393 	g_fs_count++;
394 	pthread_mutex_unlock(&g_cache_init_lock);
395 }
396 
397 static void
398 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
399 {
400 	struct spdk_fs_request *req = ctx;
401 	struct spdk_fs_cb_args *args = &req->args;
402 	struct spdk_filesystem *fs = args->fs;
403 
404 	if (bserrno == 0) {
405 		common_fs_bs_init(fs, bs);
406 	} else {
407 		free(fs);
408 		fs = NULL;
409 	}
410 
411 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
412 	free_fs_request(req);
413 }
414 
415 static void
416 fs_conf_parse(void)
417 {
418 	struct spdk_conf_section *sp;
419 
420 	sp = spdk_conf_find_section(NULL, "Blobfs");
421 	if (sp == NULL) {
422 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
423 		return;
424 	}
425 
426 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
427 	if (g_fs_cache_buffer_shift <= 0) {
428 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
429 	}
430 }
431 
432 static struct spdk_filesystem *
433 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
434 {
435 	struct spdk_filesystem *fs;
436 
437 	fs = calloc(1, sizeof(*fs));
438 	if (fs == NULL) {
439 		return NULL;
440 	}
441 
442 	fs->bdev = dev;
443 	fs->send_request = send_request_fn;
444 	TAILQ_INIT(&fs->files);
445 
446 	fs->md_target.max_ops = 512;
447 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
448 				sizeof(struct spdk_fs_channel), "blobfs_md");
449 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
450 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
451 
452 	fs->sync_target.max_ops = 512;
453 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
454 				sizeof(struct spdk_fs_channel), "blobfs_sync");
455 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
456 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
457 
458 	fs->io_target.max_ops = 512;
459 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
460 				sizeof(struct spdk_fs_channel), "blobfs_io");
461 
462 	return fs;
463 }
464 
465 static void
466 __wake_caller(void *arg, int fserrno)
467 {
468 	struct spdk_fs_cb_args *args = arg;
469 
470 	args->rc = fserrno;
471 	sem_post(args->sem);
472 }
473 
474 void
475 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
476 	     fs_send_request_fn send_request_fn,
477 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
478 {
479 	struct spdk_filesystem *fs;
480 	struct spdk_fs_request *req;
481 	struct spdk_fs_cb_args *args;
482 	struct spdk_bs_opts opts = {};
483 
484 	fs = fs_alloc(dev, send_request_fn);
485 	if (fs == NULL) {
486 		cb_fn(cb_arg, NULL, -ENOMEM);
487 		return;
488 	}
489 
490 	fs_conf_parse();
491 
492 	req = alloc_fs_request(fs->md_target.md_fs_channel);
493 	if (req == NULL) {
494 		spdk_put_io_channel(fs->md_target.md_io_channel);
495 		spdk_io_device_unregister(&fs->md_target, NULL);
496 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
497 		spdk_io_device_unregister(&fs->sync_target, NULL);
498 		spdk_io_device_unregister(&fs->io_target, NULL);
499 		free(fs);
500 		cb_fn(cb_arg, NULL, -ENOMEM);
501 		return;
502 	}
503 
504 	args = &req->args;
505 	args->fn.fs_op_with_handle = cb_fn;
506 	args->arg = cb_arg;
507 	args->fs = fs;
508 
509 	spdk_bs_opts_init(&opts);
510 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
511 	if (opt) {
512 		opts.cluster_sz = opt->cluster_sz;
513 	}
514 	spdk_bs_init(dev, &opts, init_cb, req);
515 }
516 
517 static struct spdk_file *
518 file_alloc(struct spdk_filesystem *fs)
519 {
520 	struct spdk_file *file;
521 
522 	file = calloc(1, sizeof(*file));
523 	if (file == NULL) {
524 		return NULL;
525 	}
526 
527 	file->tree = calloc(1, sizeof(*file->tree));
528 	if (file->tree == NULL) {
529 		free(file);
530 		return NULL;
531 	}
532 
533 	file->fs = fs;
534 	TAILQ_INIT(&file->open_requests);
535 	TAILQ_INIT(&file->sync_requests);
536 	pthread_spin_init(&file->lock, 0);
537 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
538 	file->priority = SPDK_FILE_PRIORITY_LOW;
539 	return file;
540 }
541 
542 static void fs_load_done(void *ctx, int bserrno);
543 
544 static int
545 _handle_deleted_files(struct spdk_fs_request *req)
546 {
547 	struct spdk_fs_cb_args *args = &req->args;
548 	struct spdk_filesystem *fs = args->fs;
549 
550 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
551 		struct spdk_deleted_file *deleted_file;
552 
553 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
554 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
555 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
556 		free(deleted_file);
557 		return 0;
558 	}
559 
560 	return 1;
561 }
562 
563 static void
564 fs_load_done(void *ctx, int bserrno)
565 {
566 	struct spdk_fs_request *req = ctx;
567 	struct spdk_fs_cb_args *args = &req->args;
568 	struct spdk_filesystem *fs = args->fs;
569 
570 	/* The filesystem has been loaded.  Now check if there are any files that
571 	 *  were marked for deletion before last unload.  Do not complete the
572 	 *  fs_load callback until all of them have been deleted on disk.
573 	 */
574 	if (_handle_deleted_files(req) == 0) {
575 		/* We found a file that's been marked for deleting but not actually
576 		 *  deleted yet.  This function will get called again once the delete
577 		 *  operation is completed.
578 		 */
579 		return;
580 	}
581 
582 	args->fn.fs_op_with_handle(args->arg, fs, 0);
583 	free_fs_request(req);
584 
585 }
586 
587 static void
588 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
589 {
590 	struct spdk_fs_request *req = ctx;
591 	struct spdk_fs_cb_args *args = &req->args;
592 	struct spdk_filesystem *fs = args->fs;
593 	uint64_t *length;
594 	const char *name;
595 	uint32_t *is_deleted;
596 	size_t value_len;
597 
598 	if (rc < 0) {
599 		args->fn.fs_op_with_handle(args->arg, fs, rc);
600 		free_fs_request(req);
601 		return;
602 	}
603 
604 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
605 	if (rc < 0) {
606 		args->fn.fs_op_with_handle(args->arg, fs, rc);
607 		free_fs_request(req);
608 		return;
609 	}
610 
611 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
612 	if (rc < 0) {
613 		args->fn.fs_op_with_handle(args->arg, fs, rc);
614 		free_fs_request(req);
615 		return;
616 	}
617 
618 	assert(value_len == 8);
619 
620 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
621 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
622 	if (rc < 0) {
623 		struct spdk_file *f;
624 
625 		f = file_alloc(fs);
626 		if (f == NULL) {
627 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
628 			free_fs_request(req);
629 			return;
630 		}
631 
632 		f->name = strdup(name);
633 		f->blobid = spdk_blob_get_id(blob);
634 		f->length = *length;
635 		f->length_flushed = *length;
636 		f->append_pos = *length;
637 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
638 	} else {
639 		struct spdk_deleted_file *deleted_file;
640 
641 		deleted_file = calloc(1, sizeof(*deleted_file));
642 		if (deleted_file == NULL) {
643 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
644 			free_fs_request(req);
645 			return;
646 		}
647 		deleted_file->id = spdk_blob_get_id(blob);
648 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
649 	}
650 }
651 
652 static void
653 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
654 {
655 	struct spdk_fs_request *req = ctx;
656 	struct spdk_fs_cb_args *args = &req->args;
657 	struct spdk_filesystem *fs = args->fs;
658 	struct spdk_bs_type bstype;
659 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
660 	static const struct spdk_bs_type zeros;
661 
662 	if (bserrno != 0) {
663 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
664 		free_fs_request(req);
665 		free(fs);
666 		return;
667 	}
668 
669 	bstype = spdk_bs_get_bstype(bs);
670 
671 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
672 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
673 		spdk_bs_set_bstype(bs, blobfs_type);
674 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
675 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
676 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
677 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
678 		free_fs_request(req);
679 		free(fs);
680 		return;
681 	}
682 
683 	common_fs_bs_init(fs, bs);
684 	fs_load_done(req, 0);
685 }
686 
687 static void
688 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
689 {
690 	assert(fs != NULL);
691 	spdk_io_device_unregister(&fs->md_target, NULL);
692 	spdk_io_device_unregister(&fs->sync_target, NULL);
693 	spdk_io_device_unregister(&fs->io_target, NULL);
694 	free(fs);
695 }
696 
697 static void
698 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
699 {
700 	assert(fs != NULL);
701 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
702 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
703 }
704 
705 void
706 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
707 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
708 {
709 	struct spdk_filesystem *fs;
710 	struct spdk_fs_cb_args *args;
711 	struct spdk_fs_request *req;
712 	struct spdk_bs_opts	bs_opts;
713 
714 	fs = fs_alloc(dev, send_request_fn);
715 	if (fs == NULL) {
716 		cb_fn(cb_arg, NULL, -ENOMEM);
717 		return;
718 	}
719 
720 	fs_conf_parse();
721 
722 	req = alloc_fs_request(fs->md_target.md_fs_channel);
723 	if (req == NULL) {
724 		spdk_fs_free_io_channels(fs);
725 		spdk_fs_io_device_unregister(fs);
726 		cb_fn(cb_arg, NULL, -ENOMEM);
727 		return;
728 	}
729 
730 	args = &req->args;
731 	args->fn.fs_op_with_handle = cb_fn;
732 	args->arg = cb_arg;
733 	args->fs = fs;
734 	TAILQ_INIT(&args->op.fs_load.deleted_files);
735 	spdk_bs_opts_init(&bs_opts);
736 	bs_opts.iter_cb_fn = iter_cb;
737 	bs_opts.iter_cb_arg = req;
738 	spdk_bs_load(dev, &bs_opts, load_cb, req);
739 }
740 
741 static void
742 unload_cb(void *ctx, int bserrno)
743 {
744 	struct spdk_fs_request *req = ctx;
745 	struct spdk_fs_cb_args *args = &req->args;
746 	struct spdk_filesystem *fs = args->fs;
747 	struct spdk_file *file, *tmp;
748 
749 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
750 		TAILQ_REMOVE(&fs->files, file, tailq);
751 		cache_free_buffers(file);
752 		free(file->name);
753 		free(file->tree);
754 		free(file);
755 	}
756 
757 	pthread_mutex_lock(&g_cache_init_lock);
758 	g_fs_count--;
759 	if (g_fs_count == 0) {
760 		__free_cache();
761 	}
762 	pthread_mutex_unlock(&g_cache_init_lock);
763 
764 	args->fn.fs_op(args->arg, bserrno);
765 	free(req);
766 
767 	spdk_fs_io_device_unregister(fs);
768 }
769 
770 void
771 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
772 {
773 	struct spdk_fs_request *req;
774 	struct spdk_fs_cb_args *args;
775 
776 	/*
777 	 * We must free the md_channel before unloading the blobstore, so just
778 	 *  allocate this request from the general heap.
779 	 */
780 	req = calloc(1, sizeof(*req));
781 	if (req == NULL) {
782 		cb_fn(cb_arg, -ENOMEM);
783 		return;
784 	}
785 
786 	args = &req->args;
787 	args->fn.fs_op = cb_fn;
788 	args->arg = cb_arg;
789 	args->fs = fs;
790 
791 	spdk_fs_free_io_channels(fs);
792 	spdk_bs_unload(fs->bs, unload_cb, req);
793 }
794 
795 static struct spdk_file *
796 fs_find_file(struct spdk_filesystem *fs, const char *name)
797 {
798 	struct spdk_file *file;
799 
800 	TAILQ_FOREACH(file, &fs->files, tailq) {
801 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
802 			return file;
803 		}
804 	}
805 
806 	return NULL;
807 }
808 
809 void
810 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
811 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
812 {
813 	struct spdk_file_stat stat;
814 	struct spdk_file *f = NULL;
815 
816 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
817 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
818 		return;
819 	}
820 
821 	f = fs_find_file(fs, name);
822 	if (f != NULL) {
823 		stat.blobid = f->blobid;
824 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
825 		cb_fn(cb_arg, &stat, 0);
826 		return;
827 	}
828 
829 	cb_fn(cb_arg, NULL, -ENOENT);
830 }
831 
832 static void
833 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
834 {
835 	struct spdk_fs_request *req = arg;
836 	struct spdk_fs_cb_args *args = &req->args;
837 
838 	args->rc = fserrno;
839 	if (fserrno == 0) {
840 		memcpy(args->arg, stat, sizeof(*stat));
841 	}
842 	sem_post(args->sem);
843 }
844 
845 static void
846 __file_stat(void *arg)
847 {
848 	struct spdk_fs_request *req = arg;
849 	struct spdk_fs_cb_args *args = &req->args;
850 
851 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
852 				args->fn.stat_op, req);
853 }
854 
855 int
856 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
857 		  const char *name, struct spdk_file_stat *stat)
858 {
859 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
860 	struct spdk_fs_request *req;
861 	int rc;
862 
863 	req = alloc_fs_request(channel);
864 	if (req == NULL) {
865 		return -ENOMEM;
866 	}
867 
868 	req->args.fs = fs;
869 	req->args.op.stat.name = name;
870 	req->args.fn.stat_op = __copy_stat;
871 	req->args.arg = stat;
872 	req->args.sem = &channel->sem;
873 	channel->send_request(__file_stat, req);
874 	sem_wait(&channel->sem);
875 
876 	rc = req->args.rc;
877 	free_fs_request(req);
878 
879 	return rc;
880 }
881 
882 static void
883 fs_create_blob_close_cb(void *ctx, int bserrno)
884 {
885 	int rc;
886 	struct spdk_fs_request *req = ctx;
887 	struct spdk_fs_cb_args *args = &req->args;
888 
889 	rc = args->rc ? args->rc : bserrno;
890 	args->fn.file_op(args->arg, rc);
891 	free_fs_request(req);
892 }
893 
894 static void
895 fs_create_blob_resize_cb(void *ctx, int bserrno)
896 {
897 	struct spdk_fs_request *req = ctx;
898 	struct spdk_fs_cb_args *args = &req->args;
899 	struct spdk_file *f = args->file;
900 	struct spdk_blob *blob = args->op.create.blob;
901 	uint64_t length = 0;
902 
903 	args->rc = bserrno;
904 	if (bserrno) {
905 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
906 		return;
907 	}
908 
909 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
910 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
911 
912 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
913 }
914 
915 static void
916 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
917 {
918 	struct spdk_fs_request *req = ctx;
919 	struct spdk_fs_cb_args *args = &req->args;
920 
921 	if (bserrno) {
922 		args->fn.file_op(args->arg, bserrno);
923 		free_fs_request(req);
924 		return;
925 	}
926 
927 	args->op.create.blob = blob;
928 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
929 }
930 
931 static void
932 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
933 {
934 	struct spdk_fs_request *req = ctx;
935 	struct spdk_fs_cb_args *args = &req->args;
936 	struct spdk_file *f = args->file;
937 
938 	if (bserrno) {
939 		args->fn.file_op(args->arg, bserrno);
940 		free_fs_request(req);
941 		return;
942 	}
943 
944 	f->blobid = blobid;
945 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
946 }
947 
948 void
949 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
950 			  spdk_file_op_complete cb_fn, void *cb_arg)
951 {
952 	struct spdk_file *file;
953 	struct spdk_fs_request *req;
954 	struct spdk_fs_cb_args *args;
955 
956 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
957 		cb_fn(cb_arg, -ENAMETOOLONG);
958 		return;
959 	}
960 
961 	file = fs_find_file(fs, name);
962 	if (file != NULL) {
963 		cb_fn(cb_arg, -EEXIST);
964 		return;
965 	}
966 
967 	file = file_alloc(fs);
968 	if (file == NULL) {
969 		cb_fn(cb_arg, -ENOMEM);
970 		return;
971 	}
972 
973 	req = alloc_fs_request(fs->md_target.md_fs_channel);
974 	if (req == NULL) {
975 		cb_fn(cb_arg, -ENOMEM);
976 		return;
977 	}
978 
979 	args = &req->args;
980 	args->file = file;
981 	args->fn.file_op = cb_fn;
982 	args->arg = cb_arg;
983 
984 	file->name = strdup(name);
985 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
986 }
987 
988 static void
989 __fs_create_file_done(void *arg, int fserrno)
990 {
991 	struct spdk_fs_request *req = arg;
992 	struct spdk_fs_cb_args *args = &req->args;
993 
994 	args->rc = fserrno;
995 	sem_post(args->sem);
996 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
997 }
998 
999 static void
1000 __fs_create_file(void *arg)
1001 {
1002 	struct spdk_fs_request *req = arg;
1003 	struct spdk_fs_cb_args *args = &req->args;
1004 
1005 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1006 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1007 }
1008 
1009 int
1010 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1011 {
1012 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1013 	struct spdk_fs_request *req;
1014 	struct spdk_fs_cb_args *args;
1015 	int rc;
1016 
1017 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1018 
1019 	req = alloc_fs_request(channel);
1020 	if (req == NULL) {
1021 		return -ENOMEM;
1022 	}
1023 
1024 	args = &req->args;
1025 	args->fs = fs;
1026 	args->op.create.name = name;
1027 	args->sem = &channel->sem;
1028 	fs->send_request(__fs_create_file, req);
1029 	sem_wait(&channel->sem);
1030 	rc = args->rc;
1031 	free_fs_request(req);
1032 
1033 	return rc;
1034 }
1035 
1036 static void
1037 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1038 {
1039 	struct spdk_fs_request *req = ctx;
1040 	struct spdk_fs_cb_args *args = &req->args;
1041 	struct spdk_file *f = args->file;
1042 
1043 	f->blob = blob;
1044 	while (!TAILQ_EMPTY(&f->open_requests)) {
1045 		req = TAILQ_FIRST(&f->open_requests);
1046 		args = &req->args;
1047 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1048 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1049 		free_fs_request(req);
1050 	}
1051 }
1052 
1053 static void
1054 fs_open_blob_create_cb(void *ctx, int bserrno)
1055 {
1056 	struct spdk_fs_request *req = ctx;
1057 	struct spdk_fs_cb_args *args = &req->args;
1058 	struct spdk_file *file = args->file;
1059 	struct spdk_filesystem *fs = args->fs;
1060 
1061 	if (file == NULL) {
1062 		/*
1063 		 * This is from an open with CREATE flag - the file
1064 		 *  is now created so look it up in the file list for this
1065 		 *  filesystem.
1066 		 */
1067 		file = fs_find_file(fs, args->op.open.name);
1068 		assert(file != NULL);
1069 		args->file = file;
1070 	}
1071 
1072 	file->ref_count++;
1073 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1074 	if (file->ref_count == 1) {
1075 		assert(file->blob == NULL);
1076 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1077 	} else if (file->blob != NULL) {
1078 		fs_open_blob_done(req, file->blob, 0);
1079 	} else {
1080 		/*
1081 		 * The blob open for this file is in progress due to a previous
1082 		 *  open request.  When that open completes, it will invoke the
1083 		 *  open callback for this request.
1084 		 */
1085 	}
1086 }
1087 
1088 void
1089 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1090 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1091 {
1092 	struct spdk_file *f = NULL;
1093 	struct spdk_fs_request *req;
1094 	struct spdk_fs_cb_args *args;
1095 
1096 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1097 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1098 		return;
1099 	}
1100 
1101 	f = fs_find_file(fs, name);
1102 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1103 		cb_fn(cb_arg, NULL, -ENOENT);
1104 		return;
1105 	}
1106 
1107 	if (f != NULL && f->is_deleted == true) {
1108 		cb_fn(cb_arg, NULL, -ENOENT);
1109 		return;
1110 	}
1111 
1112 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1113 	if (req == NULL) {
1114 		cb_fn(cb_arg, NULL, -ENOMEM);
1115 		return;
1116 	}
1117 
1118 	args = &req->args;
1119 	args->fn.file_op_with_handle = cb_fn;
1120 	args->arg = cb_arg;
1121 	args->file = f;
1122 	args->fs = fs;
1123 	args->op.open.name = name;
1124 
1125 	if (f == NULL) {
1126 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1127 	} else {
1128 		fs_open_blob_create_cb(req, 0);
1129 	}
1130 }
1131 
1132 static void
1133 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1134 {
1135 	struct spdk_fs_request *req = arg;
1136 	struct spdk_fs_cb_args *args = &req->args;
1137 
1138 	args->file = file;
1139 	__wake_caller(args, bserrno);
1140 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1141 }
1142 
1143 static void
1144 __fs_open_file(void *arg)
1145 {
1146 	struct spdk_fs_request *req = arg;
1147 	struct spdk_fs_cb_args *args = &req->args;
1148 
1149 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1150 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1151 				__fs_open_file_done, req);
1152 }
1153 
1154 int
1155 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1156 		  const char *name, uint32_t flags, struct spdk_file **file)
1157 {
1158 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1159 	struct spdk_fs_request *req;
1160 	struct spdk_fs_cb_args *args;
1161 	int rc;
1162 
1163 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1164 
1165 	req = alloc_fs_request(channel);
1166 	if (req == NULL) {
1167 		return -ENOMEM;
1168 	}
1169 
1170 	args = &req->args;
1171 	args->fs = fs;
1172 	args->op.open.name = name;
1173 	args->op.open.flags = flags;
1174 	args->sem = &channel->sem;
1175 	fs->send_request(__fs_open_file, req);
1176 	sem_wait(&channel->sem);
1177 	rc = args->rc;
1178 	if (rc == 0) {
1179 		*file = args->file;
1180 	} else {
1181 		*file = NULL;
1182 	}
1183 	free_fs_request(req);
1184 
1185 	return rc;
1186 }
1187 
1188 static void
1189 fs_rename_blob_close_cb(void *ctx, int bserrno)
1190 {
1191 	struct spdk_fs_request *req = ctx;
1192 	struct spdk_fs_cb_args *args = &req->args;
1193 
1194 	args->fn.fs_op(args->arg, bserrno);
1195 	free_fs_request(req);
1196 }
1197 
1198 static void
1199 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1200 {
1201 	struct spdk_fs_request *req = ctx;
1202 	struct spdk_fs_cb_args *args = &req->args;
1203 	const char *new_name = args->op.rename.new_name;
1204 
1205 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1206 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1207 }
1208 
1209 static void
1210 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1211 {
1212 	struct spdk_fs_cb_args *args = &req->args;
1213 	struct spdk_file *f;
1214 
1215 	f = fs_find_file(args->fs, args->op.rename.old_name);
1216 	if (f == NULL) {
1217 		args->fn.fs_op(args->arg, -ENOENT);
1218 		free_fs_request(req);
1219 		return;
1220 	}
1221 
1222 	free(f->name);
1223 	f->name = strdup(args->op.rename.new_name);
1224 	args->file = f;
1225 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1226 }
1227 
1228 static void
1229 fs_rename_delete_done(void *arg, int fserrno)
1230 {
1231 	__spdk_fs_md_rename_file(arg);
1232 }
1233 
1234 void
1235 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1236 			  const char *old_name, const char *new_name,
1237 			  spdk_file_op_complete cb_fn, void *cb_arg)
1238 {
1239 	struct spdk_file *f;
1240 	struct spdk_fs_request *req;
1241 	struct spdk_fs_cb_args *args;
1242 
1243 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1244 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1245 		cb_fn(cb_arg, -ENAMETOOLONG);
1246 		return;
1247 	}
1248 
1249 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1250 	if (req == NULL) {
1251 		cb_fn(cb_arg, -ENOMEM);
1252 		return;
1253 	}
1254 
1255 	args = &req->args;
1256 	args->fn.fs_op = cb_fn;
1257 	args->fs = fs;
1258 	args->arg = cb_arg;
1259 	args->op.rename.old_name = old_name;
1260 	args->op.rename.new_name = new_name;
1261 
1262 	f = fs_find_file(fs, new_name);
1263 	if (f == NULL) {
1264 		__spdk_fs_md_rename_file(req);
1265 		return;
1266 	}
1267 
1268 	/*
1269 	 * The rename overwrites an existing file.  So delete the existing file, then
1270 	 *  do the actual rename.
1271 	 */
1272 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1273 }
1274 
1275 static void
1276 __fs_rename_file_done(void *arg, int fserrno)
1277 {
1278 	struct spdk_fs_request *req = arg;
1279 	struct spdk_fs_cb_args *args = &req->args;
1280 
1281 	__wake_caller(args, fserrno);
1282 }
1283 
1284 static void
1285 __fs_rename_file(void *arg)
1286 {
1287 	struct spdk_fs_request *req = arg;
1288 	struct spdk_fs_cb_args *args = &req->args;
1289 
1290 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1291 				  __fs_rename_file_done, req);
1292 }
1293 
1294 int
1295 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1296 		    const char *old_name, const char *new_name)
1297 {
1298 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1299 	struct spdk_fs_request *req;
1300 	struct spdk_fs_cb_args *args;
1301 	int rc;
1302 
1303 	req = alloc_fs_request(channel);
1304 	if (req == NULL) {
1305 		return -ENOMEM;
1306 	}
1307 
1308 	args = &req->args;
1309 
1310 	args->fs = fs;
1311 	args->op.rename.old_name = old_name;
1312 	args->op.rename.new_name = new_name;
1313 	args->sem = &channel->sem;
1314 	fs->send_request(__fs_rename_file, req);
1315 	sem_wait(&channel->sem);
1316 	rc = args->rc;
1317 	free_fs_request(req);
1318 	return rc;
1319 }
1320 
1321 static void
1322 blob_delete_cb(void *ctx, int bserrno)
1323 {
1324 	struct spdk_fs_request *req = ctx;
1325 	struct spdk_fs_cb_args *args = &req->args;
1326 
1327 	args->fn.file_op(args->arg, bserrno);
1328 	free_fs_request(req);
1329 }
1330 
1331 void
1332 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1333 			  spdk_file_op_complete cb_fn, void *cb_arg)
1334 {
1335 	struct spdk_file *f;
1336 	spdk_blob_id blobid;
1337 	struct spdk_fs_request *req;
1338 	struct spdk_fs_cb_args *args;
1339 
1340 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1341 
1342 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1343 		cb_fn(cb_arg, -ENAMETOOLONG);
1344 		return;
1345 	}
1346 
1347 	f = fs_find_file(fs, name);
1348 	if (f == NULL) {
1349 		cb_fn(cb_arg, -ENOENT);
1350 		return;
1351 	}
1352 
1353 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1354 	if (req == NULL) {
1355 		cb_fn(cb_arg, -ENOMEM);
1356 		return;
1357 	}
1358 
1359 	args = &req->args;
1360 	args->fn.file_op = cb_fn;
1361 	args->arg = cb_arg;
1362 
1363 	if (f->ref_count > 0) {
1364 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1365 		f->is_deleted = true;
1366 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1367 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1368 		return;
1369 	}
1370 
1371 	TAILQ_REMOVE(&fs->files, f, tailq);
1372 
1373 	cache_free_buffers(f);
1374 
1375 	blobid = f->blobid;
1376 
1377 	free(f->name);
1378 	free(f->tree);
1379 	free(f);
1380 
1381 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1382 }
1383 
1384 static void
1385 __fs_delete_file_done(void *arg, int fserrno)
1386 {
1387 	struct spdk_fs_request *req = arg;
1388 	struct spdk_fs_cb_args *args = &req->args;
1389 
1390 	__wake_caller(args, fserrno);
1391 }
1392 
1393 static void
1394 __fs_delete_file(void *arg)
1395 {
1396 	struct spdk_fs_request *req = arg;
1397 	struct spdk_fs_cb_args *args = &req->args;
1398 
1399 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1400 }
1401 
1402 int
1403 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1404 		    const char *name)
1405 {
1406 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1407 	struct spdk_fs_request *req;
1408 	struct spdk_fs_cb_args *args;
1409 	int rc;
1410 
1411 	req = alloc_fs_request(channel);
1412 	if (req == NULL) {
1413 		return -ENOMEM;
1414 	}
1415 
1416 	args = &req->args;
1417 	args->fs = fs;
1418 	args->op.delete.name = name;
1419 	args->sem = &channel->sem;
1420 	fs->send_request(__fs_delete_file, req);
1421 	sem_wait(&channel->sem);
1422 	rc = args->rc;
1423 	free_fs_request(req);
1424 
1425 	return rc;
1426 }
1427 
1428 spdk_fs_iter
1429 spdk_fs_iter_first(struct spdk_filesystem *fs)
1430 {
1431 	struct spdk_file *f;
1432 
1433 	f = TAILQ_FIRST(&fs->files);
1434 	return f;
1435 }
1436 
1437 spdk_fs_iter
1438 spdk_fs_iter_next(spdk_fs_iter iter)
1439 {
1440 	struct spdk_file *f = iter;
1441 
1442 	if (f == NULL) {
1443 		return NULL;
1444 	}
1445 
1446 	f = TAILQ_NEXT(f, tailq);
1447 	return f;
1448 }
1449 
1450 const char *
1451 spdk_file_get_name(struct spdk_file *file)
1452 {
1453 	return file->name;
1454 }
1455 
1456 uint64_t
1457 spdk_file_get_length(struct spdk_file *file)
1458 {
1459 	uint64_t length;
1460 
1461 	assert(file != NULL);
1462 
1463 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1464 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1465 	return length;
1466 }
1467 
1468 static void
1469 fs_truncate_complete_cb(void *ctx, int bserrno)
1470 {
1471 	struct spdk_fs_request *req = ctx;
1472 	struct spdk_fs_cb_args *args = &req->args;
1473 
1474 	args->fn.file_op(args->arg, bserrno);
1475 	free_fs_request(req);
1476 }
1477 
1478 static void
1479 fs_truncate_resize_cb(void *ctx, int bserrno)
1480 {
1481 	struct spdk_fs_request *req = ctx;
1482 	struct spdk_fs_cb_args *args = &req->args;
1483 	struct spdk_file *file = args->file;
1484 	uint64_t *length = &args->op.truncate.length;
1485 
1486 	if (bserrno) {
1487 		args->fn.file_op(args->arg, bserrno);
1488 		free_fs_request(req);
1489 		return;
1490 	}
1491 
1492 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1493 
1494 	file->length = *length;
1495 	if (file->append_pos > file->length) {
1496 		file->append_pos = file->length;
1497 	}
1498 
1499 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1500 }
1501 
1502 static uint64_t
1503 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1504 {
1505 	return (length + cluster_sz - 1) / cluster_sz;
1506 }
1507 
1508 void
1509 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1510 			 spdk_file_op_complete cb_fn, void *cb_arg)
1511 {
1512 	struct spdk_filesystem *fs;
1513 	size_t num_clusters;
1514 	struct spdk_fs_request *req;
1515 	struct spdk_fs_cb_args *args;
1516 
1517 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1518 	if (length == file->length) {
1519 		cb_fn(cb_arg, 0);
1520 		return;
1521 	}
1522 
1523 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1524 	if (req == NULL) {
1525 		cb_fn(cb_arg, -ENOMEM);
1526 		return;
1527 	}
1528 
1529 	args = &req->args;
1530 	args->fn.file_op = cb_fn;
1531 	args->arg = cb_arg;
1532 	args->file = file;
1533 	args->op.truncate.length = length;
1534 	fs = file->fs;
1535 
1536 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1537 
1538 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1539 }
1540 
1541 static void
1542 __truncate(void *arg)
1543 {
1544 	struct spdk_fs_request *req = arg;
1545 	struct spdk_fs_cb_args *args = &req->args;
1546 
1547 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1548 				 args->fn.file_op, args);
1549 }
1550 
1551 int
1552 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1553 		   uint64_t length)
1554 {
1555 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1556 	struct spdk_fs_request *req;
1557 	struct spdk_fs_cb_args *args;
1558 	int rc;
1559 
1560 	req = alloc_fs_request(channel);
1561 	if (req == NULL) {
1562 		return -ENOMEM;
1563 	}
1564 
1565 	args = &req->args;
1566 
1567 	args->file = file;
1568 	args->op.truncate.length = length;
1569 	args->fn.file_op = __wake_caller;
1570 	args->sem = &channel->sem;
1571 
1572 	channel->send_request(__truncate, req);
1573 	sem_wait(&channel->sem);
1574 	rc = args->rc;
1575 	free_fs_request(req);
1576 
1577 	return rc;
1578 }
1579 
1580 static void
1581 __rw_done(void *ctx, int bserrno)
1582 {
1583 	struct spdk_fs_request *req = ctx;
1584 	struct spdk_fs_cb_args *args = &req->args;
1585 
1586 	spdk_free(args->op.rw.pin_buf);
1587 	args->fn.file_op(args->arg, bserrno);
1588 	free_fs_request(req);
1589 }
1590 
1591 static void
1592 __read_done(void *ctx, int bserrno)
1593 {
1594 	struct spdk_fs_request *req = ctx;
1595 	struct spdk_fs_cb_args *args = &req->args;
1596 
1597 	assert(req != NULL);
1598 	if (args->op.rw.is_read) {
1599 		memcpy(args->op.rw.user_buf,
1600 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1601 		       args->op.rw.length);
1602 		__rw_done(req, 0);
1603 	} else {
1604 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1605 		       args->op.rw.user_buf,
1606 		       args->op.rw.length);
1607 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1608 				   args->op.rw.pin_buf,
1609 				   args->op.rw.start_lba, args->op.rw.num_lba,
1610 				   __rw_done, req);
1611 	}
1612 }
1613 
1614 static void
1615 __do_blob_read(void *ctx, int fserrno)
1616 {
1617 	struct spdk_fs_request *req = ctx;
1618 	struct spdk_fs_cb_args *args = &req->args;
1619 
1620 	if (fserrno) {
1621 		__rw_done(req, fserrno);
1622 		return;
1623 	}
1624 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1625 			  args->op.rw.pin_buf,
1626 			  args->op.rw.start_lba, args->op.rw.num_lba,
1627 			  __read_done, req);
1628 }
1629 
1630 static void
1631 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1632 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1633 {
1634 	uint64_t end_lba;
1635 
1636 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1637 	*start_lba = offset / *lba_size;
1638 	end_lba = (offset + length - 1) / *lba_size;
1639 	*num_lba = (end_lba - *start_lba + 1);
1640 }
1641 
1642 static void
1643 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1644 	    void *payload, uint64_t offset, uint64_t length,
1645 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1646 {
1647 	struct spdk_fs_request *req;
1648 	struct spdk_fs_cb_args *args;
1649 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1650 	uint64_t start_lba, num_lba, pin_buf_length;
1651 	uint32_t lba_size;
1652 
1653 	if (is_read && offset + length > file->length) {
1654 		cb_fn(cb_arg, -EINVAL);
1655 		return;
1656 	}
1657 
1658 	req = alloc_fs_request(channel);
1659 	if (req == NULL) {
1660 		cb_fn(cb_arg, -ENOMEM);
1661 		return;
1662 	}
1663 
1664 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1665 
1666 	args = &req->args;
1667 	args->fn.file_op = cb_fn;
1668 	args->arg = cb_arg;
1669 	args->file = file;
1670 	args->op.rw.channel = channel->bs_channel;
1671 	args->op.rw.user_buf = payload;
1672 	args->op.rw.is_read = is_read;
1673 	args->op.rw.offset = offset;
1674 	args->op.rw.length = length;
1675 	args->op.rw.blocklen = lba_size;
1676 
1677 	pin_buf_length = num_lba * lba_size;
1678 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1679 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1680 	if (args->op.rw.pin_buf == NULL) {
1681 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1682 			      file->name, offset, length);
1683 		free_fs_request(req);
1684 		cb_fn(cb_arg, -ENOMEM);
1685 		return;
1686 	}
1687 
1688 	args->op.rw.start_lba = start_lba;
1689 	args->op.rw.num_lba = num_lba;
1690 
1691 	if (!is_read && file->length < offset + length) {
1692 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1693 	} else {
1694 		__do_blob_read(req, 0);
1695 	}
1696 }
1697 
1698 void
1699 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1700 		      void *payload, uint64_t offset, uint64_t length,
1701 		      spdk_file_op_complete cb_fn, void *cb_arg)
1702 {
1703 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1704 }
1705 
1706 void
1707 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1708 		     void *payload, uint64_t offset, uint64_t length,
1709 		     spdk_file_op_complete cb_fn, void *cb_arg)
1710 {
1711 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1712 		      file->name, offset, length);
1713 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1714 }
1715 
1716 struct spdk_io_channel *
1717 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1718 {
1719 	struct spdk_io_channel *io_channel;
1720 	struct spdk_fs_channel *fs_channel;
1721 
1722 	io_channel = spdk_get_io_channel(&fs->io_target);
1723 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1724 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1725 	fs_channel->send_request = __send_request_direct;
1726 
1727 	return io_channel;
1728 }
1729 
1730 void
1731 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1732 {
1733 	spdk_put_io_channel(channel);
1734 }
1735 
1736 struct spdk_fs_thread_ctx *
1737 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1738 {
1739 	struct spdk_fs_thread_ctx *ctx;
1740 
1741 	ctx = calloc(1, sizeof(*ctx));
1742 	if (!ctx) {
1743 		return NULL;
1744 	}
1745 
1746 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1747 
1748 	ctx->ch.send_request = fs->send_request;
1749 	ctx->ch.sync = 1;
1750 	pthread_spin_init(&ctx->ch.lock, 0);
1751 
1752 	return ctx;
1753 }
1754 
1755 
1756 void
1757 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1758 {
1759 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1760 	free(ctx);
1761 }
1762 
1763 void
1764 spdk_fs_set_cache_size(uint64_t size_in_mb)
1765 {
1766 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1767 }
1768 
1769 uint64_t
1770 spdk_fs_get_cache_size(void)
1771 {
1772 	return g_fs_cache_size / (1024 * 1024);
1773 }
1774 
1775 static void __file_flush(void *ctx);
1776 
1777 static void *
1778 alloc_cache_memory_buffer(struct spdk_file *context)
1779 {
1780 	struct spdk_file *file;
1781 	void *buf;
1782 
1783 	buf = spdk_mempool_get(g_cache_pool);
1784 	if (buf != NULL) {
1785 		return buf;
1786 	}
1787 
1788 	pthread_spin_lock(&g_caches_lock);
1789 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1790 		if (!file->open_for_writing &&
1791 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1792 		    file != context) {
1793 			break;
1794 		}
1795 	}
1796 	pthread_spin_unlock(&g_caches_lock);
1797 	if (file != NULL) {
1798 		cache_free_buffers(file);
1799 		buf = spdk_mempool_get(g_cache_pool);
1800 		if (buf != NULL) {
1801 			return buf;
1802 		}
1803 	}
1804 
1805 	pthread_spin_lock(&g_caches_lock);
1806 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1807 		if (!file->open_for_writing && file != context) {
1808 			break;
1809 		}
1810 	}
1811 	pthread_spin_unlock(&g_caches_lock);
1812 	if (file != NULL) {
1813 		cache_free_buffers(file);
1814 		buf = spdk_mempool_get(g_cache_pool);
1815 		if (buf != NULL) {
1816 			return buf;
1817 		}
1818 	}
1819 
1820 	pthread_spin_lock(&g_caches_lock);
1821 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1822 		if (file != context) {
1823 			break;
1824 		}
1825 	}
1826 	pthread_spin_unlock(&g_caches_lock);
1827 	if (file != NULL) {
1828 		cache_free_buffers(file);
1829 		buf = spdk_mempool_get(g_cache_pool);
1830 		if (buf != NULL) {
1831 			return buf;
1832 		}
1833 	}
1834 
1835 	return NULL;
1836 }
1837 
1838 static struct cache_buffer *
1839 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1840 {
1841 	struct cache_buffer *buf;
1842 	int count = 0;
1843 
1844 	buf = calloc(1, sizeof(*buf));
1845 	if (buf == NULL) {
1846 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1847 		return NULL;
1848 	}
1849 
1850 	buf->buf = alloc_cache_memory_buffer(file);
1851 	while (buf->buf == NULL) {
1852 		/*
1853 		 * TODO: alloc_cache_memory_buffer() should eventually free
1854 		 *  some buffers.  Need a more sophisticated check here, instead
1855 		 *  of just bailing if 100 tries does not result in getting a
1856 		 *  free buffer.  This will involve using the sync channel's
1857 		 *  semaphore to block until a buffer becomes available.
1858 		 */
1859 		if (count++ == 100) {
1860 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1861 				    file, offset);
1862 			free(buf);
1863 			return NULL;
1864 		}
1865 		buf->buf = alloc_cache_memory_buffer(file);
1866 	}
1867 
1868 	buf->buf_size = CACHE_BUFFER_SIZE;
1869 	buf->offset = offset;
1870 
1871 	pthread_spin_lock(&g_caches_lock);
1872 	if (file->tree->present_mask == 0) {
1873 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1874 	}
1875 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1876 	pthread_spin_unlock(&g_caches_lock);
1877 
1878 	return buf;
1879 }
1880 
1881 static struct cache_buffer *
1882 cache_append_buffer(struct spdk_file *file)
1883 {
1884 	struct cache_buffer *last;
1885 
1886 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1887 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1888 
1889 	last = cache_insert_buffer(file, file->append_pos);
1890 	if (last == NULL) {
1891 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1892 		return NULL;
1893 	}
1894 
1895 	file->last = last;
1896 
1897 	return last;
1898 }
1899 
1900 static void __check_sync_reqs(struct spdk_file *file);
1901 
1902 static void
1903 __file_cache_finish_sync(void *ctx, int bserrno)
1904 {
1905 	struct spdk_file *file = ctx;
1906 	struct spdk_fs_request *sync_req;
1907 	struct spdk_fs_cb_args *sync_args;
1908 
1909 	pthread_spin_lock(&file->lock);
1910 	sync_req = TAILQ_FIRST(&file->sync_requests);
1911 	sync_args = &sync_req->args;
1912 	assert(sync_args->op.sync.offset <= file->length_flushed);
1913 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1914 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1915 	pthread_spin_unlock(&file->lock);
1916 
1917 	sync_args->fn.file_op(sync_args->arg, bserrno);
1918 	__check_sync_reqs(file);
1919 
1920 	pthread_spin_lock(&file->lock);
1921 	free_fs_request(sync_req);
1922 	pthread_spin_unlock(&file->lock);
1923 }
1924 
1925 static void
1926 __check_sync_reqs(struct spdk_file *file)
1927 {
1928 	struct spdk_fs_request *sync_req;
1929 
1930 	pthread_spin_lock(&file->lock);
1931 
1932 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1933 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1934 			break;
1935 		}
1936 	}
1937 
1938 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1939 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1940 		sync_req->args.op.sync.xattr_in_progress = true;
1941 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1942 				    sizeof(file->length_flushed));
1943 
1944 		pthread_spin_unlock(&file->lock);
1945 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1946 	} else {
1947 		pthread_spin_unlock(&file->lock);
1948 	}
1949 }
1950 
1951 static void
1952 __file_flush_done(void *ctx, int bserrno)
1953 {
1954 	struct spdk_fs_request *req = ctx;
1955 	struct spdk_fs_cb_args *args = &req->args;
1956 	struct spdk_file *file = args->file;
1957 	struct cache_buffer *next = args->op.flush.cache_buffer;
1958 
1959 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1960 
1961 	pthread_spin_lock(&file->lock);
1962 	next->in_progress = false;
1963 	next->bytes_flushed += args->op.flush.length;
1964 	file->length_flushed += args->op.flush.length;
1965 	if (file->length_flushed > file->length) {
1966 		file->length = file->length_flushed;
1967 	}
1968 	if (next->bytes_flushed == next->buf_size) {
1969 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1970 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1971 	}
1972 
1973 	/*
1974 	 * Assert that there is no cached data that extends past the end of the underlying
1975 	 *  blob.
1976 	 */
1977 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1978 	       next->bytes_filled == 0);
1979 
1980 	pthread_spin_unlock(&file->lock);
1981 
1982 	__check_sync_reqs(file);
1983 
1984 	__file_flush(req);
1985 }
1986 
1987 static void
1988 __file_flush(void *ctx)
1989 {
1990 	struct spdk_fs_request *req = ctx;
1991 	struct spdk_fs_cb_args *args = &req->args;
1992 	struct spdk_file *file = args->file;
1993 	struct cache_buffer *next;
1994 	uint64_t offset, length, start_lba, num_lba;
1995 	uint32_t lba_size;
1996 
1997 	pthread_spin_lock(&file->lock);
1998 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1999 	if (next == NULL || next->in_progress) {
2000 		/*
2001 		 * There is either no data to flush, or a flush I/O is already in
2002 		 *  progress.  So return immediately - if a flush I/O is in
2003 		 *  progress we will flush more data after that is completed.
2004 		 */
2005 		free_fs_request(req);
2006 		if (next == NULL) {
2007 			/*
2008 			 * For cases where a file's cache was evicted, and then the
2009 			 *  file was later appended, we will write the data directly
2010 			 *  to disk and bypass cache.  So just update length_flushed
2011 			 *  here to reflect that all data was already written to disk.
2012 			 */
2013 			file->length_flushed = file->append_pos;
2014 		}
2015 		pthread_spin_unlock(&file->lock);
2016 		if (next == NULL) {
2017 			/*
2018 			 * There is no data to flush, but we still need to check for any
2019 			 *  outstanding sync requests to make sure metadata gets updated.
2020 			 */
2021 			__check_sync_reqs(file);
2022 		}
2023 		return;
2024 	}
2025 
2026 	offset = next->offset + next->bytes_flushed;
2027 	length = next->bytes_filled - next->bytes_flushed;
2028 	if (length == 0) {
2029 		free_fs_request(req);
2030 		pthread_spin_unlock(&file->lock);
2031 		return;
2032 	}
2033 	args->op.flush.length = length;
2034 	args->op.flush.cache_buffer = next;
2035 
2036 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2037 
2038 	next->in_progress = true;
2039 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2040 		     offset, length, start_lba, num_lba);
2041 	pthread_spin_unlock(&file->lock);
2042 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2043 			   next->buf + (start_lba * lba_size) - next->offset,
2044 			   start_lba, num_lba, __file_flush_done, req);
2045 }
2046 
2047 static void
2048 __file_extend_done(void *arg, int bserrno)
2049 {
2050 	struct spdk_fs_cb_args *args = arg;
2051 
2052 	__wake_caller(args, bserrno);
2053 }
2054 
2055 static void
2056 __file_extend_resize_cb(void *_args, int bserrno)
2057 {
2058 	struct spdk_fs_cb_args *args = _args;
2059 	struct spdk_file *file = args->file;
2060 
2061 	if (bserrno) {
2062 		__wake_caller(args, bserrno);
2063 		return;
2064 	}
2065 
2066 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2067 }
2068 
2069 static void
2070 __file_extend_blob(void *_args)
2071 {
2072 	struct spdk_fs_cb_args *args = _args;
2073 	struct spdk_file *file = args->file;
2074 
2075 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2076 }
2077 
2078 static void
2079 __rw_from_file_done(void *ctx, int bserrno)
2080 {
2081 	struct spdk_fs_request *req = ctx;
2082 
2083 	__wake_caller(&req->args, bserrno);
2084 	free_fs_request(req);
2085 }
2086 
2087 static void
2088 __rw_from_file(void *ctx)
2089 {
2090 	struct spdk_fs_request *req = ctx;
2091 	struct spdk_fs_cb_args *args = &req->args;
2092 	struct spdk_file *file = args->file;
2093 
2094 	if (args->op.rw.is_read) {
2095 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2096 				     args->op.rw.offset, args->op.rw.length,
2097 				     __rw_from_file_done, req);
2098 	} else {
2099 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2100 				      args->op.rw.offset, args->op.rw.length,
2101 				      __rw_from_file_done, req);
2102 	}
2103 }
2104 
2105 static int
2106 __send_rw_from_file(struct spdk_file *file, void *payload,
2107 		    uint64_t offset, uint64_t length, bool is_read,
2108 		    struct spdk_fs_channel *channel)
2109 {
2110 	struct spdk_fs_request *req;
2111 	struct spdk_fs_cb_args *args;
2112 
2113 	req = alloc_fs_request(channel);
2114 	if (req == NULL) {
2115 		sem_post(&channel->sem);
2116 		return -ENOMEM;
2117 	}
2118 
2119 	args = &req->args;
2120 	args->file = file;
2121 	args->sem = &channel->sem;
2122 	args->op.rw.user_buf = payload;
2123 	args->op.rw.offset = offset;
2124 	args->op.rw.length = length;
2125 	args->op.rw.is_read = is_read;
2126 	file->fs->send_request(__rw_from_file, req);
2127 	return 0;
2128 }
2129 
2130 int
2131 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2132 		void *payload, uint64_t offset, uint64_t length)
2133 {
2134 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2135 	struct spdk_fs_request *flush_req;
2136 	uint64_t rem_length, copy, blob_size, cluster_sz;
2137 	uint32_t cache_buffers_filled = 0;
2138 	uint8_t *cur_payload;
2139 	struct cache_buffer *last;
2140 
2141 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2142 
2143 	if (length == 0) {
2144 		return 0;
2145 	}
2146 
2147 	if (offset != file->append_pos) {
2148 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2149 		return -EINVAL;
2150 	}
2151 
2152 	pthread_spin_lock(&file->lock);
2153 	file->open_for_writing = true;
2154 
2155 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2156 		cache_append_buffer(file);
2157 	}
2158 
2159 	if (file->last == NULL) {
2160 		int rc;
2161 
2162 		file->append_pos += length;
2163 		pthread_spin_unlock(&file->lock);
2164 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2165 		sem_wait(&channel->sem);
2166 		return rc;
2167 	}
2168 
2169 	blob_size = __file_get_blob_size(file);
2170 
2171 	if ((offset + length) > blob_size) {
2172 		struct spdk_fs_cb_args extend_args = {};
2173 
2174 		cluster_sz = file->fs->bs_opts.cluster_sz;
2175 		extend_args.sem = &channel->sem;
2176 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2177 		extend_args.file = file;
2178 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2179 		pthread_spin_unlock(&file->lock);
2180 		file->fs->send_request(__file_extend_blob, &extend_args);
2181 		sem_wait(&channel->sem);
2182 		if (extend_args.rc) {
2183 			return extend_args.rc;
2184 		}
2185 	}
2186 
2187 	flush_req = alloc_fs_request(channel);
2188 	if (flush_req == NULL) {
2189 		pthread_spin_unlock(&file->lock);
2190 		return -ENOMEM;
2191 	}
2192 
2193 	last = file->last;
2194 	rem_length = length;
2195 	cur_payload = payload;
2196 	while (rem_length > 0) {
2197 		copy = last->buf_size - last->bytes_filled;
2198 		if (copy > rem_length) {
2199 			copy = rem_length;
2200 		}
2201 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2202 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2203 		file->append_pos += copy;
2204 		if (file->length < file->append_pos) {
2205 			file->length = file->append_pos;
2206 		}
2207 		cur_payload += copy;
2208 		last->bytes_filled += copy;
2209 		rem_length -= copy;
2210 		if (last->bytes_filled == last->buf_size) {
2211 			cache_buffers_filled++;
2212 			last = cache_append_buffer(file);
2213 			if (last == NULL) {
2214 				BLOBFS_TRACE(file, "nomem\n");
2215 				free_fs_request(flush_req);
2216 				pthread_spin_unlock(&file->lock);
2217 				return -ENOMEM;
2218 			}
2219 		}
2220 	}
2221 
2222 	pthread_spin_unlock(&file->lock);
2223 
2224 	if (cache_buffers_filled == 0) {
2225 		free_fs_request(flush_req);
2226 		return 0;
2227 	}
2228 
2229 	flush_req->args.file = file;
2230 	file->fs->send_request(__file_flush, flush_req);
2231 	return 0;
2232 }
2233 
2234 static void
2235 __readahead_done(void *ctx, int bserrno)
2236 {
2237 	struct spdk_fs_request *req = ctx;
2238 	struct spdk_fs_cb_args *args = &req->args;
2239 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2240 	struct spdk_file *file = args->file;
2241 
2242 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2243 
2244 	pthread_spin_lock(&file->lock);
2245 	cache_buffer->bytes_filled = args->op.readahead.length;
2246 	cache_buffer->bytes_flushed = args->op.readahead.length;
2247 	cache_buffer->in_progress = false;
2248 	pthread_spin_unlock(&file->lock);
2249 
2250 	free_fs_request(req);
2251 }
2252 
2253 static void
2254 __readahead(void *ctx)
2255 {
2256 	struct spdk_fs_request *req = ctx;
2257 	struct spdk_fs_cb_args *args = &req->args;
2258 	struct spdk_file *file = args->file;
2259 	uint64_t offset, length, start_lba, num_lba;
2260 	uint32_t lba_size;
2261 
2262 	offset = args->op.readahead.offset;
2263 	length = args->op.readahead.length;
2264 	assert(length > 0);
2265 
2266 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2267 
2268 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2269 		     offset, length, start_lba, num_lba);
2270 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2271 			  args->op.readahead.cache_buffer->buf,
2272 			  start_lba, num_lba, __readahead_done, req);
2273 }
2274 
2275 static uint64_t
2276 __next_cache_buffer_offset(uint64_t offset)
2277 {
2278 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2279 }
2280 
2281 static void
2282 check_readahead(struct spdk_file *file, uint64_t offset,
2283 		struct spdk_fs_channel *channel)
2284 {
2285 	struct spdk_fs_request *req;
2286 	struct spdk_fs_cb_args *args;
2287 
2288 	offset = __next_cache_buffer_offset(offset);
2289 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2290 		return;
2291 	}
2292 
2293 	req = alloc_fs_request(channel);
2294 	if (req == NULL) {
2295 		return;
2296 	}
2297 	args = &req->args;
2298 
2299 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2300 
2301 	args->file = file;
2302 	args->op.readahead.offset = offset;
2303 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2304 	if (!args->op.readahead.cache_buffer) {
2305 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2306 		free_fs_request(req);
2307 		return;
2308 	}
2309 
2310 	args->op.readahead.cache_buffer->in_progress = true;
2311 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2312 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2313 	} else {
2314 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2315 	}
2316 	file->fs->send_request(__readahead, req);
2317 }
2318 
2319 static int
2320 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2321 	    struct spdk_fs_channel *channel)
2322 {
2323 	struct cache_buffer *buf;
2324 	int rc;
2325 
2326 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2327 	if (buf == NULL) {
2328 		pthread_spin_unlock(&file->lock);
2329 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2330 		pthread_spin_lock(&file->lock);
2331 		return rc;
2332 	}
2333 
2334 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2335 		length = buf->offset + buf->bytes_filled - offset;
2336 	}
2337 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2338 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2339 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2340 		pthread_spin_lock(&g_caches_lock);
2341 		spdk_tree_remove_buffer(file->tree, buf);
2342 		if (file->tree->present_mask == 0) {
2343 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2344 		}
2345 		pthread_spin_unlock(&g_caches_lock);
2346 	}
2347 
2348 	sem_post(&channel->sem);
2349 	return 0;
2350 }
2351 
2352 int64_t
2353 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2354 	       void *payload, uint64_t offset, uint64_t length)
2355 {
2356 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2357 	uint64_t final_offset, final_length;
2358 	uint32_t sub_reads = 0;
2359 	int rc = 0;
2360 
2361 	pthread_spin_lock(&file->lock);
2362 
2363 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2364 
2365 	file->open_for_writing = false;
2366 
2367 	if (length == 0 || offset >= file->append_pos) {
2368 		pthread_spin_unlock(&file->lock);
2369 		return 0;
2370 	}
2371 
2372 	if (offset + length > file->append_pos) {
2373 		length = file->append_pos - offset;
2374 	}
2375 
2376 	if (offset != file->next_seq_offset) {
2377 		file->seq_byte_count = 0;
2378 	}
2379 	file->seq_byte_count += length;
2380 	file->next_seq_offset = offset + length;
2381 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2382 		check_readahead(file, offset, channel);
2383 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2384 	}
2385 
2386 	final_length = 0;
2387 	final_offset = offset + length;
2388 	while (offset < final_offset) {
2389 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2390 		if (length > (final_offset - offset)) {
2391 			length = final_offset - offset;
2392 		}
2393 		rc = __file_read(file, payload, offset, length, channel);
2394 		if (rc == 0) {
2395 			final_length += length;
2396 		} else {
2397 			break;
2398 		}
2399 		payload += length;
2400 		offset += length;
2401 		sub_reads++;
2402 	}
2403 	pthread_spin_unlock(&file->lock);
2404 	while (sub_reads-- > 0) {
2405 		sem_wait(&channel->sem);
2406 	}
2407 	if (rc == 0) {
2408 		return final_length;
2409 	} else {
2410 		return rc;
2411 	}
2412 }
2413 
2414 static void
2415 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2416 	   spdk_file_op_complete cb_fn, void *cb_arg)
2417 {
2418 	struct spdk_fs_request *sync_req;
2419 	struct spdk_fs_request *flush_req;
2420 	struct spdk_fs_cb_args *sync_args;
2421 	struct spdk_fs_cb_args *flush_args;
2422 
2423 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2424 
2425 	pthread_spin_lock(&file->lock);
2426 	if (file->append_pos <= file->length_flushed) {
2427 		BLOBFS_TRACE(file, "done - no data to flush\n");
2428 		pthread_spin_unlock(&file->lock);
2429 		cb_fn(cb_arg, 0);
2430 		return;
2431 	}
2432 
2433 	sync_req = alloc_fs_request(channel);
2434 	if (!sync_req) {
2435 		pthread_spin_unlock(&file->lock);
2436 		cb_fn(cb_arg, -ENOMEM);
2437 		return;
2438 	}
2439 	sync_args = &sync_req->args;
2440 
2441 	flush_req = alloc_fs_request(channel);
2442 	if (!flush_req) {
2443 		pthread_spin_unlock(&file->lock);
2444 		cb_fn(cb_arg, -ENOMEM);
2445 		return;
2446 	}
2447 	flush_args = &flush_req->args;
2448 
2449 	sync_args->file = file;
2450 	sync_args->fn.file_op = cb_fn;
2451 	sync_args->arg = cb_arg;
2452 	sync_args->op.sync.offset = file->append_pos;
2453 	sync_args->op.sync.xattr_in_progress = false;
2454 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2455 	pthread_spin_unlock(&file->lock);
2456 
2457 	flush_args->file = file;
2458 	channel->send_request(__file_flush, flush_req);
2459 }
2460 
2461 int
2462 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2463 {
2464 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2465 	struct spdk_fs_cb_args args = {};
2466 
2467 	args.sem = &channel->sem;
2468 	_file_sync(file, channel, __wake_caller, &args);
2469 	sem_wait(&channel->sem);
2470 
2471 	return args.rc;
2472 }
2473 
2474 void
2475 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2476 		     spdk_file_op_complete cb_fn, void *cb_arg)
2477 {
2478 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2479 
2480 	_file_sync(file, channel, cb_fn, cb_arg);
2481 }
2482 
2483 void
2484 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2485 {
2486 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2487 	file->priority = priority;
2488 
2489 }
2490 
2491 /*
2492  * Close routines
2493  */
2494 
2495 static void
2496 __file_close_async_done(void *ctx, int bserrno)
2497 {
2498 	struct spdk_fs_request *req = ctx;
2499 	struct spdk_fs_cb_args *args = &req->args;
2500 	struct spdk_file *file = args->file;
2501 
2502 	if (file->is_deleted) {
2503 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2504 		return;
2505 	}
2506 
2507 	args->fn.file_op(args->arg, bserrno);
2508 	free_fs_request(req);
2509 }
2510 
2511 static void
2512 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2513 {
2514 	struct spdk_blob *blob;
2515 
2516 	pthread_spin_lock(&file->lock);
2517 	if (file->ref_count == 0) {
2518 		pthread_spin_unlock(&file->lock);
2519 		__file_close_async_done(req, -EBADF);
2520 		return;
2521 	}
2522 
2523 	file->ref_count--;
2524 	if (file->ref_count > 0) {
2525 		pthread_spin_unlock(&file->lock);
2526 		req->args.fn.file_op(req->args.arg, 0);
2527 		free_fs_request(req);
2528 		return;
2529 	}
2530 
2531 	pthread_spin_unlock(&file->lock);
2532 
2533 	blob = file->blob;
2534 	file->blob = NULL;
2535 	spdk_blob_close(blob, __file_close_async_done, req);
2536 }
2537 
2538 static void
2539 __file_close_async__sync_done(void *arg, int fserrno)
2540 {
2541 	struct spdk_fs_request *req = arg;
2542 	struct spdk_fs_cb_args *args = &req->args;
2543 
2544 	__file_close_async(args->file, req);
2545 }
2546 
2547 void
2548 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2549 {
2550 	struct spdk_fs_request *req;
2551 	struct spdk_fs_cb_args *args;
2552 
2553 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2554 	if (req == NULL) {
2555 		cb_fn(cb_arg, -ENOMEM);
2556 		return;
2557 	}
2558 
2559 	args = &req->args;
2560 	args->file = file;
2561 	args->fn.file_op = cb_fn;
2562 	args->arg = cb_arg;
2563 
2564 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2565 }
2566 
2567 static void
2568 __file_close(void *arg)
2569 {
2570 	struct spdk_fs_request *req = arg;
2571 	struct spdk_fs_cb_args *args = &req->args;
2572 	struct spdk_file *file = args->file;
2573 
2574 	__file_close_async(file, req);
2575 }
2576 
2577 int
2578 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2579 {
2580 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2581 	struct spdk_fs_request *req;
2582 	struct spdk_fs_cb_args *args;
2583 
2584 	req = alloc_fs_request(channel);
2585 	if (req == NULL) {
2586 		return -ENOMEM;
2587 	}
2588 
2589 	args = &req->args;
2590 
2591 	spdk_file_sync(file, ctx);
2592 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2593 	args->file = file;
2594 	args->sem = &channel->sem;
2595 	args->fn.file_op = __wake_caller;
2596 	args->arg = req;
2597 	channel->send_request(__file_close, req);
2598 	sem_wait(&channel->sem);
2599 
2600 	return args->rc;
2601 }
2602 
2603 int
2604 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2605 {
2606 	if (size < sizeof(spdk_blob_id)) {
2607 		return -EINVAL;
2608 	}
2609 
2610 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2611 
2612 	return sizeof(spdk_blob_id);
2613 }
2614 
2615 static void
2616 cache_free_buffers(struct spdk_file *file)
2617 {
2618 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2619 	pthread_spin_lock(&file->lock);
2620 	pthread_spin_lock(&g_caches_lock);
2621 	if (file->tree->present_mask == 0) {
2622 		pthread_spin_unlock(&g_caches_lock);
2623 		pthread_spin_unlock(&file->lock);
2624 		return;
2625 	}
2626 	spdk_tree_free_buffers(file->tree);
2627 
2628 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2629 	/* If not freed, put it in the end of the queue */
2630 	if (file->tree->present_mask != 0) {
2631 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2632 	}
2633 	file->last = NULL;
2634 	pthread_spin_unlock(&g_caches_lock);
2635 	pthread_spin_unlock(&file->lock);
2636 }
2637 
2638 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2639 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2640