xref: /spdk/lib/blobfs/blobfs.c (revision 305cb239d25bcbf6070eb280eb47588fca3e7b36)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "blobfs_internal.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk_internal/log.h"
45 
46 #define BLOBFS_TRACE(file, str, args...) \
47 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args)
48 
49 #define BLOBFS_TRACE_RW(file, str, args...) \
50 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args)
51 
52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
53 
54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE;
55 static struct spdk_mempool *g_cache_pool;
56 static TAILQ_HEAD(, spdk_file) g_caches;
57 static pthread_spinlock_t g_caches_lock;
58 
59 static void
60 __sem_post(void *arg, int bserrno)
61 {
62 	sem_t *sem = arg;
63 
64 	sem_post(sem);
65 }
66 
67 void
68 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
69 {
70 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
71 	free(cache_buffer);
72 }
73 
74 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
75 
76 struct spdk_file {
77 	struct spdk_filesystem	*fs;
78 	struct spdk_blob	*blob;
79 	char			*name;
80 	uint64_t		length;
81 	bool			open_for_writing;
82 	uint64_t		length_flushed;
83 	uint64_t		append_pos;
84 	uint64_t		seq_byte_count;
85 	uint64_t		next_seq_offset;
86 	uint32_t		priority;
87 	TAILQ_ENTRY(spdk_file)	tailq;
88 	spdk_blob_id		blobid;
89 	uint32_t		ref_count;
90 	pthread_spinlock_t	lock;
91 	struct cache_buffer	*last;
92 	struct cache_tree	*tree;
93 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
94 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
95 	TAILQ_ENTRY(spdk_file)	cache_tailq;
96 };
97 
98 struct spdk_filesystem {
99 	struct spdk_blob_store	*bs;
100 	TAILQ_HEAD(, spdk_file)	files;
101 	struct spdk_bs_opts	bs_opts;
102 	struct spdk_bs_dev	*bdev;
103 	fs_send_request_fn	send_request;
104 
105 	struct {
106 		uint32_t		max_ops;
107 		struct spdk_io_channel	*sync_io_channel;
108 		struct spdk_fs_channel	*sync_fs_channel;
109 	} sync_target;
110 
111 	struct {
112 		uint32_t		max_ops;
113 		struct spdk_io_channel	*md_io_channel;
114 		struct spdk_fs_channel	*md_fs_channel;
115 	} md_target;
116 
117 	struct {
118 		uint32_t		max_ops;
119 	} io_target;
120 };
121 
122 struct spdk_fs_cb_args {
123 	union {
124 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
125 		spdk_fs_op_complete			fs_op;
126 		spdk_file_op_with_handle_complete	file_op_with_handle;
127 		spdk_file_op_complete			file_op;
128 		spdk_file_stat_op_complete		stat_op;
129 	} fn;
130 	void *arg;
131 	sem_t *sem;
132 	struct spdk_filesystem *fs;
133 	struct spdk_file *file;
134 	int rc;
135 	bool from_request;
136 	union {
137 		struct {
138 			uint64_t	length;
139 		} truncate;
140 		struct {
141 			struct spdk_io_channel	*channel;
142 			void		*user_buf;
143 			void		*pin_buf;
144 			int		is_read;
145 			off_t		offset;
146 			size_t		length;
147 			uint64_t	start_page;
148 			uint64_t	num_pages;
149 			uint32_t	blocklen;
150 		} rw;
151 		struct {
152 			const char	*old_name;
153 			const char	*new_name;
154 		} rename;
155 		struct {
156 			struct cache_buffer	*cache_buffer;
157 			uint64_t		length;
158 		} flush;
159 		struct {
160 			struct cache_buffer	*cache_buffer;
161 			uint64_t		length;
162 			uint64_t		offset;
163 		} readahead;
164 		struct {
165 			uint64_t			offset;
166 			TAILQ_ENTRY(spdk_fs_request)	tailq;
167 		} sync;
168 		struct {
169 			uint32_t			num_clusters;
170 		} resize;
171 		struct {
172 			const char	*name;
173 			uint32_t	flags;
174 			TAILQ_ENTRY(spdk_fs_request)	tailq;
175 		} open;
176 		struct {
177 			const char	*name;
178 		} create;
179 		struct {
180 			const char	*name;
181 		} delete;
182 		struct {
183 			const char	*name;
184 		} stat;
185 	} op;
186 };
187 
188 static void cache_free_buffers(struct spdk_file *file);
189 
190 static void
191 __initialize_cache(void)
192 {
193 	if (g_cache_pool != NULL) {
194 		return;
195 	}
196 
197 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
198 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
199 					   CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY);
200 	TAILQ_INIT(&g_caches);
201 	pthread_spin_init(&g_caches_lock, 0);
202 }
203 
204 static uint64_t
205 __file_get_blob_size(struct spdk_file *file)
206 {
207 	uint64_t cluster_sz;
208 
209 	cluster_sz = file->fs->bs_opts.cluster_sz;
210 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
211 }
212 
213 struct spdk_fs_request {
214 	struct spdk_fs_cb_args		args;
215 	TAILQ_ENTRY(spdk_fs_request)	link;
216 	struct spdk_fs_channel		*channel;
217 };
218 
219 struct spdk_fs_channel {
220 	struct spdk_fs_request		*req_mem;
221 	TAILQ_HEAD(, spdk_fs_request)	reqs;
222 	sem_t				sem;
223 	struct spdk_filesystem		*fs;
224 	struct spdk_io_channel		*bs_channel;
225 	fs_send_request_fn		send_request;
226 };
227 
228 static struct spdk_fs_request *
229 alloc_fs_request(struct spdk_fs_channel *channel)
230 {
231 	struct spdk_fs_request *req;
232 
233 	req = TAILQ_FIRST(&channel->reqs);
234 	if (!req) {
235 		return NULL;
236 	}
237 	TAILQ_REMOVE(&channel->reqs, req, link);
238 	memset(req, 0, sizeof(*req));
239 	req->channel = channel;
240 	req->args.from_request = true;
241 
242 	return req;
243 }
244 
245 static void
246 free_fs_request(struct spdk_fs_request *req)
247 {
248 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
249 }
250 
251 static int
252 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
253 			uint32_t max_ops)
254 {
255 	uint32_t i;
256 
257 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
258 	if (!channel->req_mem) {
259 		return -1;
260 	}
261 
262 	TAILQ_INIT(&channel->reqs);
263 	sem_init(&channel->sem, 0, 0);
264 
265 	for (i = 0; i < max_ops; i++) {
266 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
267 	}
268 
269 	channel->fs = fs;
270 
271 	return 0;
272 }
273 
274 static int
275 _spdk_fs_md_channel_create(void *io_device, uint32_t priority, void *ctx_buf)
276 {
277 	struct spdk_filesystem		*fs;
278 	struct spdk_fs_channel		*channel = ctx_buf;
279 
280 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
281 
282 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
283 }
284 
285 static int
286 _spdk_fs_sync_channel_create(void *io_device, uint32_t priority, void *ctx_buf)
287 {
288 	struct spdk_filesystem		*fs;
289 	struct spdk_fs_channel		*channel = ctx_buf;
290 
291 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
292 
293 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
294 }
295 
296 static int
297 _spdk_fs_io_channel_create(void *io_device, uint32_t priority, void *ctx_buf)
298 {
299 	struct spdk_filesystem		*fs;
300 	struct spdk_fs_channel		*channel = ctx_buf;
301 
302 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
303 
304 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
305 }
306 
307 static void
308 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
309 {
310 	struct spdk_fs_channel *channel = ctx_buf;
311 
312 	free(channel->req_mem);
313 	if (channel->bs_channel != NULL) {
314 		spdk_bs_free_io_channel(channel->bs_channel);
315 	}
316 }
317 
318 static void
319 __send_request_direct(fs_request_fn fn, void *arg)
320 {
321 	fn(arg);
322 }
323 
324 static void
325 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
326 {
327 	fs->bs = bs;
328 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
329 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs,
330 			SPDK_IO_PRIORITY_DEFAULT);
331 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
332 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs,
333 			SPDK_IO_PRIORITY_DEFAULT);
334 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
335 }
336 
337 static void
338 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
339 {
340 	struct spdk_fs_request *req = ctx;
341 	struct spdk_fs_cb_args *args = &req->args;
342 	struct spdk_filesystem *fs = args->fs;
343 
344 	if (bserrno == 0) {
345 		common_fs_bs_init(fs, bs);
346 	} else {
347 		free(fs);
348 		fs = NULL;
349 	}
350 
351 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
352 	free_fs_request(req);
353 }
354 
355 static struct spdk_filesystem *
356 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
357 {
358 	struct spdk_filesystem *fs;
359 
360 	fs = calloc(1, sizeof(*fs));
361 	if (fs == NULL) {
362 		return NULL;
363 	}
364 
365 	fs->bdev = dev;
366 	fs->send_request = send_request_fn;
367 	TAILQ_INIT(&fs->files);
368 
369 	fs->md_target.max_ops = 512;
370 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
371 				sizeof(struct spdk_fs_channel));
372 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target, SPDK_IO_PRIORITY_DEFAULT);
373 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
374 
375 	fs->sync_target.max_ops = 512;
376 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
377 				sizeof(struct spdk_fs_channel));
378 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target, SPDK_IO_PRIORITY_DEFAULT);
379 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
380 
381 	fs->io_target.max_ops = 512;
382 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
383 				sizeof(struct spdk_fs_channel));
384 
385 	__initialize_cache();
386 
387 	return fs;
388 }
389 
390 void
391 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
392 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
393 {
394 	struct spdk_filesystem *fs;
395 	struct spdk_fs_request *req;
396 	struct spdk_fs_cb_args *args;
397 
398 	fs = fs_alloc(dev, send_request_fn);
399 	if (fs == NULL) {
400 		cb_fn(cb_arg, NULL, -ENOMEM);
401 		return;
402 	}
403 
404 	req = alloc_fs_request(fs->md_target.md_fs_channel);
405 	if (req == NULL) {
406 		cb_fn(cb_arg, NULL, -ENOMEM);
407 		return;
408 	}
409 
410 	args = &req->args;
411 	args->fn.fs_op_with_handle = cb_fn;
412 	args->arg = cb_arg;
413 	args->fs = fs;
414 
415 	spdk_bs_init(dev, NULL, init_cb, req);
416 }
417 
418 static struct spdk_file *
419 file_alloc(struct spdk_filesystem *fs)
420 {
421 	struct spdk_file *file;
422 
423 	file = calloc(1, sizeof(*file));
424 	if (file == NULL) {
425 		return NULL;
426 	}
427 
428 	file->fs = fs;
429 	TAILQ_INIT(&file->open_requests);
430 	TAILQ_INIT(&file->sync_requests);
431 	pthread_spin_init(&file->lock, 0);
432 	file->tree = calloc(1, sizeof(*file->tree));
433 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
434 	file->priority = SPDK_FILE_PRIORITY_LOW;
435 	return file;
436 }
437 
438 static void
439 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
440 {
441 	struct spdk_fs_request *req = ctx;
442 	struct spdk_fs_cb_args *args = &req->args;
443 	struct spdk_filesystem *fs = args->fs;
444 	struct spdk_file *f;
445 	uint64_t *length;
446 	const char *name;
447 	size_t value_len;
448 
449 	if (rc == -ENOENT) {
450 		/* Finished iterating */
451 		args->fn.fs_op_with_handle(args->arg, fs, 0);
452 		free_fs_request(req);
453 		return;
454 	} else if (rc < 0) {
455 		args->fn.fs_op_with_handle(args->arg, fs, rc);
456 		free_fs_request(req);
457 		return;
458 	}
459 
460 	rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len);
461 	if (rc < 0) {
462 		args->fn.fs_op_with_handle(args->arg, fs, rc);
463 		free_fs_request(req);
464 		return;
465 	}
466 
467 	rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len);
468 	if (rc < 0) {
469 		args->fn.fs_op_with_handle(args->arg, fs, rc);
470 		free_fs_request(req);
471 		return;
472 	}
473 	assert(value_len == 8);
474 
475 	f = file_alloc(fs);
476 	if (f == NULL) {
477 		args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
478 		free_fs_request(req);
479 		return;
480 	}
481 
482 	f->name = strdup(name);
483 	f->blobid = spdk_blob_get_id(blob);
484 	f->length = *length;
485 	f->length_flushed = *length;
486 	f->append_pos = *length;
487 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
488 
489 	spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req);
490 }
491 
492 static void
493 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
494 {
495 	struct spdk_fs_request *req = ctx;
496 	struct spdk_fs_cb_args *args = &req->args;
497 	struct spdk_filesystem *fs = args->fs;
498 
499 	if (bserrno != 0) {
500 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
501 		free_fs_request(req);
502 		free(fs);
503 		return;
504 	}
505 
506 	common_fs_bs_init(fs, bs);
507 	spdk_bs_md_iter_first(fs->bs, iter_cb, req);
508 }
509 
510 void
511 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
512 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
513 {
514 	struct spdk_filesystem *fs;
515 	struct spdk_fs_cb_args *args;
516 	struct spdk_fs_request *req;
517 
518 	fs = fs_alloc(dev, send_request_fn);
519 	if (fs == NULL) {
520 		cb_fn(cb_arg, NULL, -ENOMEM);
521 		return;
522 	}
523 
524 	req = alloc_fs_request(fs->md_target.md_fs_channel);
525 	if (req == NULL) {
526 		cb_fn(cb_arg, NULL, -ENOMEM);
527 		return;
528 	}
529 
530 	args = &req->args;
531 	args->fn.fs_op_with_handle = cb_fn;
532 	args->arg = cb_arg;
533 	args->fs = fs;
534 
535 	spdk_bs_load(dev, load_cb, req);
536 }
537 
538 static void
539 unload_cb(void *ctx, int bserrno)
540 {
541 	struct spdk_fs_request *req = ctx;
542 	struct spdk_fs_cb_args *args = &req->args;
543 	struct spdk_filesystem *fs = args->fs;
544 
545 	args->fn.fs_op(args->arg, bserrno);
546 	free(req);
547 
548 	spdk_io_device_unregister(&fs->io_target);
549 	spdk_io_device_unregister(&fs->sync_target);
550 	spdk_io_device_unregister(&fs->md_target);
551 
552 	free(fs);
553 }
554 
555 void
556 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
557 {
558 	struct spdk_fs_request *req;
559 	struct spdk_fs_cb_args *args;
560 
561 	/*
562 	 * We must free the md_channel before unloading the blobstore, so just
563 	 *  allocate this request from the general heap.
564 	 */
565 	req = calloc(1, sizeof(*req));
566 	if (req == NULL) {
567 		cb_fn(cb_arg, -ENOMEM);
568 		return;
569 	}
570 
571 	args = &req->args;
572 	args->fn.fs_op = cb_fn;
573 	args->arg = cb_arg;
574 	args->fs = fs;
575 
576 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
577 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
578 	spdk_bs_unload(fs->bs, unload_cb, req);
579 }
580 
581 static struct spdk_file *
582 fs_find_file(struct spdk_filesystem *fs, const char *name)
583 {
584 	struct spdk_file *file;
585 
586 	TAILQ_FOREACH(file, &fs->files, tailq) {
587 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
588 			return file;
589 		}
590 	}
591 
592 	return NULL;
593 }
594 
595 void
596 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
597 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
598 {
599 	struct spdk_file_stat stat;
600 	struct spdk_file *f = NULL;
601 
602 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
603 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
604 		return;
605 	}
606 
607 	f = fs_find_file(fs, name);
608 	if (f != NULL) {
609 		stat.blobid = f->blobid;
610 		stat.size = f->length;
611 		cb_fn(cb_arg, &stat, 0);
612 		return;
613 	}
614 
615 	cb_fn(cb_arg, NULL, -ENOENT);
616 }
617 
618 static void
619 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
620 {
621 	struct spdk_fs_request *req = arg;
622 	struct spdk_fs_cb_args *args = &req->args;
623 
624 	args->rc = fserrno;
625 	if (fserrno == 0) {
626 		memcpy(args->arg, stat, sizeof(*stat));
627 	}
628 	sem_post(args->sem);
629 }
630 
631 static void
632 __file_stat(void *arg)
633 {
634 	struct spdk_fs_request *req = arg;
635 	struct spdk_fs_cb_args *args = &req->args;
636 
637 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
638 				args->fn.stat_op, req);
639 }
640 
641 int
642 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
643 		  const char *name, struct spdk_file_stat *stat)
644 {
645 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
646 	struct spdk_fs_request *req;
647 	int rc;
648 
649 	req = alloc_fs_request(channel);
650 	assert(req != NULL);
651 
652 	req->args.fs = fs;
653 	req->args.op.stat.name = name;
654 	req->args.fn.stat_op = __copy_stat;
655 	req->args.arg = stat;
656 	req->args.sem = &channel->sem;
657 	channel->send_request(__file_stat, req);
658 	sem_wait(&channel->sem);
659 
660 	rc = req->args.rc;
661 	free_fs_request(req);
662 
663 	return rc;
664 }
665 
666 static void
667 fs_create_blob_close_cb(void *ctx, int bserrno)
668 {
669 	struct spdk_fs_request *req = ctx;
670 	struct spdk_fs_cb_args *args = &req->args;
671 
672 	args->fn.file_op(args->arg, bserrno);
673 	free_fs_request(req);
674 }
675 
676 static void
677 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
678 {
679 	struct spdk_fs_request *req = ctx;
680 	struct spdk_fs_cb_args *args = &req->args;
681 	struct spdk_file *f = args->file;
682 	uint64_t length = 0;
683 
684 	f->blob = blob;
685 	spdk_bs_md_resize_blob(blob, 1);
686 	spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
687 	spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
688 
689 	spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args);
690 }
691 
692 static void
693 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
694 {
695 	struct spdk_fs_request *req = ctx;
696 	struct spdk_fs_cb_args *args = &req->args;
697 	struct spdk_file *f = args->file;
698 
699 	f->blobid = blobid;
700 	spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
701 }
702 
703 void
704 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
705 			  spdk_file_op_complete cb_fn, void *cb_arg)
706 {
707 	struct spdk_file *file;
708 	struct spdk_fs_request *req;
709 	struct spdk_fs_cb_args *args;
710 
711 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
712 		cb_fn(cb_arg, -ENAMETOOLONG);
713 		return;
714 	}
715 
716 	file = fs_find_file(fs, name);
717 	if (file != NULL) {
718 		cb_fn(cb_arg, -EEXIST);
719 		return;
720 	}
721 
722 	file = file_alloc(fs);
723 	if (file == NULL) {
724 		cb_fn(cb_arg, -ENOMEM);
725 		return;
726 	}
727 
728 	req = alloc_fs_request(fs->md_target.md_fs_channel);
729 	if (req == NULL) {
730 		cb_fn(cb_arg, -ENOMEM);
731 		return;
732 	}
733 
734 	args = &req->args;
735 	args->file = file;
736 	args->fn.file_op = cb_fn;
737 	args->arg = cb_arg;
738 
739 	file->name = strdup(name);
740 	spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args);
741 }
742 
743 static void
744 __fs_create_file_done(void *arg, int fserrno)
745 {
746 	struct spdk_fs_request *req = arg;
747 	struct spdk_fs_cb_args *args = &req->args;
748 
749 	args->rc = fserrno;
750 	sem_post(args->sem);
751 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
752 }
753 
754 static void
755 __fs_create_file(void *arg)
756 {
757 	struct spdk_fs_request *req = arg;
758 	struct spdk_fs_cb_args *args = &req->args;
759 
760 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
761 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
762 }
763 
764 int
765 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
766 {
767 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
768 	struct spdk_fs_request *req;
769 	struct spdk_fs_cb_args *args;
770 	int rc;
771 
772 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
773 
774 	req = alloc_fs_request(channel);
775 	assert(req != NULL);
776 
777 	args = &req->args;
778 	args->fs = fs;
779 	args->op.create.name = name;
780 	args->sem = &channel->sem;
781 	fs->send_request(__fs_create_file, req);
782 	sem_wait(&channel->sem);
783 	rc = args->rc;
784 	free_fs_request(req);
785 
786 	return rc;
787 }
788 
789 static void
790 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
791 {
792 	struct spdk_fs_request *req = ctx;
793 	struct spdk_fs_cb_args *args = &req->args;
794 	struct spdk_file *f = args->file;
795 
796 	f->blob = blob;
797 	while (!TAILQ_EMPTY(&f->open_requests)) {
798 		req = TAILQ_FIRST(&f->open_requests);
799 		args = &req->args;
800 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
801 		args->fn.file_op_with_handle(args->arg, f, bserrno);
802 		free_fs_request(req);
803 	}
804 }
805 
806 static void
807 fs_open_blob_create_cb(void *ctx, int bserrno)
808 {
809 	struct spdk_fs_request *req = ctx;
810 	struct spdk_fs_cb_args *args = &req->args;
811 	struct spdk_file *file = args->file;
812 	struct spdk_filesystem *fs = args->fs;
813 
814 	if (file == NULL) {
815 		/*
816 		 * This is from an open with CREATE flag - the file
817 		 *  is now created so look it up in the file list for this
818 		 *  filesystem.
819 		 */
820 		file = fs_find_file(fs, args->op.open.name);
821 		assert(file != NULL);
822 		args->file = file;
823 	}
824 
825 	file->ref_count++;
826 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
827 	if (file->ref_count == 1) {
828 		assert(file->blob == NULL);
829 		spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
830 	} else if (file->blob != NULL) {
831 		fs_open_blob_done(req, file->blob, 0);
832 	} else {
833 		/*
834 		 * The blob open for this file is in progress due to a previous
835 		 *  open request.  When that open completes, it will invoke the
836 		 *  open callback for this request.
837 		 */
838 	}
839 }
840 
841 void
842 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
843 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
844 {
845 	struct spdk_file *f = NULL;
846 	struct spdk_fs_request *req;
847 	struct spdk_fs_cb_args *args;
848 
849 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
850 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
851 		return;
852 	}
853 
854 	f = fs_find_file(fs, name);
855 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
856 		cb_fn(cb_arg, NULL, -ENOENT);
857 		return;
858 	}
859 
860 	req = alloc_fs_request(fs->md_target.md_fs_channel);
861 	if (req == NULL) {
862 		cb_fn(cb_arg, NULL, -ENOMEM);
863 		return;
864 	}
865 
866 	args = &req->args;
867 	args->fn.file_op_with_handle = cb_fn;
868 	args->arg = cb_arg;
869 	args->file = f;
870 	args->fs = fs;
871 	args->op.open.name = name;
872 
873 	if (f == NULL) {
874 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
875 	} else {
876 		fs_open_blob_create_cb(req, 0);
877 	}
878 }
879 
880 static void
881 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
882 {
883 	struct spdk_fs_request *req = arg;
884 	struct spdk_fs_cb_args *args = &req->args;
885 
886 	args->file = file;
887 	args->rc = bserrno;
888 	sem_post(args->sem);
889 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
890 }
891 
892 static void
893 __fs_open_file(void *arg)
894 {
895 	struct spdk_fs_request *req = arg;
896 	struct spdk_fs_cb_args *args = &req->args;
897 
898 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
899 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
900 				__fs_open_file_done, req);
901 }
902 
903 int
904 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
905 		  const char *name, uint32_t flags, struct spdk_file **file)
906 {
907 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
908 	struct spdk_fs_request *req;
909 	struct spdk_fs_cb_args *args;
910 	int rc;
911 
912 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
913 
914 	req = alloc_fs_request(channel);
915 	assert(req != NULL);
916 
917 	args = &req->args;
918 	args->fs = fs;
919 	args->op.open.name = name;
920 	args->op.open.flags = flags;
921 	args->sem = &channel->sem;
922 	fs->send_request(__fs_open_file, req);
923 	sem_wait(&channel->sem);
924 	rc = args->rc;
925 	if (rc == 0) {
926 		*file = args->file;
927 	} else {
928 		*file = NULL;
929 	}
930 	free_fs_request(req);
931 
932 	return rc;
933 }
934 
935 static void
936 fs_rename_blob_close_cb(void *ctx, int bserrno)
937 {
938 	struct spdk_fs_request *req = ctx;
939 	struct spdk_fs_cb_args *args = &req->args;
940 
941 	args->fn.fs_op(args->arg, bserrno);
942 	free_fs_request(req);
943 }
944 
945 static void
946 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
947 {
948 	struct spdk_fs_request *req = ctx;
949 	struct spdk_fs_cb_args *args = &req->args;
950 	struct spdk_file *f = args->file;
951 	const char *new_name = args->op.rename.new_name;
952 
953 	f->blob = blob;
954 	spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
955 	spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req);
956 }
957 
958 static void
959 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
960 {
961 	struct spdk_fs_cb_args *args = &req->args;
962 	struct spdk_file *f;
963 
964 	f = fs_find_file(args->fs, args->op.rename.old_name);
965 	if (f == NULL) {
966 		args->fn.fs_op(args->arg, -ENOENT);
967 		free_fs_request(req);
968 		return;
969 	}
970 
971 	free(f->name);
972 	f->name = strdup(args->op.rename.new_name);
973 	args->file = f;
974 	spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
975 }
976 
977 static void
978 fs_rename_delete_done(void *arg, int fserrno)
979 {
980 	__spdk_fs_md_rename_file(arg);
981 }
982 
983 void
984 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
985 			  const char *old_name, const char *new_name,
986 			  spdk_file_op_complete cb_fn, void *cb_arg)
987 {
988 	struct spdk_file *f;
989 	struct spdk_fs_request *req;
990 	struct spdk_fs_cb_args *args;
991 
992 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name);
993 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
994 		cb_fn(cb_arg, -ENAMETOOLONG);
995 		return;
996 	}
997 
998 	req = alloc_fs_request(fs->md_target.md_fs_channel);
999 	if (req == NULL) {
1000 		cb_fn(cb_arg, -ENOMEM);
1001 		return;
1002 	}
1003 
1004 	args = &req->args;
1005 	args->fn.fs_op = cb_fn;
1006 	args->fs = fs;
1007 	args->arg = cb_arg;
1008 	args->op.rename.old_name = old_name;
1009 	args->op.rename.new_name = new_name;
1010 
1011 	f = fs_find_file(fs, new_name);
1012 	if (f == NULL) {
1013 		__spdk_fs_md_rename_file(req);
1014 		return;
1015 	}
1016 
1017 	/*
1018 	 * The rename overwrites an existing file.  So delete the existing file, then
1019 	 *  do the actual rename.
1020 	 */
1021 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1022 }
1023 
1024 static void
1025 __fs_rename_file_done(void *arg, int fserrno)
1026 {
1027 	struct spdk_fs_request *req = arg;
1028 	struct spdk_fs_cb_args *args = &req->args;
1029 
1030 	args->rc = fserrno;
1031 	sem_post(args->sem);
1032 }
1033 
1034 static void
1035 __fs_rename_file(void *arg)
1036 {
1037 	struct spdk_fs_request *req = arg;
1038 	struct spdk_fs_cb_args *args = &req->args;
1039 
1040 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1041 				  __fs_rename_file_done, req);
1042 }
1043 
1044 int
1045 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1046 		    const char *old_name, const char *new_name)
1047 {
1048 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1049 	struct spdk_fs_request *req;
1050 	struct spdk_fs_cb_args *args;
1051 	int rc;
1052 
1053 	req = alloc_fs_request(channel);
1054 	assert(req != NULL);
1055 
1056 	args = &req->args;
1057 
1058 	args->fs = fs;
1059 	args->op.rename.old_name = old_name;
1060 	args->op.rename.new_name = new_name;
1061 	args->sem = &channel->sem;
1062 	fs->send_request(__fs_rename_file, req);
1063 	sem_wait(&channel->sem);
1064 	rc = args->rc;
1065 	free_fs_request(req);
1066 	return rc;
1067 }
1068 
1069 static void
1070 blob_delete_cb(void *ctx, int bserrno)
1071 {
1072 	struct spdk_fs_request *req = ctx;
1073 	struct spdk_fs_cb_args *args = &req->args;
1074 
1075 	args->fn.file_op(args->arg, bserrno);
1076 	free_fs_request(req);
1077 }
1078 
1079 void
1080 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1081 			  spdk_file_op_complete cb_fn, void *cb_arg)
1082 {
1083 	struct spdk_file *f;
1084 	spdk_blob_id blobid;
1085 	struct spdk_fs_request *req;
1086 	struct spdk_fs_cb_args *args;
1087 
1088 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1089 
1090 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1091 		cb_fn(cb_arg, -ENAMETOOLONG);
1092 		return;
1093 	}
1094 
1095 	f = fs_find_file(fs, name);
1096 	if (f == NULL) {
1097 		cb_fn(cb_arg, -ENOENT);
1098 		return;
1099 	}
1100 
1101 	if (f->ref_count > 0) {
1102 		/* For now, do not allow deleting files with open references. */
1103 		cb_fn(cb_arg, -EBUSY);
1104 		return;
1105 	}
1106 
1107 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1108 	if (req == NULL) {
1109 		cb_fn(cb_arg, -ENOMEM);
1110 		return;
1111 	}
1112 
1113 	TAILQ_REMOVE(&fs->files, f, tailq);
1114 
1115 	cache_free_buffers(f);
1116 
1117 	blobid = f->blobid;
1118 
1119 	free(f->name);
1120 	free(f->tree);
1121 	free(f);
1122 
1123 	args = &req->args;
1124 	args->fn.file_op = cb_fn;
1125 	args->arg = cb_arg;
1126 	spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1127 }
1128 
1129 static void
1130 __fs_delete_file_done(void *arg, int fserrno)
1131 {
1132 	struct spdk_fs_request *req = arg;
1133 	struct spdk_fs_cb_args *args = &req->args;
1134 
1135 	args->rc = fserrno;
1136 	sem_post(args->sem);
1137 }
1138 
1139 static void
1140 __fs_delete_file(void *arg)
1141 {
1142 	struct spdk_fs_request *req = arg;
1143 	struct spdk_fs_cb_args *args = &req->args;
1144 
1145 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1146 }
1147 
1148 int
1149 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1150 		    const char *name)
1151 {
1152 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1153 	struct spdk_fs_request *req;
1154 	struct spdk_fs_cb_args *args;
1155 	int rc;
1156 
1157 	req = alloc_fs_request(channel);
1158 	assert(req != NULL);
1159 
1160 	args = &req->args;
1161 	args->fs = fs;
1162 	args->op.delete.name = name;
1163 	args->sem = &channel->sem;
1164 	fs->send_request(__fs_delete_file, req);
1165 	sem_wait(&channel->sem);
1166 	rc = args->rc;
1167 	free_fs_request(req);
1168 
1169 	return rc;
1170 }
1171 
1172 spdk_fs_iter
1173 spdk_fs_iter_first(struct spdk_filesystem *fs)
1174 {
1175 	struct spdk_file *f;
1176 
1177 	f = TAILQ_FIRST(&fs->files);
1178 	return f;
1179 }
1180 
1181 spdk_fs_iter
1182 spdk_fs_iter_next(spdk_fs_iter iter)
1183 {
1184 	struct spdk_file *f = iter;
1185 
1186 	if (f == NULL) {
1187 		return NULL;
1188 	}
1189 
1190 	f = TAILQ_NEXT(f, tailq);
1191 	return f;
1192 }
1193 
1194 const char *
1195 spdk_file_get_name(struct spdk_file *file)
1196 {
1197 	return file->name;
1198 }
1199 
1200 uint64_t
1201 spdk_file_get_length(struct spdk_file *file)
1202 {
1203 	assert(file != NULL);
1204 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1205 	return file->length;
1206 }
1207 
1208 static void
1209 fs_truncate_complete_cb(void *ctx, int bserrno)
1210 {
1211 	struct spdk_fs_request *req = ctx;
1212 	struct spdk_fs_cb_args *args = &req->args;
1213 
1214 	args->fn.file_op(args->arg, bserrno);
1215 	free_fs_request(req);
1216 }
1217 
1218 static uint64_t
1219 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1220 {
1221 	return (length + cluster_sz - 1) / cluster_sz;
1222 }
1223 
1224 void
1225 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1226 			 spdk_file_op_complete cb_fn, void *cb_arg)
1227 {
1228 	struct spdk_filesystem *fs;
1229 	size_t num_clusters;
1230 	struct spdk_fs_request *req;
1231 	struct spdk_fs_cb_args *args;
1232 
1233 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1234 	if (length == file->length) {
1235 		cb_fn(cb_arg, 0);
1236 		return;
1237 	}
1238 
1239 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1240 	if (req == NULL) {
1241 		cb_fn(cb_arg, -ENOMEM);
1242 		return;
1243 	}
1244 
1245 	args = &req->args;
1246 	args->fn.file_op = cb_fn;
1247 	args->arg = cb_arg;
1248 	args->file = file;
1249 	fs = file->fs;
1250 
1251 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1252 
1253 	spdk_bs_md_resize_blob(file->blob, num_clusters);
1254 	spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length));
1255 
1256 	file->length = length;
1257 	if (file->append_pos > file->length) {
1258 		file->append_pos = file->length;
1259 	}
1260 
1261 	spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args);
1262 }
1263 
1264 static void
1265 __truncate(void *arg)
1266 {
1267 	struct spdk_fs_request *req = arg;
1268 	struct spdk_fs_cb_args *args = &req->args;
1269 
1270 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1271 				 args->fn.file_op, args->arg);
1272 }
1273 
1274 void
1275 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1276 		   uint64_t length)
1277 {
1278 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1279 	struct spdk_fs_request *req;
1280 	struct spdk_fs_cb_args *args;
1281 
1282 	req = alloc_fs_request(channel);
1283 	assert(req != NULL);
1284 
1285 	args = &req->args;
1286 
1287 	args->file = file;
1288 	args->op.truncate.length = length;
1289 	args->fn.file_op = __sem_post;
1290 	args->arg = &channel->sem;
1291 
1292 	channel->send_request(__truncate, req);
1293 	sem_wait(&channel->sem);
1294 	free_fs_request(req);
1295 }
1296 
1297 static void
1298 __rw_done(void *ctx, int bserrno)
1299 {
1300 	struct spdk_fs_request *req = ctx;
1301 	struct spdk_fs_cb_args *args = &req->args;
1302 
1303 	spdk_free(args->op.rw.pin_buf);
1304 	args->fn.file_op(args->arg, bserrno);
1305 	free_fs_request(req);
1306 }
1307 
1308 static void
1309 __read_done(void *ctx, int bserrno)
1310 {
1311 	struct spdk_fs_request *req = ctx;
1312 	struct spdk_fs_cb_args *args = &req->args;
1313 
1314 	if (args->op.rw.is_read) {
1315 		memcpy(args->op.rw.user_buf,
1316 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1317 		       args->op.rw.length);
1318 		__rw_done(req, 0);
1319 	} else {
1320 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1321 		       args->op.rw.user_buf,
1322 		       args->op.rw.length);
1323 		spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel,
1324 				      args->op.rw.pin_buf,
1325 				      args->op.rw.start_page, args->op.rw.num_pages,
1326 				      __rw_done, req);
1327 	}
1328 }
1329 
1330 static void
1331 __do_blob_read(void *ctx, int fserrno)
1332 {
1333 	struct spdk_fs_request *req = ctx;
1334 	struct spdk_fs_cb_args *args = &req->args;
1335 
1336 	spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel,
1337 			     args->op.rw.pin_buf,
1338 			     args->op.rw.start_page, args->op.rw.num_pages,
1339 			     __read_done, req);
1340 }
1341 
1342 static void
1343 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1344 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1345 {
1346 	uint64_t end_page;
1347 
1348 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1349 	*start_page = offset / *page_size;
1350 	end_page = (offset + length - 1) / *page_size;
1351 	*num_pages = (end_page - *start_page + 1);
1352 }
1353 
1354 static void
1355 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1356 	    void *payload, uint64_t offset, uint64_t length,
1357 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1358 {
1359 	struct spdk_fs_request *req;
1360 	struct spdk_fs_cb_args *args;
1361 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1362 	uint64_t start_page, num_pages, pin_buf_length;
1363 	uint32_t page_size;
1364 
1365 	if (is_read && offset + length > file->length) {
1366 		cb_fn(cb_arg, -EINVAL);
1367 		return;
1368 	}
1369 
1370 	req = alloc_fs_request(channel);
1371 	if (req == NULL) {
1372 		cb_fn(cb_arg, -ENOMEM);
1373 		return;
1374 	}
1375 
1376 	args = &req->args;
1377 	args->fn.file_op = cb_fn;
1378 	args->arg = cb_arg;
1379 	args->file = file;
1380 	args->op.rw.channel = channel->bs_channel;
1381 	args->op.rw.user_buf = payload;
1382 	args->op.rw.is_read = is_read;
1383 	args->op.rw.offset = offset;
1384 	args->op.rw.length = length;
1385 
1386 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1387 	pin_buf_length = num_pages * page_size;
1388 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL);
1389 
1390 	args->op.rw.start_page = start_page;
1391 	args->op.rw.num_pages = num_pages;
1392 
1393 	if (!is_read && file->length < offset + length) {
1394 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1395 	} else {
1396 		__do_blob_read(req, 0);
1397 	}
1398 }
1399 
1400 void
1401 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1402 		      void *payload, uint64_t offset, uint64_t length,
1403 		      spdk_file_op_complete cb_fn, void *cb_arg)
1404 {
1405 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1406 }
1407 
1408 void
1409 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1410 		     void *payload, uint64_t offset, uint64_t length,
1411 		     spdk_file_op_complete cb_fn, void *cb_arg)
1412 {
1413 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n",
1414 		      file->name, offset, length);
1415 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1416 }
1417 
1418 struct spdk_io_channel *
1419 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority)
1420 {
1421 	struct spdk_io_channel *io_channel;
1422 	struct spdk_fs_channel *fs_channel;
1423 
1424 	io_channel = spdk_get_io_channel(&fs->io_target, priority);
1425 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1426 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT);
1427 	fs_channel->send_request = __send_request_direct;
1428 
1429 	return io_channel;
1430 }
1431 
1432 struct spdk_io_channel *
1433 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority)
1434 {
1435 	struct spdk_io_channel *io_channel;
1436 	struct spdk_fs_channel *fs_channel;
1437 
1438 	io_channel = spdk_get_io_channel(&fs->io_target, priority);
1439 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1440 	fs_channel->send_request = fs->send_request;
1441 
1442 	return io_channel;
1443 }
1444 
1445 void
1446 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1447 {
1448 	spdk_put_io_channel(channel);
1449 }
1450 
1451 void
1452 spdk_fs_set_cache_size(uint64_t size_in_mb)
1453 {
1454 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1455 }
1456 
1457 uint64_t
1458 spdk_fs_get_cache_size(void)
1459 {
1460 	return g_fs_cache_size / (1024 * 1024);
1461 }
1462 
1463 static void __file_flush(void *_args);
1464 
1465 static void *
1466 alloc_cache_memory_buffer(struct spdk_file *context)
1467 {
1468 	struct spdk_file *file;
1469 	void *buf;
1470 
1471 	buf = spdk_mempool_get(g_cache_pool);
1472 	if (buf != NULL) {
1473 		return buf;
1474 	}
1475 
1476 	pthread_spin_lock(&g_caches_lock);
1477 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1478 		if (!file->open_for_writing &&
1479 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1480 		    file != context) {
1481 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
1482 			TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1483 			break;
1484 		}
1485 	}
1486 	pthread_spin_unlock(&g_caches_lock);
1487 	if (file != NULL) {
1488 		cache_free_buffers(file);
1489 		buf = spdk_mempool_get(g_cache_pool);
1490 		if (buf != NULL) {
1491 			return buf;
1492 		}
1493 	}
1494 
1495 	pthread_spin_lock(&g_caches_lock);
1496 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1497 		if (!file->open_for_writing && file != context) {
1498 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
1499 			TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1500 			break;
1501 		}
1502 	}
1503 	pthread_spin_unlock(&g_caches_lock);
1504 	if (file != NULL) {
1505 		cache_free_buffers(file);
1506 		buf = spdk_mempool_get(g_cache_pool);
1507 		if (buf != NULL) {
1508 			return buf;
1509 		}
1510 	}
1511 
1512 	pthread_spin_lock(&g_caches_lock);
1513 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1514 		if (file != context) {
1515 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
1516 			TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1517 			break;
1518 		}
1519 	}
1520 	pthread_spin_unlock(&g_caches_lock);
1521 	if (file != NULL) {
1522 		cache_free_buffers(file);
1523 		buf = spdk_mempool_get(g_cache_pool);
1524 		if (buf != NULL) {
1525 			return buf;
1526 		}
1527 	}
1528 
1529 	assert(false);
1530 	return NULL;
1531 }
1532 
1533 static struct cache_buffer *
1534 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1535 {
1536 	struct cache_buffer *buf;
1537 	int count = 0;
1538 
1539 	buf = calloc(1, sizeof(*buf));
1540 	if (buf == NULL) {
1541 		SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n");
1542 		return NULL;
1543 	}
1544 
1545 	buf->buf = alloc_cache_memory_buffer(file);
1546 	if (buf->buf == NULL) {
1547 		while (buf->buf == NULL) {
1548 			/*
1549 			 * TODO: alloc_cache_memory_buffer() should eventually free
1550 			 *  some buffers.  Need a more sophisticated check here, instead
1551 			 *  of just bailing if 100 tries does not result in getting a
1552 			 *  free buffer.  This will involve using the sync channel's
1553 			 *  semaphore to block until a buffer becomes available.
1554 			 */
1555 			if (count++ == 100) {
1556 				SPDK_ERRLOG("could not allocate cache buffer\n");
1557 				assert(false);
1558 				free(buf);
1559 				return NULL;
1560 			}
1561 			buf->buf = alloc_cache_memory_buffer(file);
1562 		}
1563 	}
1564 
1565 	buf->buf_size = CACHE_BUFFER_SIZE;
1566 	buf->offset = offset;
1567 
1568 	pthread_spin_lock(&g_caches_lock);
1569 	if (file->tree->present_mask == 0) {
1570 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1571 	}
1572 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1573 	pthread_spin_unlock(&g_caches_lock);
1574 
1575 	return buf;
1576 }
1577 
1578 static struct cache_buffer *
1579 cache_append_buffer(struct spdk_file *file)
1580 {
1581 	struct cache_buffer *last;
1582 
1583 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1584 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1585 
1586 	last = cache_insert_buffer(file, file->append_pos);
1587 	if (last == NULL) {
1588 		SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n");
1589 		return NULL;
1590 	}
1591 
1592 	if (file->last != NULL) {
1593 		file->last->next = last;
1594 	}
1595 	file->last = last;
1596 
1597 	return last;
1598 }
1599 
1600 static void
1601 __wake_caller(struct spdk_fs_cb_args *args)
1602 {
1603 	sem_post(args->sem);
1604 }
1605 
1606 static void
1607 __file_cache_finish_sync(struct spdk_file *file)
1608 {
1609 	struct spdk_fs_request *sync_req;
1610 	struct spdk_fs_cb_args *sync_args;
1611 
1612 	pthread_spin_lock(&file->lock);
1613 	while (!TAILQ_EMPTY(&file->sync_requests)) {
1614 		sync_req = TAILQ_FIRST(&file->sync_requests);
1615 		sync_args = &sync_req->args;
1616 		if (sync_args->op.sync.offset > file->length_flushed) {
1617 			break;
1618 		}
1619 		BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1620 		TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1621 		pthread_spin_unlock(&file->lock);
1622 		sync_args->fn.file_op(sync_args->arg, 0);
1623 		pthread_spin_lock(&file->lock);
1624 		free_fs_request(sync_req);
1625 	}
1626 	pthread_spin_unlock(&file->lock);
1627 }
1628 
1629 static void
1630 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1631 {
1632 	struct spdk_file *file = ctx;
1633 
1634 	__file_cache_finish_sync(file);
1635 }
1636 
1637 static void
1638 __free_args(struct spdk_fs_cb_args *args)
1639 {
1640 	struct spdk_fs_request *req;
1641 
1642 	if (!args->from_request) {
1643 		free(args);
1644 	} else {
1645 		/* Depends on args being at the start of the spdk_fs_request structure. */
1646 		req = (struct spdk_fs_request *)args;
1647 		free_fs_request(req);
1648 	}
1649 }
1650 
1651 static void
1652 __file_flush_done(void *arg, int bserrno)
1653 {
1654 	struct spdk_fs_cb_args *args = arg;
1655 	struct spdk_fs_request *sync_req;
1656 	struct spdk_file *file = args->file;
1657 	struct cache_buffer *next = args->op.flush.cache_buffer;
1658 
1659 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1660 
1661 	pthread_spin_lock(&file->lock);
1662 	next->in_progress = false;
1663 	next->bytes_flushed += args->op.flush.length;
1664 	file->length_flushed += args->op.flush.length;
1665 	if (file->length_flushed > file->length) {
1666 		file->length = file->length_flushed;
1667 	}
1668 	if (next->bytes_flushed == next->buf_size) {
1669 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1670 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1671 	}
1672 
1673 	TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) {
1674 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1675 			break;
1676 		}
1677 	}
1678 
1679 	/*
1680 	 * Assert that there is no cached data that extends past the end of the underlying
1681 	 *  blob.
1682 	 */
1683 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1684 	       next->bytes_filled == 0);
1685 
1686 	if (sync_req != NULL) {
1687 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1688 		spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed,
1689 				       sizeof(file->length_flushed));
1690 
1691 		pthread_spin_unlock(&file->lock);
1692 		spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file);
1693 	} else {
1694 		pthread_spin_unlock(&file->lock);
1695 		__file_cache_finish_sync(file);
1696 	}
1697 
1698 	__file_flush(args);
1699 }
1700 
1701 static void
1702 __file_flush(void *_args)
1703 {
1704 	struct spdk_fs_cb_args *args = _args;
1705 	struct spdk_file *file = args->file;
1706 	struct cache_buffer *next;
1707 	uint64_t offset, length, start_page, num_pages;
1708 	uint32_t page_size;
1709 
1710 	pthread_spin_lock(&file->lock);
1711 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1712 	if (next == NULL || next->in_progress) {
1713 		/*
1714 		 * There is either no data to flush, or a flush I/O is already in
1715 		 *  progress.  So return immediately - if a flush I/O is in
1716 		 *  progress we will flush more data after that is completed.
1717 		 */
1718 		__free_args(args);
1719 		pthread_spin_unlock(&file->lock);
1720 		return;
1721 	}
1722 
1723 	offset = next->offset + next->bytes_flushed;
1724 	length = next->bytes_filled - next->bytes_flushed;
1725 	if (length == 0) {
1726 		__free_args(args);
1727 		pthread_spin_unlock(&file->lock);
1728 		return;
1729 	}
1730 	args->op.flush.length = length;
1731 	args->op.flush.cache_buffer = next;
1732 
1733 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1734 
1735 	next->in_progress = true;
1736 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1737 		     offset, length, start_page, num_pages);
1738 	pthread_spin_unlock(&file->lock);
1739 	spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1740 			      next->buf + (start_page * page_size) - next->offset,
1741 			      start_page, num_pages,
1742 			      __file_flush_done, args);
1743 }
1744 
1745 static void
1746 __file_extend_done(void *arg, int bserrno)
1747 {
1748 	struct spdk_fs_cb_args *args = arg;
1749 
1750 	__wake_caller(args);
1751 }
1752 
1753 static void
1754 __file_extend_blob(void *_args)
1755 {
1756 	struct spdk_fs_cb_args *args = _args;
1757 	struct spdk_file *file = args->file;
1758 
1759 	spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters);
1760 
1761 	spdk_bs_md_sync_blob(file->blob, __file_extend_done, args);
1762 }
1763 
1764 static void
1765 __rw_from_file_done(void *arg, int bserrno)
1766 {
1767 	struct spdk_fs_cb_args *args = arg;
1768 
1769 	__wake_caller(args);
1770 	__free_args(args);
1771 }
1772 
1773 static void
1774 __rw_from_file(void *_args)
1775 {
1776 	struct spdk_fs_cb_args *args = _args;
1777 	struct spdk_file *file = args->file;
1778 
1779 	if (args->op.rw.is_read) {
1780 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1781 				     args->op.rw.offset, args->op.rw.length,
1782 				     __rw_from_file_done, args);
1783 	} else {
1784 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1785 				      args->op.rw.offset, args->op.rw.length,
1786 				      __rw_from_file_done, args);
1787 	}
1788 }
1789 
1790 static int
1791 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
1792 		    uint64_t offset, uint64_t length, bool is_read)
1793 {
1794 	struct spdk_fs_cb_args *args;
1795 
1796 	args = calloc(1, sizeof(*args));
1797 	if (args == NULL) {
1798 		sem_post(sem);
1799 		return -ENOMEM;
1800 	}
1801 
1802 	args->file = file;
1803 	args->sem = sem;
1804 	args->op.rw.user_buf = payload;
1805 	args->op.rw.offset = offset;
1806 	args->op.rw.length = length;
1807 	args->op.rw.is_read = is_read;
1808 	file->fs->send_request(__rw_from_file, args);
1809 	return 0;
1810 }
1811 
1812 int
1813 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
1814 		void *payload, uint64_t offset, uint64_t length)
1815 {
1816 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1817 	struct spdk_fs_cb_args *args;
1818 	uint64_t rem_length, copy, blob_size, cluster_sz;
1819 	uint32_t cache_buffers_filled = 0;
1820 	uint8_t *cur_payload;
1821 	struct cache_buffer *last;
1822 
1823 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
1824 
1825 	if (length == 0) {
1826 		return 0;
1827 	}
1828 
1829 	if (offset != file->append_pos) {
1830 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
1831 		return -EINVAL;
1832 	}
1833 
1834 	pthread_spin_lock(&file->lock);
1835 	file->open_for_writing = true;
1836 
1837 	if (file->last == NULL) {
1838 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
1839 			cache_append_buffer(file);
1840 		} else {
1841 			int rc;
1842 
1843 			file->append_pos += length;
1844 			rc = __send_rw_from_file(file, &channel->sem, payload,
1845 						 offset, length, false);
1846 			pthread_spin_unlock(&file->lock);
1847 			sem_wait(&channel->sem);
1848 			return rc;
1849 		}
1850 	}
1851 
1852 	blob_size = __file_get_blob_size(file);
1853 
1854 	if ((offset + length) > blob_size) {
1855 		struct spdk_fs_cb_args extend_args = {};
1856 
1857 		cluster_sz = file->fs->bs_opts.cluster_sz;
1858 		extend_args.sem = &channel->sem;
1859 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
1860 		extend_args.file = file;
1861 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
1862 		pthread_spin_unlock(&file->lock);
1863 		file->fs->send_request(__file_extend_blob, &extend_args);
1864 		sem_wait(&channel->sem);
1865 	}
1866 
1867 	last = file->last;
1868 	rem_length = length;
1869 	cur_payload = payload;
1870 	while (rem_length > 0) {
1871 		copy = last->buf_size - last->bytes_filled;
1872 		if (copy > rem_length) {
1873 			copy = rem_length;
1874 		}
1875 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
1876 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
1877 		file->append_pos += copy;
1878 		if (file->length < file->append_pos) {
1879 			file->length = file->append_pos;
1880 		}
1881 		cur_payload += copy;
1882 		last->bytes_filled += copy;
1883 		rem_length -= copy;
1884 		if (last->bytes_filled == last->buf_size) {
1885 			cache_buffers_filled++;
1886 			last = cache_append_buffer(file);
1887 			if (last == NULL) {
1888 				BLOBFS_TRACE(file, "nomem\n");
1889 				pthread_spin_unlock(&file->lock);
1890 				return -ENOMEM;
1891 			}
1892 		}
1893 	}
1894 
1895 	if (cache_buffers_filled == 0) {
1896 		pthread_spin_unlock(&file->lock);
1897 		return 0;
1898 	}
1899 
1900 	args = calloc(1, sizeof(*args));
1901 	if (args == NULL) {
1902 		pthread_spin_unlock(&file->lock);
1903 		return -ENOMEM;
1904 	}
1905 
1906 	args->file = file;
1907 	file->fs->send_request(__file_flush, args);
1908 	pthread_spin_unlock(&file->lock);
1909 	return 0;
1910 }
1911 
1912 static void
1913 __readahead_done(void *arg, int bserrno)
1914 {
1915 	struct spdk_fs_cb_args *args = arg;
1916 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
1917 	struct spdk_file *file = args->file;
1918 
1919 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
1920 
1921 	pthread_spin_lock(&file->lock);
1922 	cache_buffer->bytes_filled = args->op.readahead.length;
1923 	cache_buffer->bytes_flushed = args->op.readahead.length;
1924 	cache_buffer->in_progress = false;
1925 	pthread_spin_unlock(&file->lock);
1926 
1927 	__free_args(args);
1928 }
1929 
1930 static void
1931 __readahead(void *_args)
1932 {
1933 	struct spdk_fs_cb_args *args = _args;
1934 	struct spdk_file *file = args->file;
1935 	uint64_t offset, length, start_page, num_pages;
1936 	uint32_t page_size;
1937 
1938 	offset = args->op.readahead.offset;
1939 	length = args->op.readahead.length;
1940 	assert(length > 0);
1941 
1942 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1943 
1944 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1945 		     offset, length, start_page, num_pages);
1946 	spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1947 			     args->op.readahead.cache_buffer->buf,
1948 			     start_page, num_pages,
1949 			     __readahead_done, args);
1950 }
1951 
1952 static uint64_t
1953 __next_cache_buffer_offset(uint64_t offset)
1954 {
1955 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
1956 }
1957 
1958 static void
1959 check_readahead(struct spdk_file *file, uint64_t offset)
1960 {
1961 	struct spdk_fs_cb_args *args;
1962 
1963 	offset = __next_cache_buffer_offset(offset);
1964 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
1965 		return;
1966 	}
1967 
1968 	args = calloc(1, sizeof(*args));
1969 	if (args == NULL) {
1970 		return;
1971 	}
1972 
1973 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
1974 
1975 	args->file = file;
1976 	args->op.readahead.offset = offset;
1977 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
1978 	args->op.readahead.cache_buffer->in_progress = true;
1979 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
1980 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
1981 	} else {
1982 		args->op.readahead.length = CACHE_BUFFER_SIZE;
1983 	}
1984 	file->fs->send_request(__readahead, args);
1985 }
1986 
1987 static int
1988 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
1989 {
1990 	struct cache_buffer *buf;
1991 
1992 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
1993 	if (buf == NULL) {
1994 		return __send_rw_from_file(file, sem, payload, offset, length, true);
1995 	}
1996 
1997 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
1998 		length = buf->offset + buf->bytes_filled - offset;
1999 	}
2000 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2001 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2002 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2003 		pthread_spin_lock(&g_caches_lock);
2004 		spdk_tree_remove_buffer(file->tree, buf);
2005 		if (file->tree->present_mask == 0) {
2006 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2007 		}
2008 		pthread_spin_unlock(&g_caches_lock);
2009 	}
2010 
2011 	sem_post(sem);
2012 	return 0;
2013 }
2014 
2015 int64_t
2016 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2017 	       void *payload, uint64_t offset, uint64_t length)
2018 {
2019 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2020 	uint64_t final_offset, final_length;
2021 	uint32_t sub_reads = 0;
2022 	int rc = 0;
2023 
2024 	pthread_spin_lock(&file->lock);
2025 
2026 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2027 
2028 	file->open_for_writing = false;
2029 
2030 	if (length == 0 || offset >= file->length) {
2031 		pthread_spin_unlock(&file->lock);
2032 		return 0;
2033 	}
2034 
2035 	if (offset + length > file->length) {
2036 		length = file->length - offset;
2037 	}
2038 
2039 	if (offset != file->next_seq_offset) {
2040 		file->seq_byte_count = 0;
2041 	}
2042 	file->seq_byte_count += length;
2043 	file->next_seq_offset = offset + length;
2044 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2045 		check_readahead(file, offset);
2046 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2047 	}
2048 
2049 	final_length = 0;
2050 	final_offset = offset + length;
2051 	while (offset < final_offset) {
2052 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2053 		if (length > (final_offset - offset)) {
2054 			length = final_offset - offset;
2055 		}
2056 		rc = __file_read(file, payload, offset, length, &channel->sem);
2057 		if (rc == 0) {
2058 			final_length += length;
2059 		} else {
2060 			break;
2061 		}
2062 		payload += length;
2063 		offset += length;
2064 		sub_reads++;
2065 	}
2066 	pthread_spin_unlock(&file->lock);
2067 	while (sub_reads-- > 0) {
2068 		sem_wait(&channel->sem);
2069 	}
2070 	if (rc == 0) {
2071 		return final_length;
2072 	} else {
2073 		return rc;
2074 	}
2075 }
2076 
2077 static void
2078 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2079 	   spdk_file_op_complete cb_fn, void *cb_arg)
2080 {
2081 	struct spdk_fs_request *sync_req;
2082 	struct spdk_fs_request *flush_req;
2083 	struct spdk_fs_cb_args *sync_args;
2084 	struct spdk_fs_cb_args *flush_args;
2085 
2086 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2087 
2088 	pthread_spin_lock(&file->lock);
2089 	if (file->append_pos <= file->length_flushed || file->last == NULL) {
2090 		BLOBFS_TRACE(file, "done - no data to flush\n");
2091 		pthread_spin_unlock(&file->lock);
2092 		cb_fn(cb_arg, 0);
2093 		return;
2094 	}
2095 
2096 	sync_req = alloc_fs_request(channel);
2097 	assert(sync_req != NULL);
2098 	sync_args = &sync_req->args;
2099 
2100 	flush_req = alloc_fs_request(channel);
2101 	assert(flush_req != NULL);
2102 	flush_args = &flush_req->args;
2103 
2104 	sync_args->file = file;
2105 	sync_args->fn.file_op = cb_fn;
2106 	sync_args->arg = cb_arg;
2107 	sync_args->op.sync.offset = file->append_pos;
2108 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2109 	pthread_spin_unlock(&file->lock);
2110 
2111 	flush_args->file = file;
2112 	channel->send_request(__file_flush, flush_args);
2113 }
2114 
2115 int
2116 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2117 {
2118 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2119 
2120 	_file_sync(file, channel, __sem_post, &channel->sem);
2121 	sem_wait(&channel->sem);
2122 
2123 	return 0;
2124 }
2125 
2126 void
2127 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2128 		     spdk_file_op_complete cb_fn, void *cb_arg)
2129 {
2130 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2131 
2132 	_file_sync(file, channel, cb_fn, cb_arg);
2133 }
2134 
2135 void
2136 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2137 {
2138 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2139 	file->priority = priority;
2140 
2141 }
2142 
2143 /*
2144  * Close routines
2145  */
2146 
2147 static void
2148 __file_close_async_done(void *ctx, int bserrno)
2149 {
2150 	struct spdk_fs_request *req = ctx;
2151 	struct spdk_fs_cb_args *args = &req->args;
2152 
2153 	args->fn.file_op(args->arg, bserrno);
2154 	free_fs_request(req);
2155 }
2156 
2157 static void
2158 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2159 {
2160 	pthread_spin_lock(&file->lock);
2161 	if (file->ref_count == 0) {
2162 		pthread_spin_unlock(&file->lock);
2163 		__file_close_async_done(req, -EBADF);
2164 		return;
2165 	}
2166 
2167 	file->ref_count--;
2168 	if (file->ref_count > 0) {
2169 		pthread_spin_unlock(&file->lock);
2170 		__file_close_async_done(req, 0);
2171 		return;
2172 	}
2173 
2174 	pthread_spin_unlock(&file->lock);
2175 
2176 	spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req);
2177 }
2178 
2179 static void
2180 __file_close_async__sync_done(void *arg, int fserrno)
2181 {
2182 	struct spdk_fs_request *req = arg;
2183 	struct spdk_fs_cb_args *args = &req->args;
2184 
2185 	__file_close_async(args->file, req);
2186 }
2187 
2188 void
2189 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2190 {
2191 	struct spdk_fs_request *req;
2192 	struct spdk_fs_cb_args *args;
2193 
2194 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2195 	if (req == NULL) {
2196 		cb_fn(cb_arg, -ENOMEM);
2197 		return;
2198 	}
2199 
2200 	args = &req->args;
2201 	args->file = file;
2202 	args->fn.file_op = cb_fn;
2203 	args->arg = cb_arg;
2204 
2205 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2206 }
2207 
2208 static void
2209 __file_close_done(void *arg, int fserrno)
2210 {
2211 	struct spdk_fs_cb_args *args = arg;
2212 
2213 	args->rc = fserrno;
2214 	sem_post(args->sem);
2215 }
2216 
2217 static void
2218 __file_close(void *arg)
2219 {
2220 	struct spdk_fs_request *req = arg;
2221 	struct spdk_fs_cb_args *args = &req->args;
2222 	struct spdk_file *file = args->file;
2223 
2224 	__file_close_async(file, req);
2225 }
2226 
2227 int
2228 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2229 {
2230 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2231 	struct spdk_fs_request *req;
2232 	struct spdk_fs_cb_args *args;
2233 
2234 	req = alloc_fs_request(channel);
2235 	assert(req != NULL);
2236 
2237 	args = &req->args;
2238 
2239 	spdk_file_sync(file, _channel);
2240 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2241 	args->file = file;
2242 	args->sem = &channel->sem;
2243 	args->fn.file_op = __file_close_done;
2244 	args->arg = req;
2245 	channel->send_request(__file_close, req);
2246 	sem_wait(&channel->sem);
2247 
2248 	return args->rc;
2249 }
2250 
2251 static void
2252 cache_free_buffers(struct spdk_file *file)
2253 {
2254 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2255 	pthread_spin_lock(&file->lock);
2256 	pthread_spin_lock(&g_caches_lock);
2257 	if (file->tree->present_mask == 0) {
2258 		pthread_spin_unlock(&g_caches_lock);
2259 		pthread_spin_unlock(&file->lock);
2260 		return;
2261 	}
2262 	spdk_tree_free_buffers(file->tree);
2263 	if (file->tree->present_mask == 0) {
2264 		TAILQ_REMOVE(&g_caches, file, cache_tailq);
2265 	}
2266 	file->last = NULL;
2267 	pthread_spin_unlock(&g_caches_lock);
2268 	pthread_spin_unlock(&file->lock);
2269 }
2270 
2271 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS);
2272 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW);
2273