xref: /spdk/lib/blobfs/blobfs.c (revision 267a4e1ebdd729c9dd8dfb40efb3b4069e38022c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "blobfs_internal.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk_internal/log.h"
45 
46 #define BLOBFS_TRACE(file, str, args...) \
47 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args)
48 
49 #define BLOBFS_TRACE_RW(file, str, args...) \
50 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args)
51 
52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
53 
54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE;
55 static struct spdk_mempool *g_cache_pool;
56 static TAILQ_HEAD(, spdk_file) g_caches;
57 static pthread_spinlock_t g_caches_lock;
58 
59 static void
60 __sem_post(void *arg, int bserrno)
61 {
62 	sem_t *sem = arg;
63 
64 	sem_post(sem);
65 }
66 
67 void
68 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
69 {
70 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
71 	free(cache_buffer);
72 }
73 
74 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
75 
76 struct spdk_file {
77 	struct spdk_filesystem	*fs;
78 	struct spdk_blob	*blob;
79 	char			*name;
80 	uint64_t		length;
81 	bool			open_for_writing;
82 	uint64_t		length_flushed;
83 	uint64_t		append_pos;
84 	uint64_t		seq_byte_count;
85 	uint64_t		next_seq_offset;
86 	uint32_t		priority;
87 	TAILQ_ENTRY(spdk_file)	tailq;
88 	spdk_blob_id		blobid;
89 	uint32_t		ref_count;
90 	pthread_spinlock_t	lock;
91 	struct cache_buffer	*last;
92 	struct cache_tree	*tree;
93 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
94 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
95 	TAILQ_ENTRY(spdk_file)	cache_tailq;
96 };
97 
98 struct spdk_filesystem {
99 	struct spdk_blob_store	*bs;
100 	TAILQ_HEAD(, spdk_file)	files;
101 	struct spdk_bs_opts	bs_opts;
102 	struct spdk_bs_dev	*bdev;
103 	fs_send_request_fn	send_request;
104 
105 	struct {
106 		uint32_t		max_ops;
107 		struct spdk_io_channel	*sync_io_channel;
108 		struct spdk_fs_channel	*sync_fs_channel;
109 	} sync_target;
110 
111 	struct {
112 		uint32_t		max_ops;
113 		struct spdk_io_channel	*md_io_channel;
114 		struct spdk_fs_channel	*md_fs_channel;
115 	} md_target;
116 
117 	struct {
118 		uint32_t		max_ops;
119 	} io_target;
120 };
121 
122 struct spdk_fs_cb_args {
123 	union {
124 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
125 		spdk_fs_op_complete			fs_op;
126 		spdk_file_op_with_handle_complete	file_op_with_handle;
127 		spdk_file_op_complete			file_op;
128 		spdk_file_stat_op_complete		stat_op;
129 	} fn;
130 	void *arg;
131 	sem_t *sem;
132 	struct spdk_filesystem *fs;
133 	struct spdk_file *file;
134 	int rc;
135 	bool from_request;
136 	union {
137 		struct {
138 			uint64_t	length;
139 		} truncate;
140 		struct {
141 			struct spdk_io_channel	*channel;
142 			void		*user_buf;
143 			void		*pin_buf;
144 			int		is_read;
145 			off_t		offset;
146 			size_t		length;
147 			uint64_t	start_page;
148 			uint64_t	num_pages;
149 			uint32_t	blocklen;
150 		} rw;
151 		struct {
152 			const char	*old_name;
153 			const char	*new_name;
154 		} rename;
155 		struct {
156 			struct cache_buffer	*cache_buffer;
157 			uint64_t		length;
158 		} flush;
159 		struct {
160 			struct cache_buffer	*cache_buffer;
161 			uint64_t		length;
162 			uint64_t		offset;
163 		} readahead;
164 		struct {
165 			uint64_t			offset;
166 			TAILQ_ENTRY(spdk_fs_request)	tailq;
167 		} sync;
168 		struct {
169 			uint32_t			num_clusters;
170 		} resize;
171 		struct {
172 			const char	*name;
173 			uint32_t	flags;
174 			TAILQ_ENTRY(spdk_fs_request)	tailq;
175 		} open;
176 		struct {
177 			const char	*name;
178 		} create;
179 		struct {
180 			const char	*name;
181 		} delete;
182 		struct {
183 			const char	*name;
184 		} stat;
185 	} op;
186 };
187 
188 static void cache_free_buffers(struct spdk_file *file);
189 
190 static void
191 __initialize_cache(void)
192 {
193 	if (g_cache_pool != NULL) {
194 		return;
195 	}
196 
197 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
198 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
199 					   CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY);
200 	TAILQ_INIT(&g_caches);
201 	pthread_spin_init(&g_caches_lock, 0);
202 }
203 
204 static uint64_t
205 __file_get_blob_size(struct spdk_file *file)
206 {
207 	uint64_t cluster_sz;
208 
209 	cluster_sz = file->fs->bs_opts.cluster_sz;
210 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
211 }
212 
213 struct spdk_fs_request {
214 	struct spdk_fs_cb_args		args;
215 	TAILQ_ENTRY(spdk_fs_request)	link;
216 	struct spdk_fs_channel		*channel;
217 };
218 
219 struct spdk_fs_channel {
220 	struct spdk_fs_request		*req_mem;
221 	TAILQ_HEAD(, spdk_fs_request)	reqs;
222 	sem_t				sem;
223 	struct spdk_filesystem		*fs;
224 	struct spdk_io_channel		*bs_channel;
225 	fs_send_request_fn		send_request;
226 };
227 
228 static struct spdk_fs_request *
229 alloc_fs_request(struct spdk_fs_channel *channel)
230 {
231 	struct spdk_fs_request *req;
232 
233 	req = TAILQ_FIRST(&channel->reqs);
234 	if (!req) {
235 		return NULL;
236 	}
237 	TAILQ_REMOVE(&channel->reqs, req, link);
238 	memset(req, 0, sizeof(*req));
239 	req->channel = channel;
240 	req->args.from_request = true;
241 
242 	return req;
243 }
244 
245 static void
246 free_fs_request(struct spdk_fs_request *req)
247 {
248 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
249 }
250 
251 static int
252 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
253 			uint32_t max_ops)
254 {
255 	uint32_t i;
256 
257 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
258 	if (!channel->req_mem) {
259 		return -1;
260 	}
261 
262 	TAILQ_INIT(&channel->reqs);
263 	sem_init(&channel->sem, 0, 0);
264 
265 	for (i = 0; i < max_ops; i++) {
266 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
267 	}
268 
269 	channel->fs = fs;
270 
271 	return 0;
272 }
273 
274 static int
275 _spdk_fs_md_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx)
276 {
277 	struct spdk_filesystem		*fs;
278 	struct spdk_fs_channel		*channel = ctx_buf;
279 
280 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
281 
282 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
283 }
284 
285 static int
286 _spdk_fs_sync_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx)
287 {
288 	struct spdk_filesystem		*fs;
289 	struct spdk_fs_channel		*channel = ctx_buf;
290 
291 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
292 
293 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
294 }
295 
296 static int
297 _spdk_fs_io_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx)
298 {
299 	struct spdk_filesystem		*fs;
300 	struct spdk_fs_channel		*channel = ctx_buf;
301 
302 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
303 
304 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
305 }
306 
307 static void
308 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
309 {
310 	struct spdk_fs_channel *channel = ctx_buf;
311 
312 	free(channel->req_mem);
313 	if (channel->bs_channel != NULL) {
314 		spdk_bs_free_io_channel(channel->bs_channel);
315 	}
316 }
317 
318 static void
319 __send_request_direct(fs_request_fn fn, void *arg)
320 {
321 	fn(arg);
322 }
323 
324 static void
325 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
326 {
327 	fs->bs = bs;
328 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
329 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs,
330 			SPDK_IO_PRIORITY_DEFAULT);
331 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
332 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs,
333 			SPDK_IO_PRIORITY_DEFAULT);
334 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
335 }
336 
337 static void
338 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
339 {
340 	struct spdk_fs_request *req = ctx;
341 	struct spdk_fs_cb_args *args = &req->args;
342 	struct spdk_filesystem *fs = args->fs;
343 
344 	if (bserrno == 0) {
345 		common_fs_bs_init(fs, bs);
346 	} else {
347 		free(fs);
348 		fs = NULL;
349 	}
350 
351 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
352 	free_fs_request(req);
353 }
354 
355 static struct spdk_filesystem *
356 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
357 {
358 	struct spdk_filesystem *fs;
359 
360 	fs = calloc(1, sizeof(*fs));
361 	if (fs == NULL) {
362 		return NULL;
363 	}
364 
365 	fs->bdev = dev;
366 	fs->send_request = send_request_fn;
367 	TAILQ_INIT(&fs->files);
368 
369 	fs->md_target.max_ops = 512;
370 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
371 				sizeof(struct spdk_fs_channel));
372 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target, SPDK_IO_PRIORITY_DEFAULT, false,
373 				      NULL);
374 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
375 
376 	fs->sync_target.max_ops = 512;
377 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
378 				sizeof(struct spdk_fs_channel));
379 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target, SPDK_IO_PRIORITY_DEFAULT,
380 					  false, NULL);
381 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
382 
383 	fs->io_target.max_ops = 512;
384 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
385 				sizeof(struct spdk_fs_channel));
386 
387 	__initialize_cache();
388 
389 	return fs;
390 }
391 
392 void
393 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
394 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
395 {
396 	struct spdk_filesystem *fs;
397 	struct spdk_fs_request *req;
398 	struct spdk_fs_cb_args *args;
399 
400 	fs = fs_alloc(dev, send_request_fn);
401 	if (fs == NULL) {
402 		cb_fn(cb_arg, NULL, -ENOMEM);
403 		return;
404 	}
405 
406 	req = alloc_fs_request(fs->md_target.md_fs_channel);
407 	if (req == NULL) {
408 		cb_fn(cb_arg, NULL, -ENOMEM);
409 		return;
410 	}
411 
412 	args = &req->args;
413 	args->fn.fs_op_with_handle = cb_fn;
414 	args->arg = cb_arg;
415 	args->fs = fs;
416 
417 	spdk_bs_init(dev, NULL, init_cb, req);
418 }
419 
420 static struct spdk_file *
421 file_alloc(struct spdk_filesystem *fs)
422 {
423 	struct spdk_file *file;
424 
425 	file = calloc(1, sizeof(*file));
426 	if (file == NULL) {
427 		return NULL;
428 	}
429 
430 	file->fs = fs;
431 	TAILQ_INIT(&file->open_requests);
432 	TAILQ_INIT(&file->sync_requests);
433 	pthread_spin_init(&file->lock, 0);
434 	file->tree = calloc(1, sizeof(*file->tree));
435 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
436 	file->priority = SPDK_FILE_PRIORITY_LOW;
437 	return file;
438 }
439 
440 static void
441 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
442 {
443 	struct spdk_fs_request *req = ctx;
444 	struct spdk_fs_cb_args *args = &req->args;
445 	struct spdk_filesystem *fs = args->fs;
446 	struct spdk_file *f;
447 	uint64_t *length;
448 	const char *name;
449 	size_t value_len;
450 
451 	if (rc == -ENOENT) {
452 		/* Finished iterating */
453 		args->fn.fs_op_with_handle(args->arg, fs, 0);
454 		free_fs_request(req);
455 		return;
456 	} else if (rc < 0) {
457 		args->fn.fs_op_with_handle(args->arg, fs, rc);
458 		free_fs_request(req);
459 		return;
460 	}
461 
462 	rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len);
463 	if (rc < 0) {
464 		args->fn.fs_op_with_handle(args->arg, fs, rc);
465 		free_fs_request(req);
466 		return;
467 	}
468 
469 	rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len);
470 	if (rc < 0) {
471 		args->fn.fs_op_with_handle(args->arg, fs, rc);
472 		free_fs_request(req);
473 		return;
474 	}
475 	assert(value_len == 8);
476 
477 	f = file_alloc(fs);
478 	if (f == NULL) {
479 		args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
480 		free_fs_request(req);
481 		return;
482 	}
483 
484 	f->name = strdup(name);
485 	f->blobid = spdk_blob_get_id(blob);
486 	f->length = *length;
487 	f->length_flushed = *length;
488 	f->append_pos = *length;
489 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
490 
491 	spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req);
492 }
493 
494 static void
495 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
496 {
497 	struct spdk_fs_request *req = ctx;
498 	struct spdk_fs_cb_args *args = &req->args;
499 	struct spdk_filesystem *fs = args->fs;
500 
501 	if (bserrno != 0) {
502 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
503 		free_fs_request(req);
504 		free(fs);
505 		return;
506 	}
507 
508 	common_fs_bs_init(fs, bs);
509 	spdk_bs_md_iter_first(fs->bs, iter_cb, req);
510 }
511 
512 void
513 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
514 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
515 {
516 	struct spdk_filesystem *fs;
517 	struct spdk_fs_cb_args *args;
518 	struct spdk_fs_request *req;
519 
520 	fs = fs_alloc(dev, send_request_fn);
521 	if (fs == NULL) {
522 		cb_fn(cb_arg, NULL, -ENOMEM);
523 		return;
524 	}
525 
526 	req = alloc_fs_request(fs->md_target.md_fs_channel);
527 	if (req == NULL) {
528 		cb_fn(cb_arg, NULL, -ENOMEM);
529 		return;
530 	}
531 
532 	args = &req->args;
533 	args->fn.fs_op_with_handle = cb_fn;
534 	args->arg = cb_arg;
535 	args->fs = fs;
536 
537 	spdk_bs_load(dev, load_cb, req);
538 }
539 
540 static void
541 unload_cb(void *ctx, int bserrno)
542 {
543 	struct spdk_fs_request *req = ctx;
544 	struct spdk_fs_cb_args *args = &req->args;
545 	struct spdk_filesystem *fs = args->fs;
546 
547 	args->fn.fs_op(args->arg, bserrno);
548 	free(req);
549 
550 	spdk_io_device_unregister(&fs->io_target);
551 	spdk_io_device_unregister(&fs->sync_target);
552 	spdk_io_device_unregister(&fs->md_target);
553 
554 	free(fs);
555 }
556 
557 void
558 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
559 {
560 	struct spdk_fs_request *req;
561 	struct spdk_fs_cb_args *args;
562 
563 	/*
564 	 * We must free the md_channel before unloading the blobstore, so just
565 	 *  allocate this request from the general heap.
566 	 */
567 	req = calloc(1, sizeof(*req));
568 	if (req == NULL) {
569 		cb_fn(cb_arg, -ENOMEM);
570 		return;
571 	}
572 
573 	args = &req->args;
574 	args->fn.fs_op = cb_fn;
575 	args->arg = cb_arg;
576 	args->fs = fs;
577 
578 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
579 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
580 	spdk_bs_unload(fs->bs, unload_cb, req);
581 }
582 
583 static struct spdk_file *
584 fs_find_file(struct spdk_filesystem *fs, const char *name)
585 {
586 	struct spdk_file *file;
587 
588 	TAILQ_FOREACH(file, &fs->files, tailq) {
589 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
590 			return file;
591 		}
592 	}
593 
594 	return NULL;
595 }
596 
597 void
598 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
599 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
600 {
601 	struct spdk_file_stat stat;
602 	struct spdk_file *f = NULL;
603 
604 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
605 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
606 		return;
607 	}
608 
609 	f = fs_find_file(fs, name);
610 	if (f != NULL) {
611 		stat.blobid = f->blobid;
612 		stat.size = f->length;
613 		cb_fn(cb_arg, &stat, 0);
614 		return;
615 	}
616 
617 	cb_fn(cb_arg, NULL, -ENOENT);
618 }
619 
620 static void
621 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
622 {
623 	struct spdk_fs_request *req = arg;
624 	struct spdk_fs_cb_args *args = &req->args;
625 
626 	args->rc = fserrno;
627 	if (fserrno == 0) {
628 		memcpy(args->arg, stat, sizeof(*stat));
629 	}
630 	sem_post(args->sem);
631 }
632 
633 static void
634 __file_stat(void *arg)
635 {
636 	struct spdk_fs_request *req = arg;
637 	struct spdk_fs_cb_args *args = &req->args;
638 
639 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
640 				args->fn.stat_op, req);
641 }
642 
643 int
644 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
645 		  const char *name, struct spdk_file_stat *stat)
646 {
647 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
648 	struct spdk_fs_request *req;
649 	int rc;
650 
651 	req = alloc_fs_request(channel);
652 	assert(req != NULL);
653 
654 	req->args.fs = fs;
655 	req->args.op.stat.name = name;
656 	req->args.fn.stat_op = __copy_stat;
657 	req->args.arg = stat;
658 	req->args.sem = &channel->sem;
659 	channel->send_request(__file_stat, req);
660 	sem_wait(&channel->sem);
661 
662 	rc = req->args.rc;
663 	free_fs_request(req);
664 
665 	return rc;
666 }
667 
668 static void
669 fs_create_blob_close_cb(void *ctx, int bserrno)
670 {
671 	struct spdk_fs_request *req = ctx;
672 	struct spdk_fs_cb_args *args = &req->args;
673 
674 	args->fn.file_op(args->arg, bserrno);
675 	free_fs_request(req);
676 }
677 
678 static void
679 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
680 {
681 	struct spdk_fs_request *req = ctx;
682 	struct spdk_fs_cb_args *args = &req->args;
683 	struct spdk_file *f = args->file;
684 	uint64_t length = 0;
685 
686 	f->blob = blob;
687 	spdk_bs_md_resize_blob(blob, 1);
688 	spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
689 	spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
690 
691 	spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args);
692 }
693 
694 static void
695 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
696 {
697 	struct spdk_fs_request *req = ctx;
698 	struct spdk_fs_cb_args *args = &req->args;
699 	struct spdk_file *f = args->file;
700 
701 	f->blobid = blobid;
702 	spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
703 }
704 
705 void
706 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
707 			  spdk_file_op_complete cb_fn, void *cb_arg)
708 {
709 	struct spdk_file *file;
710 	struct spdk_fs_request *req;
711 	struct spdk_fs_cb_args *args;
712 
713 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
714 		cb_fn(cb_arg, -ENAMETOOLONG);
715 		return;
716 	}
717 
718 	file = fs_find_file(fs, name);
719 	if (file != NULL) {
720 		cb_fn(cb_arg, -EEXIST);
721 		return;
722 	}
723 
724 	file = file_alloc(fs);
725 	if (file == NULL) {
726 		cb_fn(cb_arg, -ENOMEM);
727 		return;
728 	}
729 
730 	req = alloc_fs_request(fs->md_target.md_fs_channel);
731 	if (req == NULL) {
732 		cb_fn(cb_arg, -ENOMEM);
733 		return;
734 	}
735 
736 	args = &req->args;
737 	args->file = file;
738 	args->fn.file_op = cb_fn;
739 	args->arg = cb_arg;
740 
741 	file->name = strdup(name);
742 	spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args);
743 }
744 
745 static void
746 __fs_create_file_done(void *arg, int fserrno)
747 {
748 	struct spdk_fs_request *req = arg;
749 	struct spdk_fs_cb_args *args = &req->args;
750 
751 	args->rc = fserrno;
752 	sem_post(args->sem);
753 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
754 }
755 
756 static void
757 __fs_create_file(void *arg)
758 {
759 	struct spdk_fs_request *req = arg;
760 	struct spdk_fs_cb_args *args = &req->args;
761 
762 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
763 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
764 }
765 
766 int
767 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
768 {
769 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
770 	struct spdk_fs_request *req;
771 	struct spdk_fs_cb_args *args;
772 	int rc;
773 
774 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
775 
776 	req = alloc_fs_request(channel);
777 	assert(req != NULL);
778 
779 	args = &req->args;
780 	args->fs = fs;
781 	args->op.create.name = name;
782 	args->sem = &channel->sem;
783 	fs->send_request(__fs_create_file, req);
784 	sem_wait(&channel->sem);
785 	rc = args->rc;
786 	free_fs_request(req);
787 
788 	return rc;
789 }
790 
791 static void
792 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
793 {
794 	struct spdk_fs_request *req = ctx;
795 	struct spdk_fs_cb_args *args = &req->args;
796 	struct spdk_file *f = args->file;
797 
798 	f->blob = blob;
799 	while (!TAILQ_EMPTY(&f->open_requests)) {
800 		req = TAILQ_FIRST(&f->open_requests);
801 		args = &req->args;
802 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
803 		args->fn.file_op_with_handle(args->arg, f, bserrno);
804 		free_fs_request(req);
805 	}
806 }
807 
808 static void
809 fs_open_blob_create_cb(void *ctx, int bserrno)
810 {
811 	struct spdk_fs_request *req = ctx;
812 	struct spdk_fs_cb_args *args = &req->args;
813 	struct spdk_file *file = args->file;
814 	struct spdk_filesystem *fs = args->fs;
815 
816 	if (file == NULL) {
817 		/*
818 		 * This is from an open with CREATE flag - the file
819 		 *  is now created so look it up in the file list for this
820 		 *  filesystem.
821 		 */
822 		file = fs_find_file(fs, args->op.open.name);
823 		assert(file != NULL);
824 		args->file = file;
825 	}
826 
827 	file->ref_count++;
828 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
829 	if (file->ref_count == 1) {
830 		assert(file->blob == NULL);
831 		spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
832 	} else if (file->blob != NULL) {
833 		fs_open_blob_done(req, file->blob, 0);
834 	} else {
835 		/*
836 		 * The blob open for this file is in progress due to a previous
837 		 *  open request.  When that open completes, it will invoke the
838 		 *  open callback for this request.
839 		 */
840 	}
841 }
842 
843 void
844 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
845 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
846 {
847 	struct spdk_file *f = NULL;
848 	struct spdk_fs_request *req;
849 	struct spdk_fs_cb_args *args;
850 
851 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
852 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
853 		return;
854 	}
855 
856 	f = fs_find_file(fs, name);
857 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
858 		cb_fn(cb_arg, NULL, -ENOENT);
859 		return;
860 	}
861 
862 	req = alloc_fs_request(fs->md_target.md_fs_channel);
863 	if (req == NULL) {
864 		cb_fn(cb_arg, NULL, -ENOMEM);
865 		return;
866 	}
867 
868 	args = &req->args;
869 	args->fn.file_op_with_handle = cb_fn;
870 	args->arg = cb_arg;
871 	args->file = f;
872 	args->fs = fs;
873 	args->op.open.name = name;
874 
875 	if (f == NULL) {
876 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
877 	} else {
878 		fs_open_blob_create_cb(req, 0);
879 	}
880 }
881 
882 static void
883 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
884 {
885 	struct spdk_fs_request *req = arg;
886 	struct spdk_fs_cb_args *args = &req->args;
887 
888 	args->file = file;
889 	args->rc = bserrno;
890 	sem_post(args->sem);
891 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
892 }
893 
894 static void
895 __fs_open_file(void *arg)
896 {
897 	struct spdk_fs_request *req = arg;
898 	struct spdk_fs_cb_args *args = &req->args;
899 
900 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
901 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
902 				__fs_open_file_done, req);
903 }
904 
905 int
906 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
907 		  const char *name, uint32_t flags, struct spdk_file **file)
908 {
909 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
910 	struct spdk_fs_request *req;
911 	struct spdk_fs_cb_args *args;
912 	int rc;
913 
914 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
915 
916 	req = alloc_fs_request(channel);
917 	assert(req != NULL);
918 
919 	args = &req->args;
920 	args->fs = fs;
921 	args->op.open.name = name;
922 	args->op.open.flags = flags;
923 	args->sem = &channel->sem;
924 	fs->send_request(__fs_open_file, req);
925 	sem_wait(&channel->sem);
926 	rc = args->rc;
927 	if (rc == 0) {
928 		*file = args->file;
929 	} else {
930 		*file = NULL;
931 	}
932 	free_fs_request(req);
933 
934 	return rc;
935 }
936 
937 static void
938 fs_rename_blob_close_cb(void *ctx, int bserrno)
939 {
940 	struct spdk_fs_request *req = ctx;
941 	struct spdk_fs_cb_args *args = &req->args;
942 
943 	args->fn.fs_op(args->arg, bserrno);
944 	free_fs_request(req);
945 }
946 
947 static void
948 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
949 {
950 	struct spdk_fs_request *req = ctx;
951 	struct spdk_fs_cb_args *args = &req->args;
952 	struct spdk_file *f = args->file;
953 	const char *new_name = args->op.rename.new_name;
954 
955 	f->blob = blob;
956 	spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
957 	spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req);
958 }
959 
960 static void
961 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
962 {
963 	struct spdk_fs_cb_args *args = &req->args;
964 	struct spdk_file *f;
965 
966 	f = fs_find_file(args->fs, args->op.rename.old_name);
967 	if (f == NULL) {
968 		args->fn.fs_op(args->arg, -ENOENT);
969 		free_fs_request(req);
970 		return;
971 	}
972 
973 	free(f->name);
974 	f->name = strdup(args->op.rename.new_name);
975 	args->file = f;
976 	spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
977 }
978 
979 static void
980 fs_rename_delete_done(void *arg, int fserrno)
981 {
982 	__spdk_fs_md_rename_file(arg);
983 }
984 
985 void
986 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
987 			  const char *old_name, const char *new_name,
988 			  spdk_file_op_complete cb_fn, void *cb_arg)
989 {
990 	struct spdk_file *f;
991 	struct spdk_fs_request *req;
992 	struct spdk_fs_cb_args *args;
993 
994 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name);
995 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
996 		cb_fn(cb_arg, -ENAMETOOLONG);
997 		return;
998 	}
999 
1000 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1001 	if (req == NULL) {
1002 		cb_fn(cb_arg, -ENOMEM);
1003 		return;
1004 	}
1005 
1006 	args = &req->args;
1007 	args->fn.fs_op = cb_fn;
1008 	args->fs = fs;
1009 	args->arg = cb_arg;
1010 	args->op.rename.old_name = old_name;
1011 	args->op.rename.new_name = new_name;
1012 
1013 	f = fs_find_file(fs, new_name);
1014 	if (f == NULL) {
1015 		__spdk_fs_md_rename_file(req);
1016 		return;
1017 	}
1018 
1019 	/*
1020 	 * The rename overwrites an existing file.  So delete the existing file, then
1021 	 *  do the actual rename.
1022 	 */
1023 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1024 }
1025 
1026 static void
1027 __fs_rename_file_done(void *arg, int fserrno)
1028 {
1029 	struct spdk_fs_request *req = arg;
1030 	struct spdk_fs_cb_args *args = &req->args;
1031 
1032 	args->rc = fserrno;
1033 	sem_post(args->sem);
1034 }
1035 
1036 static void
1037 __fs_rename_file(void *arg)
1038 {
1039 	struct spdk_fs_request *req = arg;
1040 	struct spdk_fs_cb_args *args = &req->args;
1041 
1042 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1043 				  __fs_rename_file_done, req);
1044 }
1045 
1046 int
1047 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1048 		    const char *old_name, const char *new_name)
1049 {
1050 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1051 	struct spdk_fs_request *req;
1052 	struct spdk_fs_cb_args *args;
1053 	int rc;
1054 
1055 	req = alloc_fs_request(channel);
1056 	assert(req != NULL);
1057 
1058 	args = &req->args;
1059 
1060 	args->fs = fs;
1061 	args->op.rename.old_name = old_name;
1062 	args->op.rename.new_name = new_name;
1063 	args->sem = &channel->sem;
1064 	fs->send_request(__fs_rename_file, req);
1065 	sem_wait(&channel->sem);
1066 	rc = args->rc;
1067 	free_fs_request(req);
1068 	return rc;
1069 }
1070 
1071 static void
1072 blob_delete_cb(void *ctx, int bserrno)
1073 {
1074 	struct spdk_fs_request *req = ctx;
1075 	struct spdk_fs_cb_args *args = &req->args;
1076 
1077 	args->fn.file_op(args->arg, bserrno);
1078 	free_fs_request(req);
1079 }
1080 
1081 void
1082 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1083 			  spdk_file_op_complete cb_fn, void *cb_arg)
1084 {
1085 	struct spdk_file *f;
1086 	spdk_blob_id blobid;
1087 	struct spdk_fs_request *req;
1088 	struct spdk_fs_cb_args *args;
1089 
1090 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1091 
1092 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1093 		cb_fn(cb_arg, -ENAMETOOLONG);
1094 		return;
1095 	}
1096 
1097 	f = fs_find_file(fs, name);
1098 	if (f == NULL) {
1099 		cb_fn(cb_arg, -ENOENT);
1100 		return;
1101 	}
1102 
1103 	if (f->ref_count > 0) {
1104 		/* For now, do not allow deleting files with open references. */
1105 		cb_fn(cb_arg, -EBUSY);
1106 		return;
1107 	}
1108 
1109 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1110 	if (req == NULL) {
1111 		cb_fn(cb_arg, -ENOMEM);
1112 		return;
1113 	}
1114 
1115 	TAILQ_REMOVE(&fs->files, f, tailq);
1116 
1117 	cache_free_buffers(f);
1118 
1119 	blobid = f->blobid;
1120 
1121 	free(f->name);
1122 	free(f->tree);
1123 	free(f);
1124 
1125 	args = &req->args;
1126 	args->fn.file_op = cb_fn;
1127 	args->arg = cb_arg;
1128 	spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1129 }
1130 
1131 static void
1132 __fs_delete_file_done(void *arg, int fserrno)
1133 {
1134 	struct spdk_fs_request *req = arg;
1135 	struct spdk_fs_cb_args *args = &req->args;
1136 
1137 	args->rc = fserrno;
1138 	sem_post(args->sem);
1139 }
1140 
1141 static void
1142 __fs_delete_file(void *arg)
1143 {
1144 	struct spdk_fs_request *req = arg;
1145 	struct spdk_fs_cb_args *args = &req->args;
1146 
1147 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1148 }
1149 
1150 int
1151 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1152 		    const char *name)
1153 {
1154 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1155 	struct spdk_fs_request *req;
1156 	struct spdk_fs_cb_args *args;
1157 	int rc;
1158 
1159 	req = alloc_fs_request(channel);
1160 	assert(req != NULL);
1161 
1162 	args = &req->args;
1163 	args->fs = fs;
1164 	args->op.delete.name = name;
1165 	args->sem = &channel->sem;
1166 	fs->send_request(__fs_delete_file, req);
1167 	sem_wait(&channel->sem);
1168 	rc = args->rc;
1169 	free_fs_request(req);
1170 
1171 	return rc;
1172 }
1173 
1174 spdk_fs_iter
1175 spdk_fs_iter_first(struct spdk_filesystem *fs)
1176 {
1177 	struct spdk_file *f;
1178 
1179 	f = TAILQ_FIRST(&fs->files);
1180 	return f;
1181 }
1182 
1183 spdk_fs_iter
1184 spdk_fs_iter_next(spdk_fs_iter iter)
1185 {
1186 	struct spdk_file *f = iter;
1187 
1188 	if (f == NULL) {
1189 		return NULL;
1190 	}
1191 
1192 	f = TAILQ_NEXT(f, tailq);
1193 	return f;
1194 }
1195 
1196 const char *
1197 spdk_file_get_name(struct spdk_file *file)
1198 {
1199 	return file->name;
1200 }
1201 
1202 uint64_t
1203 spdk_file_get_length(struct spdk_file *file)
1204 {
1205 	assert(file != NULL);
1206 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1207 	return file->length;
1208 }
1209 
1210 static void
1211 fs_truncate_complete_cb(void *ctx, int bserrno)
1212 {
1213 	struct spdk_fs_request *req = ctx;
1214 	struct spdk_fs_cb_args *args = &req->args;
1215 
1216 	args->fn.file_op(args->arg, bserrno);
1217 	free_fs_request(req);
1218 }
1219 
1220 static uint64_t
1221 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1222 {
1223 	return (length + cluster_sz - 1) / cluster_sz;
1224 }
1225 
1226 void
1227 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1228 			 spdk_file_op_complete cb_fn, void *cb_arg)
1229 {
1230 	struct spdk_filesystem *fs;
1231 	size_t num_clusters;
1232 	struct spdk_fs_request *req;
1233 	struct spdk_fs_cb_args *args;
1234 
1235 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1236 	if (length == file->length) {
1237 		cb_fn(cb_arg, 0);
1238 		return;
1239 	}
1240 
1241 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1242 	if (req == NULL) {
1243 		cb_fn(cb_arg, -ENOMEM);
1244 		return;
1245 	}
1246 
1247 	args = &req->args;
1248 	args->fn.file_op = cb_fn;
1249 	args->arg = cb_arg;
1250 	args->file = file;
1251 	fs = file->fs;
1252 
1253 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1254 
1255 	spdk_bs_md_resize_blob(file->blob, num_clusters);
1256 	spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length));
1257 
1258 	file->length = length;
1259 	if (file->append_pos > file->length) {
1260 		file->append_pos = file->length;
1261 	}
1262 
1263 	spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args);
1264 }
1265 
1266 static void
1267 __truncate(void *arg)
1268 {
1269 	struct spdk_fs_request *req = arg;
1270 	struct spdk_fs_cb_args *args = &req->args;
1271 
1272 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1273 				 args->fn.file_op, args->arg);
1274 }
1275 
1276 void
1277 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1278 		   uint64_t length)
1279 {
1280 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1281 	struct spdk_fs_request *req;
1282 	struct spdk_fs_cb_args *args;
1283 
1284 	req = alloc_fs_request(channel);
1285 	assert(req != NULL);
1286 
1287 	args = &req->args;
1288 
1289 	args->file = file;
1290 	args->op.truncate.length = length;
1291 	args->fn.file_op = __sem_post;
1292 	args->arg = &channel->sem;
1293 
1294 	channel->send_request(__truncate, req);
1295 	sem_wait(&channel->sem);
1296 	free_fs_request(req);
1297 }
1298 
1299 static void
1300 __rw_done(void *ctx, int bserrno)
1301 {
1302 	struct spdk_fs_request *req = ctx;
1303 	struct spdk_fs_cb_args *args = &req->args;
1304 
1305 	spdk_free(args->op.rw.pin_buf);
1306 	args->fn.file_op(args->arg, bserrno);
1307 	free_fs_request(req);
1308 }
1309 
1310 static void
1311 __read_done(void *ctx, int bserrno)
1312 {
1313 	struct spdk_fs_request *req = ctx;
1314 	struct spdk_fs_cb_args *args = &req->args;
1315 
1316 	if (args->op.rw.is_read) {
1317 		memcpy(args->op.rw.user_buf,
1318 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1319 		       args->op.rw.length);
1320 		__rw_done(req, 0);
1321 	} else {
1322 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1323 		       args->op.rw.user_buf,
1324 		       args->op.rw.length);
1325 		spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel,
1326 				      args->op.rw.pin_buf,
1327 				      args->op.rw.start_page, args->op.rw.num_pages,
1328 				      __rw_done, req);
1329 	}
1330 }
1331 
1332 static void
1333 __do_blob_read(void *ctx, int fserrno)
1334 {
1335 	struct spdk_fs_request *req = ctx;
1336 	struct spdk_fs_cb_args *args = &req->args;
1337 
1338 	spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel,
1339 			     args->op.rw.pin_buf,
1340 			     args->op.rw.start_page, args->op.rw.num_pages,
1341 			     __read_done, req);
1342 }
1343 
1344 static void
1345 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1346 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1347 {
1348 	uint64_t end_page;
1349 
1350 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1351 	*start_page = offset / *page_size;
1352 	end_page = (offset + length - 1) / *page_size;
1353 	*num_pages = (end_page - *start_page + 1);
1354 }
1355 
1356 static void
1357 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1358 	    void *payload, uint64_t offset, uint64_t length,
1359 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1360 {
1361 	struct spdk_fs_request *req;
1362 	struct spdk_fs_cb_args *args;
1363 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1364 	uint64_t start_page, num_pages, pin_buf_length;
1365 	uint32_t page_size;
1366 
1367 	if (is_read && offset + length > file->length) {
1368 		cb_fn(cb_arg, -EINVAL);
1369 		return;
1370 	}
1371 
1372 	req = alloc_fs_request(channel);
1373 	if (req == NULL) {
1374 		cb_fn(cb_arg, -ENOMEM);
1375 		return;
1376 	}
1377 
1378 	args = &req->args;
1379 	args->fn.file_op = cb_fn;
1380 	args->arg = cb_arg;
1381 	args->file = file;
1382 	args->op.rw.channel = channel->bs_channel;
1383 	args->op.rw.user_buf = payload;
1384 	args->op.rw.is_read = is_read;
1385 	args->op.rw.offset = offset;
1386 	args->op.rw.length = length;
1387 
1388 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1389 	pin_buf_length = num_pages * page_size;
1390 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL);
1391 
1392 	args->op.rw.start_page = start_page;
1393 	args->op.rw.num_pages = num_pages;
1394 
1395 	if (!is_read && file->length < offset + length) {
1396 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1397 	} else {
1398 		__do_blob_read(req, 0);
1399 	}
1400 }
1401 
1402 void
1403 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1404 		      void *payload, uint64_t offset, uint64_t length,
1405 		      spdk_file_op_complete cb_fn, void *cb_arg)
1406 {
1407 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1408 }
1409 
1410 void
1411 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1412 		     void *payload, uint64_t offset, uint64_t length,
1413 		     spdk_file_op_complete cb_fn, void *cb_arg)
1414 {
1415 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n",
1416 		      file->name, offset, length);
1417 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1418 }
1419 
1420 struct spdk_io_channel *
1421 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority)
1422 {
1423 	struct spdk_io_channel *io_channel;
1424 	struct spdk_fs_channel *fs_channel;
1425 
1426 	io_channel = spdk_get_io_channel(&fs->io_target, priority, false, NULL);
1427 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1428 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT);
1429 	fs_channel->send_request = __send_request_direct;
1430 
1431 	return io_channel;
1432 }
1433 
1434 struct spdk_io_channel *
1435 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority)
1436 {
1437 	struct spdk_io_channel *io_channel;
1438 	struct spdk_fs_channel *fs_channel;
1439 
1440 	io_channel = spdk_get_io_channel(&fs->io_target, priority, false, NULL);
1441 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1442 	fs_channel->send_request = fs->send_request;
1443 
1444 	return io_channel;
1445 }
1446 
1447 void
1448 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1449 {
1450 	spdk_put_io_channel(channel);
1451 }
1452 
1453 void
1454 spdk_fs_set_cache_size(uint64_t size_in_mb)
1455 {
1456 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1457 }
1458 
1459 uint64_t
1460 spdk_fs_get_cache_size(void)
1461 {
1462 	return g_fs_cache_size / (1024 * 1024);
1463 }
1464 
1465 static void __file_flush(void *_args);
1466 
1467 static void *
1468 alloc_cache_memory_buffer(struct spdk_file *context)
1469 {
1470 	struct spdk_file *file;
1471 	void *buf;
1472 
1473 	buf = spdk_mempool_get(g_cache_pool);
1474 	if (buf != NULL) {
1475 		return buf;
1476 	}
1477 
1478 	pthread_spin_lock(&g_caches_lock);
1479 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1480 		if (!file->open_for_writing &&
1481 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1482 		    file != context) {
1483 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
1484 			TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1485 			break;
1486 		}
1487 	}
1488 	pthread_spin_unlock(&g_caches_lock);
1489 	if (file != NULL) {
1490 		cache_free_buffers(file);
1491 		buf = spdk_mempool_get(g_cache_pool);
1492 		if (buf != NULL) {
1493 			return buf;
1494 		}
1495 	}
1496 
1497 	pthread_spin_lock(&g_caches_lock);
1498 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1499 		if (!file->open_for_writing && file != context) {
1500 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
1501 			TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1502 			break;
1503 		}
1504 	}
1505 	pthread_spin_unlock(&g_caches_lock);
1506 	if (file != NULL) {
1507 		cache_free_buffers(file);
1508 		buf = spdk_mempool_get(g_cache_pool);
1509 		if (buf != NULL) {
1510 			return buf;
1511 		}
1512 	}
1513 
1514 	pthread_spin_lock(&g_caches_lock);
1515 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1516 		if (file != context) {
1517 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
1518 			TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1519 			break;
1520 		}
1521 	}
1522 	pthread_spin_unlock(&g_caches_lock);
1523 	if (file != NULL) {
1524 		cache_free_buffers(file);
1525 		buf = spdk_mempool_get(g_cache_pool);
1526 		if (buf != NULL) {
1527 			return buf;
1528 		}
1529 	}
1530 
1531 	assert(false);
1532 	return NULL;
1533 }
1534 
1535 static struct cache_buffer *
1536 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1537 {
1538 	struct cache_buffer *buf;
1539 	int count = 0;
1540 
1541 	buf = calloc(1, sizeof(*buf));
1542 	if (buf == NULL) {
1543 		SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n");
1544 		return NULL;
1545 	}
1546 
1547 	buf->buf = alloc_cache_memory_buffer(file);
1548 	if (buf->buf == NULL) {
1549 		while (buf->buf == NULL) {
1550 			/*
1551 			 * TODO: alloc_cache_memory_buffer() should eventually free
1552 			 *  some buffers.  Need a more sophisticated check here, instead
1553 			 *  of just bailing if 100 tries does not result in getting a
1554 			 *  free buffer.  This will involve using the sync channel's
1555 			 *  semaphore to block until a buffer becomes available.
1556 			 */
1557 			if (count++ == 100) {
1558 				SPDK_ERRLOG("could not allocate cache buffer\n");
1559 				assert(false);
1560 				free(buf);
1561 				return NULL;
1562 			}
1563 			buf->buf = alloc_cache_memory_buffer(file);
1564 		}
1565 	}
1566 
1567 	buf->buf_size = CACHE_BUFFER_SIZE;
1568 	buf->offset = offset;
1569 
1570 	pthread_spin_lock(&g_caches_lock);
1571 	if (file->tree->present_mask == 0) {
1572 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1573 	}
1574 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1575 	pthread_spin_unlock(&g_caches_lock);
1576 
1577 	return buf;
1578 }
1579 
1580 static struct cache_buffer *
1581 cache_append_buffer(struct spdk_file *file)
1582 {
1583 	struct cache_buffer *last;
1584 
1585 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1586 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1587 
1588 	last = cache_insert_buffer(file, file->append_pos);
1589 	if (last == NULL) {
1590 		SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n");
1591 		return NULL;
1592 	}
1593 
1594 	if (file->last != NULL) {
1595 		file->last->next = last;
1596 	}
1597 	file->last = last;
1598 
1599 	return last;
1600 }
1601 
1602 static void
1603 __wake_caller(struct spdk_fs_cb_args *args)
1604 {
1605 	sem_post(args->sem);
1606 }
1607 
1608 static void
1609 __file_cache_finish_sync(struct spdk_file *file)
1610 {
1611 	struct spdk_fs_request *sync_req;
1612 	struct spdk_fs_cb_args *sync_args;
1613 
1614 	pthread_spin_lock(&file->lock);
1615 	while (!TAILQ_EMPTY(&file->sync_requests)) {
1616 		sync_req = TAILQ_FIRST(&file->sync_requests);
1617 		sync_args = &sync_req->args;
1618 		if (sync_args->op.sync.offset > file->length_flushed) {
1619 			break;
1620 		}
1621 		BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1622 		TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1623 		pthread_spin_unlock(&file->lock);
1624 		sync_args->fn.file_op(sync_args->arg, 0);
1625 		pthread_spin_lock(&file->lock);
1626 		free_fs_request(sync_req);
1627 	}
1628 	pthread_spin_unlock(&file->lock);
1629 }
1630 
1631 static void
1632 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1633 {
1634 	struct spdk_file *file = ctx;
1635 
1636 	__file_cache_finish_sync(file);
1637 }
1638 
1639 static void
1640 __free_args(struct spdk_fs_cb_args *args)
1641 {
1642 	struct spdk_fs_request *req;
1643 
1644 	if (!args->from_request) {
1645 		free(args);
1646 	} else {
1647 		/* Depends on args being at the start of the spdk_fs_request structure. */
1648 		req = (struct spdk_fs_request *)args;
1649 		free_fs_request(req);
1650 	}
1651 }
1652 
1653 static void
1654 __file_flush_done(void *arg, int bserrno)
1655 {
1656 	struct spdk_fs_cb_args *args = arg;
1657 	struct spdk_fs_request *sync_req;
1658 	struct spdk_file *file = args->file;
1659 	struct cache_buffer *next = args->op.flush.cache_buffer;
1660 
1661 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1662 
1663 	pthread_spin_lock(&file->lock);
1664 	next->in_progress = false;
1665 	next->bytes_flushed += args->op.flush.length;
1666 	file->length_flushed += args->op.flush.length;
1667 	if (file->length_flushed > file->length) {
1668 		file->length = file->length_flushed;
1669 	}
1670 	if (next->bytes_flushed == next->buf_size) {
1671 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1672 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1673 	}
1674 
1675 	TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) {
1676 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1677 			break;
1678 		}
1679 	}
1680 
1681 	/*
1682 	 * Assert that there is no cached data that extends past the end of the underlying
1683 	 *  blob.
1684 	 */
1685 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1686 	       next->bytes_filled == 0);
1687 
1688 	if (sync_req != NULL) {
1689 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1690 		spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed,
1691 				       sizeof(file->length_flushed));
1692 
1693 		pthread_spin_unlock(&file->lock);
1694 		spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file);
1695 	} else {
1696 		pthread_spin_unlock(&file->lock);
1697 		__file_cache_finish_sync(file);
1698 	}
1699 
1700 	__file_flush(args);
1701 }
1702 
1703 static void
1704 __file_flush(void *_args)
1705 {
1706 	struct spdk_fs_cb_args *args = _args;
1707 	struct spdk_file *file = args->file;
1708 	struct cache_buffer *next;
1709 	uint64_t offset, length, start_page, num_pages;
1710 	uint32_t page_size;
1711 
1712 	pthread_spin_lock(&file->lock);
1713 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1714 	if (next == NULL || next->in_progress) {
1715 		/*
1716 		 * There is either no data to flush, or a flush I/O is already in
1717 		 *  progress.  So return immediately - if a flush I/O is in
1718 		 *  progress we will flush more data after that is completed.
1719 		 */
1720 		__free_args(args);
1721 		pthread_spin_unlock(&file->lock);
1722 		return;
1723 	}
1724 
1725 	offset = next->offset + next->bytes_flushed;
1726 	length = next->bytes_filled - next->bytes_flushed;
1727 	if (length == 0) {
1728 		__free_args(args);
1729 		pthread_spin_unlock(&file->lock);
1730 		return;
1731 	}
1732 	args->op.flush.length = length;
1733 	args->op.flush.cache_buffer = next;
1734 
1735 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1736 
1737 	next->in_progress = true;
1738 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1739 		     offset, length, start_page, num_pages);
1740 	pthread_spin_unlock(&file->lock);
1741 	spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1742 			      next->buf + (start_page * page_size) - next->offset,
1743 			      start_page, num_pages,
1744 			      __file_flush_done, args);
1745 }
1746 
1747 static void
1748 __file_extend_done(void *arg, int bserrno)
1749 {
1750 	struct spdk_fs_cb_args *args = arg;
1751 
1752 	__wake_caller(args);
1753 }
1754 
1755 static void
1756 __file_extend_blob(void *_args)
1757 {
1758 	struct spdk_fs_cb_args *args = _args;
1759 	struct spdk_file *file = args->file;
1760 
1761 	spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters);
1762 
1763 	spdk_bs_md_sync_blob(file->blob, __file_extend_done, args);
1764 }
1765 
1766 static void
1767 __rw_from_file_done(void *arg, int bserrno)
1768 {
1769 	struct spdk_fs_cb_args *args = arg;
1770 
1771 	__wake_caller(args);
1772 	__free_args(args);
1773 }
1774 
1775 static void
1776 __rw_from_file(void *_args)
1777 {
1778 	struct spdk_fs_cb_args *args = _args;
1779 	struct spdk_file *file = args->file;
1780 
1781 	if (args->op.rw.is_read) {
1782 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1783 				     args->op.rw.offset, args->op.rw.length,
1784 				     __rw_from_file_done, args);
1785 	} else {
1786 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1787 				      args->op.rw.offset, args->op.rw.length,
1788 				      __rw_from_file_done, args);
1789 	}
1790 }
1791 
1792 static int
1793 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
1794 		    uint64_t offset, uint64_t length, bool is_read)
1795 {
1796 	struct spdk_fs_cb_args *args;
1797 
1798 	args = calloc(1, sizeof(*args));
1799 	if (args == NULL) {
1800 		sem_post(sem);
1801 		return -ENOMEM;
1802 	}
1803 
1804 	args->file = file;
1805 	args->sem = sem;
1806 	args->op.rw.user_buf = payload;
1807 	args->op.rw.offset = offset;
1808 	args->op.rw.length = length;
1809 	args->op.rw.is_read = is_read;
1810 	file->fs->send_request(__rw_from_file, args);
1811 	return 0;
1812 }
1813 
1814 int
1815 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
1816 		void *payload, uint64_t offset, uint64_t length)
1817 {
1818 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1819 	struct spdk_fs_cb_args *args;
1820 	uint64_t rem_length, copy, blob_size, cluster_sz;
1821 	uint32_t cache_buffers_filled = 0;
1822 	uint8_t *cur_payload;
1823 	struct cache_buffer *last;
1824 
1825 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
1826 
1827 	if (length == 0) {
1828 		return 0;
1829 	}
1830 
1831 	if (offset != file->append_pos) {
1832 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
1833 		return -EINVAL;
1834 	}
1835 
1836 	pthread_spin_lock(&file->lock);
1837 	file->open_for_writing = true;
1838 
1839 	if (file->last == NULL) {
1840 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
1841 			cache_append_buffer(file);
1842 		} else {
1843 			int rc;
1844 
1845 			file->append_pos += length;
1846 			rc = __send_rw_from_file(file, &channel->sem, payload,
1847 						 offset, length, false);
1848 			pthread_spin_unlock(&file->lock);
1849 			sem_wait(&channel->sem);
1850 			return rc;
1851 		}
1852 	}
1853 
1854 	blob_size = __file_get_blob_size(file);
1855 
1856 	if ((offset + length) > blob_size) {
1857 		struct spdk_fs_cb_args extend_args = {};
1858 
1859 		cluster_sz = file->fs->bs_opts.cluster_sz;
1860 		extend_args.sem = &channel->sem;
1861 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
1862 		extend_args.file = file;
1863 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
1864 		pthread_spin_unlock(&file->lock);
1865 		file->fs->send_request(__file_extend_blob, &extend_args);
1866 		sem_wait(&channel->sem);
1867 	}
1868 
1869 	last = file->last;
1870 	rem_length = length;
1871 	cur_payload = payload;
1872 	while (rem_length > 0) {
1873 		copy = last->buf_size - last->bytes_filled;
1874 		if (copy > rem_length) {
1875 			copy = rem_length;
1876 		}
1877 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
1878 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
1879 		file->append_pos += copy;
1880 		if (file->length < file->append_pos) {
1881 			file->length = file->append_pos;
1882 		}
1883 		cur_payload += copy;
1884 		last->bytes_filled += copy;
1885 		rem_length -= copy;
1886 		if (last->bytes_filled == last->buf_size) {
1887 			cache_buffers_filled++;
1888 			last = cache_append_buffer(file);
1889 			if (last == NULL) {
1890 				BLOBFS_TRACE(file, "nomem\n");
1891 				pthread_spin_unlock(&file->lock);
1892 				return -ENOMEM;
1893 			}
1894 		}
1895 	}
1896 
1897 	if (cache_buffers_filled == 0) {
1898 		pthread_spin_unlock(&file->lock);
1899 		return 0;
1900 	}
1901 
1902 	args = calloc(1, sizeof(*args));
1903 	if (args == NULL) {
1904 		pthread_spin_unlock(&file->lock);
1905 		return -ENOMEM;
1906 	}
1907 
1908 	args->file = file;
1909 	file->fs->send_request(__file_flush, args);
1910 	pthread_spin_unlock(&file->lock);
1911 	return 0;
1912 }
1913 
1914 static void
1915 __readahead_done(void *arg, int bserrno)
1916 {
1917 	struct spdk_fs_cb_args *args = arg;
1918 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
1919 	struct spdk_file *file = args->file;
1920 
1921 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
1922 
1923 	pthread_spin_lock(&file->lock);
1924 	cache_buffer->bytes_filled = args->op.readahead.length;
1925 	cache_buffer->bytes_flushed = args->op.readahead.length;
1926 	cache_buffer->in_progress = false;
1927 	pthread_spin_unlock(&file->lock);
1928 
1929 	__free_args(args);
1930 }
1931 
1932 static void
1933 __readahead(void *_args)
1934 {
1935 	struct spdk_fs_cb_args *args = _args;
1936 	struct spdk_file *file = args->file;
1937 	uint64_t offset, length, start_page, num_pages;
1938 	uint32_t page_size;
1939 
1940 	offset = args->op.readahead.offset;
1941 	length = args->op.readahead.length;
1942 	assert(length > 0);
1943 
1944 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1945 
1946 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1947 		     offset, length, start_page, num_pages);
1948 	spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1949 			     args->op.readahead.cache_buffer->buf,
1950 			     start_page, num_pages,
1951 			     __readahead_done, args);
1952 }
1953 
1954 static uint64_t
1955 __next_cache_buffer_offset(uint64_t offset)
1956 {
1957 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
1958 }
1959 
1960 static void
1961 check_readahead(struct spdk_file *file, uint64_t offset)
1962 {
1963 	struct spdk_fs_cb_args *args;
1964 
1965 	offset = __next_cache_buffer_offset(offset);
1966 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
1967 		return;
1968 	}
1969 
1970 	args = calloc(1, sizeof(*args));
1971 	if (args == NULL) {
1972 		return;
1973 	}
1974 
1975 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
1976 
1977 	args->file = file;
1978 	args->op.readahead.offset = offset;
1979 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
1980 	args->op.readahead.cache_buffer->in_progress = true;
1981 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
1982 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
1983 	} else {
1984 		args->op.readahead.length = CACHE_BUFFER_SIZE;
1985 	}
1986 	file->fs->send_request(__readahead, args);
1987 }
1988 
1989 static int
1990 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
1991 {
1992 	struct cache_buffer *buf;
1993 
1994 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
1995 	if (buf == NULL) {
1996 		return __send_rw_from_file(file, sem, payload, offset, length, true);
1997 	}
1998 
1999 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2000 		length = buf->offset + buf->bytes_filled - offset;
2001 	}
2002 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2003 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2004 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2005 		pthread_spin_lock(&g_caches_lock);
2006 		spdk_tree_remove_buffer(file->tree, buf);
2007 		if (file->tree->present_mask == 0) {
2008 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2009 		}
2010 		pthread_spin_unlock(&g_caches_lock);
2011 	}
2012 
2013 	sem_post(sem);
2014 	return 0;
2015 }
2016 
2017 int64_t
2018 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2019 	       void *payload, uint64_t offset, uint64_t length)
2020 {
2021 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2022 	uint64_t final_offset, final_length;
2023 	uint32_t sub_reads = 0;
2024 	int rc = 0;
2025 
2026 	pthread_spin_lock(&file->lock);
2027 
2028 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2029 
2030 	file->open_for_writing = false;
2031 
2032 	if (length == 0 || offset >= file->length) {
2033 		pthread_spin_unlock(&file->lock);
2034 		return 0;
2035 	}
2036 
2037 	if (offset + length > file->length) {
2038 		length = file->length - offset;
2039 	}
2040 
2041 	if (offset != file->next_seq_offset) {
2042 		file->seq_byte_count = 0;
2043 	}
2044 	file->seq_byte_count += length;
2045 	file->next_seq_offset = offset + length;
2046 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2047 		check_readahead(file, offset);
2048 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2049 	}
2050 
2051 	final_length = 0;
2052 	final_offset = offset + length;
2053 	while (offset < final_offset) {
2054 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2055 		if (length > (final_offset - offset)) {
2056 			length = final_offset - offset;
2057 		}
2058 		rc = __file_read(file, payload, offset, length, &channel->sem);
2059 		if (rc == 0) {
2060 			final_length += length;
2061 		} else {
2062 			break;
2063 		}
2064 		payload += length;
2065 		offset += length;
2066 		sub_reads++;
2067 	}
2068 	pthread_spin_unlock(&file->lock);
2069 	while (sub_reads-- > 0) {
2070 		sem_wait(&channel->sem);
2071 	}
2072 	if (rc == 0) {
2073 		return final_length;
2074 	} else {
2075 		return rc;
2076 	}
2077 }
2078 
2079 static void
2080 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2081 	   spdk_file_op_complete cb_fn, void *cb_arg)
2082 {
2083 	struct spdk_fs_request *sync_req;
2084 	struct spdk_fs_request *flush_req;
2085 	struct spdk_fs_cb_args *sync_args;
2086 	struct spdk_fs_cb_args *flush_args;
2087 
2088 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2089 
2090 	pthread_spin_lock(&file->lock);
2091 	if (file->append_pos <= file->length_flushed || file->last == NULL) {
2092 		BLOBFS_TRACE(file, "done - no data to flush\n");
2093 		pthread_spin_unlock(&file->lock);
2094 		cb_fn(cb_arg, 0);
2095 		return;
2096 	}
2097 
2098 	sync_req = alloc_fs_request(channel);
2099 	assert(sync_req != NULL);
2100 	sync_args = &sync_req->args;
2101 
2102 	flush_req = alloc_fs_request(channel);
2103 	assert(flush_req != NULL);
2104 	flush_args = &flush_req->args;
2105 
2106 	sync_args->file = file;
2107 	sync_args->fn.file_op = cb_fn;
2108 	sync_args->arg = cb_arg;
2109 	sync_args->op.sync.offset = file->append_pos;
2110 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2111 	pthread_spin_unlock(&file->lock);
2112 
2113 	flush_args->file = file;
2114 	channel->send_request(__file_flush, flush_args);
2115 }
2116 
2117 int
2118 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2119 {
2120 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2121 
2122 	_file_sync(file, channel, __sem_post, &channel->sem);
2123 	sem_wait(&channel->sem);
2124 
2125 	return 0;
2126 }
2127 
2128 void
2129 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2130 		     spdk_file_op_complete cb_fn, void *cb_arg)
2131 {
2132 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2133 
2134 	_file_sync(file, channel, cb_fn, cb_arg);
2135 }
2136 
2137 void
2138 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2139 {
2140 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2141 	file->priority = priority;
2142 
2143 }
2144 
2145 /*
2146  * Close routines
2147  */
2148 
2149 static void
2150 __file_close_async_done(void *ctx, int bserrno)
2151 {
2152 	struct spdk_fs_request *req = ctx;
2153 	struct spdk_fs_cb_args *args = &req->args;
2154 
2155 	args->fn.file_op(args->arg, bserrno);
2156 	free_fs_request(req);
2157 }
2158 
2159 static void
2160 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2161 {
2162 	pthread_spin_lock(&file->lock);
2163 	if (file->ref_count == 0) {
2164 		pthread_spin_unlock(&file->lock);
2165 		__file_close_async_done(req, -EBADF);
2166 		return;
2167 	}
2168 
2169 	file->ref_count--;
2170 	if (file->ref_count > 0) {
2171 		pthread_spin_unlock(&file->lock);
2172 		__file_close_async_done(req, 0);
2173 		return;
2174 	}
2175 
2176 	pthread_spin_unlock(&file->lock);
2177 
2178 	spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req);
2179 }
2180 
2181 static void
2182 __file_close_async__sync_done(void *arg, int fserrno)
2183 {
2184 	struct spdk_fs_request *req = arg;
2185 	struct spdk_fs_cb_args *args = &req->args;
2186 
2187 	__file_close_async(args->file, req);
2188 }
2189 
2190 void
2191 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2192 {
2193 	struct spdk_fs_request *req;
2194 	struct spdk_fs_cb_args *args;
2195 
2196 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2197 	if (req == NULL) {
2198 		cb_fn(cb_arg, -ENOMEM);
2199 		return;
2200 	}
2201 
2202 	args = &req->args;
2203 	args->file = file;
2204 	args->fn.file_op = cb_fn;
2205 	args->arg = cb_arg;
2206 
2207 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2208 }
2209 
2210 static void
2211 __file_close_done(void *arg, int fserrno)
2212 {
2213 	struct spdk_fs_cb_args *args = arg;
2214 
2215 	args->rc = fserrno;
2216 	sem_post(args->sem);
2217 }
2218 
2219 static void
2220 __file_close(void *arg)
2221 {
2222 	struct spdk_fs_request *req = arg;
2223 	struct spdk_fs_cb_args *args = &req->args;
2224 	struct spdk_file *file = args->file;
2225 
2226 	__file_close_async(file, req);
2227 }
2228 
2229 int
2230 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2231 {
2232 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2233 	struct spdk_fs_request *req;
2234 	struct spdk_fs_cb_args *args;
2235 
2236 	req = alloc_fs_request(channel);
2237 	assert(req != NULL);
2238 
2239 	args = &req->args;
2240 
2241 	spdk_file_sync(file, _channel);
2242 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2243 	args->file = file;
2244 	args->sem = &channel->sem;
2245 	args->fn.file_op = __file_close_done;
2246 	args->arg = req;
2247 	channel->send_request(__file_close, req);
2248 	sem_wait(&channel->sem);
2249 
2250 	return args->rc;
2251 }
2252 
2253 static void
2254 cache_free_buffers(struct spdk_file *file)
2255 {
2256 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2257 	pthread_spin_lock(&file->lock);
2258 	pthread_spin_lock(&g_caches_lock);
2259 	if (file->tree->present_mask == 0) {
2260 		pthread_spin_unlock(&g_caches_lock);
2261 		pthread_spin_unlock(&file->lock);
2262 		return;
2263 	}
2264 	spdk_tree_free_buffers(file->tree);
2265 	if (file->tree->present_mask == 0) {
2266 		TAILQ_REMOVE(&g_caches, file, cache_tailq);
2267 	}
2268 	file->last = NULL;
2269 	pthread_spin_unlock(&g_caches_lock);
2270 	pthread_spin_unlock(&file->lock);
2271 }
2272 
2273 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS);
2274 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW);
2275