xref: /spdk/lib/blobfs/blobfs.c (revision eb8b1e20a9c8a6bc79f32fde8693d2791a74c34d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "blobfs_internal.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk_internal/log.h"
45 
46 #define BLOBFS_TRACE(file, str, args...) \
47 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args)
48 
49 #define BLOBFS_TRACE_RW(file, str, args...) \
50 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args)
51 
52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
53 
54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE;
55 static struct spdk_mempool *g_cache_pool;
56 static TAILQ_HEAD(, spdk_file) g_caches;
57 static int g_fs_count = 0;
58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
59 static pthread_spinlock_t g_caches_lock;
60 
61 static void
62 __sem_post(void *arg, int bserrno)
63 {
64 	sem_t *sem = arg;
65 
66 	sem_post(sem);
67 }
68 
69 void
70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
71 {
72 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
73 	free(cache_buffer);
74 }
75 
76 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
77 
78 struct spdk_file {
79 	struct spdk_filesystem	*fs;
80 	struct spdk_blob	*blob;
81 	char			*name;
82 	uint64_t		length;
83 	bool                    is_deleted;
84 	bool			open_for_writing;
85 	uint64_t		length_flushed;
86 	uint64_t		append_pos;
87 	uint64_t		seq_byte_count;
88 	uint64_t		next_seq_offset;
89 	uint32_t		priority;
90 	TAILQ_ENTRY(spdk_file)	tailq;
91 	spdk_blob_id		blobid;
92 	uint32_t		ref_count;
93 	pthread_spinlock_t	lock;
94 	struct cache_buffer	*last;
95 	struct cache_tree	*tree;
96 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
97 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
98 	TAILQ_ENTRY(spdk_file)	cache_tailq;
99 };
100 
101 struct spdk_deleted_file {
102 	spdk_blob_id	id;
103 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
104 };
105 
106 struct spdk_filesystem {
107 	struct spdk_blob_store	*bs;
108 	TAILQ_HEAD(, spdk_file)	files;
109 	struct spdk_bs_opts	bs_opts;
110 	struct spdk_bs_dev	*bdev;
111 	fs_send_request_fn	send_request;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*sync_io_channel;
116 		struct spdk_fs_channel	*sync_fs_channel;
117 	} sync_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 		struct spdk_io_channel	*md_io_channel;
122 		struct spdk_fs_channel	*md_fs_channel;
123 	} md_target;
124 
125 	struct {
126 		uint32_t		max_ops;
127 	} io_target;
128 };
129 
130 struct spdk_fs_cb_args {
131 	union {
132 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
133 		spdk_fs_op_complete			fs_op;
134 		spdk_file_op_with_handle_complete	file_op_with_handle;
135 		spdk_file_op_complete			file_op;
136 		spdk_file_stat_op_complete		stat_op;
137 	} fn;
138 	void *arg;
139 	sem_t *sem;
140 	struct spdk_filesystem *fs;
141 	struct spdk_file *file;
142 	int rc;
143 	bool from_request;
144 	union {
145 		struct {
146 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
147 		} fs_load;
148 		struct {
149 			uint64_t	length;
150 		} truncate;
151 		struct {
152 			struct spdk_io_channel	*channel;
153 			void		*user_buf;
154 			void		*pin_buf;
155 			int		is_read;
156 			off_t		offset;
157 			size_t		length;
158 			uint64_t	start_page;
159 			uint64_t	num_pages;
160 			uint32_t	blocklen;
161 		} rw;
162 		struct {
163 			const char	*old_name;
164 			const char	*new_name;
165 		} rename;
166 		struct {
167 			struct cache_buffer	*cache_buffer;
168 			uint64_t		length;
169 		} flush;
170 		struct {
171 			struct cache_buffer	*cache_buffer;
172 			uint64_t		length;
173 			uint64_t		offset;
174 		} readahead;
175 		struct {
176 			uint64_t			offset;
177 			TAILQ_ENTRY(spdk_fs_request)	tailq;
178 			bool				xattr_in_progress;
179 		} sync;
180 		struct {
181 			uint32_t			num_clusters;
182 		} resize;
183 		struct {
184 			const char	*name;
185 			uint32_t	flags;
186 			TAILQ_ENTRY(spdk_fs_request)	tailq;
187 		} open;
188 		struct {
189 			const char	*name;
190 		} create;
191 		struct {
192 			const char	*name;
193 		} delete;
194 		struct {
195 			const char	*name;
196 		} stat;
197 	} op;
198 };
199 
200 static void cache_free_buffers(struct spdk_file *file);
201 
202 static void
203 __initialize_cache(void)
204 {
205 	assert(g_cache_pool == NULL);
206 
207 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
208 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
209 					   CACHE_BUFFER_SIZE,
210 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
211 					   SPDK_ENV_SOCKET_ID_ANY);
212 	TAILQ_INIT(&g_caches);
213 	pthread_spin_init(&g_caches_lock, 0);
214 }
215 
216 static void
217 __free_cache(void)
218 {
219 	assert(g_cache_pool != NULL);
220 
221 	spdk_mempool_free(g_cache_pool);
222 	g_cache_pool = NULL;
223 }
224 
225 static uint64_t
226 __file_get_blob_size(struct spdk_file *file)
227 {
228 	uint64_t cluster_sz;
229 
230 	cluster_sz = file->fs->bs_opts.cluster_sz;
231 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
232 }
233 
234 struct spdk_fs_request {
235 	struct spdk_fs_cb_args		args;
236 	TAILQ_ENTRY(spdk_fs_request)	link;
237 	struct spdk_fs_channel		*channel;
238 };
239 
240 struct spdk_fs_channel {
241 	struct spdk_fs_request		*req_mem;
242 	TAILQ_HEAD(, spdk_fs_request)	reqs;
243 	sem_t				sem;
244 	struct spdk_filesystem		*fs;
245 	struct spdk_io_channel		*bs_channel;
246 	fs_send_request_fn		send_request;
247 	bool				sync;
248 	pthread_spinlock_t		lock;
249 };
250 
251 static struct spdk_fs_request *
252 alloc_fs_request(struct spdk_fs_channel *channel)
253 {
254 	struct spdk_fs_request *req;
255 
256 	if (channel->sync) {
257 		pthread_spin_lock(&channel->lock);
258 	}
259 
260 	req = TAILQ_FIRST(&channel->reqs);
261 	if (req) {
262 		TAILQ_REMOVE(&channel->reqs, req, link);
263 	}
264 
265 	if (channel->sync) {
266 		pthread_spin_unlock(&channel->lock);
267 	}
268 
269 	if (req == NULL) {
270 		return NULL;
271 	}
272 	memset(req, 0, sizeof(*req));
273 	req->channel = channel;
274 	req->args.from_request = true;
275 
276 	return req;
277 }
278 
279 static void
280 free_fs_request(struct spdk_fs_request *req)
281 {
282 	struct spdk_fs_channel *channel = req->channel;
283 
284 	if (channel->sync) {
285 		pthread_spin_lock(&channel->lock);
286 	}
287 
288 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
289 
290 	if (channel->sync) {
291 		pthread_spin_unlock(&channel->lock);
292 	}
293 }
294 
295 static int
296 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
297 			uint32_t max_ops)
298 {
299 	uint32_t i;
300 
301 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
302 	if (!channel->req_mem) {
303 		return -1;
304 	}
305 
306 	TAILQ_INIT(&channel->reqs);
307 	sem_init(&channel->sem, 0, 0);
308 
309 	for (i = 0; i < max_ops; i++) {
310 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
311 	}
312 
313 	channel->fs = fs;
314 
315 	return 0;
316 }
317 
318 static int
319 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
320 {
321 	struct spdk_filesystem		*fs;
322 	struct spdk_fs_channel		*channel = ctx_buf;
323 
324 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
325 
326 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
327 }
328 
329 static int
330 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
331 {
332 	struct spdk_filesystem		*fs;
333 	struct spdk_fs_channel		*channel = ctx_buf;
334 
335 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
336 
337 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
338 }
339 
340 static int
341 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
342 {
343 	struct spdk_filesystem		*fs;
344 	struct spdk_fs_channel		*channel = ctx_buf;
345 
346 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
347 
348 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
349 }
350 
351 static void
352 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
353 {
354 	struct spdk_fs_channel *channel = ctx_buf;
355 
356 	free(channel->req_mem);
357 	if (channel->bs_channel != NULL) {
358 		spdk_bs_free_io_channel(channel->bs_channel);
359 	}
360 }
361 
362 static void
363 __send_request_direct(fs_request_fn fn, void *arg)
364 {
365 	fn(arg);
366 }
367 
368 static void
369 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
370 {
371 	fs->bs = bs;
372 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
373 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
374 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
375 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
376 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
377 
378 	pthread_mutex_lock(&g_cache_init_lock);
379 	if (g_fs_count == 0) {
380 		__initialize_cache();
381 	}
382 	g_fs_count++;
383 	pthread_mutex_unlock(&g_cache_init_lock);
384 }
385 
386 static void
387 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
388 {
389 	struct spdk_fs_request *req = ctx;
390 	struct spdk_fs_cb_args *args = &req->args;
391 	struct spdk_filesystem *fs = args->fs;
392 
393 	if (bserrno == 0) {
394 		common_fs_bs_init(fs, bs);
395 	} else {
396 		free(fs);
397 		fs = NULL;
398 	}
399 
400 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
401 	free_fs_request(req);
402 }
403 
404 static struct spdk_filesystem *
405 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
406 {
407 	struct spdk_filesystem *fs;
408 
409 	fs = calloc(1, sizeof(*fs));
410 	if (fs == NULL) {
411 		return NULL;
412 	}
413 
414 	fs->bdev = dev;
415 	fs->send_request = send_request_fn;
416 	TAILQ_INIT(&fs->files);
417 
418 	fs->md_target.max_ops = 512;
419 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
420 				sizeof(struct spdk_fs_channel));
421 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
422 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
423 
424 	fs->sync_target.max_ops = 512;
425 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
426 				sizeof(struct spdk_fs_channel));
427 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
428 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
429 
430 	fs->io_target.max_ops = 512;
431 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
432 				sizeof(struct spdk_fs_channel));
433 
434 	return fs;
435 }
436 
437 void
438 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
439 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
440 {
441 	struct spdk_filesystem *fs;
442 	struct spdk_fs_request *req;
443 	struct spdk_fs_cb_args *args;
444 	struct spdk_bs_opts opts = {};
445 
446 	fs = fs_alloc(dev, send_request_fn);
447 	if (fs == NULL) {
448 		cb_fn(cb_arg, NULL, -ENOMEM);
449 		return;
450 	}
451 
452 	req = alloc_fs_request(fs->md_target.md_fs_channel);
453 	if (req == NULL) {
454 		spdk_put_io_channel(fs->md_target.md_io_channel);
455 		spdk_io_device_unregister(&fs->md_target, NULL);
456 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
457 		spdk_io_device_unregister(&fs->sync_target, NULL);
458 		spdk_io_device_unregister(&fs->io_target, NULL);
459 		free(fs);
460 		cb_fn(cb_arg, NULL, -ENOMEM);
461 		return;
462 	}
463 
464 	args = &req->args;
465 	args->fn.fs_op_with_handle = cb_fn;
466 	args->arg = cb_arg;
467 	args->fs = fs;
468 
469 	spdk_bs_opts_init(&opts);
470 	strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH);
471 
472 	spdk_bs_init(dev, &opts, init_cb, req);
473 }
474 
475 static struct spdk_file *
476 file_alloc(struct spdk_filesystem *fs)
477 {
478 	struct spdk_file *file;
479 
480 	file = calloc(1, sizeof(*file));
481 	if (file == NULL) {
482 		return NULL;
483 	}
484 
485 	file->tree = calloc(1, sizeof(*file->tree));
486 	if (file->tree == NULL) {
487 		free(file);
488 		return NULL;
489 	}
490 
491 	file->fs = fs;
492 	TAILQ_INIT(&file->open_requests);
493 	TAILQ_INIT(&file->sync_requests);
494 	pthread_spin_init(&file->lock, 0);
495 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
496 	file->priority = SPDK_FILE_PRIORITY_LOW;
497 	return file;
498 }
499 
500 static void iter_delete_cb(void *ctx, int bserrno);
501 
502 static int
503 _handle_deleted_files(struct spdk_fs_request *req)
504 {
505 	struct spdk_fs_cb_args *args = &req->args;
506 	struct spdk_filesystem *fs = args->fs;
507 
508 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
509 		struct spdk_deleted_file *deleted_file;
510 
511 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
512 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
513 		spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req);
514 		free(deleted_file);
515 		return 0;
516 	}
517 
518 	return 1;
519 }
520 
521 static void
522 iter_delete_cb(void *ctx, int bserrno)
523 {
524 	struct spdk_fs_request *req = ctx;
525 	struct spdk_fs_cb_args *args = &req->args;
526 	struct spdk_filesystem *fs = args->fs;
527 
528 	if (_handle_deleted_files(req) == 0)
529 		return;
530 
531 	args->fn.fs_op_with_handle(args->arg, fs, 0);
532 	free_fs_request(req);
533 
534 }
535 
536 static void
537 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
538 {
539 	struct spdk_fs_request *req = ctx;
540 	struct spdk_fs_cb_args *args = &req->args;
541 	struct spdk_filesystem *fs = args->fs;
542 	uint64_t *length;
543 	const char *name;
544 	uint32_t *is_deleted;
545 	size_t value_len;
546 
547 	if (rc == -ENOENT) {
548 		/* Finished iterating */
549 		if (_handle_deleted_files(req) == 0)
550 			return;
551 		args->fn.fs_op_with_handle(args->arg, fs, 0);
552 		free_fs_request(req);
553 		return;
554 	} else if (rc < 0) {
555 		args->fn.fs_op_with_handle(args->arg, fs, rc);
556 		free_fs_request(req);
557 		return;
558 	}
559 
560 	rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len);
561 	if (rc < 0) {
562 		args->fn.fs_op_with_handle(args->arg, fs, rc);
563 		free_fs_request(req);
564 		return;
565 	}
566 
567 	rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len);
568 	if (rc < 0) {
569 		args->fn.fs_op_with_handle(args->arg, fs, rc);
570 		free_fs_request(req);
571 		return;
572 	}
573 
574 	assert(value_len == 8);
575 
576 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
577 	rc = spdk_bs_md_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
578 	if (rc < 0) {
579 		struct spdk_file *f;
580 
581 		f = file_alloc(fs);
582 		if (f == NULL) {
583 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
584 			free_fs_request(req);
585 			return;
586 		}
587 
588 		f->name = strdup(name);
589 		f->blobid = spdk_blob_get_id(blob);
590 		f->length = *length;
591 		f->length_flushed = *length;
592 		f->append_pos = *length;
593 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
594 	} else {
595 		struct spdk_deleted_file *deleted_file;
596 
597 		deleted_file = calloc(1, sizeof(*deleted_file));
598 		if (deleted_file == NULL) {
599 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
600 			free_fs_request(req);
601 			return;
602 		}
603 		deleted_file->id = spdk_blob_get_id(blob);
604 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
605 	}
606 
607 	spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req);
608 }
609 
610 static void
611 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
612 {
613 	struct spdk_fs_request *req = ctx;
614 	struct spdk_fs_cb_args *args = &req->args;
615 	struct spdk_filesystem *fs = args->fs;
616 	struct spdk_bs_type bstype;
617 	static const char blobfs_type[SPDK_BLOBSTORE_TYPE_LENGTH] = {"BLOBFS"};
618 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
619 
620 	if (bserrno != 0) {
621 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
622 		free_fs_request(req);
623 		free(fs);
624 		return;
625 	}
626 
627 	bstype = spdk_bs_get_bstype(bs);
628 
629 	if (!memcmp(&bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH)) {
630 		SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "assigning bstype");
631 		snprintf(bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH, blobfs_type);
632 		spdk_bs_set_bstype(bs, bstype);
633 	} else if (strncmp(bstype.bstype, blobfs_type, SPDK_BLOBSTORE_TYPE_LENGTH)) {
634 		SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "not blobfs: %s", bstype.bstype);
635 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
636 		free_fs_request(req);
637 		free(fs);
638 		return;
639 	}
640 
641 	common_fs_bs_init(fs, bs);
642 	spdk_bs_md_iter_first(fs->bs, iter_cb, req);
643 }
644 
645 void
646 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
647 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
648 {
649 	struct spdk_filesystem *fs;
650 	struct spdk_fs_cb_args *args;
651 	struct spdk_fs_request *req;
652 	struct spdk_bs_opts opts = {};
653 
654 	fs = fs_alloc(dev, send_request_fn);
655 	if (fs == NULL) {
656 		cb_fn(cb_arg, NULL, -ENOMEM);
657 		return;
658 	}
659 
660 	req = alloc_fs_request(fs->md_target.md_fs_channel);
661 	if (req == NULL) {
662 		spdk_put_io_channel(fs->md_target.md_io_channel);
663 		spdk_io_device_unregister(&fs->md_target, NULL);
664 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
665 		spdk_io_device_unregister(&fs->sync_target, NULL);
666 		spdk_io_device_unregister(&fs->io_target, NULL);
667 		free(fs);
668 		cb_fn(cb_arg, NULL, -ENOMEM);
669 		return;
670 	}
671 
672 	args = &req->args;
673 	args->fn.fs_op_with_handle = cb_fn;
674 	args->arg = cb_arg;
675 	args->fs = fs;
676 	TAILQ_INIT(&args->op.fs_load.deleted_files);
677 
678 	spdk_bs_opts_init(&opts);
679 
680 	spdk_bs_load(dev, &opts, load_cb, req);
681 }
682 
683 static void
684 unload_cb(void *ctx, int bserrno)
685 {
686 	struct spdk_fs_request *req = ctx;
687 	struct spdk_fs_cb_args *args = &req->args;
688 	struct spdk_filesystem *fs = args->fs;
689 
690 	pthread_mutex_lock(&g_cache_init_lock);
691 	g_fs_count--;
692 	if (g_fs_count == 0) {
693 		__free_cache();
694 	}
695 	pthread_mutex_unlock(&g_cache_init_lock);
696 
697 	args->fn.fs_op(args->arg, bserrno);
698 	free(req);
699 
700 	spdk_io_device_unregister(&fs->io_target, NULL);
701 	spdk_io_device_unregister(&fs->sync_target, NULL);
702 	spdk_io_device_unregister(&fs->md_target, NULL);
703 
704 	free(fs);
705 }
706 
707 void
708 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
709 {
710 	struct spdk_fs_request *req;
711 	struct spdk_fs_cb_args *args;
712 
713 	/*
714 	 * We must free the md_channel before unloading the blobstore, so just
715 	 *  allocate this request from the general heap.
716 	 */
717 	req = calloc(1, sizeof(*req));
718 	if (req == NULL) {
719 		cb_fn(cb_arg, -ENOMEM);
720 		return;
721 	}
722 
723 	args = &req->args;
724 	args->fn.fs_op = cb_fn;
725 	args->arg = cb_arg;
726 	args->fs = fs;
727 
728 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
729 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
730 	spdk_bs_unload(fs->bs, unload_cb, req);
731 }
732 
733 static struct spdk_file *
734 fs_find_file(struct spdk_filesystem *fs, const char *name)
735 {
736 	struct spdk_file *file;
737 
738 	TAILQ_FOREACH(file, &fs->files, tailq) {
739 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
740 			return file;
741 		}
742 	}
743 
744 	return NULL;
745 }
746 
747 void
748 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
749 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
750 {
751 	struct spdk_file_stat stat;
752 	struct spdk_file *f = NULL;
753 
754 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
755 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
756 		return;
757 	}
758 
759 	f = fs_find_file(fs, name);
760 	if (f != NULL) {
761 		stat.blobid = f->blobid;
762 		stat.size = f->length;
763 		cb_fn(cb_arg, &stat, 0);
764 		return;
765 	}
766 
767 	cb_fn(cb_arg, NULL, -ENOENT);
768 }
769 
770 static void
771 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
772 {
773 	struct spdk_fs_request *req = arg;
774 	struct spdk_fs_cb_args *args = &req->args;
775 
776 	args->rc = fserrno;
777 	if (fserrno == 0) {
778 		memcpy(args->arg, stat, sizeof(*stat));
779 	}
780 	sem_post(args->sem);
781 }
782 
783 static void
784 __file_stat(void *arg)
785 {
786 	struct spdk_fs_request *req = arg;
787 	struct spdk_fs_cb_args *args = &req->args;
788 
789 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
790 				args->fn.stat_op, req);
791 }
792 
793 int
794 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
795 		  const char *name, struct spdk_file_stat *stat)
796 {
797 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
798 	struct spdk_fs_request *req;
799 	int rc;
800 
801 	req = alloc_fs_request(channel);
802 	assert(req != NULL);
803 
804 	req->args.fs = fs;
805 	req->args.op.stat.name = name;
806 	req->args.fn.stat_op = __copy_stat;
807 	req->args.arg = stat;
808 	req->args.sem = &channel->sem;
809 	channel->send_request(__file_stat, req);
810 	sem_wait(&channel->sem);
811 
812 	rc = req->args.rc;
813 	free_fs_request(req);
814 
815 	return rc;
816 }
817 
818 static void
819 fs_create_blob_close_cb(void *ctx, int bserrno)
820 {
821 	struct spdk_fs_request *req = ctx;
822 	struct spdk_fs_cb_args *args = &req->args;
823 
824 	args->fn.file_op(args->arg, bserrno);
825 	free_fs_request(req);
826 }
827 
828 static void
829 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
830 {
831 	struct spdk_fs_request *req = ctx;
832 	struct spdk_fs_cb_args *args = &req->args;
833 	struct spdk_file *f = args->file;
834 	uint64_t length = 0;
835 
836 	f->blob = blob;
837 	spdk_bs_md_resize_blob(blob, 1);
838 	spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
839 	spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
840 
841 	spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args);
842 }
843 
844 static void
845 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
846 {
847 	struct spdk_fs_request *req = ctx;
848 	struct spdk_fs_cb_args *args = &req->args;
849 	struct spdk_file *f = args->file;
850 
851 	f->blobid = blobid;
852 	spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
853 }
854 
855 void
856 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
857 			  spdk_file_op_complete cb_fn, void *cb_arg)
858 {
859 	struct spdk_file *file;
860 	struct spdk_fs_request *req;
861 	struct spdk_fs_cb_args *args;
862 
863 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
864 		cb_fn(cb_arg, -ENAMETOOLONG);
865 		return;
866 	}
867 
868 	file = fs_find_file(fs, name);
869 	if (file != NULL) {
870 		cb_fn(cb_arg, -EEXIST);
871 		return;
872 	}
873 
874 	file = file_alloc(fs);
875 	if (file == NULL) {
876 		cb_fn(cb_arg, -ENOMEM);
877 		return;
878 	}
879 
880 	req = alloc_fs_request(fs->md_target.md_fs_channel);
881 	if (req == NULL) {
882 		cb_fn(cb_arg, -ENOMEM);
883 		return;
884 	}
885 
886 	args = &req->args;
887 	args->file = file;
888 	args->fn.file_op = cb_fn;
889 	args->arg = cb_arg;
890 
891 	file->name = strdup(name);
892 	spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args);
893 }
894 
895 static void
896 __fs_create_file_done(void *arg, int fserrno)
897 {
898 	struct spdk_fs_request *req = arg;
899 	struct spdk_fs_cb_args *args = &req->args;
900 
901 	args->rc = fserrno;
902 	sem_post(args->sem);
903 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
904 }
905 
906 static void
907 __fs_create_file(void *arg)
908 {
909 	struct spdk_fs_request *req = arg;
910 	struct spdk_fs_cb_args *args = &req->args;
911 
912 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
913 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
914 }
915 
916 int
917 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
918 {
919 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
920 	struct spdk_fs_request *req;
921 	struct spdk_fs_cb_args *args;
922 	int rc;
923 
924 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
925 
926 	req = alloc_fs_request(channel);
927 	assert(req != NULL);
928 
929 	args = &req->args;
930 	args->fs = fs;
931 	args->op.create.name = name;
932 	args->sem = &channel->sem;
933 	fs->send_request(__fs_create_file, req);
934 	sem_wait(&channel->sem);
935 	rc = args->rc;
936 	free_fs_request(req);
937 
938 	return rc;
939 }
940 
941 static void
942 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
943 {
944 	struct spdk_fs_request *req = ctx;
945 	struct spdk_fs_cb_args *args = &req->args;
946 	struct spdk_file *f = args->file;
947 
948 	f->blob = blob;
949 	while (!TAILQ_EMPTY(&f->open_requests)) {
950 		req = TAILQ_FIRST(&f->open_requests);
951 		args = &req->args;
952 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
953 		args->fn.file_op_with_handle(args->arg, f, bserrno);
954 		free_fs_request(req);
955 	}
956 }
957 
958 static void
959 fs_open_blob_create_cb(void *ctx, int bserrno)
960 {
961 	struct spdk_fs_request *req = ctx;
962 	struct spdk_fs_cb_args *args = &req->args;
963 	struct spdk_file *file = args->file;
964 	struct spdk_filesystem *fs = args->fs;
965 
966 	if (file == NULL) {
967 		/*
968 		 * This is from an open with CREATE flag - the file
969 		 *  is now created so look it up in the file list for this
970 		 *  filesystem.
971 		 */
972 		file = fs_find_file(fs, args->op.open.name);
973 		assert(file != NULL);
974 		args->file = file;
975 	}
976 
977 	file->ref_count++;
978 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
979 	if (file->ref_count == 1) {
980 		assert(file->blob == NULL);
981 		spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
982 	} else if (file->blob != NULL) {
983 		fs_open_blob_done(req, file->blob, 0);
984 	} else {
985 		/*
986 		 * The blob open for this file is in progress due to a previous
987 		 *  open request.  When that open completes, it will invoke the
988 		 *  open callback for this request.
989 		 */
990 	}
991 }
992 
993 void
994 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
995 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
996 {
997 	struct spdk_file *f = NULL;
998 	struct spdk_fs_request *req;
999 	struct spdk_fs_cb_args *args;
1000 
1001 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1002 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1003 		return;
1004 	}
1005 
1006 	f = fs_find_file(fs, name);
1007 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1008 		cb_fn(cb_arg, NULL, -ENOENT);
1009 		return;
1010 	}
1011 
1012 	if (f != NULL && f->is_deleted == true) {
1013 		cb_fn(cb_arg, NULL, -ENOENT);
1014 		return;
1015 	}
1016 
1017 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1018 	if (req == NULL) {
1019 		cb_fn(cb_arg, NULL, -ENOMEM);
1020 		return;
1021 	}
1022 
1023 	args = &req->args;
1024 	args->fn.file_op_with_handle = cb_fn;
1025 	args->arg = cb_arg;
1026 	args->file = f;
1027 	args->fs = fs;
1028 	args->op.open.name = name;
1029 
1030 	if (f == NULL) {
1031 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1032 	} else {
1033 		fs_open_blob_create_cb(req, 0);
1034 	}
1035 }
1036 
1037 static void
1038 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1039 {
1040 	struct spdk_fs_request *req = arg;
1041 	struct spdk_fs_cb_args *args = &req->args;
1042 
1043 	args->file = file;
1044 	args->rc = bserrno;
1045 	sem_post(args->sem);
1046 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
1047 }
1048 
1049 static void
1050 __fs_open_file(void *arg)
1051 {
1052 	struct spdk_fs_request *req = arg;
1053 	struct spdk_fs_cb_args *args = &req->args;
1054 
1055 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
1056 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1057 				__fs_open_file_done, req);
1058 }
1059 
1060 int
1061 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1062 		  const char *name, uint32_t flags, struct spdk_file **file)
1063 {
1064 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1065 	struct spdk_fs_request *req;
1066 	struct spdk_fs_cb_args *args;
1067 	int rc;
1068 
1069 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1070 
1071 	req = alloc_fs_request(channel);
1072 	assert(req != NULL);
1073 
1074 	args = &req->args;
1075 	args->fs = fs;
1076 	args->op.open.name = name;
1077 	args->op.open.flags = flags;
1078 	args->sem = &channel->sem;
1079 	fs->send_request(__fs_open_file, req);
1080 	sem_wait(&channel->sem);
1081 	rc = args->rc;
1082 	if (rc == 0) {
1083 		*file = args->file;
1084 	} else {
1085 		*file = NULL;
1086 	}
1087 	free_fs_request(req);
1088 
1089 	return rc;
1090 }
1091 
1092 static void
1093 fs_rename_blob_close_cb(void *ctx, int bserrno)
1094 {
1095 	struct spdk_fs_request *req = ctx;
1096 	struct spdk_fs_cb_args *args = &req->args;
1097 
1098 	args->fn.fs_op(args->arg, bserrno);
1099 	free_fs_request(req);
1100 }
1101 
1102 static void
1103 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1104 {
1105 	struct spdk_fs_request *req = ctx;
1106 	struct spdk_fs_cb_args *args = &req->args;
1107 	struct spdk_file *f = args->file;
1108 	const char *new_name = args->op.rename.new_name;
1109 
1110 	f->blob = blob;
1111 	spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1112 	spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req);
1113 }
1114 
1115 static void
1116 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1117 {
1118 	struct spdk_fs_cb_args *args = &req->args;
1119 	struct spdk_file *f;
1120 
1121 	f = fs_find_file(args->fs, args->op.rename.old_name);
1122 	if (f == NULL) {
1123 		args->fn.fs_op(args->arg, -ENOENT);
1124 		free_fs_request(req);
1125 		return;
1126 	}
1127 
1128 	free(f->name);
1129 	f->name = strdup(args->op.rename.new_name);
1130 	args->file = f;
1131 	spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1132 }
1133 
1134 static void
1135 fs_rename_delete_done(void *arg, int fserrno)
1136 {
1137 	__spdk_fs_md_rename_file(arg);
1138 }
1139 
1140 void
1141 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1142 			  const char *old_name, const char *new_name,
1143 			  spdk_file_op_complete cb_fn, void *cb_arg)
1144 {
1145 	struct spdk_file *f;
1146 	struct spdk_fs_request *req;
1147 	struct spdk_fs_cb_args *args;
1148 
1149 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1150 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1151 		cb_fn(cb_arg, -ENAMETOOLONG);
1152 		return;
1153 	}
1154 
1155 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1156 	if (req == NULL) {
1157 		cb_fn(cb_arg, -ENOMEM);
1158 		return;
1159 	}
1160 
1161 	args = &req->args;
1162 	args->fn.fs_op = cb_fn;
1163 	args->fs = fs;
1164 	args->arg = cb_arg;
1165 	args->op.rename.old_name = old_name;
1166 	args->op.rename.new_name = new_name;
1167 
1168 	f = fs_find_file(fs, new_name);
1169 	if (f == NULL) {
1170 		__spdk_fs_md_rename_file(req);
1171 		return;
1172 	}
1173 
1174 	/*
1175 	 * The rename overwrites an existing file.  So delete the existing file, then
1176 	 *  do the actual rename.
1177 	 */
1178 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1179 }
1180 
1181 static void
1182 __fs_rename_file_done(void *arg, int fserrno)
1183 {
1184 	struct spdk_fs_request *req = arg;
1185 	struct spdk_fs_cb_args *args = &req->args;
1186 
1187 	args->rc = fserrno;
1188 	sem_post(args->sem);
1189 }
1190 
1191 static void
1192 __fs_rename_file(void *arg)
1193 {
1194 	struct spdk_fs_request *req = arg;
1195 	struct spdk_fs_cb_args *args = &req->args;
1196 
1197 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1198 				  __fs_rename_file_done, req);
1199 }
1200 
1201 int
1202 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1203 		    const char *old_name, const char *new_name)
1204 {
1205 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1206 	struct spdk_fs_request *req;
1207 	struct spdk_fs_cb_args *args;
1208 	int rc;
1209 
1210 	req = alloc_fs_request(channel);
1211 	assert(req != NULL);
1212 
1213 	args = &req->args;
1214 
1215 	args->fs = fs;
1216 	args->op.rename.old_name = old_name;
1217 	args->op.rename.new_name = new_name;
1218 	args->sem = &channel->sem;
1219 	fs->send_request(__fs_rename_file, req);
1220 	sem_wait(&channel->sem);
1221 	rc = args->rc;
1222 	free_fs_request(req);
1223 	return rc;
1224 }
1225 
1226 static void
1227 blob_delete_cb(void *ctx, int bserrno)
1228 {
1229 	struct spdk_fs_request *req = ctx;
1230 	struct spdk_fs_cb_args *args = &req->args;
1231 
1232 	args->fn.file_op(args->arg, bserrno);
1233 	free_fs_request(req);
1234 }
1235 
1236 void
1237 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1238 			  spdk_file_op_complete cb_fn, void *cb_arg)
1239 {
1240 	struct spdk_file *f;
1241 	spdk_blob_id blobid;
1242 	struct spdk_fs_request *req;
1243 	struct spdk_fs_cb_args *args;
1244 
1245 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1246 
1247 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1248 		cb_fn(cb_arg, -ENAMETOOLONG);
1249 		return;
1250 	}
1251 
1252 	f = fs_find_file(fs, name);
1253 	if (f == NULL) {
1254 		cb_fn(cb_arg, -ENOENT);
1255 		return;
1256 	}
1257 
1258 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1259 	if (req == NULL) {
1260 		cb_fn(cb_arg, -ENOMEM);
1261 		return;
1262 	}
1263 
1264 	args = &req->args;
1265 	args->fn.file_op = cb_fn;
1266 	args->arg = cb_arg;
1267 
1268 	if (f->ref_count > 0) {
1269 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1270 		f->is_deleted = true;
1271 		spdk_blob_md_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1272 		spdk_bs_md_sync_blob(f->blob, blob_delete_cb, args);
1273 		return;
1274 	}
1275 
1276 	TAILQ_REMOVE(&fs->files, f, tailq);
1277 
1278 	cache_free_buffers(f);
1279 
1280 	blobid = f->blobid;
1281 
1282 	free(f->name);
1283 	free(f->tree);
1284 	free(f);
1285 
1286 	spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1287 }
1288 
1289 static void
1290 __fs_delete_file_done(void *arg, int fserrno)
1291 {
1292 	struct spdk_fs_request *req = arg;
1293 	struct spdk_fs_cb_args *args = &req->args;
1294 
1295 	args->rc = fserrno;
1296 	sem_post(args->sem);
1297 }
1298 
1299 static void
1300 __fs_delete_file(void *arg)
1301 {
1302 	struct spdk_fs_request *req = arg;
1303 	struct spdk_fs_cb_args *args = &req->args;
1304 
1305 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1306 }
1307 
1308 int
1309 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1310 		    const char *name)
1311 {
1312 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1313 	struct spdk_fs_request *req;
1314 	struct spdk_fs_cb_args *args;
1315 	int rc;
1316 
1317 	req = alloc_fs_request(channel);
1318 	assert(req != NULL);
1319 
1320 	args = &req->args;
1321 	args->fs = fs;
1322 	args->op.delete.name = name;
1323 	args->sem = &channel->sem;
1324 	fs->send_request(__fs_delete_file, req);
1325 	sem_wait(&channel->sem);
1326 	rc = args->rc;
1327 	free_fs_request(req);
1328 
1329 	return rc;
1330 }
1331 
1332 spdk_fs_iter
1333 spdk_fs_iter_first(struct spdk_filesystem *fs)
1334 {
1335 	struct spdk_file *f;
1336 
1337 	f = TAILQ_FIRST(&fs->files);
1338 	return f;
1339 }
1340 
1341 spdk_fs_iter
1342 spdk_fs_iter_next(spdk_fs_iter iter)
1343 {
1344 	struct spdk_file *f = iter;
1345 
1346 	if (f == NULL) {
1347 		return NULL;
1348 	}
1349 
1350 	f = TAILQ_NEXT(f, tailq);
1351 	return f;
1352 }
1353 
1354 const char *
1355 spdk_file_get_name(struct spdk_file *file)
1356 {
1357 	return file->name;
1358 }
1359 
1360 uint64_t
1361 spdk_file_get_length(struct spdk_file *file)
1362 {
1363 	assert(file != NULL);
1364 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1365 	return file->length;
1366 }
1367 
1368 static void
1369 fs_truncate_complete_cb(void *ctx, int bserrno)
1370 {
1371 	struct spdk_fs_request *req = ctx;
1372 	struct spdk_fs_cb_args *args = &req->args;
1373 
1374 	args->fn.file_op(args->arg, bserrno);
1375 	free_fs_request(req);
1376 }
1377 
1378 static uint64_t
1379 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1380 {
1381 	return (length + cluster_sz - 1) / cluster_sz;
1382 }
1383 
1384 void
1385 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1386 			 spdk_file_op_complete cb_fn, void *cb_arg)
1387 {
1388 	struct spdk_filesystem *fs;
1389 	size_t num_clusters;
1390 	struct spdk_fs_request *req;
1391 	struct spdk_fs_cb_args *args;
1392 
1393 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1394 	if (length == file->length) {
1395 		cb_fn(cb_arg, 0);
1396 		return;
1397 	}
1398 
1399 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1400 	if (req == NULL) {
1401 		cb_fn(cb_arg, -ENOMEM);
1402 		return;
1403 	}
1404 
1405 	args = &req->args;
1406 	args->fn.file_op = cb_fn;
1407 	args->arg = cb_arg;
1408 	args->file = file;
1409 	fs = file->fs;
1410 
1411 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1412 
1413 	spdk_bs_md_resize_blob(file->blob, num_clusters);
1414 	spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length));
1415 
1416 	file->length = length;
1417 	if (file->append_pos > file->length) {
1418 		file->append_pos = file->length;
1419 	}
1420 
1421 	spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args);
1422 }
1423 
1424 static void
1425 __truncate(void *arg)
1426 {
1427 	struct spdk_fs_request *req = arg;
1428 	struct spdk_fs_cb_args *args = &req->args;
1429 
1430 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1431 				 args->fn.file_op, args->arg);
1432 }
1433 
1434 void
1435 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1436 		   uint64_t length)
1437 {
1438 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1439 	struct spdk_fs_request *req;
1440 	struct spdk_fs_cb_args *args;
1441 
1442 	req = alloc_fs_request(channel);
1443 	assert(req != NULL);
1444 
1445 	args = &req->args;
1446 
1447 	args->file = file;
1448 	args->op.truncate.length = length;
1449 	args->fn.file_op = __sem_post;
1450 	args->arg = &channel->sem;
1451 
1452 	channel->send_request(__truncate, req);
1453 	sem_wait(&channel->sem);
1454 	free_fs_request(req);
1455 }
1456 
1457 static void
1458 __rw_done(void *ctx, int bserrno)
1459 {
1460 	struct spdk_fs_request *req = ctx;
1461 	struct spdk_fs_cb_args *args = &req->args;
1462 
1463 	spdk_dma_free(args->op.rw.pin_buf);
1464 	args->fn.file_op(args->arg, bserrno);
1465 	free_fs_request(req);
1466 }
1467 
1468 static void
1469 __read_done(void *ctx, int bserrno)
1470 {
1471 	struct spdk_fs_request *req = ctx;
1472 	struct spdk_fs_cb_args *args = &req->args;
1473 
1474 	if (args->op.rw.is_read) {
1475 		memcpy(args->op.rw.user_buf,
1476 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1477 		       args->op.rw.length);
1478 		__rw_done(req, 0);
1479 	} else {
1480 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1481 		       args->op.rw.user_buf,
1482 		       args->op.rw.length);
1483 		spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel,
1484 				      args->op.rw.pin_buf,
1485 				      args->op.rw.start_page, args->op.rw.num_pages,
1486 				      __rw_done, req);
1487 	}
1488 }
1489 
1490 static void
1491 __do_blob_read(void *ctx, int fserrno)
1492 {
1493 	struct spdk_fs_request *req = ctx;
1494 	struct spdk_fs_cb_args *args = &req->args;
1495 
1496 	spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel,
1497 			     args->op.rw.pin_buf,
1498 			     args->op.rw.start_page, args->op.rw.num_pages,
1499 			     __read_done, req);
1500 }
1501 
1502 static void
1503 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1504 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1505 {
1506 	uint64_t end_page;
1507 
1508 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1509 	*start_page = offset / *page_size;
1510 	end_page = (offset + length - 1) / *page_size;
1511 	*num_pages = (end_page - *start_page + 1);
1512 }
1513 
1514 static void
1515 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1516 	    void *payload, uint64_t offset, uint64_t length,
1517 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1518 {
1519 	struct spdk_fs_request *req;
1520 	struct spdk_fs_cb_args *args;
1521 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1522 	uint64_t start_page, num_pages, pin_buf_length;
1523 	uint32_t page_size;
1524 
1525 	if (is_read && offset + length > file->length) {
1526 		cb_fn(cb_arg, -EINVAL);
1527 		return;
1528 	}
1529 
1530 	req = alloc_fs_request(channel);
1531 	if (req == NULL) {
1532 		cb_fn(cb_arg, -ENOMEM);
1533 		return;
1534 	}
1535 
1536 	args = &req->args;
1537 	args->fn.file_op = cb_fn;
1538 	args->arg = cb_arg;
1539 	args->file = file;
1540 	args->op.rw.channel = channel->bs_channel;
1541 	args->op.rw.user_buf = payload;
1542 	args->op.rw.is_read = is_read;
1543 	args->op.rw.offset = offset;
1544 	args->op.rw.length = length;
1545 
1546 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1547 	pin_buf_length = num_pages * page_size;
1548 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL);
1549 
1550 	args->op.rw.start_page = start_page;
1551 	args->op.rw.num_pages = num_pages;
1552 
1553 	if (!is_read && file->length < offset + length) {
1554 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1555 	} else {
1556 		__do_blob_read(req, 0);
1557 	}
1558 }
1559 
1560 void
1561 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1562 		      void *payload, uint64_t offset, uint64_t length,
1563 		      spdk_file_op_complete cb_fn, void *cb_arg)
1564 {
1565 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1566 }
1567 
1568 void
1569 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1570 		     void *payload, uint64_t offset, uint64_t length,
1571 		     spdk_file_op_complete cb_fn, void *cb_arg)
1572 {
1573 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n",
1574 		      file->name, offset, length);
1575 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1576 }
1577 
1578 struct spdk_io_channel *
1579 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1580 {
1581 	struct spdk_io_channel *io_channel;
1582 	struct spdk_fs_channel *fs_channel;
1583 
1584 	io_channel = spdk_get_io_channel(&fs->io_target);
1585 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1586 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1587 	fs_channel->send_request = __send_request_direct;
1588 
1589 	return io_channel;
1590 }
1591 
1592 struct spdk_io_channel *
1593 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1594 {
1595 	struct spdk_io_channel *io_channel;
1596 	struct spdk_fs_channel *fs_channel;
1597 
1598 	io_channel = spdk_get_io_channel(&fs->io_target);
1599 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1600 	fs_channel->send_request = fs->send_request;
1601 	fs_channel->sync = 1;
1602 	pthread_spin_init(&fs_channel->lock, 0);
1603 
1604 	return io_channel;
1605 }
1606 
1607 void
1608 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1609 {
1610 	spdk_put_io_channel(channel);
1611 }
1612 
1613 void
1614 spdk_fs_set_cache_size(uint64_t size_in_mb)
1615 {
1616 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1617 }
1618 
1619 uint64_t
1620 spdk_fs_get_cache_size(void)
1621 {
1622 	return g_fs_cache_size / (1024 * 1024);
1623 }
1624 
1625 static void __file_flush(void *_args);
1626 
1627 static void *
1628 alloc_cache_memory_buffer(struct spdk_file *context)
1629 {
1630 	struct spdk_file *file;
1631 	void *buf;
1632 
1633 	buf = spdk_mempool_get(g_cache_pool);
1634 	if (buf != NULL) {
1635 		return buf;
1636 	}
1637 
1638 	pthread_spin_lock(&g_caches_lock);
1639 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1640 		if (!file->open_for_writing &&
1641 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1642 		    file != context) {
1643 			break;
1644 		}
1645 	}
1646 	pthread_spin_unlock(&g_caches_lock);
1647 	if (file != NULL) {
1648 		cache_free_buffers(file);
1649 		buf = spdk_mempool_get(g_cache_pool);
1650 		if (buf != NULL) {
1651 			return buf;
1652 		}
1653 	}
1654 
1655 	pthread_spin_lock(&g_caches_lock);
1656 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1657 		if (!file->open_for_writing && file != context) {
1658 			break;
1659 		}
1660 	}
1661 	pthread_spin_unlock(&g_caches_lock);
1662 	if (file != NULL) {
1663 		cache_free_buffers(file);
1664 		buf = spdk_mempool_get(g_cache_pool);
1665 		if (buf != NULL) {
1666 			return buf;
1667 		}
1668 	}
1669 
1670 	pthread_spin_lock(&g_caches_lock);
1671 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1672 		if (file != context) {
1673 			break;
1674 		}
1675 	}
1676 	pthread_spin_unlock(&g_caches_lock);
1677 	if (file != NULL) {
1678 		cache_free_buffers(file);
1679 		buf = spdk_mempool_get(g_cache_pool);
1680 		if (buf != NULL) {
1681 			return buf;
1682 		}
1683 	}
1684 
1685 	return NULL;
1686 }
1687 
1688 static struct cache_buffer *
1689 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1690 {
1691 	struct cache_buffer *buf;
1692 	int count = 0;
1693 
1694 	buf = calloc(1, sizeof(*buf));
1695 	if (buf == NULL) {
1696 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n");
1697 		return NULL;
1698 	}
1699 
1700 	buf->buf = alloc_cache_memory_buffer(file);
1701 	while (buf->buf == NULL) {
1702 		/*
1703 		 * TODO: alloc_cache_memory_buffer() should eventually free
1704 		 *  some buffers.  Need a more sophisticated check here, instead
1705 		 *  of just bailing if 100 tries does not result in getting a
1706 		 *  free buffer.  This will involve using the sync channel's
1707 		 *  semaphore to block until a buffer becomes available.
1708 		 */
1709 		if (count++ == 100) {
1710 			SPDK_ERRLOG("could not allocate cache buffer\n");
1711 			assert(false);
1712 			free(buf);
1713 			return NULL;
1714 		}
1715 		buf->buf = alloc_cache_memory_buffer(file);
1716 	}
1717 
1718 	buf->buf_size = CACHE_BUFFER_SIZE;
1719 	buf->offset = offset;
1720 
1721 	pthread_spin_lock(&g_caches_lock);
1722 	if (file->tree->present_mask == 0) {
1723 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1724 	}
1725 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1726 	pthread_spin_unlock(&g_caches_lock);
1727 
1728 	return buf;
1729 }
1730 
1731 static struct cache_buffer *
1732 cache_append_buffer(struct spdk_file *file)
1733 {
1734 	struct cache_buffer *last;
1735 
1736 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1737 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1738 
1739 	last = cache_insert_buffer(file, file->append_pos);
1740 	if (last == NULL) {
1741 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n");
1742 		return NULL;
1743 	}
1744 
1745 	file->last = last;
1746 
1747 	return last;
1748 }
1749 
1750 static void
1751 __wake_caller(struct spdk_fs_cb_args *args)
1752 {
1753 	sem_post(args->sem);
1754 }
1755 
1756 static void __check_sync_reqs(struct spdk_file *file);
1757 
1758 static void
1759 __file_cache_finish_sync(struct spdk_file *file)
1760 {
1761 	struct spdk_fs_request *sync_req;
1762 	struct spdk_fs_cb_args *sync_args;
1763 
1764 	pthread_spin_lock(&file->lock);
1765 	sync_req = TAILQ_FIRST(&file->sync_requests);
1766 	sync_args = &sync_req->args;
1767 	assert(sync_args->op.sync.offset <= file->length_flushed);
1768 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1769 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1770 	pthread_spin_unlock(&file->lock);
1771 
1772 	sync_args->fn.file_op(sync_args->arg, 0);
1773 	__check_sync_reqs(file);
1774 
1775 	pthread_spin_lock(&file->lock);
1776 	free_fs_request(sync_req);
1777 	pthread_spin_unlock(&file->lock);
1778 }
1779 
1780 static void
1781 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1782 {
1783 	struct spdk_file *file = ctx;
1784 
1785 	__file_cache_finish_sync(file);
1786 }
1787 
1788 static void
1789 __free_args(struct spdk_fs_cb_args *args)
1790 {
1791 	struct spdk_fs_request *req;
1792 
1793 	if (!args->from_request) {
1794 		free(args);
1795 	} else {
1796 		/* Depends on args being at the start of the spdk_fs_request structure. */
1797 		req = (struct spdk_fs_request *)args;
1798 		free_fs_request(req);
1799 	}
1800 }
1801 
1802 static void
1803 __check_sync_reqs(struct spdk_file *file)
1804 {
1805 	struct spdk_fs_request *sync_req;
1806 
1807 	pthread_spin_lock(&file->lock);
1808 
1809 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1810 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1811 			break;
1812 		}
1813 	}
1814 
1815 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1816 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1817 		sync_req->args.op.sync.xattr_in_progress = true;
1818 		spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed,
1819 				       sizeof(file->length_flushed));
1820 
1821 		pthread_spin_unlock(&file->lock);
1822 		spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file);
1823 	} else {
1824 		pthread_spin_unlock(&file->lock);
1825 	}
1826 }
1827 
1828 static void
1829 __file_flush_done(void *arg, int bserrno)
1830 {
1831 	struct spdk_fs_cb_args *args = arg;
1832 	struct spdk_file *file = args->file;
1833 	struct cache_buffer *next = args->op.flush.cache_buffer;
1834 
1835 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1836 
1837 	pthread_spin_lock(&file->lock);
1838 	next->in_progress = false;
1839 	next->bytes_flushed += args->op.flush.length;
1840 	file->length_flushed += args->op.flush.length;
1841 	if (file->length_flushed > file->length) {
1842 		file->length = file->length_flushed;
1843 	}
1844 	if (next->bytes_flushed == next->buf_size) {
1845 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1846 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1847 	}
1848 
1849 	/*
1850 	 * Assert that there is no cached data that extends past the end of the underlying
1851 	 *  blob.
1852 	 */
1853 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1854 	       next->bytes_filled == 0);
1855 
1856 	pthread_spin_unlock(&file->lock);
1857 
1858 	__check_sync_reqs(file);
1859 
1860 	__file_flush(args);
1861 }
1862 
1863 static void
1864 __file_flush(void *_args)
1865 {
1866 	struct spdk_fs_cb_args *args = _args;
1867 	struct spdk_file *file = args->file;
1868 	struct cache_buffer *next;
1869 	uint64_t offset, length, start_page, num_pages;
1870 	uint32_t page_size;
1871 
1872 	pthread_spin_lock(&file->lock);
1873 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1874 	if (next == NULL || next->in_progress) {
1875 		/*
1876 		 * There is either no data to flush, or a flush I/O is already in
1877 		 *  progress.  So return immediately - if a flush I/O is in
1878 		 *  progress we will flush more data after that is completed.
1879 		 */
1880 		__free_args(args);
1881 		pthread_spin_unlock(&file->lock);
1882 		return;
1883 	}
1884 
1885 	offset = next->offset + next->bytes_flushed;
1886 	length = next->bytes_filled - next->bytes_flushed;
1887 	if (length == 0) {
1888 		__free_args(args);
1889 		pthread_spin_unlock(&file->lock);
1890 		return;
1891 	}
1892 	args->op.flush.length = length;
1893 	args->op.flush.cache_buffer = next;
1894 
1895 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1896 
1897 	next->in_progress = true;
1898 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1899 		     offset, length, start_page, num_pages);
1900 	pthread_spin_unlock(&file->lock);
1901 	spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1902 			      next->buf + (start_page * page_size) - next->offset,
1903 			      start_page, num_pages,
1904 			      __file_flush_done, args);
1905 }
1906 
1907 static void
1908 __file_extend_done(void *arg, int bserrno)
1909 {
1910 	struct spdk_fs_cb_args *args = arg;
1911 
1912 	__wake_caller(args);
1913 }
1914 
1915 static void
1916 __file_extend_blob(void *_args)
1917 {
1918 	struct spdk_fs_cb_args *args = _args;
1919 	struct spdk_file *file = args->file;
1920 
1921 	spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters);
1922 
1923 	spdk_bs_md_sync_blob(file->blob, __file_extend_done, args);
1924 }
1925 
1926 static void
1927 __rw_from_file_done(void *arg, int bserrno)
1928 {
1929 	struct spdk_fs_cb_args *args = arg;
1930 
1931 	__wake_caller(args);
1932 	__free_args(args);
1933 }
1934 
1935 static void
1936 __rw_from_file(void *_args)
1937 {
1938 	struct spdk_fs_cb_args *args = _args;
1939 	struct spdk_file *file = args->file;
1940 
1941 	if (args->op.rw.is_read) {
1942 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1943 				     args->op.rw.offset, args->op.rw.length,
1944 				     __rw_from_file_done, args);
1945 	} else {
1946 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1947 				      args->op.rw.offset, args->op.rw.length,
1948 				      __rw_from_file_done, args);
1949 	}
1950 }
1951 
1952 static int
1953 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
1954 		    uint64_t offset, uint64_t length, bool is_read)
1955 {
1956 	struct spdk_fs_cb_args *args;
1957 
1958 	args = calloc(1, sizeof(*args));
1959 	if (args == NULL) {
1960 		sem_post(sem);
1961 		return -ENOMEM;
1962 	}
1963 
1964 	args->file = file;
1965 	args->sem = sem;
1966 	args->op.rw.user_buf = payload;
1967 	args->op.rw.offset = offset;
1968 	args->op.rw.length = length;
1969 	args->op.rw.is_read = is_read;
1970 	file->fs->send_request(__rw_from_file, args);
1971 	return 0;
1972 }
1973 
1974 int
1975 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
1976 		void *payload, uint64_t offset, uint64_t length)
1977 {
1978 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1979 	struct spdk_fs_cb_args *args;
1980 	uint64_t rem_length, copy, blob_size, cluster_sz;
1981 	uint32_t cache_buffers_filled = 0;
1982 	uint8_t *cur_payload;
1983 	struct cache_buffer *last;
1984 
1985 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
1986 
1987 	if (length == 0) {
1988 		return 0;
1989 	}
1990 
1991 	if (offset != file->append_pos) {
1992 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
1993 		return -EINVAL;
1994 	}
1995 
1996 	pthread_spin_lock(&file->lock);
1997 	file->open_for_writing = true;
1998 
1999 	if (file->last == NULL) {
2000 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
2001 			cache_append_buffer(file);
2002 		} else {
2003 			int rc;
2004 
2005 			file->append_pos += length;
2006 			pthread_spin_unlock(&file->lock);
2007 			rc = __send_rw_from_file(file, &channel->sem, payload,
2008 						 offset, length, false);
2009 			sem_wait(&channel->sem);
2010 			return rc;
2011 		}
2012 	}
2013 
2014 	blob_size = __file_get_blob_size(file);
2015 
2016 	if ((offset + length) > blob_size) {
2017 		struct spdk_fs_cb_args extend_args = {};
2018 
2019 		cluster_sz = file->fs->bs_opts.cluster_sz;
2020 		extend_args.sem = &channel->sem;
2021 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2022 		extend_args.file = file;
2023 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2024 		pthread_spin_unlock(&file->lock);
2025 		file->fs->send_request(__file_extend_blob, &extend_args);
2026 		sem_wait(&channel->sem);
2027 	}
2028 
2029 	last = file->last;
2030 	rem_length = length;
2031 	cur_payload = payload;
2032 	while (rem_length > 0) {
2033 		copy = last->buf_size - last->bytes_filled;
2034 		if (copy > rem_length) {
2035 			copy = rem_length;
2036 		}
2037 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2038 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2039 		file->append_pos += copy;
2040 		if (file->length < file->append_pos) {
2041 			file->length = file->append_pos;
2042 		}
2043 		cur_payload += copy;
2044 		last->bytes_filled += copy;
2045 		rem_length -= copy;
2046 		if (last->bytes_filled == last->buf_size) {
2047 			cache_buffers_filled++;
2048 			last = cache_append_buffer(file);
2049 			if (last == NULL) {
2050 				BLOBFS_TRACE(file, "nomem\n");
2051 				pthread_spin_unlock(&file->lock);
2052 				return -ENOMEM;
2053 			}
2054 		}
2055 	}
2056 
2057 	if (cache_buffers_filled == 0) {
2058 		pthread_spin_unlock(&file->lock);
2059 		return 0;
2060 	}
2061 
2062 	args = calloc(1, sizeof(*args));
2063 	if (args == NULL) {
2064 		pthread_spin_unlock(&file->lock);
2065 		return -ENOMEM;
2066 	}
2067 
2068 	args->file = file;
2069 	file->fs->send_request(__file_flush, args);
2070 	pthread_spin_unlock(&file->lock);
2071 	return 0;
2072 }
2073 
2074 static void
2075 __readahead_done(void *arg, int bserrno)
2076 {
2077 	struct spdk_fs_cb_args *args = arg;
2078 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2079 	struct spdk_file *file = args->file;
2080 
2081 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2082 
2083 	pthread_spin_lock(&file->lock);
2084 	cache_buffer->bytes_filled = args->op.readahead.length;
2085 	cache_buffer->bytes_flushed = args->op.readahead.length;
2086 	cache_buffer->in_progress = false;
2087 	pthread_spin_unlock(&file->lock);
2088 
2089 	__free_args(args);
2090 }
2091 
2092 static void
2093 __readahead(void *_args)
2094 {
2095 	struct spdk_fs_cb_args *args = _args;
2096 	struct spdk_file *file = args->file;
2097 	uint64_t offset, length, start_page, num_pages;
2098 	uint32_t page_size;
2099 
2100 	offset = args->op.readahead.offset;
2101 	length = args->op.readahead.length;
2102 	assert(length > 0);
2103 
2104 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2105 
2106 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2107 		     offset, length, start_page, num_pages);
2108 	spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2109 			     args->op.readahead.cache_buffer->buf,
2110 			     start_page, num_pages,
2111 			     __readahead_done, args);
2112 }
2113 
2114 static uint64_t
2115 __next_cache_buffer_offset(uint64_t offset)
2116 {
2117 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2118 }
2119 
2120 static void
2121 check_readahead(struct spdk_file *file, uint64_t offset)
2122 {
2123 	struct spdk_fs_cb_args *args;
2124 
2125 	offset = __next_cache_buffer_offset(offset);
2126 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2127 		return;
2128 	}
2129 
2130 	args = calloc(1, sizeof(*args));
2131 	if (args == NULL) {
2132 		return;
2133 	}
2134 
2135 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2136 
2137 	args->file = file;
2138 	args->op.readahead.offset = offset;
2139 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2140 	args->op.readahead.cache_buffer->in_progress = true;
2141 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2142 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2143 	} else {
2144 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2145 	}
2146 	file->fs->send_request(__readahead, args);
2147 }
2148 
2149 static int
2150 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2151 {
2152 	struct cache_buffer *buf;
2153 	int rc;
2154 
2155 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2156 	if (buf == NULL) {
2157 		pthread_spin_unlock(&file->lock);
2158 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2159 		pthread_spin_lock(&file->lock);
2160 		return rc;
2161 	}
2162 
2163 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2164 		length = buf->offset + buf->bytes_filled - offset;
2165 	}
2166 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2167 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2168 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2169 		pthread_spin_lock(&g_caches_lock);
2170 		spdk_tree_remove_buffer(file->tree, buf);
2171 		if (file->tree->present_mask == 0) {
2172 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2173 		}
2174 		pthread_spin_unlock(&g_caches_lock);
2175 	}
2176 
2177 	sem_post(sem);
2178 	return 0;
2179 }
2180 
2181 int64_t
2182 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2183 	       void *payload, uint64_t offset, uint64_t length)
2184 {
2185 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2186 	uint64_t final_offset, final_length;
2187 	uint32_t sub_reads = 0;
2188 	int rc = 0;
2189 
2190 	pthread_spin_lock(&file->lock);
2191 
2192 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2193 
2194 	file->open_for_writing = false;
2195 
2196 	if (length == 0 || offset >= file->append_pos) {
2197 		pthread_spin_unlock(&file->lock);
2198 		return 0;
2199 	}
2200 
2201 	if (offset + length > file->append_pos) {
2202 		length = file->append_pos - offset;
2203 	}
2204 
2205 	if (offset != file->next_seq_offset) {
2206 		file->seq_byte_count = 0;
2207 	}
2208 	file->seq_byte_count += length;
2209 	file->next_seq_offset = offset + length;
2210 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2211 		check_readahead(file, offset);
2212 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2213 	}
2214 
2215 	final_length = 0;
2216 	final_offset = offset + length;
2217 	while (offset < final_offset) {
2218 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2219 		if (length > (final_offset - offset)) {
2220 			length = final_offset - offset;
2221 		}
2222 		rc = __file_read(file, payload, offset, length, &channel->sem);
2223 		if (rc == 0) {
2224 			final_length += length;
2225 		} else {
2226 			break;
2227 		}
2228 		payload += length;
2229 		offset += length;
2230 		sub_reads++;
2231 	}
2232 	pthread_spin_unlock(&file->lock);
2233 	while (sub_reads-- > 0) {
2234 		sem_wait(&channel->sem);
2235 	}
2236 	if (rc == 0) {
2237 		return final_length;
2238 	} else {
2239 		return rc;
2240 	}
2241 }
2242 
2243 static void
2244 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2245 	   spdk_file_op_complete cb_fn, void *cb_arg)
2246 {
2247 	struct spdk_fs_request *sync_req;
2248 	struct spdk_fs_request *flush_req;
2249 	struct spdk_fs_cb_args *sync_args;
2250 	struct spdk_fs_cb_args *flush_args;
2251 
2252 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2253 
2254 	pthread_spin_lock(&file->lock);
2255 	if (file->append_pos <= file->length_flushed || file->last == NULL) {
2256 		BLOBFS_TRACE(file, "done - no data to flush\n");
2257 		pthread_spin_unlock(&file->lock);
2258 		cb_fn(cb_arg, 0);
2259 		return;
2260 	}
2261 
2262 	sync_req = alloc_fs_request(channel);
2263 	assert(sync_req != NULL);
2264 	sync_args = &sync_req->args;
2265 
2266 	flush_req = alloc_fs_request(channel);
2267 	assert(flush_req != NULL);
2268 	flush_args = &flush_req->args;
2269 
2270 	sync_args->file = file;
2271 	sync_args->fn.file_op = cb_fn;
2272 	sync_args->arg = cb_arg;
2273 	sync_args->op.sync.offset = file->append_pos;
2274 	sync_args->op.sync.xattr_in_progress = false;
2275 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2276 	pthread_spin_unlock(&file->lock);
2277 
2278 	flush_args->file = file;
2279 	channel->send_request(__file_flush, flush_args);
2280 }
2281 
2282 int
2283 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2284 {
2285 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2286 
2287 	_file_sync(file, channel, __sem_post, &channel->sem);
2288 	sem_wait(&channel->sem);
2289 
2290 	return 0;
2291 }
2292 
2293 void
2294 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2295 		     spdk_file_op_complete cb_fn, void *cb_arg)
2296 {
2297 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2298 
2299 	_file_sync(file, channel, cb_fn, cb_arg);
2300 }
2301 
2302 void
2303 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2304 {
2305 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2306 	file->priority = priority;
2307 
2308 }
2309 
2310 /*
2311  * Close routines
2312  */
2313 
2314 static void
2315 __file_close_async_done(void *ctx, int bserrno)
2316 {
2317 	struct spdk_fs_request *req = ctx;
2318 	struct spdk_fs_cb_args *args = &req->args;
2319 	struct spdk_file *file = args->file;
2320 
2321 	if (file->is_deleted) {
2322 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2323 		return;
2324 	}
2325 	args->fn.file_op(args->arg, bserrno);
2326 	free_fs_request(req);
2327 }
2328 
2329 static void
2330 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2331 {
2332 	pthread_spin_lock(&file->lock);
2333 	if (file->ref_count == 0) {
2334 		pthread_spin_unlock(&file->lock);
2335 		__file_close_async_done(req, -EBADF);
2336 		return;
2337 	}
2338 
2339 	file->ref_count--;
2340 	if (file->ref_count > 0) {
2341 		pthread_spin_unlock(&file->lock);
2342 		__file_close_async_done(req, 0);
2343 		return;
2344 	}
2345 
2346 	pthread_spin_unlock(&file->lock);
2347 
2348 	spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req);
2349 }
2350 
2351 static void
2352 __file_close_async__sync_done(void *arg, int fserrno)
2353 {
2354 	struct spdk_fs_request *req = arg;
2355 	struct spdk_fs_cb_args *args = &req->args;
2356 
2357 	__file_close_async(args->file, req);
2358 }
2359 
2360 void
2361 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2362 {
2363 	struct spdk_fs_request *req;
2364 	struct spdk_fs_cb_args *args;
2365 
2366 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2367 	if (req == NULL) {
2368 		cb_fn(cb_arg, -ENOMEM);
2369 		return;
2370 	}
2371 
2372 	args = &req->args;
2373 	args->file = file;
2374 	args->fn.file_op = cb_fn;
2375 	args->arg = cb_arg;
2376 
2377 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2378 }
2379 
2380 static void
2381 __file_close_done(void *arg, int fserrno)
2382 {
2383 	struct spdk_fs_cb_args *args = arg;
2384 
2385 	args->rc = fserrno;
2386 	sem_post(args->sem);
2387 }
2388 
2389 static void
2390 __file_close(void *arg)
2391 {
2392 	struct spdk_fs_request *req = arg;
2393 	struct spdk_fs_cb_args *args = &req->args;
2394 	struct spdk_file *file = args->file;
2395 
2396 	__file_close_async(file, req);
2397 }
2398 
2399 int
2400 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2401 {
2402 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2403 	struct spdk_fs_request *req;
2404 	struct spdk_fs_cb_args *args;
2405 
2406 	req = alloc_fs_request(channel);
2407 	assert(req != NULL);
2408 
2409 	args = &req->args;
2410 
2411 	spdk_file_sync(file, _channel);
2412 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2413 	args->file = file;
2414 	args->sem = &channel->sem;
2415 	args->fn.file_op = __file_close_done;
2416 	args->arg = req;
2417 	channel->send_request(__file_close, req);
2418 	sem_wait(&channel->sem);
2419 
2420 	return args->rc;
2421 }
2422 
2423 static void
2424 cache_free_buffers(struct spdk_file *file)
2425 {
2426 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2427 	pthread_spin_lock(&file->lock);
2428 	pthread_spin_lock(&g_caches_lock);
2429 	if (file->tree->present_mask == 0) {
2430 		pthread_spin_unlock(&g_caches_lock);
2431 		pthread_spin_unlock(&file->lock);
2432 		return;
2433 	}
2434 	spdk_tree_free_buffers(file->tree);
2435 
2436 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2437 	/* If not freed, put it in the end of the queue */
2438 	if (file->tree->present_mask != 0) {
2439 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2440 	}
2441 	file->last = NULL;
2442 	pthread_spin_unlock(&g_caches_lock);
2443 	pthread_spin_unlock(&file->lock);
2444 }
2445 
2446 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS);
2447 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW);
2448