xref: /spdk/lib/blobfs/blobfs.c (revision d92f0f75caf311608f5f0e19d4b3db349609b4e8)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "blobfs_internal.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk_internal/log.h"
45 
46 #define BLOBFS_TRACE(file, str, args...) \
47 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args)
48 
49 #define BLOBFS_TRACE_RW(file, str, args...) \
50 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args)
51 
52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
53 
54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE;
55 static struct spdk_mempool *g_cache_pool;
56 static TAILQ_HEAD(, spdk_file) g_caches;
57 static int g_fs_count = 0;
58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
59 static pthread_spinlock_t g_caches_lock;
60 
61 static void
62 __sem_post(void *arg, int bserrno)
63 {
64 	sem_t *sem = arg;
65 
66 	sem_post(sem);
67 }
68 
69 void
70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
71 {
72 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
73 	free(cache_buffer);
74 }
75 
76 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
77 
78 struct spdk_file {
79 	struct spdk_filesystem	*fs;
80 	struct spdk_blob	*blob;
81 	char			*name;
82 	uint64_t		length;
83 	bool			open_for_writing;
84 	uint64_t		length_flushed;
85 	uint64_t		append_pos;
86 	uint64_t		seq_byte_count;
87 	uint64_t		next_seq_offset;
88 	uint32_t		priority;
89 	TAILQ_ENTRY(spdk_file)	tailq;
90 	spdk_blob_id		blobid;
91 	uint32_t		ref_count;
92 	pthread_spinlock_t	lock;
93 	struct cache_buffer	*last;
94 	struct cache_tree	*tree;
95 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
96 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
97 	TAILQ_ENTRY(spdk_file)	cache_tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	bool from_request;
138 	union {
139 		struct {
140 			uint64_t	length;
141 		} truncate;
142 		struct {
143 			struct spdk_io_channel	*channel;
144 			void		*user_buf;
145 			void		*pin_buf;
146 			int		is_read;
147 			off_t		offset;
148 			size_t		length;
149 			uint64_t	start_page;
150 			uint64_t	num_pages;
151 			uint32_t	blocklen;
152 		} rw;
153 		struct {
154 			const char	*old_name;
155 			const char	*new_name;
156 		} rename;
157 		struct {
158 			struct cache_buffer	*cache_buffer;
159 			uint64_t		length;
160 		} flush;
161 		struct {
162 			struct cache_buffer	*cache_buffer;
163 			uint64_t		length;
164 			uint64_t		offset;
165 		} readahead;
166 		struct {
167 			uint64_t			offset;
168 			TAILQ_ENTRY(spdk_fs_request)	tailq;
169 			bool				xattr_in_progress;
170 		} sync;
171 		struct {
172 			uint32_t			num_clusters;
173 		} resize;
174 		struct {
175 			const char	*name;
176 			uint32_t	flags;
177 			TAILQ_ENTRY(spdk_fs_request)	tailq;
178 		} open;
179 		struct {
180 			const char	*name;
181 		} create;
182 		struct {
183 			const char	*name;
184 		} delete;
185 		struct {
186 			const char	*name;
187 		} stat;
188 	} op;
189 };
190 
191 static void cache_free_buffers(struct spdk_file *file);
192 
193 static void
194 __initialize_cache(void)
195 {
196 	assert(g_cache_pool == NULL);
197 
198 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
199 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
200 					   CACHE_BUFFER_SIZE,
201 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
202 					   SPDK_ENV_SOCKET_ID_ANY);
203 	TAILQ_INIT(&g_caches);
204 	pthread_spin_init(&g_caches_lock, 0);
205 }
206 
207 static void
208 __free_cache(void)
209 {
210 	assert(g_cache_pool != NULL);
211 
212 	spdk_mempool_free(g_cache_pool);
213 	g_cache_pool = NULL;
214 }
215 
216 static uint64_t
217 __file_get_blob_size(struct spdk_file *file)
218 {
219 	uint64_t cluster_sz;
220 
221 	cluster_sz = file->fs->bs_opts.cluster_sz;
222 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
223 }
224 
225 struct spdk_fs_request {
226 	struct spdk_fs_cb_args		args;
227 	TAILQ_ENTRY(spdk_fs_request)	link;
228 	struct spdk_fs_channel		*channel;
229 };
230 
231 struct spdk_fs_channel {
232 	struct spdk_fs_request		*req_mem;
233 	TAILQ_HEAD(, spdk_fs_request)	reqs;
234 	sem_t				sem;
235 	struct spdk_filesystem		*fs;
236 	struct spdk_io_channel		*bs_channel;
237 	fs_send_request_fn		send_request;
238 	bool				sync;
239 	pthread_spinlock_t		lock;
240 };
241 
242 static struct spdk_fs_request *
243 alloc_fs_request(struct spdk_fs_channel *channel)
244 {
245 	struct spdk_fs_request *req;
246 
247 	if (channel->sync) {
248 		pthread_spin_lock(&channel->lock);
249 	}
250 
251 	req = TAILQ_FIRST(&channel->reqs);
252 	if (req) {
253 		TAILQ_REMOVE(&channel->reqs, req, link);
254 	}
255 
256 	if (channel->sync) {
257 		pthread_spin_unlock(&channel->lock);
258 	}
259 
260 	if (req == NULL) {
261 		return NULL;
262 	}
263 	memset(req, 0, sizeof(*req));
264 	req->channel = channel;
265 	req->args.from_request = true;
266 
267 	return req;
268 }
269 
270 static void
271 free_fs_request(struct spdk_fs_request *req)
272 {
273 	struct spdk_fs_channel *channel = req->channel;
274 
275 	if (channel->sync) {
276 		pthread_spin_lock(&channel->lock);
277 	}
278 
279 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
280 
281 	if (channel->sync) {
282 		pthread_spin_unlock(&channel->lock);
283 	}
284 }
285 
286 static int
287 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
288 			uint32_t max_ops)
289 {
290 	uint32_t i;
291 
292 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
293 	if (!channel->req_mem) {
294 		return -1;
295 	}
296 
297 	TAILQ_INIT(&channel->reqs);
298 	sem_init(&channel->sem, 0, 0);
299 
300 	for (i = 0; i < max_ops; i++) {
301 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
302 	}
303 
304 	channel->fs = fs;
305 
306 	return 0;
307 }
308 
309 static int
310 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
311 {
312 	struct spdk_filesystem		*fs;
313 	struct spdk_fs_channel		*channel = ctx_buf;
314 
315 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
316 
317 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
318 }
319 
320 static int
321 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
322 {
323 	struct spdk_filesystem		*fs;
324 	struct spdk_fs_channel		*channel = ctx_buf;
325 
326 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
327 
328 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
329 }
330 
331 static int
332 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
333 {
334 	struct spdk_filesystem		*fs;
335 	struct spdk_fs_channel		*channel = ctx_buf;
336 
337 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
338 
339 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
340 }
341 
342 static void
343 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
344 {
345 	struct spdk_fs_channel *channel = ctx_buf;
346 
347 	free(channel->req_mem);
348 	if (channel->bs_channel != NULL) {
349 		spdk_bs_free_io_channel(channel->bs_channel);
350 	}
351 }
352 
353 static void
354 __send_request_direct(fs_request_fn fn, void *arg)
355 {
356 	fn(arg);
357 }
358 
359 static void
360 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
361 {
362 	fs->bs = bs;
363 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
364 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
365 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
366 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
367 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
368 
369 	pthread_mutex_lock(&g_cache_init_lock);
370 	if (g_fs_count == 0) {
371 		__initialize_cache();
372 	}
373 	g_fs_count++;
374 	pthread_mutex_unlock(&g_cache_init_lock);
375 }
376 
377 static void
378 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
379 {
380 	struct spdk_fs_request *req = ctx;
381 	struct spdk_fs_cb_args *args = &req->args;
382 	struct spdk_filesystem *fs = args->fs;
383 
384 	if (bserrno == 0) {
385 		common_fs_bs_init(fs, bs);
386 	} else {
387 		free(fs);
388 		fs = NULL;
389 	}
390 
391 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
392 	free_fs_request(req);
393 }
394 
395 static struct spdk_filesystem *
396 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
397 {
398 	struct spdk_filesystem *fs;
399 
400 	fs = calloc(1, sizeof(*fs));
401 	if (fs == NULL) {
402 		return NULL;
403 	}
404 
405 	fs->bdev = dev;
406 	fs->send_request = send_request_fn;
407 	TAILQ_INIT(&fs->files);
408 
409 	fs->md_target.max_ops = 512;
410 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
411 				sizeof(struct spdk_fs_channel));
412 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
413 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
414 
415 	fs->sync_target.max_ops = 512;
416 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
417 				sizeof(struct spdk_fs_channel));
418 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
419 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
420 
421 	fs->io_target.max_ops = 512;
422 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
423 				sizeof(struct spdk_fs_channel));
424 
425 	return fs;
426 }
427 
428 void
429 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
430 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
431 {
432 	struct spdk_filesystem *fs;
433 	struct spdk_fs_request *req;
434 	struct spdk_fs_cb_args *args;
435 
436 	fs = fs_alloc(dev, send_request_fn);
437 	if (fs == NULL) {
438 		cb_fn(cb_arg, NULL, -ENOMEM);
439 		return;
440 	}
441 
442 	req = alloc_fs_request(fs->md_target.md_fs_channel);
443 	if (req == NULL) {
444 		spdk_put_io_channel(fs->md_target.md_io_channel);
445 		spdk_io_device_unregister(&fs->md_target, NULL);
446 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
447 		spdk_io_device_unregister(&fs->sync_target, NULL);
448 		spdk_io_device_unregister(&fs->io_target, NULL);
449 		free(fs);
450 		cb_fn(cb_arg, NULL, -ENOMEM);
451 		return;
452 	}
453 
454 	args = &req->args;
455 	args->fn.fs_op_with_handle = cb_fn;
456 	args->arg = cb_arg;
457 	args->fs = fs;
458 
459 	spdk_bs_init(dev, NULL, init_cb, req);
460 }
461 
462 static struct spdk_file *
463 file_alloc(struct spdk_filesystem *fs)
464 {
465 	struct spdk_file *file;
466 
467 	file = calloc(1, sizeof(*file));
468 	if (file == NULL) {
469 		return NULL;
470 	}
471 
472 	file->tree = calloc(1, sizeof(*file->tree));
473 	if (file->tree == NULL) {
474 		free(file);
475 		return NULL;
476 	}
477 
478 	file->fs = fs;
479 	TAILQ_INIT(&file->open_requests);
480 	TAILQ_INIT(&file->sync_requests);
481 	pthread_spin_init(&file->lock, 0);
482 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
483 	file->priority = SPDK_FILE_PRIORITY_LOW;
484 	return file;
485 }
486 
487 static void
488 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
489 {
490 	struct spdk_fs_request *req = ctx;
491 	struct spdk_fs_cb_args *args = &req->args;
492 	struct spdk_filesystem *fs = args->fs;
493 	struct spdk_file *f;
494 	uint64_t *length;
495 	const char *name;
496 	size_t value_len;
497 
498 	if (rc == -ENOENT) {
499 		/* Finished iterating */
500 		args->fn.fs_op_with_handle(args->arg, fs, 0);
501 		free_fs_request(req);
502 		return;
503 	} else if (rc < 0) {
504 		args->fn.fs_op_with_handle(args->arg, fs, rc);
505 		free_fs_request(req);
506 		return;
507 	}
508 
509 	rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len);
510 	if (rc < 0) {
511 		args->fn.fs_op_with_handle(args->arg, fs, rc);
512 		free_fs_request(req);
513 		return;
514 	}
515 
516 	rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len);
517 	if (rc < 0) {
518 		args->fn.fs_op_with_handle(args->arg, fs, rc);
519 		free_fs_request(req);
520 		return;
521 	}
522 	assert(value_len == 8);
523 
524 	f = file_alloc(fs);
525 	if (f == NULL) {
526 		args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
527 		free_fs_request(req);
528 		return;
529 	}
530 
531 	f->name = strdup(name);
532 	f->blobid = spdk_blob_get_id(blob);
533 	f->length = *length;
534 	f->length_flushed = *length;
535 	f->append_pos = *length;
536 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
537 
538 	spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req);
539 }
540 
541 static void
542 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
543 {
544 	struct spdk_fs_request *req = ctx;
545 	struct spdk_fs_cb_args *args = &req->args;
546 	struct spdk_filesystem *fs = args->fs;
547 
548 	if (bserrno != 0) {
549 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
550 		free_fs_request(req);
551 		free(fs);
552 		return;
553 	}
554 
555 	common_fs_bs_init(fs, bs);
556 	spdk_bs_md_iter_first(fs->bs, iter_cb, req);
557 }
558 
559 void
560 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
561 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
562 {
563 	struct spdk_filesystem *fs;
564 	struct spdk_fs_cb_args *args;
565 	struct spdk_fs_request *req;
566 
567 	fs = fs_alloc(dev, send_request_fn);
568 	if (fs == NULL) {
569 		cb_fn(cb_arg, NULL, -ENOMEM);
570 		return;
571 	}
572 
573 	req = alloc_fs_request(fs->md_target.md_fs_channel);
574 	if (req == NULL) {
575 		spdk_put_io_channel(fs->md_target.md_io_channel);
576 		spdk_io_device_unregister(&fs->md_target, NULL);
577 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
578 		spdk_io_device_unregister(&fs->sync_target, NULL);
579 		spdk_io_device_unregister(&fs->io_target, NULL);
580 		free(fs);
581 		cb_fn(cb_arg, NULL, -ENOMEM);
582 		return;
583 	}
584 
585 	args = &req->args;
586 	args->fn.fs_op_with_handle = cb_fn;
587 	args->arg = cb_arg;
588 	args->fs = fs;
589 
590 	spdk_bs_load(dev, load_cb, req);
591 }
592 
593 static void
594 unload_cb(void *ctx, int bserrno)
595 {
596 	struct spdk_fs_request *req = ctx;
597 	struct spdk_fs_cb_args *args = &req->args;
598 	struct spdk_filesystem *fs = args->fs;
599 
600 	pthread_mutex_lock(&g_cache_init_lock);
601 	g_fs_count--;
602 	if (g_fs_count == 0) {
603 		__free_cache();
604 	}
605 	pthread_mutex_unlock(&g_cache_init_lock);
606 
607 	args->fn.fs_op(args->arg, bserrno);
608 	free(req);
609 
610 	spdk_io_device_unregister(&fs->io_target, NULL);
611 	spdk_io_device_unregister(&fs->sync_target, NULL);
612 	spdk_io_device_unregister(&fs->md_target, NULL);
613 
614 	free(fs);
615 }
616 
617 void
618 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
619 {
620 	struct spdk_fs_request *req;
621 	struct spdk_fs_cb_args *args;
622 
623 	/*
624 	 * We must free the md_channel before unloading the blobstore, so just
625 	 *  allocate this request from the general heap.
626 	 */
627 	req = calloc(1, sizeof(*req));
628 	if (req == NULL) {
629 		cb_fn(cb_arg, -ENOMEM);
630 		return;
631 	}
632 
633 	args = &req->args;
634 	args->fn.fs_op = cb_fn;
635 	args->arg = cb_arg;
636 	args->fs = fs;
637 
638 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
639 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
640 	spdk_bs_unload(fs->bs, unload_cb, req);
641 }
642 
643 static struct spdk_file *
644 fs_find_file(struct spdk_filesystem *fs, const char *name)
645 {
646 	struct spdk_file *file;
647 
648 	TAILQ_FOREACH(file, &fs->files, tailq) {
649 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
650 			return file;
651 		}
652 	}
653 
654 	return NULL;
655 }
656 
657 void
658 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
659 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
660 {
661 	struct spdk_file_stat stat;
662 	struct spdk_file *f = NULL;
663 
664 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
665 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
666 		return;
667 	}
668 
669 	f = fs_find_file(fs, name);
670 	if (f != NULL) {
671 		stat.blobid = f->blobid;
672 		stat.size = f->length;
673 		cb_fn(cb_arg, &stat, 0);
674 		return;
675 	}
676 
677 	cb_fn(cb_arg, NULL, -ENOENT);
678 }
679 
680 static void
681 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
682 {
683 	struct spdk_fs_request *req = arg;
684 	struct spdk_fs_cb_args *args = &req->args;
685 
686 	args->rc = fserrno;
687 	if (fserrno == 0) {
688 		memcpy(args->arg, stat, sizeof(*stat));
689 	}
690 	sem_post(args->sem);
691 }
692 
693 static void
694 __file_stat(void *arg)
695 {
696 	struct spdk_fs_request *req = arg;
697 	struct spdk_fs_cb_args *args = &req->args;
698 
699 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
700 				args->fn.stat_op, req);
701 }
702 
703 int
704 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
705 		  const char *name, struct spdk_file_stat *stat)
706 {
707 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
708 	struct spdk_fs_request *req;
709 	int rc;
710 
711 	req = alloc_fs_request(channel);
712 	assert(req != NULL);
713 
714 	req->args.fs = fs;
715 	req->args.op.stat.name = name;
716 	req->args.fn.stat_op = __copy_stat;
717 	req->args.arg = stat;
718 	req->args.sem = &channel->sem;
719 	channel->send_request(__file_stat, req);
720 	sem_wait(&channel->sem);
721 
722 	rc = req->args.rc;
723 	free_fs_request(req);
724 
725 	return rc;
726 }
727 
728 static void
729 fs_create_blob_close_cb(void *ctx, int bserrno)
730 {
731 	struct spdk_fs_request *req = ctx;
732 	struct spdk_fs_cb_args *args = &req->args;
733 
734 	args->fn.file_op(args->arg, bserrno);
735 	free_fs_request(req);
736 }
737 
738 static void
739 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
740 {
741 	struct spdk_fs_request *req = ctx;
742 	struct spdk_fs_cb_args *args = &req->args;
743 	struct spdk_file *f = args->file;
744 	uint64_t length = 0;
745 
746 	f->blob = blob;
747 	spdk_bs_md_resize_blob(blob, 1);
748 	spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
749 	spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
750 
751 	spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args);
752 }
753 
754 static void
755 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
756 {
757 	struct spdk_fs_request *req = ctx;
758 	struct spdk_fs_cb_args *args = &req->args;
759 	struct spdk_file *f = args->file;
760 
761 	f->blobid = blobid;
762 	spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
763 }
764 
765 void
766 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
767 			  spdk_file_op_complete cb_fn, void *cb_arg)
768 {
769 	struct spdk_file *file;
770 	struct spdk_fs_request *req;
771 	struct spdk_fs_cb_args *args;
772 
773 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
774 		cb_fn(cb_arg, -ENAMETOOLONG);
775 		return;
776 	}
777 
778 	file = fs_find_file(fs, name);
779 	if (file != NULL) {
780 		cb_fn(cb_arg, -EEXIST);
781 		return;
782 	}
783 
784 	file = file_alloc(fs);
785 	if (file == NULL) {
786 		cb_fn(cb_arg, -ENOMEM);
787 		return;
788 	}
789 
790 	req = alloc_fs_request(fs->md_target.md_fs_channel);
791 	if (req == NULL) {
792 		cb_fn(cb_arg, -ENOMEM);
793 		return;
794 	}
795 
796 	args = &req->args;
797 	args->file = file;
798 	args->fn.file_op = cb_fn;
799 	args->arg = cb_arg;
800 
801 	file->name = strdup(name);
802 	spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args);
803 }
804 
805 static void
806 __fs_create_file_done(void *arg, int fserrno)
807 {
808 	struct spdk_fs_request *req = arg;
809 	struct spdk_fs_cb_args *args = &req->args;
810 
811 	args->rc = fserrno;
812 	sem_post(args->sem);
813 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
814 }
815 
816 static void
817 __fs_create_file(void *arg)
818 {
819 	struct spdk_fs_request *req = arg;
820 	struct spdk_fs_cb_args *args = &req->args;
821 
822 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
823 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
824 }
825 
826 int
827 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
828 {
829 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
830 	struct spdk_fs_request *req;
831 	struct spdk_fs_cb_args *args;
832 	int rc;
833 
834 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
835 
836 	req = alloc_fs_request(channel);
837 	assert(req != NULL);
838 
839 	args = &req->args;
840 	args->fs = fs;
841 	args->op.create.name = name;
842 	args->sem = &channel->sem;
843 	fs->send_request(__fs_create_file, req);
844 	sem_wait(&channel->sem);
845 	rc = args->rc;
846 	free_fs_request(req);
847 
848 	return rc;
849 }
850 
851 static void
852 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
853 {
854 	struct spdk_fs_request *req = ctx;
855 	struct spdk_fs_cb_args *args = &req->args;
856 	struct spdk_file *f = args->file;
857 
858 	f->blob = blob;
859 	while (!TAILQ_EMPTY(&f->open_requests)) {
860 		req = TAILQ_FIRST(&f->open_requests);
861 		args = &req->args;
862 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
863 		args->fn.file_op_with_handle(args->arg, f, bserrno);
864 		free_fs_request(req);
865 	}
866 }
867 
868 static void
869 fs_open_blob_create_cb(void *ctx, int bserrno)
870 {
871 	struct spdk_fs_request *req = ctx;
872 	struct spdk_fs_cb_args *args = &req->args;
873 	struct spdk_file *file = args->file;
874 	struct spdk_filesystem *fs = args->fs;
875 
876 	if (file == NULL) {
877 		/*
878 		 * This is from an open with CREATE flag - the file
879 		 *  is now created so look it up in the file list for this
880 		 *  filesystem.
881 		 */
882 		file = fs_find_file(fs, args->op.open.name);
883 		assert(file != NULL);
884 		args->file = file;
885 	}
886 
887 	file->ref_count++;
888 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
889 	if (file->ref_count == 1) {
890 		assert(file->blob == NULL);
891 		spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
892 	} else if (file->blob != NULL) {
893 		fs_open_blob_done(req, file->blob, 0);
894 	} else {
895 		/*
896 		 * The blob open for this file is in progress due to a previous
897 		 *  open request.  When that open completes, it will invoke the
898 		 *  open callback for this request.
899 		 */
900 	}
901 }
902 
903 void
904 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
905 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
906 {
907 	struct spdk_file *f = NULL;
908 	struct spdk_fs_request *req;
909 	struct spdk_fs_cb_args *args;
910 
911 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
912 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
913 		return;
914 	}
915 
916 	f = fs_find_file(fs, name);
917 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
918 		cb_fn(cb_arg, NULL, -ENOENT);
919 		return;
920 	}
921 
922 	req = alloc_fs_request(fs->md_target.md_fs_channel);
923 	if (req == NULL) {
924 		cb_fn(cb_arg, NULL, -ENOMEM);
925 		return;
926 	}
927 
928 	args = &req->args;
929 	args->fn.file_op_with_handle = cb_fn;
930 	args->arg = cb_arg;
931 	args->file = f;
932 	args->fs = fs;
933 	args->op.open.name = name;
934 
935 	if (f == NULL) {
936 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
937 	} else {
938 		fs_open_blob_create_cb(req, 0);
939 	}
940 }
941 
942 static void
943 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
944 {
945 	struct spdk_fs_request *req = arg;
946 	struct spdk_fs_cb_args *args = &req->args;
947 
948 	args->file = file;
949 	args->rc = bserrno;
950 	sem_post(args->sem);
951 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
952 }
953 
954 static void
955 __fs_open_file(void *arg)
956 {
957 	struct spdk_fs_request *req = arg;
958 	struct spdk_fs_cb_args *args = &req->args;
959 
960 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
961 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
962 				__fs_open_file_done, req);
963 }
964 
965 int
966 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
967 		  const char *name, uint32_t flags, struct spdk_file **file)
968 {
969 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
970 	struct spdk_fs_request *req;
971 	struct spdk_fs_cb_args *args;
972 	int rc;
973 
974 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
975 
976 	req = alloc_fs_request(channel);
977 	assert(req != NULL);
978 
979 	args = &req->args;
980 	args->fs = fs;
981 	args->op.open.name = name;
982 	args->op.open.flags = flags;
983 	args->sem = &channel->sem;
984 	fs->send_request(__fs_open_file, req);
985 	sem_wait(&channel->sem);
986 	rc = args->rc;
987 	if (rc == 0) {
988 		*file = args->file;
989 	} else {
990 		*file = NULL;
991 	}
992 	free_fs_request(req);
993 
994 	return rc;
995 }
996 
997 static void
998 fs_rename_blob_close_cb(void *ctx, int bserrno)
999 {
1000 	struct spdk_fs_request *req = ctx;
1001 	struct spdk_fs_cb_args *args = &req->args;
1002 
1003 	args->fn.fs_op(args->arg, bserrno);
1004 	free_fs_request(req);
1005 }
1006 
1007 static void
1008 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1009 {
1010 	struct spdk_fs_request *req = ctx;
1011 	struct spdk_fs_cb_args *args = &req->args;
1012 	struct spdk_file *f = args->file;
1013 	const char *new_name = args->op.rename.new_name;
1014 
1015 	f->blob = blob;
1016 	spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1017 	spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req);
1018 }
1019 
1020 static void
1021 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1022 {
1023 	struct spdk_fs_cb_args *args = &req->args;
1024 	struct spdk_file *f;
1025 
1026 	f = fs_find_file(args->fs, args->op.rename.old_name);
1027 	if (f == NULL) {
1028 		args->fn.fs_op(args->arg, -ENOENT);
1029 		free_fs_request(req);
1030 		return;
1031 	}
1032 
1033 	free(f->name);
1034 	f->name = strdup(args->op.rename.new_name);
1035 	args->file = f;
1036 	spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1037 }
1038 
1039 static void
1040 fs_rename_delete_done(void *arg, int fserrno)
1041 {
1042 	__spdk_fs_md_rename_file(arg);
1043 }
1044 
1045 void
1046 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1047 			  const char *old_name, const char *new_name,
1048 			  spdk_file_op_complete cb_fn, void *cb_arg)
1049 {
1050 	struct spdk_file *f;
1051 	struct spdk_fs_request *req;
1052 	struct spdk_fs_cb_args *args;
1053 
1054 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1055 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1056 		cb_fn(cb_arg, -ENAMETOOLONG);
1057 		return;
1058 	}
1059 
1060 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1061 	if (req == NULL) {
1062 		cb_fn(cb_arg, -ENOMEM);
1063 		return;
1064 	}
1065 
1066 	args = &req->args;
1067 	args->fn.fs_op = cb_fn;
1068 	args->fs = fs;
1069 	args->arg = cb_arg;
1070 	args->op.rename.old_name = old_name;
1071 	args->op.rename.new_name = new_name;
1072 
1073 	f = fs_find_file(fs, new_name);
1074 	if (f == NULL) {
1075 		__spdk_fs_md_rename_file(req);
1076 		return;
1077 	}
1078 
1079 	/*
1080 	 * The rename overwrites an existing file.  So delete the existing file, then
1081 	 *  do the actual rename.
1082 	 */
1083 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1084 }
1085 
1086 static void
1087 __fs_rename_file_done(void *arg, int fserrno)
1088 {
1089 	struct spdk_fs_request *req = arg;
1090 	struct spdk_fs_cb_args *args = &req->args;
1091 
1092 	args->rc = fserrno;
1093 	sem_post(args->sem);
1094 }
1095 
1096 static void
1097 __fs_rename_file(void *arg)
1098 {
1099 	struct spdk_fs_request *req = arg;
1100 	struct spdk_fs_cb_args *args = &req->args;
1101 
1102 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1103 				  __fs_rename_file_done, req);
1104 }
1105 
1106 int
1107 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1108 		    const char *old_name, const char *new_name)
1109 {
1110 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1111 	struct spdk_fs_request *req;
1112 	struct spdk_fs_cb_args *args;
1113 	int rc;
1114 
1115 	req = alloc_fs_request(channel);
1116 	assert(req != NULL);
1117 
1118 	args = &req->args;
1119 
1120 	args->fs = fs;
1121 	args->op.rename.old_name = old_name;
1122 	args->op.rename.new_name = new_name;
1123 	args->sem = &channel->sem;
1124 	fs->send_request(__fs_rename_file, req);
1125 	sem_wait(&channel->sem);
1126 	rc = args->rc;
1127 	free_fs_request(req);
1128 	return rc;
1129 }
1130 
1131 static void
1132 blob_delete_cb(void *ctx, int bserrno)
1133 {
1134 	struct spdk_fs_request *req = ctx;
1135 	struct spdk_fs_cb_args *args = &req->args;
1136 
1137 	args->fn.file_op(args->arg, bserrno);
1138 	free_fs_request(req);
1139 }
1140 
1141 void
1142 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1143 			  spdk_file_op_complete cb_fn, void *cb_arg)
1144 {
1145 	struct spdk_file *f;
1146 	spdk_blob_id blobid;
1147 	struct spdk_fs_request *req;
1148 	struct spdk_fs_cb_args *args;
1149 
1150 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1151 
1152 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1153 		cb_fn(cb_arg, -ENAMETOOLONG);
1154 		return;
1155 	}
1156 
1157 	f = fs_find_file(fs, name);
1158 	if (f == NULL) {
1159 		cb_fn(cb_arg, -ENOENT);
1160 		return;
1161 	}
1162 
1163 	if (f->ref_count > 0) {
1164 		/* For now, do not allow deleting files with open references. */
1165 		cb_fn(cb_arg, -EBUSY);
1166 		return;
1167 	}
1168 
1169 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1170 	if (req == NULL) {
1171 		cb_fn(cb_arg, -ENOMEM);
1172 		return;
1173 	}
1174 
1175 	TAILQ_REMOVE(&fs->files, f, tailq);
1176 
1177 	cache_free_buffers(f);
1178 
1179 	blobid = f->blobid;
1180 
1181 	free(f->name);
1182 	free(f->tree);
1183 	free(f);
1184 
1185 	args = &req->args;
1186 	args->fn.file_op = cb_fn;
1187 	args->arg = cb_arg;
1188 	spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1189 }
1190 
1191 static void
1192 __fs_delete_file_done(void *arg, int fserrno)
1193 {
1194 	struct spdk_fs_request *req = arg;
1195 	struct spdk_fs_cb_args *args = &req->args;
1196 
1197 	args->rc = fserrno;
1198 	sem_post(args->sem);
1199 }
1200 
1201 static void
1202 __fs_delete_file(void *arg)
1203 {
1204 	struct spdk_fs_request *req = arg;
1205 	struct spdk_fs_cb_args *args = &req->args;
1206 
1207 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1208 }
1209 
1210 int
1211 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1212 		    const char *name)
1213 {
1214 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1215 	struct spdk_fs_request *req;
1216 	struct spdk_fs_cb_args *args;
1217 	int rc;
1218 
1219 	req = alloc_fs_request(channel);
1220 	assert(req != NULL);
1221 
1222 	args = &req->args;
1223 	args->fs = fs;
1224 	args->op.delete.name = name;
1225 	args->sem = &channel->sem;
1226 	fs->send_request(__fs_delete_file, req);
1227 	sem_wait(&channel->sem);
1228 	rc = args->rc;
1229 	free_fs_request(req);
1230 
1231 	return rc;
1232 }
1233 
1234 spdk_fs_iter
1235 spdk_fs_iter_first(struct spdk_filesystem *fs)
1236 {
1237 	struct spdk_file *f;
1238 
1239 	f = TAILQ_FIRST(&fs->files);
1240 	return f;
1241 }
1242 
1243 spdk_fs_iter
1244 spdk_fs_iter_next(spdk_fs_iter iter)
1245 {
1246 	struct spdk_file *f = iter;
1247 
1248 	if (f == NULL) {
1249 		return NULL;
1250 	}
1251 
1252 	f = TAILQ_NEXT(f, tailq);
1253 	return f;
1254 }
1255 
1256 const char *
1257 spdk_file_get_name(struct spdk_file *file)
1258 {
1259 	return file->name;
1260 }
1261 
1262 uint64_t
1263 spdk_file_get_length(struct spdk_file *file)
1264 {
1265 	assert(file != NULL);
1266 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1267 	return file->length;
1268 }
1269 
1270 static void
1271 fs_truncate_complete_cb(void *ctx, int bserrno)
1272 {
1273 	struct spdk_fs_request *req = ctx;
1274 	struct spdk_fs_cb_args *args = &req->args;
1275 
1276 	args->fn.file_op(args->arg, bserrno);
1277 	free_fs_request(req);
1278 }
1279 
1280 static uint64_t
1281 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1282 {
1283 	return (length + cluster_sz - 1) / cluster_sz;
1284 }
1285 
1286 void
1287 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1288 			 spdk_file_op_complete cb_fn, void *cb_arg)
1289 {
1290 	struct spdk_filesystem *fs;
1291 	size_t num_clusters;
1292 	struct spdk_fs_request *req;
1293 	struct spdk_fs_cb_args *args;
1294 
1295 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1296 	if (length == file->length) {
1297 		cb_fn(cb_arg, 0);
1298 		return;
1299 	}
1300 
1301 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1302 	if (req == NULL) {
1303 		cb_fn(cb_arg, -ENOMEM);
1304 		return;
1305 	}
1306 
1307 	args = &req->args;
1308 	args->fn.file_op = cb_fn;
1309 	args->arg = cb_arg;
1310 	args->file = file;
1311 	fs = file->fs;
1312 
1313 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1314 
1315 	spdk_bs_md_resize_blob(file->blob, num_clusters);
1316 	spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length));
1317 
1318 	file->length = length;
1319 	if (file->append_pos > file->length) {
1320 		file->append_pos = file->length;
1321 	}
1322 
1323 	spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args);
1324 }
1325 
1326 static void
1327 __truncate(void *arg)
1328 {
1329 	struct spdk_fs_request *req = arg;
1330 	struct spdk_fs_cb_args *args = &req->args;
1331 
1332 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1333 				 args->fn.file_op, args->arg);
1334 }
1335 
1336 void
1337 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1338 		   uint64_t length)
1339 {
1340 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1341 	struct spdk_fs_request *req;
1342 	struct spdk_fs_cb_args *args;
1343 
1344 	req = alloc_fs_request(channel);
1345 	assert(req != NULL);
1346 
1347 	args = &req->args;
1348 
1349 	args->file = file;
1350 	args->op.truncate.length = length;
1351 	args->fn.file_op = __sem_post;
1352 	args->arg = &channel->sem;
1353 
1354 	channel->send_request(__truncate, req);
1355 	sem_wait(&channel->sem);
1356 	free_fs_request(req);
1357 }
1358 
1359 static void
1360 __rw_done(void *ctx, int bserrno)
1361 {
1362 	struct spdk_fs_request *req = ctx;
1363 	struct spdk_fs_cb_args *args = &req->args;
1364 
1365 	spdk_dma_free(args->op.rw.pin_buf);
1366 	args->fn.file_op(args->arg, bserrno);
1367 	free_fs_request(req);
1368 }
1369 
1370 static void
1371 __read_done(void *ctx, int bserrno)
1372 {
1373 	struct spdk_fs_request *req = ctx;
1374 	struct spdk_fs_cb_args *args = &req->args;
1375 
1376 	if (args->op.rw.is_read) {
1377 		memcpy(args->op.rw.user_buf,
1378 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1379 		       args->op.rw.length);
1380 		__rw_done(req, 0);
1381 	} else {
1382 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1383 		       args->op.rw.user_buf,
1384 		       args->op.rw.length);
1385 		spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel,
1386 				      args->op.rw.pin_buf,
1387 				      args->op.rw.start_page, args->op.rw.num_pages,
1388 				      __rw_done, req);
1389 	}
1390 }
1391 
1392 static void
1393 __do_blob_read(void *ctx, int fserrno)
1394 {
1395 	struct spdk_fs_request *req = ctx;
1396 	struct spdk_fs_cb_args *args = &req->args;
1397 
1398 	spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel,
1399 			     args->op.rw.pin_buf,
1400 			     args->op.rw.start_page, args->op.rw.num_pages,
1401 			     __read_done, req);
1402 }
1403 
1404 static void
1405 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1406 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1407 {
1408 	uint64_t end_page;
1409 
1410 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1411 	*start_page = offset / *page_size;
1412 	end_page = (offset + length - 1) / *page_size;
1413 	*num_pages = (end_page - *start_page + 1);
1414 }
1415 
1416 static void
1417 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1418 	    void *payload, uint64_t offset, uint64_t length,
1419 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1420 {
1421 	struct spdk_fs_request *req;
1422 	struct spdk_fs_cb_args *args;
1423 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1424 	uint64_t start_page, num_pages, pin_buf_length;
1425 	uint32_t page_size;
1426 
1427 	if (is_read && offset + length > file->length) {
1428 		cb_fn(cb_arg, -EINVAL);
1429 		return;
1430 	}
1431 
1432 	req = alloc_fs_request(channel);
1433 	if (req == NULL) {
1434 		cb_fn(cb_arg, -ENOMEM);
1435 		return;
1436 	}
1437 
1438 	args = &req->args;
1439 	args->fn.file_op = cb_fn;
1440 	args->arg = cb_arg;
1441 	args->file = file;
1442 	args->op.rw.channel = channel->bs_channel;
1443 	args->op.rw.user_buf = payload;
1444 	args->op.rw.is_read = is_read;
1445 	args->op.rw.offset = offset;
1446 	args->op.rw.length = length;
1447 
1448 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1449 	pin_buf_length = num_pages * page_size;
1450 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL);
1451 
1452 	args->op.rw.start_page = start_page;
1453 	args->op.rw.num_pages = num_pages;
1454 
1455 	if (!is_read && file->length < offset + length) {
1456 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1457 	} else {
1458 		__do_blob_read(req, 0);
1459 	}
1460 }
1461 
1462 void
1463 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1464 		      void *payload, uint64_t offset, uint64_t length,
1465 		      spdk_file_op_complete cb_fn, void *cb_arg)
1466 {
1467 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1468 }
1469 
1470 void
1471 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1472 		     void *payload, uint64_t offset, uint64_t length,
1473 		     spdk_file_op_complete cb_fn, void *cb_arg)
1474 {
1475 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n",
1476 		      file->name, offset, length);
1477 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1478 }
1479 
1480 struct spdk_io_channel *
1481 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1482 {
1483 	struct spdk_io_channel *io_channel;
1484 	struct spdk_fs_channel *fs_channel;
1485 
1486 	io_channel = spdk_get_io_channel(&fs->io_target);
1487 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1488 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1489 	fs_channel->send_request = __send_request_direct;
1490 
1491 	return io_channel;
1492 }
1493 
1494 struct spdk_io_channel *
1495 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1496 {
1497 	struct spdk_io_channel *io_channel;
1498 	struct spdk_fs_channel *fs_channel;
1499 
1500 	io_channel = spdk_get_io_channel(&fs->io_target);
1501 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1502 	fs_channel->send_request = fs->send_request;
1503 	fs_channel->sync = 1;
1504 	pthread_spin_init(&fs_channel->lock, 0);
1505 
1506 	return io_channel;
1507 }
1508 
1509 void
1510 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1511 {
1512 	spdk_put_io_channel(channel);
1513 }
1514 
1515 void
1516 spdk_fs_set_cache_size(uint64_t size_in_mb)
1517 {
1518 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1519 }
1520 
1521 uint64_t
1522 spdk_fs_get_cache_size(void)
1523 {
1524 	return g_fs_cache_size / (1024 * 1024);
1525 }
1526 
1527 static void __file_flush(void *_args);
1528 
1529 static void *
1530 alloc_cache_memory_buffer(struct spdk_file *context)
1531 {
1532 	struct spdk_file *file;
1533 	void *buf;
1534 
1535 	buf = spdk_mempool_get(g_cache_pool);
1536 	if (buf != NULL) {
1537 		return buf;
1538 	}
1539 
1540 	pthread_spin_lock(&g_caches_lock);
1541 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1542 		if (!file->open_for_writing &&
1543 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1544 		    file != context) {
1545 			break;
1546 		}
1547 	}
1548 	pthread_spin_unlock(&g_caches_lock);
1549 	if (file != NULL) {
1550 		cache_free_buffers(file);
1551 		buf = spdk_mempool_get(g_cache_pool);
1552 		if (buf != NULL) {
1553 			return buf;
1554 		}
1555 	}
1556 
1557 	pthread_spin_lock(&g_caches_lock);
1558 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1559 		if (!file->open_for_writing && file != context) {
1560 			break;
1561 		}
1562 	}
1563 	pthread_spin_unlock(&g_caches_lock);
1564 	if (file != NULL) {
1565 		cache_free_buffers(file);
1566 		buf = spdk_mempool_get(g_cache_pool);
1567 		if (buf != NULL) {
1568 			return buf;
1569 		}
1570 	}
1571 
1572 	pthread_spin_lock(&g_caches_lock);
1573 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1574 		if (file != context) {
1575 			break;
1576 		}
1577 	}
1578 	pthread_spin_unlock(&g_caches_lock);
1579 	if (file != NULL) {
1580 		cache_free_buffers(file);
1581 		buf = spdk_mempool_get(g_cache_pool);
1582 		if (buf != NULL) {
1583 			return buf;
1584 		}
1585 	}
1586 
1587 	return NULL;
1588 }
1589 
1590 static struct cache_buffer *
1591 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1592 {
1593 	struct cache_buffer *buf;
1594 	int count = 0;
1595 
1596 	buf = calloc(1, sizeof(*buf));
1597 	if (buf == NULL) {
1598 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n");
1599 		return NULL;
1600 	}
1601 
1602 	buf->buf = alloc_cache_memory_buffer(file);
1603 	while (buf->buf == NULL) {
1604 		/*
1605 		 * TODO: alloc_cache_memory_buffer() should eventually free
1606 		 *  some buffers.  Need a more sophisticated check here, instead
1607 		 *  of just bailing if 100 tries does not result in getting a
1608 		 *  free buffer.  This will involve using the sync channel's
1609 		 *  semaphore to block until a buffer becomes available.
1610 		 */
1611 		if (count++ == 100) {
1612 			SPDK_ERRLOG("could not allocate cache buffer\n");
1613 			assert(false);
1614 			free(buf);
1615 			return NULL;
1616 		}
1617 		buf->buf = alloc_cache_memory_buffer(file);
1618 	}
1619 
1620 	buf->buf_size = CACHE_BUFFER_SIZE;
1621 	buf->offset = offset;
1622 
1623 	pthread_spin_lock(&g_caches_lock);
1624 	if (file->tree->present_mask == 0) {
1625 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1626 	}
1627 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1628 	pthread_spin_unlock(&g_caches_lock);
1629 
1630 	return buf;
1631 }
1632 
1633 static struct cache_buffer *
1634 cache_append_buffer(struct spdk_file *file)
1635 {
1636 	struct cache_buffer *last;
1637 
1638 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1639 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1640 
1641 	last = cache_insert_buffer(file, file->append_pos);
1642 	if (last == NULL) {
1643 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n");
1644 		return NULL;
1645 	}
1646 
1647 	file->last = last;
1648 
1649 	return last;
1650 }
1651 
1652 static void
1653 __wake_caller(struct spdk_fs_cb_args *args)
1654 {
1655 	sem_post(args->sem);
1656 }
1657 
1658 static void __check_sync_reqs(struct spdk_file *file);
1659 
1660 static void
1661 __file_cache_finish_sync(struct spdk_file *file)
1662 {
1663 	struct spdk_fs_request *sync_req;
1664 	struct spdk_fs_cb_args *sync_args;
1665 
1666 	pthread_spin_lock(&file->lock);
1667 	sync_req = TAILQ_FIRST(&file->sync_requests);
1668 	sync_args = &sync_req->args;
1669 	assert(sync_args->op.sync.offset <= file->length_flushed);
1670 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1671 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1672 	pthread_spin_unlock(&file->lock);
1673 
1674 	sync_args->fn.file_op(sync_args->arg, 0);
1675 	__check_sync_reqs(file);
1676 
1677 	pthread_spin_lock(&file->lock);
1678 	free_fs_request(sync_req);
1679 	pthread_spin_unlock(&file->lock);
1680 }
1681 
1682 static void
1683 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1684 {
1685 	struct spdk_file *file = ctx;
1686 
1687 	__file_cache_finish_sync(file);
1688 }
1689 
1690 static void
1691 __free_args(struct spdk_fs_cb_args *args)
1692 {
1693 	struct spdk_fs_request *req;
1694 
1695 	if (!args->from_request) {
1696 		free(args);
1697 	} else {
1698 		/* Depends on args being at the start of the spdk_fs_request structure. */
1699 		req = (struct spdk_fs_request *)args;
1700 		free_fs_request(req);
1701 	}
1702 }
1703 
1704 static void
1705 __check_sync_reqs(struct spdk_file *file)
1706 {
1707 	struct spdk_fs_request *sync_req;
1708 
1709 	pthread_spin_lock(&file->lock);
1710 
1711 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1712 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1713 			break;
1714 		}
1715 	}
1716 
1717 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1718 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1719 		sync_req->args.op.sync.xattr_in_progress = true;
1720 		spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed,
1721 				       sizeof(file->length_flushed));
1722 
1723 		pthread_spin_unlock(&file->lock);
1724 		spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file);
1725 	} else {
1726 		pthread_spin_unlock(&file->lock);
1727 	}
1728 }
1729 
1730 static void
1731 __file_flush_done(void *arg, int bserrno)
1732 {
1733 	struct spdk_fs_cb_args *args = arg;
1734 	struct spdk_file *file = args->file;
1735 	struct cache_buffer *next = args->op.flush.cache_buffer;
1736 
1737 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1738 
1739 	pthread_spin_lock(&file->lock);
1740 	next->in_progress = false;
1741 	next->bytes_flushed += args->op.flush.length;
1742 	file->length_flushed += args->op.flush.length;
1743 	if (file->length_flushed > file->length) {
1744 		file->length = file->length_flushed;
1745 	}
1746 	if (next->bytes_flushed == next->buf_size) {
1747 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1748 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1749 	}
1750 
1751 	/*
1752 	 * Assert that there is no cached data that extends past the end of the underlying
1753 	 *  blob.
1754 	 */
1755 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1756 	       next->bytes_filled == 0);
1757 
1758 	pthread_spin_unlock(&file->lock);
1759 
1760 	__check_sync_reqs(file);
1761 
1762 	__file_flush(args);
1763 }
1764 
1765 static void
1766 __file_flush(void *_args)
1767 {
1768 	struct spdk_fs_cb_args *args = _args;
1769 	struct spdk_file *file = args->file;
1770 	struct cache_buffer *next;
1771 	uint64_t offset, length, start_page, num_pages;
1772 	uint32_t page_size;
1773 
1774 	pthread_spin_lock(&file->lock);
1775 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1776 	if (next == NULL || next->in_progress) {
1777 		/*
1778 		 * There is either no data to flush, or a flush I/O is already in
1779 		 *  progress.  So return immediately - if a flush I/O is in
1780 		 *  progress we will flush more data after that is completed.
1781 		 */
1782 		__free_args(args);
1783 		pthread_spin_unlock(&file->lock);
1784 		return;
1785 	}
1786 
1787 	offset = next->offset + next->bytes_flushed;
1788 	length = next->bytes_filled - next->bytes_flushed;
1789 	if (length == 0) {
1790 		__free_args(args);
1791 		pthread_spin_unlock(&file->lock);
1792 		return;
1793 	}
1794 	args->op.flush.length = length;
1795 	args->op.flush.cache_buffer = next;
1796 
1797 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1798 
1799 	next->in_progress = true;
1800 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1801 		     offset, length, start_page, num_pages);
1802 	pthread_spin_unlock(&file->lock);
1803 	spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1804 			      next->buf + (start_page * page_size) - next->offset,
1805 			      start_page, num_pages,
1806 			      __file_flush_done, args);
1807 }
1808 
1809 static void
1810 __file_extend_done(void *arg, int bserrno)
1811 {
1812 	struct spdk_fs_cb_args *args = arg;
1813 
1814 	__wake_caller(args);
1815 }
1816 
1817 static void
1818 __file_extend_blob(void *_args)
1819 {
1820 	struct spdk_fs_cb_args *args = _args;
1821 	struct spdk_file *file = args->file;
1822 
1823 	spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters);
1824 
1825 	spdk_bs_md_sync_blob(file->blob, __file_extend_done, args);
1826 }
1827 
1828 static void
1829 __rw_from_file_done(void *arg, int bserrno)
1830 {
1831 	struct spdk_fs_cb_args *args = arg;
1832 
1833 	__wake_caller(args);
1834 	__free_args(args);
1835 }
1836 
1837 static void
1838 __rw_from_file(void *_args)
1839 {
1840 	struct spdk_fs_cb_args *args = _args;
1841 	struct spdk_file *file = args->file;
1842 
1843 	if (args->op.rw.is_read) {
1844 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1845 				     args->op.rw.offset, args->op.rw.length,
1846 				     __rw_from_file_done, args);
1847 	} else {
1848 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1849 				      args->op.rw.offset, args->op.rw.length,
1850 				      __rw_from_file_done, args);
1851 	}
1852 }
1853 
1854 static int
1855 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
1856 		    uint64_t offset, uint64_t length, bool is_read)
1857 {
1858 	struct spdk_fs_cb_args *args;
1859 
1860 	args = calloc(1, sizeof(*args));
1861 	if (args == NULL) {
1862 		sem_post(sem);
1863 		return -ENOMEM;
1864 	}
1865 
1866 	args->file = file;
1867 	args->sem = sem;
1868 	args->op.rw.user_buf = payload;
1869 	args->op.rw.offset = offset;
1870 	args->op.rw.length = length;
1871 	args->op.rw.is_read = is_read;
1872 	file->fs->send_request(__rw_from_file, args);
1873 	return 0;
1874 }
1875 
1876 int
1877 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
1878 		void *payload, uint64_t offset, uint64_t length)
1879 {
1880 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1881 	struct spdk_fs_cb_args *args;
1882 	uint64_t rem_length, copy, blob_size, cluster_sz;
1883 	uint32_t cache_buffers_filled = 0;
1884 	uint8_t *cur_payload;
1885 	struct cache_buffer *last;
1886 
1887 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
1888 
1889 	if (length == 0) {
1890 		return 0;
1891 	}
1892 
1893 	if (offset != file->append_pos) {
1894 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
1895 		return -EINVAL;
1896 	}
1897 
1898 	pthread_spin_lock(&file->lock);
1899 	file->open_for_writing = true;
1900 
1901 	if (file->last == NULL) {
1902 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
1903 			cache_append_buffer(file);
1904 		} else {
1905 			int rc;
1906 
1907 			file->append_pos += length;
1908 			pthread_spin_unlock(&file->lock);
1909 			rc = __send_rw_from_file(file, &channel->sem, payload,
1910 						 offset, length, false);
1911 			sem_wait(&channel->sem);
1912 			return rc;
1913 		}
1914 	}
1915 
1916 	blob_size = __file_get_blob_size(file);
1917 
1918 	if ((offset + length) > blob_size) {
1919 		struct spdk_fs_cb_args extend_args = {};
1920 
1921 		cluster_sz = file->fs->bs_opts.cluster_sz;
1922 		extend_args.sem = &channel->sem;
1923 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
1924 		extend_args.file = file;
1925 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
1926 		pthread_spin_unlock(&file->lock);
1927 		file->fs->send_request(__file_extend_blob, &extend_args);
1928 		sem_wait(&channel->sem);
1929 	}
1930 
1931 	last = file->last;
1932 	rem_length = length;
1933 	cur_payload = payload;
1934 	while (rem_length > 0) {
1935 		copy = last->buf_size - last->bytes_filled;
1936 		if (copy > rem_length) {
1937 			copy = rem_length;
1938 		}
1939 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
1940 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
1941 		file->append_pos += copy;
1942 		if (file->length < file->append_pos) {
1943 			file->length = file->append_pos;
1944 		}
1945 		cur_payload += copy;
1946 		last->bytes_filled += copy;
1947 		rem_length -= copy;
1948 		if (last->bytes_filled == last->buf_size) {
1949 			cache_buffers_filled++;
1950 			last = cache_append_buffer(file);
1951 			if (last == NULL) {
1952 				BLOBFS_TRACE(file, "nomem\n");
1953 				pthread_spin_unlock(&file->lock);
1954 				return -ENOMEM;
1955 			}
1956 		}
1957 	}
1958 
1959 	if (cache_buffers_filled == 0) {
1960 		pthread_spin_unlock(&file->lock);
1961 		return 0;
1962 	}
1963 
1964 	args = calloc(1, sizeof(*args));
1965 	if (args == NULL) {
1966 		pthread_spin_unlock(&file->lock);
1967 		return -ENOMEM;
1968 	}
1969 
1970 	args->file = file;
1971 	file->fs->send_request(__file_flush, args);
1972 	pthread_spin_unlock(&file->lock);
1973 	return 0;
1974 }
1975 
1976 static void
1977 __readahead_done(void *arg, int bserrno)
1978 {
1979 	struct spdk_fs_cb_args *args = arg;
1980 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
1981 	struct spdk_file *file = args->file;
1982 
1983 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
1984 
1985 	pthread_spin_lock(&file->lock);
1986 	cache_buffer->bytes_filled = args->op.readahead.length;
1987 	cache_buffer->bytes_flushed = args->op.readahead.length;
1988 	cache_buffer->in_progress = false;
1989 	pthread_spin_unlock(&file->lock);
1990 
1991 	__free_args(args);
1992 }
1993 
1994 static void
1995 __readahead(void *_args)
1996 {
1997 	struct spdk_fs_cb_args *args = _args;
1998 	struct spdk_file *file = args->file;
1999 	uint64_t offset, length, start_page, num_pages;
2000 	uint32_t page_size;
2001 
2002 	offset = args->op.readahead.offset;
2003 	length = args->op.readahead.length;
2004 	assert(length > 0);
2005 
2006 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2007 
2008 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2009 		     offset, length, start_page, num_pages);
2010 	spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2011 			     args->op.readahead.cache_buffer->buf,
2012 			     start_page, num_pages,
2013 			     __readahead_done, args);
2014 }
2015 
2016 static uint64_t
2017 __next_cache_buffer_offset(uint64_t offset)
2018 {
2019 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2020 }
2021 
2022 static void
2023 check_readahead(struct spdk_file *file, uint64_t offset)
2024 {
2025 	struct spdk_fs_cb_args *args;
2026 
2027 	offset = __next_cache_buffer_offset(offset);
2028 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2029 		return;
2030 	}
2031 
2032 	args = calloc(1, sizeof(*args));
2033 	if (args == NULL) {
2034 		return;
2035 	}
2036 
2037 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2038 
2039 	args->file = file;
2040 	args->op.readahead.offset = offset;
2041 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2042 	args->op.readahead.cache_buffer->in_progress = true;
2043 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2044 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2045 	} else {
2046 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2047 	}
2048 	file->fs->send_request(__readahead, args);
2049 }
2050 
2051 static int
2052 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2053 {
2054 	struct cache_buffer *buf;
2055 	int rc;
2056 
2057 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2058 	if (buf == NULL) {
2059 		pthread_spin_unlock(&file->lock);
2060 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2061 		pthread_spin_lock(&file->lock);
2062 		return rc;
2063 	}
2064 
2065 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2066 		length = buf->offset + buf->bytes_filled - offset;
2067 	}
2068 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2069 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2070 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2071 		pthread_spin_lock(&g_caches_lock);
2072 		spdk_tree_remove_buffer(file->tree, buf);
2073 		if (file->tree->present_mask == 0) {
2074 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2075 		}
2076 		pthread_spin_unlock(&g_caches_lock);
2077 	}
2078 
2079 	sem_post(sem);
2080 	return 0;
2081 }
2082 
2083 int64_t
2084 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2085 	       void *payload, uint64_t offset, uint64_t length)
2086 {
2087 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2088 	uint64_t final_offset, final_length;
2089 	uint32_t sub_reads = 0;
2090 	int rc = 0;
2091 
2092 	pthread_spin_lock(&file->lock);
2093 
2094 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2095 
2096 	file->open_for_writing = false;
2097 
2098 	if (length == 0 || offset >= file->append_pos) {
2099 		pthread_spin_unlock(&file->lock);
2100 		return 0;
2101 	}
2102 
2103 	if (offset + length > file->append_pos) {
2104 		length = file->append_pos - offset;
2105 	}
2106 
2107 	if (offset != file->next_seq_offset) {
2108 		file->seq_byte_count = 0;
2109 	}
2110 	file->seq_byte_count += length;
2111 	file->next_seq_offset = offset + length;
2112 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2113 		check_readahead(file, offset);
2114 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2115 	}
2116 
2117 	final_length = 0;
2118 	final_offset = offset + length;
2119 	while (offset < final_offset) {
2120 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2121 		if (length > (final_offset - offset)) {
2122 			length = final_offset - offset;
2123 		}
2124 		rc = __file_read(file, payload, offset, length, &channel->sem);
2125 		if (rc == 0) {
2126 			final_length += length;
2127 		} else {
2128 			break;
2129 		}
2130 		payload += length;
2131 		offset += length;
2132 		sub_reads++;
2133 	}
2134 	pthread_spin_unlock(&file->lock);
2135 	while (sub_reads-- > 0) {
2136 		sem_wait(&channel->sem);
2137 	}
2138 	if (rc == 0) {
2139 		return final_length;
2140 	} else {
2141 		return rc;
2142 	}
2143 }
2144 
2145 static void
2146 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2147 	   spdk_file_op_complete cb_fn, void *cb_arg)
2148 {
2149 	struct spdk_fs_request *sync_req;
2150 	struct spdk_fs_request *flush_req;
2151 	struct spdk_fs_cb_args *sync_args;
2152 	struct spdk_fs_cb_args *flush_args;
2153 
2154 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2155 
2156 	pthread_spin_lock(&file->lock);
2157 	if (file->append_pos <= file->length_flushed || file->last == NULL) {
2158 		BLOBFS_TRACE(file, "done - no data to flush\n");
2159 		pthread_spin_unlock(&file->lock);
2160 		cb_fn(cb_arg, 0);
2161 		return;
2162 	}
2163 
2164 	sync_req = alloc_fs_request(channel);
2165 	assert(sync_req != NULL);
2166 	sync_args = &sync_req->args;
2167 
2168 	flush_req = alloc_fs_request(channel);
2169 	assert(flush_req != NULL);
2170 	flush_args = &flush_req->args;
2171 
2172 	sync_args->file = file;
2173 	sync_args->fn.file_op = cb_fn;
2174 	sync_args->arg = cb_arg;
2175 	sync_args->op.sync.offset = file->append_pos;
2176 	sync_args->op.sync.xattr_in_progress = false;
2177 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2178 	pthread_spin_unlock(&file->lock);
2179 
2180 	flush_args->file = file;
2181 	channel->send_request(__file_flush, flush_args);
2182 }
2183 
2184 int
2185 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2186 {
2187 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2188 
2189 	_file_sync(file, channel, __sem_post, &channel->sem);
2190 	sem_wait(&channel->sem);
2191 
2192 	return 0;
2193 }
2194 
2195 void
2196 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2197 		     spdk_file_op_complete cb_fn, void *cb_arg)
2198 {
2199 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2200 
2201 	_file_sync(file, channel, cb_fn, cb_arg);
2202 }
2203 
2204 void
2205 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2206 {
2207 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2208 	file->priority = priority;
2209 
2210 }
2211 
2212 /*
2213  * Close routines
2214  */
2215 
2216 static void
2217 __file_close_async_done(void *ctx, int bserrno)
2218 {
2219 	struct spdk_fs_request *req = ctx;
2220 	struct spdk_fs_cb_args *args = &req->args;
2221 
2222 	args->fn.file_op(args->arg, bserrno);
2223 	free_fs_request(req);
2224 }
2225 
2226 static void
2227 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2228 {
2229 	pthread_spin_lock(&file->lock);
2230 	if (file->ref_count == 0) {
2231 		pthread_spin_unlock(&file->lock);
2232 		__file_close_async_done(req, -EBADF);
2233 		return;
2234 	}
2235 
2236 	file->ref_count--;
2237 	if (file->ref_count > 0) {
2238 		pthread_spin_unlock(&file->lock);
2239 		__file_close_async_done(req, 0);
2240 		return;
2241 	}
2242 
2243 	pthread_spin_unlock(&file->lock);
2244 
2245 	spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req);
2246 }
2247 
2248 static void
2249 __file_close_async__sync_done(void *arg, int fserrno)
2250 {
2251 	struct spdk_fs_request *req = arg;
2252 	struct spdk_fs_cb_args *args = &req->args;
2253 
2254 	__file_close_async(args->file, req);
2255 }
2256 
2257 void
2258 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2259 {
2260 	struct spdk_fs_request *req;
2261 	struct spdk_fs_cb_args *args;
2262 
2263 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2264 	if (req == NULL) {
2265 		cb_fn(cb_arg, -ENOMEM);
2266 		return;
2267 	}
2268 
2269 	args = &req->args;
2270 	args->file = file;
2271 	args->fn.file_op = cb_fn;
2272 	args->arg = cb_arg;
2273 
2274 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2275 }
2276 
2277 static void
2278 __file_close_done(void *arg, int fserrno)
2279 {
2280 	struct spdk_fs_cb_args *args = arg;
2281 
2282 	args->rc = fserrno;
2283 	sem_post(args->sem);
2284 }
2285 
2286 static void
2287 __file_close(void *arg)
2288 {
2289 	struct spdk_fs_request *req = arg;
2290 	struct spdk_fs_cb_args *args = &req->args;
2291 	struct spdk_file *file = args->file;
2292 
2293 	__file_close_async(file, req);
2294 }
2295 
2296 int
2297 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2298 {
2299 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2300 	struct spdk_fs_request *req;
2301 	struct spdk_fs_cb_args *args;
2302 
2303 	req = alloc_fs_request(channel);
2304 	assert(req != NULL);
2305 
2306 	args = &req->args;
2307 
2308 	spdk_file_sync(file, _channel);
2309 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2310 	args->file = file;
2311 	args->sem = &channel->sem;
2312 	args->fn.file_op = __file_close_done;
2313 	args->arg = req;
2314 	channel->send_request(__file_close, req);
2315 	sem_wait(&channel->sem);
2316 
2317 	return args->rc;
2318 }
2319 
2320 static void
2321 cache_free_buffers(struct spdk_file *file)
2322 {
2323 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2324 	pthread_spin_lock(&file->lock);
2325 	pthread_spin_lock(&g_caches_lock);
2326 	if (file->tree->present_mask == 0) {
2327 		pthread_spin_unlock(&g_caches_lock);
2328 		pthread_spin_unlock(&file->lock);
2329 		return;
2330 	}
2331 	spdk_tree_free_buffers(file->tree);
2332 
2333 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2334 	/* If not freed, put it in the end of the queue */
2335 	if (file->tree->present_mask != 0) {
2336 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2337 	}
2338 	file->last = NULL;
2339 	pthread_spin_unlock(&g_caches_lock);
2340 	pthread_spin_unlock(&file->lock);
2341 }
2342 
2343 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS);
2344 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW);
2345