xref: /spdk/lib/blobfs/blobfs.c (revision 9c2aea2ad581e78982146835ecab3b12107d25d6)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	bool from_request;
138 	union {
139 		struct {
140 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
141 		} fs_load;
142 		struct {
143 			uint64_t	length;
144 		} truncate;
145 		struct {
146 			struct spdk_io_channel	*channel;
147 			void		*user_buf;
148 			void		*pin_buf;
149 			int		is_read;
150 			off_t		offset;
151 			size_t		length;
152 			uint64_t	start_lba;
153 			uint64_t	num_lba;
154 			uint32_t	blocklen;
155 		} rw;
156 		struct {
157 			const char	*old_name;
158 			const char	*new_name;
159 		} rename;
160 		struct {
161 			struct cache_buffer	*cache_buffer;
162 			uint64_t		length;
163 		} flush;
164 		struct {
165 			struct cache_buffer	*cache_buffer;
166 			uint64_t		length;
167 			uint64_t		offset;
168 		} readahead;
169 		struct {
170 			uint64_t			offset;
171 			TAILQ_ENTRY(spdk_fs_request)	tailq;
172 			bool				xattr_in_progress;
173 		} sync;
174 		struct {
175 			uint32_t			num_clusters;
176 		} resize;
177 		struct {
178 			const char	*name;
179 			uint32_t	flags;
180 			TAILQ_ENTRY(spdk_fs_request)	tailq;
181 		} open;
182 		struct {
183 			const char		*name;
184 			struct spdk_blob	*blob;
185 		} create;
186 		struct {
187 			const char	*name;
188 		} delete;
189 		struct {
190 			const char	*name;
191 		} stat;
192 	} op;
193 };
194 
195 static void cache_free_buffers(struct spdk_file *file);
196 
197 void
198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
199 {
200 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
201 }
202 
203 static void
204 __initialize_cache(void)
205 {
206 	assert(g_cache_pool == NULL);
207 
208 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
209 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
210 					   CACHE_BUFFER_SIZE,
211 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 					   SPDK_ENV_SOCKET_ID_ANY);
213 	if (!g_cache_pool) {
214 		SPDK_ERRLOG("Create mempool failed, you may "
215 			    "increase the memory and try again\n");
216 		assert(false);
217 	}
218 	TAILQ_INIT(&g_caches);
219 	pthread_spin_init(&g_caches_lock, 0);
220 }
221 
222 static void
223 __free_cache(void)
224 {
225 	assert(g_cache_pool != NULL);
226 
227 	spdk_mempool_free(g_cache_pool);
228 	g_cache_pool = NULL;
229 }
230 
231 static uint64_t
232 __file_get_blob_size(struct spdk_file *file)
233 {
234 	uint64_t cluster_sz;
235 
236 	cluster_sz = file->fs->bs_opts.cluster_sz;
237 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
238 }
239 
240 struct spdk_fs_request {
241 	struct spdk_fs_cb_args		args;
242 	TAILQ_ENTRY(spdk_fs_request)	link;
243 	struct spdk_fs_channel		*channel;
244 };
245 
246 struct spdk_fs_channel {
247 	struct spdk_fs_request		*req_mem;
248 	TAILQ_HEAD(, spdk_fs_request)	reqs;
249 	sem_t				sem;
250 	struct spdk_filesystem		*fs;
251 	struct spdk_io_channel		*bs_channel;
252 	fs_send_request_fn		send_request;
253 	bool				sync;
254 	pthread_spinlock_t		lock;
255 };
256 
257 static struct spdk_fs_request *
258 alloc_fs_request(struct spdk_fs_channel *channel)
259 {
260 	struct spdk_fs_request *req;
261 
262 	if (channel->sync) {
263 		pthread_spin_lock(&channel->lock);
264 	}
265 
266 	req = TAILQ_FIRST(&channel->reqs);
267 	if (req) {
268 		TAILQ_REMOVE(&channel->reqs, req, link);
269 	}
270 
271 	if (channel->sync) {
272 		pthread_spin_unlock(&channel->lock);
273 	}
274 
275 	if (req == NULL) {
276 		return NULL;
277 	}
278 	memset(req, 0, sizeof(*req));
279 	req->channel = channel;
280 	req->args.from_request = true;
281 
282 	return req;
283 }
284 
285 static void
286 free_fs_request(struct spdk_fs_request *req)
287 {
288 	struct spdk_fs_channel *channel = req->channel;
289 
290 	if (channel->sync) {
291 		pthread_spin_lock(&channel->lock);
292 	}
293 
294 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
295 
296 	if (channel->sync) {
297 		pthread_spin_unlock(&channel->lock);
298 	}
299 }
300 
301 static int
302 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
303 			uint32_t max_ops)
304 {
305 	uint32_t i;
306 
307 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
308 	if (!channel->req_mem) {
309 		return -1;
310 	}
311 
312 	TAILQ_INIT(&channel->reqs);
313 	sem_init(&channel->sem, 0, 0);
314 
315 	for (i = 0; i < max_ops; i++) {
316 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
317 	}
318 
319 	channel->fs = fs;
320 
321 	return 0;
322 }
323 
324 static int
325 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
326 {
327 	struct spdk_filesystem		*fs;
328 	struct spdk_fs_channel		*channel = ctx_buf;
329 
330 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
331 
332 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
333 }
334 
335 static int
336 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
337 {
338 	struct spdk_filesystem		*fs;
339 	struct spdk_fs_channel		*channel = ctx_buf;
340 
341 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
342 
343 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
344 }
345 
346 static int
347 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
348 {
349 	struct spdk_filesystem		*fs;
350 	struct spdk_fs_channel		*channel = ctx_buf;
351 
352 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
353 
354 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
355 }
356 
357 static void
358 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
359 {
360 	struct spdk_fs_channel *channel = ctx_buf;
361 
362 	free(channel->req_mem);
363 	if (channel->bs_channel != NULL) {
364 		spdk_bs_free_io_channel(channel->bs_channel);
365 	}
366 }
367 
368 static void
369 __send_request_direct(fs_request_fn fn, void *arg)
370 {
371 	fn(arg);
372 }
373 
374 static void
375 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
376 {
377 	fs->bs = bs;
378 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
379 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
380 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
381 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
382 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
383 
384 	pthread_mutex_lock(&g_cache_init_lock);
385 	if (g_fs_count == 0) {
386 		__initialize_cache();
387 	}
388 	g_fs_count++;
389 	pthread_mutex_unlock(&g_cache_init_lock);
390 }
391 
392 static void
393 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
394 {
395 	struct spdk_fs_request *req = ctx;
396 	struct spdk_fs_cb_args *args = &req->args;
397 	struct spdk_filesystem *fs = args->fs;
398 
399 	if (bserrno == 0) {
400 		common_fs_bs_init(fs, bs);
401 	} else {
402 		free(fs);
403 		fs = NULL;
404 	}
405 
406 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
407 	free_fs_request(req);
408 }
409 
410 static void
411 fs_conf_parse(void)
412 {
413 	struct spdk_conf_section *sp;
414 
415 	sp = spdk_conf_find_section(NULL, "Blobfs");
416 	if (sp == NULL) {
417 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
418 		return;
419 	}
420 
421 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
422 	if (g_fs_cache_buffer_shift <= 0) {
423 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
424 	}
425 }
426 
427 static struct spdk_filesystem *
428 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
429 {
430 	struct spdk_filesystem *fs;
431 
432 	fs = calloc(1, sizeof(*fs));
433 	if (fs == NULL) {
434 		return NULL;
435 	}
436 
437 	fs->bdev = dev;
438 	fs->send_request = send_request_fn;
439 	TAILQ_INIT(&fs->files);
440 
441 	fs->md_target.max_ops = 512;
442 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
443 				sizeof(struct spdk_fs_channel), "blobfs_md");
444 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
445 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
446 
447 	fs->sync_target.max_ops = 512;
448 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
449 				sizeof(struct spdk_fs_channel), "blobfs_sync");
450 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
451 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
452 
453 	fs->io_target.max_ops = 512;
454 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
455 				sizeof(struct spdk_fs_channel), "blobfs_io");
456 
457 	return fs;
458 }
459 
460 static void
461 __wake_caller(void *arg, int fserrno)
462 {
463 	struct spdk_fs_cb_args *args = arg;
464 
465 	args->rc = fserrno;
466 	sem_post(args->sem);
467 }
468 
469 void
470 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
471 	     fs_send_request_fn send_request_fn,
472 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
473 {
474 	struct spdk_filesystem *fs;
475 	struct spdk_fs_request *req;
476 	struct spdk_fs_cb_args *args;
477 	struct spdk_bs_opts opts = {};
478 
479 	fs = fs_alloc(dev, send_request_fn);
480 	if (fs == NULL) {
481 		cb_fn(cb_arg, NULL, -ENOMEM);
482 		return;
483 	}
484 
485 	fs_conf_parse();
486 
487 	req = alloc_fs_request(fs->md_target.md_fs_channel);
488 	if (req == NULL) {
489 		spdk_put_io_channel(fs->md_target.md_io_channel);
490 		spdk_io_device_unregister(&fs->md_target, NULL);
491 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
492 		spdk_io_device_unregister(&fs->sync_target, NULL);
493 		spdk_io_device_unregister(&fs->io_target, NULL);
494 		free(fs);
495 		cb_fn(cb_arg, NULL, -ENOMEM);
496 		return;
497 	}
498 
499 	args = &req->args;
500 	args->fn.fs_op_with_handle = cb_fn;
501 	args->arg = cb_arg;
502 	args->fs = fs;
503 
504 	spdk_bs_opts_init(&opts);
505 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
506 	if (opt) {
507 		opts.cluster_sz = opt->cluster_sz;
508 	}
509 	spdk_bs_init(dev, &opts, init_cb, req);
510 }
511 
512 static struct spdk_file *
513 file_alloc(struct spdk_filesystem *fs)
514 {
515 	struct spdk_file *file;
516 
517 	file = calloc(1, sizeof(*file));
518 	if (file == NULL) {
519 		return NULL;
520 	}
521 
522 	file->tree = calloc(1, sizeof(*file->tree));
523 	if (file->tree == NULL) {
524 		free(file);
525 		return NULL;
526 	}
527 
528 	file->fs = fs;
529 	TAILQ_INIT(&file->open_requests);
530 	TAILQ_INIT(&file->sync_requests);
531 	pthread_spin_init(&file->lock, 0);
532 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
533 	file->priority = SPDK_FILE_PRIORITY_LOW;
534 	return file;
535 }
536 
537 static void fs_load_done(void *ctx, int bserrno);
538 
539 static int
540 _handle_deleted_files(struct spdk_fs_request *req)
541 {
542 	struct spdk_fs_cb_args *args = &req->args;
543 	struct spdk_filesystem *fs = args->fs;
544 
545 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
546 		struct spdk_deleted_file *deleted_file;
547 
548 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
549 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
550 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
551 		free(deleted_file);
552 		return 0;
553 	}
554 
555 	return 1;
556 }
557 
558 static void
559 fs_load_done(void *ctx, int bserrno)
560 {
561 	struct spdk_fs_request *req = ctx;
562 	struct spdk_fs_cb_args *args = &req->args;
563 	struct spdk_filesystem *fs = args->fs;
564 
565 	/* The filesystem has been loaded.  Now check if there are any files that
566 	 *  were marked for deletion before last unload.  Do not complete the
567 	 *  fs_load callback until all of them have been deleted on disk.
568 	 */
569 	if (_handle_deleted_files(req) == 0) {
570 		/* We found a file that's been marked for deleting but not actually
571 		 *  deleted yet.  This function will get called again once the delete
572 		 *  operation is completed.
573 		 */
574 		return;
575 	}
576 
577 	args->fn.fs_op_with_handle(args->arg, fs, 0);
578 	free_fs_request(req);
579 
580 }
581 
582 static void
583 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
584 {
585 	struct spdk_fs_request *req = ctx;
586 	struct spdk_fs_cb_args *args = &req->args;
587 	struct spdk_filesystem *fs = args->fs;
588 	uint64_t *length;
589 	const char *name;
590 	uint32_t *is_deleted;
591 	size_t value_len;
592 
593 	if (rc < 0) {
594 		args->fn.fs_op_with_handle(args->arg, fs, rc);
595 		free_fs_request(req);
596 		return;
597 	}
598 
599 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
600 	if (rc < 0) {
601 		args->fn.fs_op_with_handle(args->arg, fs, rc);
602 		free_fs_request(req);
603 		return;
604 	}
605 
606 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
607 	if (rc < 0) {
608 		args->fn.fs_op_with_handle(args->arg, fs, rc);
609 		free_fs_request(req);
610 		return;
611 	}
612 
613 	assert(value_len == 8);
614 
615 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
616 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
617 	if (rc < 0) {
618 		struct spdk_file *f;
619 
620 		f = file_alloc(fs);
621 		if (f == NULL) {
622 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
623 			free_fs_request(req);
624 			return;
625 		}
626 
627 		f->name = strdup(name);
628 		f->blobid = spdk_blob_get_id(blob);
629 		f->length = *length;
630 		f->length_flushed = *length;
631 		f->append_pos = *length;
632 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
633 	} else {
634 		struct spdk_deleted_file *deleted_file;
635 
636 		deleted_file = calloc(1, sizeof(*deleted_file));
637 		if (deleted_file == NULL) {
638 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
639 			free_fs_request(req);
640 			return;
641 		}
642 		deleted_file->id = spdk_blob_get_id(blob);
643 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
644 	}
645 }
646 
647 static void
648 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
649 {
650 	struct spdk_fs_request *req = ctx;
651 	struct spdk_fs_cb_args *args = &req->args;
652 	struct spdk_filesystem *fs = args->fs;
653 	struct spdk_bs_type bstype;
654 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
655 	static const struct spdk_bs_type zeros;
656 
657 	if (bserrno != 0) {
658 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
659 		free_fs_request(req);
660 		free(fs);
661 		return;
662 	}
663 
664 	bstype = spdk_bs_get_bstype(bs);
665 
666 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
667 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
668 		spdk_bs_set_bstype(bs, blobfs_type);
669 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
670 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
671 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
672 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
673 		free_fs_request(req);
674 		free(fs);
675 		return;
676 	}
677 
678 	common_fs_bs_init(fs, bs);
679 	fs_load_done(req, 0);
680 }
681 
682 static void
683 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
684 {
685 	assert(fs != NULL);
686 	spdk_io_device_unregister(&fs->md_target, NULL);
687 	spdk_io_device_unregister(&fs->sync_target, NULL);
688 	spdk_io_device_unregister(&fs->io_target, NULL);
689 	free(fs);
690 }
691 
692 static void
693 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
694 {
695 	assert(fs != NULL);
696 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
697 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
698 }
699 
700 void
701 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
702 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
703 {
704 	struct spdk_filesystem *fs;
705 	struct spdk_fs_cb_args *args;
706 	struct spdk_fs_request *req;
707 	struct spdk_bs_opts	bs_opts;
708 
709 	fs = fs_alloc(dev, send_request_fn);
710 	if (fs == NULL) {
711 		cb_fn(cb_arg, NULL, -ENOMEM);
712 		return;
713 	}
714 
715 	fs_conf_parse();
716 
717 	req = alloc_fs_request(fs->md_target.md_fs_channel);
718 	if (req == NULL) {
719 		spdk_fs_free_io_channels(fs);
720 		spdk_fs_io_device_unregister(fs);
721 		cb_fn(cb_arg, NULL, -ENOMEM);
722 		return;
723 	}
724 
725 	args = &req->args;
726 	args->fn.fs_op_with_handle = cb_fn;
727 	args->arg = cb_arg;
728 	args->fs = fs;
729 	TAILQ_INIT(&args->op.fs_load.deleted_files);
730 	spdk_bs_opts_init(&bs_opts);
731 	bs_opts.iter_cb_fn = iter_cb;
732 	bs_opts.iter_cb_arg = req;
733 	spdk_bs_load(dev, &bs_opts, load_cb, req);
734 }
735 
736 static void
737 unload_cb(void *ctx, int bserrno)
738 {
739 	struct spdk_fs_request *req = ctx;
740 	struct spdk_fs_cb_args *args = &req->args;
741 	struct spdk_filesystem *fs = args->fs;
742 	struct spdk_file *file, *tmp;
743 
744 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
745 		TAILQ_REMOVE(&fs->files, file, tailq);
746 		cache_free_buffers(file);
747 		free(file->name);
748 		free(file->tree);
749 		free(file);
750 	}
751 
752 	pthread_mutex_lock(&g_cache_init_lock);
753 	g_fs_count--;
754 	if (g_fs_count == 0) {
755 		__free_cache();
756 	}
757 	pthread_mutex_unlock(&g_cache_init_lock);
758 
759 	args->fn.fs_op(args->arg, bserrno);
760 	free(req);
761 
762 	spdk_fs_io_device_unregister(fs);
763 }
764 
765 void
766 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
767 {
768 	struct spdk_fs_request *req;
769 	struct spdk_fs_cb_args *args;
770 
771 	/*
772 	 * We must free the md_channel before unloading the blobstore, so just
773 	 *  allocate this request from the general heap.
774 	 */
775 	req = calloc(1, sizeof(*req));
776 	if (req == NULL) {
777 		cb_fn(cb_arg, -ENOMEM);
778 		return;
779 	}
780 
781 	args = &req->args;
782 	args->fn.fs_op = cb_fn;
783 	args->arg = cb_arg;
784 	args->fs = fs;
785 
786 	spdk_fs_free_io_channels(fs);
787 	spdk_bs_unload(fs->bs, unload_cb, req);
788 }
789 
790 static struct spdk_file *
791 fs_find_file(struct spdk_filesystem *fs, const char *name)
792 {
793 	struct spdk_file *file;
794 
795 	TAILQ_FOREACH(file, &fs->files, tailq) {
796 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
797 			return file;
798 		}
799 	}
800 
801 	return NULL;
802 }
803 
804 void
805 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
806 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
807 {
808 	struct spdk_file_stat stat;
809 	struct spdk_file *f = NULL;
810 
811 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
812 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
813 		return;
814 	}
815 
816 	f = fs_find_file(fs, name);
817 	if (f != NULL) {
818 		stat.blobid = f->blobid;
819 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
820 		cb_fn(cb_arg, &stat, 0);
821 		return;
822 	}
823 
824 	cb_fn(cb_arg, NULL, -ENOENT);
825 }
826 
827 static void
828 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
829 {
830 	struct spdk_fs_request *req = arg;
831 	struct spdk_fs_cb_args *args = &req->args;
832 
833 	args->rc = fserrno;
834 	if (fserrno == 0) {
835 		memcpy(args->arg, stat, sizeof(*stat));
836 	}
837 	sem_post(args->sem);
838 }
839 
840 static void
841 __file_stat(void *arg)
842 {
843 	struct spdk_fs_request *req = arg;
844 	struct spdk_fs_cb_args *args = &req->args;
845 
846 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
847 				args->fn.stat_op, req);
848 }
849 
850 int
851 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
852 		  const char *name, struct spdk_file_stat *stat)
853 {
854 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
855 	struct spdk_fs_request *req;
856 	int rc;
857 
858 	req = alloc_fs_request(channel);
859 	if (req == NULL) {
860 		return -ENOMEM;
861 	}
862 
863 	req->args.fs = fs;
864 	req->args.op.stat.name = name;
865 	req->args.fn.stat_op = __copy_stat;
866 	req->args.arg = stat;
867 	req->args.sem = &channel->sem;
868 	channel->send_request(__file_stat, req);
869 	sem_wait(&channel->sem);
870 
871 	rc = req->args.rc;
872 	free_fs_request(req);
873 
874 	return rc;
875 }
876 
877 static void
878 fs_create_blob_close_cb(void *ctx, int bserrno)
879 {
880 	int rc;
881 	struct spdk_fs_request *req = ctx;
882 	struct spdk_fs_cb_args *args = &req->args;
883 
884 	rc = args->rc ? args->rc : bserrno;
885 	args->fn.file_op(args->arg, rc);
886 	free_fs_request(req);
887 }
888 
889 static void
890 fs_create_blob_resize_cb(void *ctx, int bserrno)
891 {
892 	struct spdk_fs_request *req = ctx;
893 	struct spdk_fs_cb_args *args = &req->args;
894 	struct spdk_file *f = args->file;
895 	struct spdk_blob *blob = args->op.create.blob;
896 	uint64_t length = 0;
897 
898 	args->rc = bserrno;
899 	if (bserrno) {
900 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
901 		return;
902 	}
903 
904 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
905 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
906 
907 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
908 }
909 
910 static void
911 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
912 {
913 	struct spdk_fs_request *req = ctx;
914 	struct spdk_fs_cb_args *args = &req->args;
915 
916 	if (bserrno) {
917 		args->fn.file_op(args->arg, bserrno);
918 		free_fs_request(req);
919 		return;
920 	}
921 
922 	args->op.create.blob = blob;
923 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
924 }
925 
926 static void
927 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
928 {
929 	struct spdk_fs_request *req = ctx;
930 	struct spdk_fs_cb_args *args = &req->args;
931 	struct spdk_file *f = args->file;
932 
933 	if (bserrno) {
934 		args->fn.file_op(args->arg, bserrno);
935 		free_fs_request(req);
936 		return;
937 	}
938 
939 	f->blobid = blobid;
940 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
941 }
942 
943 void
944 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
945 			  spdk_file_op_complete cb_fn, void *cb_arg)
946 {
947 	struct spdk_file *file;
948 	struct spdk_fs_request *req;
949 	struct spdk_fs_cb_args *args;
950 
951 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
952 		cb_fn(cb_arg, -ENAMETOOLONG);
953 		return;
954 	}
955 
956 	file = fs_find_file(fs, name);
957 	if (file != NULL) {
958 		cb_fn(cb_arg, -EEXIST);
959 		return;
960 	}
961 
962 	file = file_alloc(fs);
963 	if (file == NULL) {
964 		cb_fn(cb_arg, -ENOMEM);
965 		return;
966 	}
967 
968 	req = alloc_fs_request(fs->md_target.md_fs_channel);
969 	if (req == NULL) {
970 		cb_fn(cb_arg, -ENOMEM);
971 		return;
972 	}
973 
974 	args = &req->args;
975 	args->file = file;
976 	args->fn.file_op = cb_fn;
977 	args->arg = cb_arg;
978 
979 	file->name = strdup(name);
980 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
981 }
982 
983 static void
984 __fs_create_file_done(void *arg, int fserrno)
985 {
986 	struct spdk_fs_request *req = arg;
987 	struct spdk_fs_cb_args *args = &req->args;
988 
989 	args->rc = fserrno;
990 	sem_post(args->sem);
991 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
992 }
993 
994 static void
995 __fs_create_file(void *arg)
996 {
997 	struct spdk_fs_request *req = arg;
998 	struct spdk_fs_cb_args *args = &req->args;
999 
1000 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1001 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1002 }
1003 
1004 int
1005 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
1006 {
1007 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1008 	struct spdk_fs_request *req;
1009 	struct spdk_fs_cb_args *args;
1010 	int rc;
1011 
1012 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1013 
1014 	req = alloc_fs_request(channel);
1015 	if (req == NULL) {
1016 		return -ENOMEM;
1017 	}
1018 
1019 	args = &req->args;
1020 	args->fs = fs;
1021 	args->op.create.name = name;
1022 	args->sem = &channel->sem;
1023 	fs->send_request(__fs_create_file, req);
1024 	sem_wait(&channel->sem);
1025 	rc = args->rc;
1026 	free_fs_request(req);
1027 
1028 	return rc;
1029 }
1030 
1031 static void
1032 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1033 {
1034 	struct spdk_fs_request *req = ctx;
1035 	struct spdk_fs_cb_args *args = &req->args;
1036 	struct spdk_file *f = args->file;
1037 
1038 	f->blob = blob;
1039 	while (!TAILQ_EMPTY(&f->open_requests)) {
1040 		req = TAILQ_FIRST(&f->open_requests);
1041 		args = &req->args;
1042 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1043 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1044 		free_fs_request(req);
1045 	}
1046 }
1047 
1048 static void
1049 fs_open_blob_create_cb(void *ctx, int bserrno)
1050 {
1051 	struct spdk_fs_request *req = ctx;
1052 	struct spdk_fs_cb_args *args = &req->args;
1053 	struct spdk_file *file = args->file;
1054 	struct spdk_filesystem *fs = args->fs;
1055 
1056 	if (file == NULL) {
1057 		/*
1058 		 * This is from an open with CREATE flag - the file
1059 		 *  is now created so look it up in the file list for this
1060 		 *  filesystem.
1061 		 */
1062 		file = fs_find_file(fs, args->op.open.name);
1063 		assert(file != NULL);
1064 		args->file = file;
1065 	}
1066 
1067 	file->ref_count++;
1068 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1069 	if (file->ref_count == 1) {
1070 		assert(file->blob == NULL);
1071 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1072 	} else if (file->blob != NULL) {
1073 		fs_open_blob_done(req, file->blob, 0);
1074 	} else {
1075 		/*
1076 		 * The blob open for this file is in progress due to a previous
1077 		 *  open request.  When that open completes, it will invoke the
1078 		 *  open callback for this request.
1079 		 */
1080 	}
1081 }
1082 
1083 void
1084 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1085 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1086 {
1087 	struct spdk_file *f = NULL;
1088 	struct spdk_fs_request *req;
1089 	struct spdk_fs_cb_args *args;
1090 
1091 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1092 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1093 		return;
1094 	}
1095 
1096 	f = fs_find_file(fs, name);
1097 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1098 		cb_fn(cb_arg, NULL, -ENOENT);
1099 		return;
1100 	}
1101 
1102 	if (f != NULL && f->is_deleted == true) {
1103 		cb_fn(cb_arg, NULL, -ENOENT);
1104 		return;
1105 	}
1106 
1107 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1108 	if (req == NULL) {
1109 		cb_fn(cb_arg, NULL, -ENOMEM);
1110 		return;
1111 	}
1112 
1113 	args = &req->args;
1114 	args->fn.file_op_with_handle = cb_fn;
1115 	args->arg = cb_arg;
1116 	args->file = f;
1117 	args->fs = fs;
1118 	args->op.open.name = name;
1119 
1120 	if (f == NULL) {
1121 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1122 	} else {
1123 		fs_open_blob_create_cb(req, 0);
1124 	}
1125 }
1126 
1127 static void
1128 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1129 {
1130 	struct spdk_fs_request *req = arg;
1131 	struct spdk_fs_cb_args *args = &req->args;
1132 
1133 	args->file = file;
1134 	__wake_caller(args, bserrno);
1135 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1136 }
1137 
1138 static void
1139 __fs_open_file(void *arg)
1140 {
1141 	struct spdk_fs_request *req = arg;
1142 	struct spdk_fs_cb_args *args = &req->args;
1143 
1144 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1145 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1146 				__fs_open_file_done, req);
1147 }
1148 
1149 int
1150 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1151 		  const char *name, uint32_t flags, struct spdk_file **file)
1152 {
1153 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1154 	struct spdk_fs_request *req;
1155 	struct spdk_fs_cb_args *args;
1156 	int rc;
1157 
1158 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1159 
1160 	req = alloc_fs_request(channel);
1161 	if (req == NULL) {
1162 		return -ENOMEM;
1163 	}
1164 
1165 	args = &req->args;
1166 	args->fs = fs;
1167 	args->op.open.name = name;
1168 	args->op.open.flags = flags;
1169 	args->sem = &channel->sem;
1170 	fs->send_request(__fs_open_file, req);
1171 	sem_wait(&channel->sem);
1172 	rc = args->rc;
1173 	if (rc == 0) {
1174 		*file = args->file;
1175 	} else {
1176 		*file = NULL;
1177 	}
1178 	free_fs_request(req);
1179 
1180 	return rc;
1181 }
1182 
1183 static void
1184 fs_rename_blob_close_cb(void *ctx, int bserrno)
1185 {
1186 	struct spdk_fs_request *req = ctx;
1187 	struct spdk_fs_cb_args *args = &req->args;
1188 
1189 	args->fn.fs_op(args->arg, bserrno);
1190 	free_fs_request(req);
1191 }
1192 
1193 static void
1194 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1195 {
1196 	struct spdk_fs_request *req = ctx;
1197 	struct spdk_fs_cb_args *args = &req->args;
1198 	const char *new_name = args->op.rename.new_name;
1199 
1200 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1201 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1202 }
1203 
1204 static void
1205 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1206 {
1207 	struct spdk_fs_cb_args *args = &req->args;
1208 	struct spdk_file *f;
1209 
1210 	f = fs_find_file(args->fs, args->op.rename.old_name);
1211 	if (f == NULL) {
1212 		args->fn.fs_op(args->arg, -ENOENT);
1213 		free_fs_request(req);
1214 		return;
1215 	}
1216 
1217 	free(f->name);
1218 	f->name = strdup(args->op.rename.new_name);
1219 	args->file = f;
1220 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1221 }
1222 
1223 static void
1224 fs_rename_delete_done(void *arg, int fserrno)
1225 {
1226 	__spdk_fs_md_rename_file(arg);
1227 }
1228 
1229 void
1230 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1231 			  const char *old_name, const char *new_name,
1232 			  spdk_file_op_complete cb_fn, void *cb_arg)
1233 {
1234 	struct spdk_file *f;
1235 	struct spdk_fs_request *req;
1236 	struct spdk_fs_cb_args *args;
1237 
1238 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1239 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1240 		cb_fn(cb_arg, -ENAMETOOLONG);
1241 		return;
1242 	}
1243 
1244 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1245 	if (req == NULL) {
1246 		cb_fn(cb_arg, -ENOMEM);
1247 		return;
1248 	}
1249 
1250 	args = &req->args;
1251 	args->fn.fs_op = cb_fn;
1252 	args->fs = fs;
1253 	args->arg = cb_arg;
1254 	args->op.rename.old_name = old_name;
1255 	args->op.rename.new_name = new_name;
1256 
1257 	f = fs_find_file(fs, new_name);
1258 	if (f == NULL) {
1259 		__spdk_fs_md_rename_file(req);
1260 		return;
1261 	}
1262 
1263 	/*
1264 	 * The rename overwrites an existing file.  So delete the existing file, then
1265 	 *  do the actual rename.
1266 	 */
1267 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1268 }
1269 
1270 static void
1271 __fs_rename_file_done(void *arg, int fserrno)
1272 {
1273 	struct spdk_fs_request *req = arg;
1274 	struct spdk_fs_cb_args *args = &req->args;
1275 
1276 	__wake_caller(args, fserrno);
1277 }
1278 
1279 static void
1280 __fs_rename_file(void *arg)
1281 {
1282 	struct spdk_fs_request *req = arg;
1283 	struct spdk_fs_cb_args *args = &req->args;
1284 
1285 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1286 				  __fs_rename_file_done, req);
1287 }
1288 
1289 int
1290 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1291 		    const char *old_name, const char *new_name)
1292 {
1293 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1294 	struct spdk_fs_request *req;
1295 	struct spdk_fs_cb_args *args;
1296 	int rc;
1297 
1298 	req = alloc_fs_request(channel);
1299 	if (req == NULL) {
1300 		return -ENOMEM;
1301 	}
1302 
1303 	args = &req->args;
1304 
1305 	args->fs = fs;
1306 	args->op.rename.old_name = old_name;
1307 	args->op.rename.new_name = new_name;
1308 	args->sem = &channel->sem;
1309 	fs->send_request(__fs_rename_file, req);
1310 	sem_wait(&channel->sem);
1311 	rc = args->rc;
1312 	free_fs_request(req);
1313 	return rc;
1314 }
1315 
1316 static void
1317 blob_delete_cb(void *ctx, int bserrno)
1318 {
1319 	struct spdk_fs_request *req = ctx;
1320 	struct spdk_fs_cb_args *args = &req->args;
1321 
1322 	args->fn.file_op(args->arg, bserrno);
1323 	free_fs_request(req);
1324 }
1325 
1326 void
1327 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1328 			  spdk_file_op_complete cb_fn, void *cb_arg)
1329 {
1330 	struct spdk_file *f;
1331 	spdk_blob_id blobid;
1332 	struct spdk_fs_request *req;
1333 	struct spdk_fs_cb_args *args;
1334 
1335 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1336 
1337 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1338 		cb_fn(cb_arg, -ENAMETOOLONG);
1339 		return;
1340 	}
1341 
1342 	f = fs_find_file(fs, name);
1343 	if (f == NULL) {
1344 		cb_fn(cb_arg, -ENOENT);
1345 		return;
1346 	}
1347 
1348 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1349 	if (req == NULL) {
1350 		cb_fn(cb_arg, -ENOMEM);
1351 		return;
1352 	}
1353 
1354 	args = &req->args;
1355 	args->fn.file_op = cb_fn;
1356 	args->arg = cb_arg;
1357 
1358 	if (f->ref_count > 0) {
1359 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1360 		f->is_deleted = true;
1361 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1362 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1363 		return;
1364 	}
1365 
1366 	TAILQ_REMOVE(&fs->files, f, tailq);
1367 
1368 	cache_free_buffers(f);
1369 
1370 	blobid = f->blobid;
1371 
1372 	free(f->name);
1373 	free(f->tree);
1374 	free(f);
1375 
1376 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1377 }
1378 
1379 static void
1380 __fs_delete_file_done(void *arg, int fserrno)
1381 {
1382 	struct spdk_fs_request *req = arg;
1383 	struct spdk_fs_cb_args *args = &req->args;
1384 
1385 	__wake_caller(args, fserrno);
1386 }
1387 
1388 static void
1389 __fs_delete_file(void *arg)
1390 {
1391 	struct spdk_fs_request *req = arg;
1392 	struct spdk_fs_cb_args *args = &req->args;
1393 
1394 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1395 }
1396 
1397 int
1398 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1399 		    const char *name)
1400 {
1401 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1402 	struct spdk_fs_request *req;
1403 	struct spdk_fs_cb_args *args;
1404 	int rc;
1405 
1406 	req = alloc_fs_request(channel);
1407 	if (req == NULL) {
1408 		return -ENOMEM;
1409 	}
1410 
1411 	args = &req->args;
1412 	args->fs = fs;
1413 	args->op.delete.name = name;
1414 	args->sem = &channel->sem;
1415 	fs->send_request(__fs_delete_file, req);
1416 	sem_wait(&channel->sem);
1417 	rc = args->rc;
1418 	free_fs_request(req);
1419 
1420 	return rc;
1421 }
1422 
1423 spdk_fs_iter
1424 spdk_fs_iter_first(struct spdk_filesystem *fs)
1425 {
1426 	struct spdk_file *f;
1427 
1428 	f = TAILQ_FIRST(&fs->files);
1429 	return f;
1430 }
1431 
1432 spdk_fs_iter
1433 spdk_fs_iter_next(spdk_fs_iter iter)
1434 {
1435 	struct spdk_file *f = iter;
1436 
1437 	if (f == NULL) {
1438 		return NULL;
1439 	}
1440 
1441 	f = TAILQ_NEXT(f, tailq);
1442 	return f;
1443 }
1444 
1445 const char *
1446 spdk_file_get_name(struct spdk_file *file)
1447 {
1448 	return file->name;
1449 }
1450 
1451 uint64_t
1452 spdk_file_get_length(struct spdk_file *file)
1453 {
1454 	assert(file != NULL);
1455 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1456 	return file->length;
1457 }
1458 
1459 static void
1460 fs_truncate_complete_cb(void *ctx, int bserrno)
1461 {
1462 	struct spdk_fs_request *req = ctx;
1463 	struct spdk_fs_cb_args *args = &req->args;
1464 
1465 	args->fn.file_op(args->arg, bserrno);
1466 	free_fs_request(req);
1467 }
1468 
1469 static void
1470 fs_truncate_resize_cb(void *ctx, int bserrno)
1471 {
1472 	struct spdk_fs_request *req = ctx;
1473 	struct spdk_fs_cb_args *args = &req->args;
1474 	struct spdk_file *file = args->file;
1475 	uint64_t *length = &args->op.truncate.length;
1476 
1477 	if (bserrno) {
1478 		args->fn.file_op(args->arg, bserrno);
1479 		free_fs_request(req);
1480 		return;
1481 	}
1482 
1483 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1484 
1485 	file->length = *length;
1486 	if (file->append_pos > file->length) {
1487 		file->append_pos = file->length;
1488 	}
1489 
1490 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1491 }
1492 
1493 static uint64_t
1494 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1495 {
1496 	return (length + cluster_sz - 1) / cluster_sz;
1497 }
1498 
1499 void
1500 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1501 			 spdk_file_op_complete cb_fn, void *cb_arg)
1502 {
1503 	struct spdk_filesystem *fs;
1504 	size_t num_clusters;
1505 	struct spdk_fs_request *req;
1506 	struct spdk_fs_cb_args *args;
1507 
1508 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1509 	if (length == file->length) {
1510 		cb_fn(cb_arg, 0);
1511 		return;
1512 	}
1513 
1514 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1515 	if (req == NULL) {
1516 		cb_fn(cb_arg, -ENOMEM);
1517 		return;
1518 	}
1519 
1520 	args = &req->args;
1521 	args->fn.file_op = cb_fn;
1522 	args->arg = cb_arg;
1523 	args->file = file;
1524 	args->op.truncate.length = length;
1525 	fs = file->fs;
1526 
1527 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1528 
1529 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1530 }
1531 
1532 static void
1533 __truncate(void *arg)
1534 {
1535 	struct spdk_fs_request *req = arg;
1536 	struct spdk_fs_cb_args *args = &req->args;
1537 
1538 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1539 				 args->fn.file_op, args);
1540 }
1541 
1542 int
1543 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1544 		   uint64_t length)
1545 {
1546 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1547 	struct spdk_fs_request *req;
1548 	struct spdk_fs_cb_args *args;
1549 	int rc;
1550 
1551 	req = alloc_fs_request(channel);
1552 	if (req == NULL) {
1553 		return -ENOMEM;
1554 	}
1555 
1556 	args = &req->args;
1557 
1558 	args->file = file;
1559 	args->op.truncate.length = length;
1560 	args->fn.file_op = __wake_caller;
1561 	args->sem = &channel->sem;
1562 
1563 	channel->send_request(__truncate, req);
1564 	sem_wait(&channel->sem);
1565 	rc = args->rc;
1566 	free_fs_request(req);
1567 
1568 	return rc;
1569 }
1570 
1571 static void
1572 __rw_done(void *ctx, int bserrno)
1573 {
1574 	struct spdk_fs_request *req = ctx;
1575 	struct spdk_fs_cb_args *args = &req->args;
1576 
1577 	spdk_dma_free(args->op.rw.pin_buf);
1578 	args->fn.file_op(args->arg, bserrno);
1579 	free_fs_request(req);
1580 }
1581 
1582 static void
1583 __read_done(void *ctx, int bserrno)
1584 {
1585 	struct spdk_fs_request *req = ctx;
1586 	struct spdk_fs_cb_args *args = &req->args;
1587 
1588 	assert(req != NULL);
1589 	if (args->op.rw.is_read) {
1590 		memcpy(args->op.rw.user_buf,
1591 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1592 		       args->op.rw.length);
1593 		__rw_done(req, 0);
1594 	} else {
1595 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1596 		       args->op.rw.user_buf,
1597 		       args->op.rw.length);
1598 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1599 				   args->op.rw.pin_buf,
1600 				   args->op.rw.start_lba, args->op.rw.num_lba,
1601 				   __rw_done, req);
1602 	}
1603 }
1604 
1605 static void
1606 __do_blob_read(void *ctx, int fserrno)
1607 {
1608 	struct spdk_fs_request *req = ctx;
1609 	struct spdk_fs_cb_args *args = &req->args;
1610 
1611 	if (fserrno) {
1612 		__rw_done(req, fserrno);
1613 		return;
1614 	}
1615 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1616 			  args->op.rw.pin_buf,
1617 			  args->op.rw.start_lba, args->op.rw.num_lba,
1618 			  __read_done, req);
1619 }
1620 
1621 static void
1622 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1623 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1624 {
1625 	uint64_t end_lba;
1626 
1627 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1628 	*start_lba = offset / *lba_size;
1629 	end_lba = (offset + length - 1) / *lba_size;
1630 	*num_lba = (end_lba - *start_lba + 1);
1631 }
1632 
1633 static void
1634 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1635 	    void *payload, uint64_t offset, uint64_t length,
1636 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1637 {
1638 	struct spdk_fs_request *req;
1639 	struct spdk_fs_cb_args *args;
1640 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1641 	uint64_t start_lba, num_lba, pin_buf_length;
1642 	uint32_t lba_size;
1643 
1644 	if (is_read && offset + length > file->length) {
1645 		cb_fn(cb_arg, -EINVAL);
1646 		return;
1647 	}
1648 
1649 	req = alloc_fs_request(channel);
1650 	if (req == NULL) {
1651 		cb_fn(cb_arg, -ENOMEM);
1652 		return;
1653 	}
1654 
1655 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1656 
1657 	args = &req->args;
1658 	args->fn.file_op = cb_fn;
1659 	args->arg = cb_arg;
1660 	args->file = file;
1661 	args->op.rw.channel = channel->bs_channel;
1662 	args->op.rw.user_buf = payload;
1663 	args->op.rw.is_read = is_read;
1664 	args->op.rw.offset = offset;
1665 	args->op.rw.length = length;
1666 	args->op.rw.blocklen = lba_size;
1667 
1668 	pin_buf_length = num_lba * lba_size;
1669 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, lba_size, NULL);
1670 	if (args->op.rw.pin_buf == NULL) {
1671 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1672 			      file->name, offset, length);
1673 		free_fs_request(req);
1674 		cb_fn(cb_arg, -ENOMEM);
1675 		return;
1676 	}
1677 
1678 	args->op.rw.start_lba = start_lba;
1679 	args->op.rw.num_lba = num_lba;
1680 
1681 	if (!is_read && file->length < offset + length) {
1682 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1683 	} else {
1684 		__do_blob_read(req, 0);
1685 	}
1686 }
1687 
1688 void
1689 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1690 		      void *payload, uint64_t offset, uint64_t length,
1691 		      spdk_file_op_complete cb_fn, void *cb_arg)
1692 {
1693 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1694 }
1695 
1696 void
1697 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1698 		     void *payload, uint64_t offset, uint64_t length,
1699 		     spdk_file_op_complete cb_fn, void *cb_arg)
1700 {
1701 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1702 		      file->name, offset, length);
1703 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1704 }
1705 
1706 struct spdk_io_channel *
1707 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1708 {
1709 	struct spdk_io_channel *io_channel;
1710 	struct spdk_fs_channel *fs_channel;
1711 
1712 	io_channel = spdk_get_io_channel(&fs->io_target);
1713 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1714 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1715 	fs_channel->send_request = __send_request_direct;
1716 
1717 	return io_channel;
1718 }
1719 
1720 struct spdk_io_channel *
1721 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1722 {
1723 	struct spdk_io_channel *io_channel;
1724 	struct spdk_fs_channel *fs_channel;
1725 
1726 	io_channel = spdk_get_io_channel(&fs->io_target);
1727 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1728 	fs_channel->send_request = fs->send_request;
1729 	fs_channel->sync = 1;
1730 	pthread_spin_init(&fs_channel->lock, 0);
1731 
1732 	return io_channel;
1733 }
1734 
1735 void
1736 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1737 {
1738 	spdk_put_io_channel(channel);
1739 }
1740 
1741 void
1742 spdk_fs_set_cache_size(uint64_t size_in_mb)
1743 {
1744 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1745 }
1746 
1747 uint64_t
1748 spdk_fs_get_cache_size(void)
1749 {
1750 	return g_fs_cache_size / (1024 * 1024);
1751 }
1752 
1753 static void __file_flush(void *_args);
1754 
1755 static void *
1756 alloc_cache_memory_buffer(struct spdk_file *context)
1757 {
1758 	struct spdk_file *file;
1759 	void *buf;
1760 
1761 	buf = spdk_mempool_get(g_cache_pool);
1762 	if (buf != NULL) {
1763 		return buf;
1764 	}
1765 
1766 	pthread_spin_lock(&g_caches_lock);
1767 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1768 		if (!file->open_for_writing &&
1769 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1770 		    file != context) {
1771 			break;
1772 		}
1773 	}
1774 	pthread_spin_unlock(&g_caches_lock);
1775 	if (file != NULL) {
1776 		cache_free_buffers(file);
1777 		buf = spdk_mempool_get(g_cache_pool);
1778 		if (buf != NULL) {
1779 			return buf;
1780 		}
1781 	}
1782 
1783 	pthread_spin_lock(&g_caches_lock);
1784 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1785 		if (!file->open_for_writing && file != context) {
1786 			break;
1787 		}
1788 	}
1789 	pthread_spin_unlock(&g_caches_lock);
1790 	if (file != NULL) {
1791 		cache_free_buffers(file);
1792 		buf = spdk_mempool_get(g_cache_pool);
1793 		if (buf != NULL) {
1794 			return buf;
1795 		}
1796 	}
1797 
1798 	pthread_spin_lock(&g_caches_lock);
1799 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1800 		if (file != context) {
1801 			break;
1802 		}
1803 	}
1804 	pthread_spin_unlock(&g_caches_lock);
1805 	if (file != NULL) {
1806 		cache_free_buffers(file);
1807 		buf = spdk_mempool_get(g_cache_pool);
1808 		if (buf != NULL) {
1809 			return buf;
1810 		}
1811 	}
1812 
1813 	return NULL;
1814 }
1815 
1816 static struct cache_buffer *
1817 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1818 {
1819 	struct cache_buffer *buf;
1820 	int count = 0;
1821 
1822 	buf = calloc(1, sizeof(*buf));
1823 	if (buf == NULL) {
1824 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1825 		return NULL;
1826 	}
1827 
1828 	buf->buf = alloc_cache_memory_buffer(file);
1829 	while (buf->buf == NULL) {
1830 		/*
1831 		 * TODO: alloc_cache_memory_buffer() should eventually free
1832 		 *  some buffers.  Need a more sophisticated check here, instead
1833 		 *  of just bailing if 100 tries does not result in getting a
1834 		 *  free buffer.  This will involve using the sync channel's
1835 		 *  semaphore to block until a buffer becomes available.
1836 		 */
1837 		if (count++ == 100) {
1838 			SPDK_ERRLOG("could not allocate cache buffer\n");
1839 			assert(false);
1840 			free(buf);
1841 			return NULL;
1842 		}
1843 		buf->buf = alloc_cache_memory_buffer(file);
1844 	}
1845 
1846 	buf->buf_size = CACHE_BUFFER_SIZE;
1847 	buf->offset = offset;
1848 
1849 	pthread_spin_lock(&g_caches_lock);
1850 	if (file->tree->present_mask == 0) {
1851 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1852 	}
1853 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1854 	pthread_spin_unlock(&g_caches_lock);
1855 
1856 	return buf;
1857 }
1858 
1859 static struct cache_buffer *
1860 cache_append_buffer(struct spdk_file *file)
1861 {
1862 	struct cache_buffer *last;
1863 
1864 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1865 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1866 
1867 	last = cache_insert_buffer(file, file->append_pos);
1868 	if (last == NULL) {
1869 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1870 		return NULL;
1871 	}
1872 
1873 	file->last = last;
1874 
1875 	return last;
1876 }
1877 
1878 static void __check_sync_reqs(struct spdk_file *file);
1879 
1880 static void
1881 __file_cache_finish_sync(void *ctx, int bserrno)
1882 {
1883 	struct spdk_file *file = ctx;
1884 	struct spdk_fs_request *sync_req;
1885 	struct spdk_fs_cb_args *sync_args;
1886 
1887 	pthread_spin_lock(&file->lock);
1888 	sync_req = TAILQ_FIRST(&file->sync_requests);
1889 	sync_args = &sync_req->args;
1890 	assert(sync_args->op.sync.offset <= file->length_flushed);
1891 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1892 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1893 	pthread_spin_unlock(&file->lock);
1894 
1895 	sync_args->fn.file_op(sync_args->arg, bserrno);
1896 	__check_sync_reqs(file);
1897 
1898 	pthread_spin_lock(&file->lock);
1899 	free_fs_request(sync_req);
1900 	pthread_spin_unlock(&file->lock);
1901 }
1902 
1903 static void
1904 __free_args(struct spdk_fs_cb_args *args)
1905 {
1906 	struct spdk_fs_request *req;
1907 
1908 	if (!args->from_request) {
1909 		free(args);
1910 	} else {
1911 		/* Depends on args being at the start of the spdk_fs_request structure. */
1912 		req = (struct spdk_fs_request *)args;
1913 		free_fs_request(req);
1914 	}
1915 }
1916 
1917 static void
1918 __check_sync_reqs(struct spdk_file *file)
1919 {
1920 	struct spdk_fs_request *sync_req;
1921 
1922 	pthread_spin_lock(&file->lock);
1923 
1924 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1925 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1926 			break;
1927 		}
1928 	}
1929 
1930 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1931 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1932 		sync_req->args.op.sync.xattr_in_progress = true;
1933 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1934 				    sizeof(file->length_flushed));
1935 
1936 		pthread_spin_unlock(&file->lock);
1937 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1938 	} else {
1939 		pthread_spin_unlock(&file->lock);
1940 	}
1941 }
1942 
1943 static void
1944 __file_flush_done(void *arg, int bserrno)
1945 {
1946 	struct spdk_fs_cb_args *args = arg;
1947 	struct spdk_file *file = args->file;
1948 	struct cache_buffer *next = args->op.flush.cache_buffer;
1949 
1950 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1951 
1952 	pthread_spin_lock(&file->lock);
1953 	next->in_progress = false;
1954 	next->bytes_flushed += args->op.flush.length;
1955 	file->length_flushed += args->op.flush.length;
1956 	if (file->length_flushed > file->length) {
1957 		file->length = file->length_flushed;
1958 	}
1959 	if (next->bytes_flushed == next->buf_size) {
1960 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1961 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1962 	}
1963 
1964 	/*
1965 	 * Assert that there is no cached data that extends past the end of the underlying
1966 	 *  blob.
1967 	 */
1968 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1969 	       next->bytes_filled == 0);
1970 
1971 	pthread_spin_unlock(&file->lock);
1972 
1973 	__check_sync_reqs(file);
1974 
1975 	__file_flush(args);
1976 }
1977 
1978 static void
1979 __file_flush(void *_args)
1980 {
1981 	struct spdk_fs_cb_args *args = _args;
1982 	struct spdk_file *file = args->file;
1983 	struct cache_buffer *next;
1984 	uint64_t offset, length, start_lba, num_lba;
1985 	uint32_t lba_size;
1986 
1987 	pthread_spin_lock(&file->lock);
1988 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1989 	if (next == NULL || next->in_progress) {
1990 		/*
1991 		 * There is either no data to flush, or a flush I/O is already in
1992 		 *  progress.  So return immediately - if a flush I/O is in
1993 		 *  progress we will flush more data after that is completed.
1994 		 */
1995 		__free_args(args);
1996 		if (next == NULL) {
1997 			/*
1998 			 * For cases where a file's cache was evicted, and then the
1999 			 *  file was later appended, we will write the data directly
2000 			 *  to disk and bypass cache.  So just update length_flushed
2001 			 *  here to reflect that all data was already written to disk.
2002 			 */
2003 			file->length_flushed = file->append_pos;
2004 		}
2005 		pthread_spin_unlock(&file->lock);
2006 		if (next == NULL) {
2007 			/*
2008 			 * There is no data to flush, but we still need to check for any
2009 			 *  outstanding sync requests to make sure metadata gets updated.
2010 			 */
2011 			__check_sync_reqs(file);
2012 		}
2013 		return;
2014 	}
2015 
2016 	offset = next->offset + next->bytes_flushed;
2017 	length = next->bytes_filled - next->bytes_flushed;
2018 	if (length == 0) {
2019 		__free_args(args);
2020 		pthread_spin_unlock(&file->lock);
2021 		return;
2022 	}
2023 	args->op.flush.length = length;
2024 	args->op.flush.cache_buffer = next;
2025 
2026 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2027 
2028 	next->in_progress = true;
2029 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2030 		     offset, length, start_lba, num_lba);
2031 	pthread_spin_unlock(&file->lock);
2032 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2033 			   next->buf + (start_lba * lba_size) - next->offset,
2034 			   start_lba, num_lba, __file_flush_done, args);
2035 }
2036 
2037 static void
2038 __file_extend_done(void *arg, int bserrno)
2039 {
2040 	struct spdk_fs_cb_args *args = arg;
2041 
2042 	__wake_caller(args, bserrno);
2043 }
2044 
2045 static void
2046 __file_extend_resize_cb(void *_args, int bserrno)
2047 {
2048 	struct spdk_fs_cb_args *args = _args;
2049 	struct spdk_file *file = args->file;
2050 
2051 	if (bserrno) {
2052 		__wake_caller(args, bserrno);
2053 		return;
2054 	}
2055 
2056 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2057 }
2058 
2059 static void
2060 __file_extend_blob(void *_args)
2061 {
2062 	struct spdk_fs_cb_args *args = _args;
2063 	struct spdk_file *file = args->file;
2064 
2065 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2066 }
2067 
2068 static void
2069 __rw_from_file_done(void *arg, int bserrno)
2070 {
2071 	struct spdk_fs_cb_args *args = arg;
2072 
2073 	__wake_caller(args, bserrno);
2074 	__free_args(args);
2075 }
2076 
2077 static void
2078 __rw_from_file(void *_args)
2079 {
2080 	struct spdk_fs_cb_args *args = _args;
2081 	struct spdk_file *file = args->file;
2082 
2083 	if (args->op.rw.is_read) {
2084 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2085 				     args->op.rw.offset, args->op.rw.length,
2086 				     __rw_from_file_done, args);
2087 	} else {
2088 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2089 				      args->op.rw.offset, args->op.rw.length,
2090 				      __rw_from_file_done, args);
2091 	}
2092 }
2093 
2094 static int
2095 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
2096 		    uint64_t offset, uint64_t length, bool is_read)
2097 {
2098 	struct spdk_fs_cb_args *args;
2099 
2100 	args = calloc(1, sizeof(*args));
2101 	if (args == NULL) {
2102 		sem_post(sem);
2103 		return -ENOMEM;
2104 	}
2105 
2106 	args->file = file;
2107 	args->sem = sem;
2108 	args->op.rw.user_buf = payload;
2109 	args->op.rw.offset = offset;
2110 	args->op.rw.length = length;
2111 	args->op.rw.is_read = is_read;
2112 	file->fs->send_request(__rw_from_file, args);
2113 	return 0;
2114 }
2115 
2116 int
2117 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
2118 		void *payload, uint64_t offset, uint64_t length)
2119 {
2120 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2121 	struct spdk_fs_cb_args *args;
2122 	uint64_t rem_length, copy, blob_size, cluster_sz;
2123 	uint32_t cache_buffers_filled = 0;
2124 	uint8_t *cur_payload;
2125 	struct cache_buffer *last;
2126 
2127 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2128 
2129 	if (length == 0) {
2130 		return 0;
2131 	}
2132 
2133 	if (offset != file->append_pos) {
2134 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2135 		return -EINVAL;
2136 	}
2137 
2138 	pthread_spin_lock(&file->lock);
2139 	file->open_for_writing = true;
2140 
2141 	if (file->last == NULL) {
2142 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
2143 			cache_append_buffer(file);
2144 		} else {
2145 			int rc;
2146 
2147 			file->append_pos += length;
2148 			pthread_spin_unlock(&file->lock);
2149 			rc = __send_rw_from_file(file, &channel->sem, payload,
2150 						 offset, length, false);
2151 			sem_wait(&channel->sem);
2152 			return rc;
2153 		}
2154 	}
2155 
2156 	blob_size = __file_get_blob_size(file);
2157 
2158 	if ((offset + length) > blob_size) {
2159 		struct spdk_fs_cb_args extend_args = {};
2160 
2161 		cluster_sz = file->fs->bs_opts.cluster_sz;
2162 		extend_args.sem = &channel->sem;
2163 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2164 		extend_args.file = file;
2165 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2166 		pthread_spin_unlock(&file->lock);
2167 		file->fs->send_request(__file_extend_blob, &extend_args);
2168 		sem_wait(&channel->sem);
2169 		if (extend_args.rc) {
2170 			return extend_args.rc;
2171 		}
2172 	}
2173 
2174 	last = file->last;
2175 	rem_length = length;
2176 	cur_payload = payload;
2177 	while (rem_length > 0) {
2178 		copy = last->buf_size - last->bytes_filled;
2179 		if (copy > rem_length) {
2180 			copy = rem_length;
2181 		}
2182 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2183 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2184 		file->append_pos += copy;
2185 		if (file->length < file->append_pos) {
2186 			file->length = file->append_pos;
2187 		}
2188 		cur_payload += copy;
2189 		last->bytes_filled += copy;
2190 		rem_length -= copy;
2191 		if (last->bytes_filled == last->buf_size) {
2192 			cache_buffers_filled++;
2193 			last = cache_append_buffer(file);
2194 			if (last == NULL) {
2195 				BLOBFS_TRACE(file, "nomem\n");
2196 				pthread_spin_unlock(&file->lock);
2197 				return -ENOMEM;
2198 			}
2199 		}
2200 	}
2201 
2202 	pthread_spin_unlock(&file->lock);
2203 
2204 	if (cache_buffers_filled == 0) {
2205 		return 0;
2206 	}
2207 
2208 	args = calloc(1, sizeof(*args));
2209 	if (args == NULL) {
2210 		return -ENOMEM;
2211 	}
2212 
2213 	args->file = file;
2214 	file->fs->send_request(__file_flush, args);
2215 	return 0;
2216 }
2217 
2218 static void
2219 __readahead_done(void *arg, int bserrno)
2220 {
2221 	struct spdk_fs_cb_args *args = arg;
2222 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2223 	struct spdk_file *file = args->file;
2224 
2225 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2226 
2227 	pthread_spin_lock(&file->lock);
2228 	cache_buffer->bytes_filled = args->op.readahead.length;
2229 	cache_buffer->bytes_flushed = args->op.readahead.length;
2230 	cache_buffer->in_progress = false;
2231 	pthread_spin_unlock(&file->lock);
2232 
2233 	__free_args(args);
2234 }
2235 
2236 static void
2237 __readahead(void *_args)
2238 {
2239 	struct spdk_fs_cb_args *args = _args;
2240 	struct spdk_file *file = args->file;
2241 	uint64_t offset, length, start_lba, num_lba;
2242 	uint32_t lba_size;
2243 
2244 	offset = args->op.readahead.offset;
2245 	length = args->op.readahead.length;
2246 	assert(length > 0);
2247 
2248 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2249 
2250 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2251 		     offset, length, start_lba, num_lba);
2252 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2253 			  args->op.readahead.cache_buffer->buf,
2254 			  start_lba, num_lba, __readahead_done, args);
2255 }
2256 
2257 static uint64_t
2258 __next_cache_buffer_offset(uint64_t offset)
2259 {
2260 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2261 }
2262 
2263 static void
2264 check_readahead(struct spdk_file *file, uint64_t offset)
2265 {
2266 	struct spdk_fs_cb_args *args;
2267 
2268 	offset = __next_cache_buffer_offset(offset);
2269 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2270 		return;
2271 	}
2272 
2273 	args = calloc(1, sizeof(*args));
2274 	if (args == NULL) {
2275 		return;
2276 	}
2277 
2278 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2279 
2280 	args->file = file;
2281 	args->op.readahead.offset = offset;
2282 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2283 	if (!args->op.readahead.cache_buffer) {
2284 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2285 		free(args);
2286 		return;
2287 	}
2288 
2289 	args->op.readahead.cache_buffer->in_progress = true;
2290 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2291 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2292 	} else {
2293 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2294 	}
2295 	file->fs->send_request(__readahead, args);
2296 }
2297 
2298 static int
2299 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2300 {
2301 	struct cache_buffer *buf;
2302 	int rc;
2303 
2304 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2305 	if (buf == NULL) {
2306 		pthread_spin_unlock(&file->lock);
2307 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2308 		pthread_spin_lock(&file->lock);
2309 		return rc;
2310 	}
2311 
2312 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2313 		length = buf->offset + buf->bytes_filled - offset;
2314 	}
2315 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2316 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2317 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2318 		pthread_spin_lock(&g_caches_lock);
2319 		spdk_tree_remove_buffer(file->tree, buf);
2320 		if (file->tree->present_mask == 0) {
2321 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2322 		}
2323 		pthread_spin_unlock(&g_caches_lock);
2324 	}
2325 
2326 	sem_post(sem);
2327 	return 0;
2328 }
2329 
2330 int64_t
2331 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2332 	       void *payload, uint64_t offset, uint64_t length)
2333 {
2334 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2335 	uint64_t final_offset, final_length;
2336 	uint32_t sub_reads = 0;
2337 	int rc = 0;
2338 
2339 	pthread_spin_lock(&file->lock);
2340 
2341 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2342 
2343 	file->open_for_writing = false;
2344 
2345 	if (length == 0 || offset >= file->append_pos) {
2346 		pthread_spin_unlock(&file->lock);
2347 		return 0;
2348 	}
2349 
2350 	if (offset + length > file->append_pos) {
2351 		length = file->append_pos - offset;
2352 	}
2353 
2354 	if (offset != file->next_seq_offset) {
2355 		file->seq_byte_count = 0;
2356 	}
2357 	file->seq_byte_count += length;
2358 	file->next_seq_offset = offset + length;
2359 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2360 		check_readahead(file, offset);
2361 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2362 	}
2363 
2364 	final_length = 0;
2365 	final_offset = offset + length;
2366 	while (offset < final_offset) {
2367 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2368 		if (length > (final_offset - offset)) {
2369 			length = final_offset - offset;
2370 		}
2371 		rc = __file_read(file, payload, offset, length, &channel->sem);
2372 		if (rc == 0) {
2373 			final_length += length;
2374 		} else {
2375 			break;
2376 		}
2377 		payload += length;
2378 		offset += length;
2379 		sub_reads++;
2380 	}
2381 	pthread_spin_unlock(&file->lock);
2382 	while (sub_reads-- > 0) {
2383 		sem_wait(&channel->sem);
2384 	}
2385 	if (rc == 0) {
2386 		return final_length;
2387 	} else {
2388 		return rc;
2389 	}
2390 }
2391 
2392 static void
2393 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2394 	   spdk_file_op_complete cb_fn, void *cb_arg)
2395 {
2396 	struct spdk_fs_request *sync_req;
2397 	struct spdk_fs_request *flush_req;
2398 	struct spdk_fs_cb_args *sync_args;
2399 	struct spdk_fs_cb_args *flush_args;
2400 
2401 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2402 
2403 	pthread_spin_lock(&file->lock);
2404 	if (file->append_pos <= file->length_flushed) {
2405 		BLOBFS_TRACE(file, "done - no data to flush\n");
2406 		pthread_spin_unlock(&file->lock);
2407 		cb_fn(cb_arg, 0);
2408 		return;
2409 	}
2410 
2411 	sync_req = alloc_fs_request(channel);
2412 	if (!sync_req) {
2413 		pthread_spin_unlock(&file->lock);
2414 		cb_fn(cb_arg, -ENOMEM);
2415 		return;
2416 	}
2417 	sync_args = &sync_req->args;
2418 
2419 	flush_req = alloc_fs_request(channel);
2420 	if (!flush_req) {
2421 		pthread_spin_unlock(&file->lock);
2422 		cb_fn(cb_arg, -ENOMEM);
2423 		return;
2424 	}
2425 	flush_args = &flush_req->args;
2426 
2427 	sync_args->file = file;
2428 	sync_args->fn.file_op = cb_fn;
2429 	sync_args->arg = cb_arg;
2430 	sync_args->op.sync.offset = file->append_pos;
2431 	sync_args->op.sync.xattr_in_progress = false;
2432 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2433 	pthread_spin_unlock(&file->lock);
2434 
2435 	flush_args->file = file;
2436 	channel->send_request(__file_flush, flush_args);
2437 }
2438 
2439 int
2440 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2441 {
2442 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2443 	struct spdk_fs_cb_args args = {};
2444 
2445 	args.sem = &channel->sem;
2446 	_file_sync(file, channel, __wake_caller, &args);
2447 	sem_wait(&channel->sem);
2448 
2449 	return args.rc;
2450 }
2451 
2452 void
2453 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2454 		     spdk_file_op_complete cb_fn, void *cb_arg)
2455 {
2456 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2457 
2458 	_file_sync(file, channel, cb_fn, cb_arg);
2459 }
2460 
2461 void
2462 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2463 {
2464 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2465 	file->priority = priority;
2466 
2467 }
2468 
2469 /*
2470  * Close routines
2471  */
2472 
2473 static void
2474 __file_close_async_done(void *ctx, int bserrno)
2475 {
2476 	struct spdk_fs_request *req = ctx;
2477 	struct spdk_fs_cb_args *args = &req->args;
2478 	struct spdk_file *file = args->file;
2479 
2480 	if (file->is_deleted) {
2481 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2482 		return;
2483 	}
2484 
2485 	args->fn.file_op(args->arg, bserrno);
2486 	free_fs_request(req);
2487 }
2488 
2489 static void
2490 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2491 {
2492 	struct spdk_blob *blob;
2493 
2494 	pthread_spin_lock(&file->lock);
2495 	if (file->ref_count == 0) {
2496 		pthread_spin_unlock(&file->lock);
2497 		__file_close_async_done(req, -EBADF);
2498 		return;
2499 	}
2500 
2501 	file->ref_count--;
2502 	if (file->ref_count > 0) {
2503 		pthread_spin_unlock(&file->lock);
2504 		req->args.fn.file_op(req->args.arg, 0);
2505 		free_fs_request(req);
2506 		return;
2507 	}
2508 
2509 	pthread_spin_unlock(&file->lock);
2510 
2511 	blob = file->blob;
2512 	file->blob = NULL;
2513 	spdk_blob_close(blob, __file_close_async_done, req);
2514 }
2515 
2516 static void
2517 __file_close_async__sync_done(void *arg, int fserrno)
2518 {
2519 	struct spdk_fs_request *req = arg;
2520 	struct spdk_fs_cb_args *args = &req->args;
2521 
2522 	__file_close_async(args->file, req);
2523 }
2524 
2525 void
2526 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2527 {
2528 	struct spdk_fs_request *req;
2529 	struct spdk_fs_cb_args *args;
2530 
2531 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2532 	if (req == NULL) {
2533 		cb_fn(cb_arg, -ENOMEM);
2534 		return;
2535 	}
2536 
2537 	args = &req->args;
2538 	args->file = file;
2539 	args->fn.file_op = cb_fn;
2540 	args->arg = cb_arg;
2541 
2542 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2543 }
2544 
2545 static void
2546 __file_close(void *arg)
2547 {
2548 	struct spdk_fs_request *req = arg;
2549 	struct spdk_fs_cb_args *args = &req->args;
2550 	struct spdk_file *file = args->file;
2551 
2552 	__file_close_async(file, req);
2553 }
2554 
2555 int
2556 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2557 {
2558 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2559 	struct spdk_fs_request *req;
2560 	struct spdk_fs_cb_args *args;
2561 
2562 	req = alloc_fs_request(channel);
2563 	if (req == NULL) {
2564 		return -ENOMEM;
2565 	}
2566 
2567 	args = &req->args;
2568 
2569 	spdk_file_sync(file, _channel);
2570 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2571 	args->file = file;
2572 	args->sem = &channel->sem;
2573 	args->fn.file_op = __wake_caller;
2574 	args->arg = req;
2575 	channel->send_request(__file_close, req);
2576 	sem_wait(&channel->sem);
2577 
2578 	return args->rc;
2579 }
2580 
2581 int
2582 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2583 {
2584 	if (size < sizeof(spdk_blob_id)) {
2585 		return -EINVAL;
2586 	}
2587 
2588 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2589 
2590 	return sizeof(spdk_blob_id);
2591 }
2592 
2593 static void
2594 cache_free_buffers(struct spdk_file *file)
2595 {
2596 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2597 	pthread_spin_lock(&file->lock);
2598 	pthread_spin_lock(&g_caches_lock);
2599 	if (file->tree->present_mask == 0) {
2600 		pthread_spin_unlock(&g_caches_lock);
2601 		pthread_spin_unlock(&file->lock);
2602 		return;
2603 	}
2604 	spdk_tree_free_buffers(file->tree);
2605 
2606 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2607 	/* If not freed, put it in the end of the queue */
2608 	if (file->tree->present_mask != 0) {
2609 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2610 	}
2611 	file->last = NULL;
2612 	pthread_spin_unlock(&g_caches_lock);
2613 	pthread_spin_unlock(&file->lock);
2614 }
2615 
2616 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2617 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2618