xref: /spdk/lib/blobfs/blobfs.c (revision 19de08066d6259f6358e89ad2fde4f6704010f05)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "blobfs_internal.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk_internal/log.h"
45 
46 #define BLOBFS_TRACE(file, str, args...) \
47 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args)
48 
49 #define BLOBFS_TRACE_RW(file, str, args...) \
50 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args)
51 
52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
53 
54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE;
55 static struct spdk_mempool *g_cache_pool;
56 static TAILQ_HEAD(, spdk_file) g_caches;
57 static int g_fs_count = 0;
58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
59 static pthread_spinlock_t g_caches_lock;
60 
61 static void
62 __sem_post(void *arg, int bserrno)
63 {
64 	sem_t *sem = arg;
65 
66 	sem_post(sem);
67 }
68 
69 void
70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
71 {
72 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
73 	free(cache_buffer);
74 }
75 
76 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
77 
78 struct spdk_file {
79 	struct spdk_filesystem	*fs;
80 	struct spdk_blob	*blob;
81 	char			*name;
82 	uint64_t		length;
83 	bool                    is_deleted;
84 	bool			open_for_writing;
85 	uint64_t		length_flushed;
86 	uint64_t		append_pos;
87 	uint64_t		seq_byte_count;
88 	uint64_t		next_seq_offset;
89 	uint32_t		priority;
90 	TAILQ_ENTRY(spdk_file)	tailq;
91 	spdk_blob_id		blobid;
92 	uint32_t		ref_count;
93 	pthread_spinlock_t	lock;
94 	struct cache_buffer	*last;
95 	struct cache_tree	*tree;
96 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
97 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
98 	TAILQ_ENTRY(spdk_file)	cache_tailq;
99 };
100 
101 struct spdk_deleted_file {
102 	spdk_blob_id	id;
103 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
104 };
105 
106 struct spdk_filesystem {
107 	struct spdk_blob_store	*bs;
108 	TAILQ_HEAD(, spdk_file)	files;
109 	struct spdk_bs_opts	bs_opts;
110 	struct spdk_bs_dev	*bdev;
111 	fs_send_request_fn	send_request;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*sync_io_channel;
116 		struct spdk_fs_channel	*sync_fs_channel;
117 	} sync_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 		struct spdk_io_channel	*md_io_channel;
122 		struct spdk_fs_channel	*md_fs_channel;
123 	} md_target;
124 
125 	struct {
126 		uint32_t		max_ops;
127 	} io_target;
128 };
129 
130 struct spdk_fs_cb_args {
131 	union {
132 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
133 		spdk_fs_op_complete			fs_op;
134 		spdk_file_op_with_handle_complete	file_op_with_handle;
135 		spdk_file_op_complete			file_op;
136 		spdk_file_stat_op_complete		stat_op;
137 	} fn;
138 	void *arg;
139 	sem_t *sem;
140 	struct spdk_filesystem *fs;
141 	struct spdk_file *file;
142 	int rc;
143 	bool from_request;
144 	union {
145 		struct {
146 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
147 		} fs_load;
148 		struct {
149 			uint64_t	length;
150 		} truncate;
151 		struct {
152 			struct spdk_io_channel	*channel;
153 			void		*user_buf;
154 			void		*pin_buf;
155 			int		is_read;
156 			off_t		offset;
157 			size_t		length;
158 			uint64_t	start_page;
159 			uint64_t	num_pages;
160 			uint32_t	blocklen;
161 		} rw;
162 		struct {
163 			const char	*old_name;
164 			const char	*new_name;
165 		} rename;
166 		struct {
167 			struct cache_buffer	*cache_buffer;
168 			uint64_t		length;
169 		} flush;
170 		struct {
171 			struct cache_buffer	*cache_buffer;
172 			uint64_t		length;
173 			uint64_t		offset;
174 		} readahead;
175 		struct {
176 			uint64_t			offset;
177 			TAILQ_ENTRY(spdk_fs_request)	tailq;
178 			bool				xattr_in_progress;
179 		} sync;
180 		struct {
181 			uint32_t			num_clusters;
182 		} resize;
183 		struct {
184 			const char	*name;
185 			uint32_t	flags;
186 			TAILQ_ENTRY(spdk_fs_request)	tailq;
187 		} open;
188 		struct {
189 			const char	*name;
190 		} create;
191 		struct {
192 			const char	*name;
193 		} delete;
194 		struct {
195 			const char	*name;
196 		} stat;
197 	} op;
198 };
199 
200 static void cache_free_buffers(struct spdk_file *file);
201 
202 static void
203 __initialize_cache(void)
204 {
205 	assert(g_cache_pool == NULL);
206 
207 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
208 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
209 					   CACHE_BUFFER_SIZE,
210 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
211 					   SPDK_ENV_SOCKET_ID_ANY);
212 	TAILQ_INIT(&g_caches);
213 	pthread_spin_init(&g_caches_lock, 0);
214 }
215 
216 static void
217 __free_cache(void)
218 {
219 	assert(g_cache_pool != NULL);
220 
221 	spdk_mempool_free(g_cache_pool);
222 	g_cache_pool = NULL;
223 }
224 
225 static uint64_t
226 __file_get_blob_size(struct spdk_file *file)
227 {
228 	uint64_t cluster_sz;
229 
230 	cluster_sz = file->fs->bs_opts.cluster_sz;
231 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
232 }
233 
234 struct spdk_fs_request {
235 	struct spdk_fs_cb_args		args;
236 	TAILQ_ENTRY(spdk_fs_request)	link;
237 	struct spdk_fs_channel		*channel;
238 };
239 
240 struct spdk_fs_channel {
241 	struct spdk_fs_request		*req_mem;
242 	TAILQ_HEAD(, spdk_fs_request)	reqs;
243 	sem_t				sem;
244 	struct spdk_filesystem		*fs;
245 	struct spdk_io_channel		*bs_channel;
246 	fs_send_request_fn		send_request;
247 	bool				sync;
248 	pthread_spinlock_t		lock;
249 };
250 
251 static struct spdk_fs_request *
252 alloc_fs_request(struct spdk_fs_channel *channel)
253 {
254 	struct spdk_fs_request *req;
255 
256 	if (channel->sync) {
257 		pthread_spin_lock(&channel->lock);
258 	}
259 
260 	req = TAILQ_FIRST(&channel->reqs);
261 	if (req) {
262 		TAILQ_REMOVE(&channel->reqs, req, link);
263 	}
264 
265 	if (channel->sync) {
266 		pthread_spin_unlock(&channel->lock);
267 	}
268 
269 	if (req == NULL) {
270 		return NULL;
271 	}
272 	memset(req, 0, sizeof(*req));
273 	req->channel = channel;
274 	req->args.from_request = true;
275 
276 	return req;
277 }
278 
279 static void
280 free_fs_request(struct spdk_fs_request *req)
281 {
282 	struct spdk_fs_channel *channel = req->channel;
283 
284 	if (channel->sync) {
285 		pthread_spin_lock(&channel->lock);
286 	}
287 
288 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
289 
290 	if (channel->sync) {
291 		pthread_spin_unlock(&channel->lock);
292 	}
293 }
294 
295 static int
296 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
297 			uint32_t max_ops)
298 {
299 	uint32_t i;
300 
301 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
302 	if (!channel->req_mem) {
303 		return -1;
304 	}
305 
306 	TAILQ_INIT(&channel->reqs);
307 	sem_init(&channel->sem, 0, 0);
308 
309 	for (i = 0; i < max_ops; i++) {
310 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
311 	}
312 
313 	channel->fs = fs;
314 
315 	return 0;
316 }
317 
318 static int
319 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
320 {
321 	struct spdk_filesystem		*fs;
322 	struct spdk_fs_channel		*channel = ctx_buf;
323 
324 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
325 
326 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
327 }
328 
329 static int
330 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
331 {
332 	struct spdk_filesystem		*fs;
333 	struct spdk_fs_channel		*channel = ctx_buf;
334 
335 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
336 
337 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
338 }
339 
340 static int
341 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
342 {
343 	struct spdk_filesystem		*fs;
344 	struct spdk_fs_channel		*channel = ctx_buf;
345 
346 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
347 
348 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
349 }
350 
351 static void
352 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
353 {
354 	struct spdk_fs_channel *channel = ctx_buf;
355 
356 	free(channel->req_mem);
357 	if (channel->bs_channel != NULL) {
358 		spdk_bs_free_io_channel(channel->bs_channel);
359 	}
360 }
361 
362 static void
363 __send_request_direct(fs_request_fn fn, void *arg)
364 {
365 	fn(arg);
366 }
367 
368 static void
369 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
370 {
371 	fs->bs = bs;
372 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
373 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
374 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
375 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
376 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
377 
378 	pthread_mutex_lock(&g_cache_init_lock);
379 	if (g_fs_count == 0) {
380 		__initialize_cache();
381 	}
382 	g_fs_count++;
383 	pthread_mutex_unlock(&g_cache_init_lock);
384 }
385 
386 static void
387 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
388 {
389 	struct spdk_fs_request *req = ctx;
390 	struct spdk_fs_cb_args *args = &req->args;
391 	struct spdk_filesystem *fs = args->fs;
392 
393 	if (bserrno == 0) {
394 		common_fs_bs_init(fs, bs);
395 	} else {
396 		free(fs);
397 		fs = NULL;
398 	}
399 
400 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
401 	free_fs_request(req);
402 }
403 
404 static struct spdk_filesystem *
405 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
406 {
407 	struct spdk_filesystem *fs;
408 
409 	fs = calloc(1, sizeof(*fs));
410 	if (fs == NULL) {
411 		return NULL;
412 	}
413 
414 	fs->bdev = dev;
415 	fs->send_request = send_request_fn;
416 	TAILQ_INIT(&fs->files);
417 
418 	fs->md_target.max_ops = 512;
419 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
420 				sizeof(struct spdk_fs_channel));
421 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
422 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
423 
424 	fs->sync_target.max_ops = 512;
425 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
426 				sizeof(struct spdk_fs_channel));
427 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
428 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
429 
430 	fs->io_target.max_ops = 512;
431 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
432 				sizeof(struct spdk_fs_channel));
433 
434 	return fs;
435 }
436 
437 void
438 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
439 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
440 {
441 	struct spdk_filesystem *fs;
442 	struct spdk_fs_request *req;
443 	struct spdk_fs_cb_args *args;
444 
445 	fs = fs_alloc(dev, send_request_fn);
446 	if (fs == NULL) {
447 		cb_fn(cb_arg, NULL, -ENOMEM);
448 		return;
449 	}
450 
451 	req = alloc_fs_request(fs->md_target.md_fs_channel);
452 	if (req == NULL) {
453 		spdk_put_io_channel(fs->md_target.md_io_channel);
454 		spdk_io_device_unregister(&fs->md_target, NULL);
455 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
456 		spdk_io_device_unregister(&fs->sync_target, NULL);
457 		spdk_io_device_unregister(&fs->io_target, NULL);
458 		free(fs);
459 		cb_fn(cb_arg, NULL, -ENOMEM);
460 		return;
461 	}
462 
463 	args = &req->args;
464 	args->fn.fs_op_with_handle = cb_fn;
465 	args->arg = cb_arg;
466 	args->fs = fs;
467 
468 	spdk_bs_init(dev, NULL, init_cb, req);
469 }
470 
471 static struct spdk_file *
472 file_alloc(struct spdk_filesystem *fs)
473 {
474 	struct spdk_file *file;
475 
476 	file = calloc(1, sizeof(*file));
477 	if (file == NULL) {
478 		return NULL;
479 	}
480 
481 	file->tree = calloc(1, sizeof(*file->tree));
482 	if (file->tree == NULL) {
483 		free(file);
484 		return NULL;
485 	}
486 
487 	file->fs = fs;
488 	TAILQ_INIT(&file->open_requests);
489 	TAILQ_INIT(&file->sync_requests);
490 	pthread_spin_init(&file->lock, 0);
491 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
492 	file->priority = SPDK_FILE_PRIORITY_LOW;
493 	return file;
494 }
495 
496 static void iter_delete_cb(void *ctx, int bserrno);
497 
498 static int
499 _handle_deleted_files(struct spdk_fs_request *req)
500 {
501 	struct spdk_fs_cb_args *args = &req->args;
502 	struct spdk_filesystem *fs = args->fs;
503 
504 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
505 		struct spdk_deleted_file *deleted_file;
506 
507 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
508 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
509 		spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req);
510 		free(deleted_file);
511 		return 0;
512 	}
513 
514 	return 1;
515 }
516 
517 static void
518 iter_delete_cb(void *ctx, int bserrno)
519 {
520 	struct spdk_fs_request *req = ctx;
521 	struct spdk_fs_cb_args *args = &req->args;
522 	struct spdk_filesystem *fs = args->fs;
523 
524 	if (_handle_deleted_files(req) == 0)
525 		return;
526 
527 	args->fn.fs_op_with_handle(args->arg, fs, 0);
528 	free_fs_request(req);
529 
530 }
531 
532 static void
533 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
534 {
535 	struct spdk_fs_request *req = ctx;
536 	struct spdk_fs_cb_args *args = &req->args;
537 	struct spdk_filesystem *fs = args->fs;
538 	uint64_t *length;
539 	const char *name;
540 	uint32_t *is_deleted;
541 	size_t value_len;
542 
543 	if (rc == -ENOENT) {
544 		/* Finished iterating */
545 		if (_handle_deleted_files(req) == 0)
546 			return;
547 		args->fn.fs_op_with_handle(args->arg, fs, 0);
548 		free_fs_request(req);
549 		return;
550 	} else if (rc < 0) {
551 		args->fn.fs_op_with_handle(args->arg, fs, rc);
552 		free_fs_request(req);
553 		return;
554 	}
555 
556 	rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len);
557 	if (rc < 0) {
558 		args->fn.fs_op_with_handle(args->arg, fs, rc);
559 		free_fs_request(req);
560 		return;
561 	}
562 
563 	rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len);
564 	if (rc < 0) {
565 		args->fn.fs_op_with_handle(args->arg, fs, rc);
566 		free_fs_request(req);
567 		return;
568 	}
569 
570 	assert(value_len == 8);
571 
572 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
573 	rc = spdk_bs_md_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
574 	if (rc < 0) {
575 		struct spdk_file *f;
576 
577 		f = file_alloc(fs);
578 		if (f == NULL) {
579 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
580 			free_fs_request(req);
581 			return;
582 		}
583 
584 		f->name = strdup(name);
585 		f->blobid = spdk_blob_get_id(blob);
586 		f->length = *length;
587 		f->length_flushed = *length;
588 		f->append_pos = *length;
589 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
590 	} else {
591 		struct spdk_deleted_file *deleted_file;
592 
593 		deleted_file = calloc(1, sizeof(*deleted_file));
594 		if (deleted_file == NULL) {
595 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
596 			free_fs_request(req);
597 			return;
598 		}
599 		deleted_file->id = spdk_blob_get_id(blob);
600 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
601 	}
602 
603 	spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req);
604 }
605 
606 static void
607 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
608 {
609 	struct spdk_fs_request *req = ctx;
610 	struct spdk_fs_cb_args *args = &req->args;
611 	struct spdk_filesystem *fs = args->fs;
612 
613 	if (bserrno != 0) {
614 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
615 		free_fs_request(req);
616 		free(fs);
617 		return;
618 	}
619 
620 	common_fs_bs_init(fs, bs);
621 	spdk_bs_md_iter_first(fs->bs, iter_cb, req);
622 }
623 
624 void
625 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
626 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
627 {
628 	struct spdk_filesystem *fs;
629 	struct spdk_fs_cb_args *args;
630 	struct spdk_fs_request *req;
631 
632 	fs = fs_alloc(dev, send_request_fn);
633 	if (fs == NULL) {
634 		cb_fn(cb_arg, NULL, -ENOMEM);
635 		return;
636 	}
637 
638 	req = alloc_fs_request(fs->md_target.md_fs_channel);
639 	if (req == NULL) {
640 		spdk_put_io_channel(fs->md_target.md_io_channel);
641 		spdk_io_device_unregister(&fs->md_target, NULL);
642 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
643 		spdk_io_device_unregister(&fs->sync_target, NULL);
644 		spdk_io_device_unregister(&fs->io_target, NULL);
645 		free(fs);
646 		cb_fn(cb_arg, NULL, -ENOMEM);
647 		return;
648 	}
649 
650 	args = &req->args;
651 	args->fn.fs_op_with_handle = cb_fn;
652 	args->arg = cb_arg;
653 	args->fs = fs;
654 	TAILQ_INIT(&args->op.fs_load.deleted_files);
655 	spdk_bs_load(dev, load_cb, req);
656 }
657 
658 static void
659 unload_cb(void *ctx, int bserrno)
660 {
661 	struct spdk_fs_request *req = ctx;
662 	struct spdk_fs_cb_args *args = &req->args;
663 	struct spdk_filesystem *fs = args->fs;
664 
665 	pthread_mutex_lock(&g_cache_init_lock);
666 	g_fs_count--;
667 	if (g_fs_count == 0) {
668 		__free_cache();
669 	}
670 	pthread_mutex_unlock(&g_cache_init_lock);
671 
672 	args->fn.fs_op(args->arg, bserrno);
673 	free(req);
674 
675 	spdk_io_device_unregister(&fs->io_target, NULL);
676 	spdk_io_device_unregister(&fs->sync_target, NULL);
677 	spdk_io_device_unregister(&fs->md_target, NULL);
678 
679 	free(fs);
680 }
681 
682 void
683 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
684 {
685 	struct spdk_fs_request *req;
686 	struct spdk_fs_cb_args *args;
687 
688 	/*
689 	 * We must free the md_channel before unloading the blobstore, so just
690 	 *  allocate this request from the general heap.
691 	 */
692 	req = calloc(1, sizeof(*req));
693 	if (req == NULL) {
694 		cb_fn(cb_arg, -ENOMEM);
695 		return;
696 	}
697 
698 	args = &req->args;
699 	args->fn.fs_op = cb_fn;
700 	args->arg = cb_arg;
701 	args->fs = fs;
702 
703 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
704 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
705 	spdk_bs_unload(fs->bs, unload_cb, req);
706 }
707 
708 static struct spdk_file *
709 fs_find_file(struct spdk_filesystem *fs, const char *name)
710 {
711 	struct spdk_file *file;
712 
713 	TAILQ_FOREACH(file, &fs->files, tailq) {
714 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
715 			return file;
716 		}
717 	}
718 
719 	return NULL;
720 }
721 
722 void
723 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
724 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
725 {
726 	struct spdk_file_stat stat;
727 	struct spdk_file *f = NULL;
728 
729 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
730 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
731 		return;
732 	}
733 
734 	f = fs_find_file(fs, name);
735 	if (f != NULL) {
736 		stat.blobid = f->blobid;
737 		stat.size = f->length;
738 		cb_fn(cb_arg, &stat, 0);
739 		return;
740 	}
741 
742 	cb_fn(cb_arg, NULL, -ENOENT);
743 }
744 
745 static void
746 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
747 {
748 	struct spdk_fs_request *req = arg;
749 	struct spdk_fs_cb_args *args = &req->args;
750 
751 	args->rc = fserrno;
752 	if (fserrno == 0) {
753 		memcpy(args->arg, stat, sizeof(*stat));
754 	}
755 	sem_post(args->sem);
756 }
757 
758 static void
759 __file_stat(void *arg)
760 {
761 	struct spdk_fs_request *req = arg;
762 	struct spdk_fs_cb_args *args = &req->args;
763 
764 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
765 				args->fn.stat_op, req);
766 }
767 
768 int
769 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
770 		  const char *name, struct spdk_file_stat *stat)
771 {
772 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
773 	struct spdk_fs_request *req;
774 	int rc;
775 
776 	req = alloc_fs_request(channel);
777 	assert(req != NULL);
778 
779 	req->args.fs = fs;
780 	req->args.op.stat.name = name;
781 	req->args.fn.stat_op = __copy_stat;
782 	req->args.arg = stat;
783 	req->args.sem = &channel->sem;
784 	channel->send_request(__file_stat, req);
785 	sem_wait(&channel->sem);
786 
787 	rc = req->args.rc;
788 	free_fs_request(req);
789 
790 	return rc;
791 }
792 
793 static void
794 fs_create_blob_close_cb(void *ctx, int bserrno)
795 {
796 	struct spdk_fs_request *req = ctx;
797 	struct spdk_fs_cb_args *args = &req->args;
798 
799 	args->fn.file_op(args->arg, bserrno);
800 	free_fs_request(req);
801 }
802 
803 static void
804 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
805 {
806 	struct spdk_fs_request *req = ctx;
807 	struct spdk_fs_cb_args *args = &req->args;
808 	struct spdk_file *f = args->file;
809 	uint64_t length = 0;
810 
811 	f->blob = blob;
812 	spdk_bs_md_resize_blob(blob, 1);
813 	spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
814 	spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
815 
816 	spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args);
817 }
818 
819 static void
820 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
821 {
822 	struct spdk_fs_request *req = ctx;
823 	struct spdk_fs_cb_args *args = &req->args;
824 	struct spdk_file *f = args->file;
825 
826 	f->blobid = blobid;
827 	spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
828 }
829 
830 void
831 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
832 			  spdk_file_op_complete cb_fn, void *cb_arg)
833 {
834 	struct spdk_file *file;
835 	struct spdk_fs_request *req;
836 	struct spdk_fs_cb_args *args;
837 
838 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
839 		cb_fn(cb_arg, -ENAMETOOLONG);
840 		return;
841 	}
842 
843 	file = fs_find_file(fs, name);
844 	if (file != NULL) {
845 		cb_fn(cb_arg, -EEXIST);
846 		return;
847 	}
848 
849 	file = file_alloc(fs);
850 	if (file == NULL) {
851 		cb_fn(cb_arg, -ENOMEM);
852 		return;
853 	}
854 
855 	req = alloc_fs_request(fs->md_target.md_fs_channel);
856 	if (req == NULL) {
857 		cb_fn(cb_arg, -ENOMEM);
858 		return;
859 	}
860 
861 	args = &req->args;
862 	args->file = file;
863 	args->fn.file_op = cb_fn;
864 	args->arg = cb_arg;
865 
866 	file->name = strdup(name);
867 	spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args);
868 }
869 
870 static void
871 __fs_create_file_done(void *arg, int fserrno)
872 {
873 	struct spdk_fs_request *req = arg;
874 	struct spdk_fs_cb_args *args = &req->args;
875 
876 	args->rc = fserrno;
877 	sem_post(args->sem);
878 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
879 }
880 
881 static void
882 __fs_create_file(void *arg)
883 {
884 	struct spdk_fs_request *req = arg;
885 	struct spdk_fs_cb_args *args = &req->args;
886 
887 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
888 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
889 }
890 
891 int
892 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
893 {
894 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
895 	struct spdk_fs_request *req;
896 	struct spdk_fs_cb_args *args;
897 	int rc;
898 
899 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
900 
901 	req = alloc_fs_request(channel);
902 	assert(req != NULL);
903 
904 	args = &req->args;
905 	args->fs = fs;
906 	args->op.create.name = name;
907 	args->sem = &channel->sem;
908 	fs->send_request(__fs_create_file, req);
909 	sem_wait(&channel->sem);
910 	rc = args->rc;
911 	free_fs_request(req);
912 
913 	return rc;
914 }
915 
916 static void
917 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
918 {
919 	struct spdk_fs_request *req = ctx;
920 	struct spdk_fs_cb_args *args = &req->args;
921 	struct spdk_file *f = args->file;
922 
923 	f->blob = blob;
924 	while (!TAILQ_EMPTY(&f->open_requests)) {
925 		req = TAILQ_FIRST(&f->open_requests);
926 		args = &req->args;
927 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
928 		args->fn.file_op_with_handle(args->arg, f, bserrno);
929 		free_fs_request(req);
930 	}
931 }
932 
933 static void
934 fs_open_blob_create_cb(void *ctx, int bserrno)
935 {
936 	struct spdk_fs_request *req = ctx;
937 	struct spdk_fs_cb_args *args = &req->args;
938 	struct spdk_file *file = args->file;
939 	struct spdk_filesystem *fs = args->fs;
940 
941 	if (file == NULL) {
942 		/*
943 		 * This is from an open with CREATE flag - the file
944 		 *  is now created so look it up in the file list for this
945 		 *  filesystem.
946 		 */
947 		file = fs_find_file(fs, args->op.open.name);
948 		assert(file != NULL);
949 		args->file = file;
950 	}
951 
952 	file->ref_count++;
953 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
954 	if (file->ref_count == 1) {
955 		assert(file->blob == NULL);
956 		spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
957 	} else if (file->blob != NULL) {
958 		fs_open_blob_done(req, file->blob, 0);
959 	} else {
960 		/*
961 		 * The blob open for this file is in progress due to a previous
962 		 *  open request.  When that open completes, it will invoke the
963 		 *  open callback for this request.
964 		 */
965 	}
966 }
967 
968 void
969 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
970 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
971 {
972 	struct spdk_file *f = NULL;
973 	struct spdk_fs_request *req;
974 	struct spdk_fs_cb_args *args;
975 
976 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
977 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
978 		return;
979 	}
980 
981 	f = fs_find_file(fs, name);
982 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
983 		cb_fn(cb_arg, NULL, -ENOENT);
984 		return;
985 	}
986 
987 	if (f != NULL && f->is_deleted == true) {
988 		cb_fn(cb_arg, NULL, -ENOENT);
989 		return;
990 	}
991 
992 	req = alloc_fs_request(fs->md_target.md_fs_channel);
993 	if (req == NULL) {
994 		cb_fn(cb_arg, NULL, -ENOMEM);
995 		return;
996 	}
997 
998 	args = &req->args;
999 	args->fn.file_op_with_handle = cb_fn;
1000 	args->arg = cb_arg;
1001 	args->file = f;
1002 	args->fs = fs;
1003 	args->op.open.name = name;
1004 
1005 	if (f == NULL) {
1006 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1007 	} else {
1008 		fs_open_blob_create_cb(req, 0);
1009 	}
1010 }
1011 
1012 static void
1013 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1014 {
1015 	struct spdk_fs_request *req = arg;
1016 	struct spdk_fs_cb_args *args = &req->args;
1017 
1018 	args->file = file;
1019 	args->rc = bserrno;
1020 	sem_post(args->sem);
1021 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
1022 }
1023 
1024 static void
1025 __fs_open_file(void *arg)
1026 {
1027 	struct spdk_fs_request *req = arg;
1028 	struct spdk_fs_cb_args *args = &req->args;
1029 
1030 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
1031 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1032 				__fs_open_file_done, req);
1033 }
1034 
1035 int
1036 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1037 		  const char *name, uint32_t flags, struct spdk_file **file)
1038 {
1039 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1040 	struct spdk_fs_request *req;
1041 	struct spdk_fs_cb_args *args;
1042 	int rc;
1043 
1044 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1045 
1046 	req = alloc_fs_request(channel);
1047 	assert(req != NULL);
1048 
1049 	args = &req->args;
1050 	args->fs = fs;
1051 	args->op.open.name = name;
1052 	args->op.open.flags = flags;
1053 	args->sem = &channel->sem;
1054 	fs->send_request(__fs_open_file, req);
1055 	sem_wait(&channel->sem);
1056 	rc = args->rc;
1057 	if (rc == 0) {
1058 		*file = args->file;
1059 	} else {
1060 		*file = NULL;
1061 	}
1062 	free_fs_request(req);
1063 
1064 	return rc;
1065 }
1066 
1067 static void
1068 fs_rename_blob_close_cb(void *ctx, int bserrno)
1069 {
1070 	struct spdk_fs_request *req = ctx;
1071 	struct spdk_fs_cb_args *args = &req->args;
1072 
1073 	args->fn.fs_op(args->arg, bserrno);
1074 	free_fs_request(req);
1075 }
1076 
1077 static void
1078 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1079 {
1080 	struct spdk_fs_request *req = ctx;
1081 	struct spdk_fs_cb_args *args = &req->args;
1082 	struct spdk_file *f = args->file;
1083 	const char *new_name = args->op.rename.new_name;
1084 
1085 	f->blob = blob;
1086 	spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1087 	spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req);
1088 }
1089 
1090 static void
1091 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1092 {
1093 	struct spdk_fs_cb_args *args = &req->args;
1094 	struct spdk_file *f;
1095 
1096 	f = fs_find_file(args->fs, args->op.rename.old_name);
1097 	if (f == NULL) {
1098 		args->fn.fs_op(args->arg, -ENOENT);
1099 		free_fs_request(req);
1100 		return;
1101 	}
1102 
1103 	free(f->name);
1104 	f->name = strdup(args->op.rename.new_name);
1105 	args->file = f;
1106 	spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1107 }
1108 
1109 static void
1110 fs_rename_delete_done(void *arg, int fserrno)
1111 {
1112 	__spdk_fs_md_rename_file(arg);
1113 }
1114 
1115 void
1116 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1117 			  const char *old_name, const char *new_name,
1118 			  spdk_file_op_complete cb_fn, void *cb_arg)
1119 {
1120 	struct spdk_file *f;
1121 	struct spdk_fs_request *req;
1122 	struct spdk_fs_cb_args *args;
1123 
1124 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1125 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1126 		cb_fn(cb_arg, -ENAMETOOLONG);
1127 		return;
1128 	}
1129 
1130 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1131 	if (req == NULL) {
1132 		cb_fn(cb_arg, -ENOMEM);
1133 		return;
1134 	}
1135 
1136 	args = &req->args;
1137 	args->fn.fs_op = cb_fn;
1138 	args->fs = fs;
1139 	args->arg = cb_arg;
1140 	args->op.rename.old_name = old_name;
1141 	args->op.rename.new_name = new_name;
1142 
1143 	f = fs_find_file(fs, new_name);
1144 	if (f == NULL) {
1145 		__spdk_fs_md_rename_file(req);
1146 		return;
1147 	}
1148 
1149 	/*
1150 	 * The rename overwrites an existing file.  So delete the existing file, then
1151 	 *  do the actual rename.
1152 	 */
1153 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1154 }
1155 
1156 static void
1157 __fs_rename_file_done(void *arg, int fserrno)
1158 {
1159 	struct spdk_fs_request *req = arg;
1160 	struct spdk_fs_cb_args *args = &req->args;
1161 
1162 	args->rc = fserrno;
1163 	sem_post(args->sem);
1164 }
1165 
1166 static void
1167 __fs_rename_file(void *arg)
1168 {
1169 	struct spdk_fs_request *req = arg;
1170 	struct spdk_fs_cb_args *args = &req->args;
1171 
1172 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1173 				  __fs_rename_file_done, req);
1174 }
1175 
1176 int
1177 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1178 		    const char *old_name, const char *new_name)
1179 {
1180 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1181 	struct spdk_fs_request *req;
1182 	struct spdk_fs_cb_args *args;
1183 	int rc;
1184 
1185 	req = alloc_fs_request(channel);
1186 	assert(req != NULL);
1187 
1188 	args = &req->args;
1189 
1190 	args->fs = fs;
1191 	args->op.rename.old_name = old_name;
1192 	args->op.rename.new_name = new_name;
1193 	args->sem = &channel->sem;
1194 	fs->send_request(__fs_rename_file, req);
1195 	sem_wait(&channel->sem);
1196 	rc = args->rc;
1197 	free_fs_request(req);
1198 	return rc;
1199 }
1200 
1201 static void
1202 blob_delete_cb(void *ctx, int bserrno)
1203 {
1204 	struct spdk_fs_request *req = ctx;
1205 	struct spdk_fs_cb_args *args = &req->args;
1206 
1207 	args->fn.file_op(args->arg, bserrno);
1208 	free_fs_request(req);
1209 }
1210 
1211 void
1212 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1213 			  spdk_file_op_complete cb_fn, void *cb_arg)
1214 {
1215 	struct spdk_file *f;
1216 	spdk_blob_id blobid;
1217 	struct spdk_fs_request *req;
1218 	struct spdk_fs_cb_args *args;
1219 
1220 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1221 
1222 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1223 		cb_fn(cb_arg, -ENAMETOOLONG);
1224 		return;
1225 	}
1226 
1227 	f = fs_find_file(fs, name);
1228 	if (f == NULL) {
1229 		cb_fn(cb_arg, -ENOENT);
1230 		return;
1231 	}
1232 
1233 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1234 	if (req == NULL) {
1235 		cb_fn(cb_arg, -ENOMEM);
1236 		return;
1237 	}
1238 
1239 	args = &req->args;
1240 	args->fn.file_op = cb_fn;
1241 	args->arg = cb_arg;
1242 
1243 	if (f->ref_count > 0) {
1244 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1245 		f->is_deleted = true;
1246 		spdk_blob_md_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1247 		spdk_bs_md_sync_blob(f->blob, blob_delete_cb, args);
1248 		return;
1249 	}
1250 
1251 	TAILQ_REMOVE(&fs->files, f, tailq);
1252 
1253 	cache_free_buffers(f);
1254 
1255 	blobid = f->blobid;
1256 
1257 	free(f->name);
1258 	free(f->tree);
1259 	free(f);
1260 
1261 	spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1262 }
1263 
1264 static void
1265 __fs_delete_file_done(void *arg, int fserrno)
1266 {
1267 	struct spdk_fs_request *req = arg;
1268 	struct spdk_fs_cb_args *args = &req->args;
1269 
1270 	args->rc = fserrno;
1271 	sem_post(args->sem);
1272 }
1273 
1274 static void
1275 __fs_delete_file(void *arg)
1276 {
1277 	struct spdk_fs_request *req = arg;
1278 	struct spdk_fs_cb_args *args = &req->args;
1279 
1280 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1281 }
1282 
1283 int
1284 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1285 		    const char *name)
1286 {
1287 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1288 	struct spdk_fs_request *req;
1289 	struct spdk_fs_cb_args *args;
1290 	int rc;
1291 
1292 	req = alloc_fs_request(channel);
1293 	assert(req != NULL);
1294 
1295 	args = &req->args;
1296 	args->fs = fs;
1297 	args->op.delete.name = name;
1298 	args->sem = &channel->sem;
1299 	fs->send_request(__fs_delete_file, req);
1300 	sem_wait(&channel->sem);
1301 	rc = args->rc;
1302 	free_fs_request(req);
1303 
1304 	return rc;
1305 }
1306 
1307 spdk_fs_iter
1308 spdk_fs_iter_first(struct spdk_filesystem *fs)
1309 {
1310 	struct spdk_file *f;
1311 
1312 	f = TAILQ_FIRST(&fs->files);
1313 	return f;
1314 }
1315 
1316 spdk_fs_iter
1317 spdk_fs_iter_next(spdk_fs_iter iter)
1318 {
1319 	struct spdk_file *f = iter;
1320 
1321 	if (f == NULL) {
1322 		return NULL;
1323 	}
1324 
1325 	f = TAILQ_NEXT(f, tailq);
1326 	return f;
1327 }
1328 
1329 const char *
1330 spdk_file_get_name(struct spdk_file *file)
1331 {
1332 	return file->name;
1333 }
1334 
1335 uint64_t
1336 spdk_file_get_length(struct spdk_file *file)
1337 {
1338 	assert(file != NULL);
1339 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1340 	return file->length;
1341 }
1342 
1343 static void
1344 fs_truncate_complete_cb(void *ctx, int bserrno)
1345 {
1346 	struct spdk_fs_request *req = ctx;
1347 	struct spdk_fs_cb_args *args = &req->args;
1348 
1349 	args->fn.file_op(args->arg, bserrno);
1350 	free_fs_request(req);
1351 }
1352 
1353 static uint64_t
1354 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1355 {
1356 	return (length + cluster_sz - 1) / cluster_sz;
1357 }
1358 
1359 void
1360 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1361 			 spdk_file_op_complete cb_fn, void *cb_arg)
1362 {
1363 	struct spdk_filesystem *fs;
1364 	size_t num_clusters;
1365 	struct spdk_fs_request *req;
1366 	struct spdk_fs_cb_args *args;
1367 
1368 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1369 	if (length == file->length) {
1370 		cb_fn(cb_arg, 0);
1371 		return;
1372 	}
1373 
1374 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1375 	if (req == NULL) {
1376 		cb_fn(cb_arg, -ENOMEM);
1377 		return;
1378 	}
1379 
1380 	args = &req->args;
1381 	args->fn.file_op = cb_fn;
1382 	args->arg = cb_arg;
1383 	args->file = file;
1384 	fs = file->fs;
1385 
1386 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1387 
1388 	spdk_bs_md_resize_blob(file->blob, num_clusters);
1389 	spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length));
1390 
1391 	file->length = length;
1392 	if (file->append_pos > file->length) {
1393 		file->append_pos = file->length;
1394 	}
1395 
1396 	spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args);
1397 }
1398 
1399 static void
1400 __truncate(void *arg)
1401 {
1402 	struct spdk_fs_request *req = arg;
1403 	struct spdk_fs_cb_args *args = &req->args;
1404 
1405 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1406 				 args->fn.file_op, args->arg);
1407 }
1408 
1409 void
1410 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1411 		   uint64_t length)
1412 {
1413 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1414 	struct spdk_fs_request *req;
1415 	struct spdk_fs_cb_args *args;
1416 
1417 	req = alloc_fs_request(channel);
1418 	assert(req != NULL);
1419 
1420 	args = &req->args;
1421 
1422 	args->file = file;
1423 	args->op.truncate.length = length;
1424 	args->fn.file_op = __sem_post;
1425 	args->arg = &channel->sem;
1426 
1427 	channel->send_request(__truncate, req);
1428 	sem_wait(&channel->sem);
1429 	free_fs_request(req);
1430 }
1431 
1432 static void
1433 __rw_done(void *ctx, int bserrno)
1434 {
1435 	struct spdk_fs_request *req = ctx;
1436 	struct spdk_fs_cb_args *args = &req->args;
1437 
1438 	spdk_dma_free(args->op.rw.pin_buf);
1439 	args->fn.file_op(args->arg, bserrno);
1440 	free_fs_request(req);
1441 }
1442 
1443 static void
1444 __read_done(void *ctx, int bserrno)
1445 {
1446 	struct spdk_fs_request *req = ctx;
1447 	struct spdk_fs_cb_args *args = &req->args;
1448 
1449 	if (args->op.rw.is_read) {
1450 		memcpy(args->op.rw.user_buf,
1451 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1452 		       args->op.rw.length);
1453 		__rw_done(req, 0);
1454 	} else {
1455 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1456 		       args->op.rw.user_buf,
1457 		       args->op.rw.length);
1458 		spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel,
1459 				      args->op.rw.pin_buf,
1460 				      args->op.rw.start_page, args->op.rw.num_pages,
1461 				      __rw_done, req);
1462 	}
1463 }
1464 
1465 static void
1466 __do_blob_read(void *ctx, int fserrno)
1467 {
1468 	struct spdk_fs_request *req = ctx;
1469 	struct spdk_fs_cb_args *args = &req->args;
1470 
1471 	spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel,
1472 			     args->op.rw.pin_buf,
1473 			     args->op.rw.start_page, args->op.rw.num_pages,
1474 			     __read_done, req);
1475 }
1476 
1477 static void
1478 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1479 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1480 {
1481 	uint64_t end_page;
1482 
1483 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1484 	*start_page = offset / *page_size;
1485 	end_page = (offset + length - 1) / *page_size;
1486 	*num_pages = (end_page - *start_page + 1);
1487 }
1488 
1489 static void
1490 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1491 	    void *payload, uint64_t offset, uint64_t length,
1492 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1493 {
1494 	struct spdk_fs_request *req;
1495 	struct spdk_fs_cb_args *args;
1496 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1497 	uint64_t start_page, num_pages, pin_buf_length;
1498 	uint32_t page_size;
1499 
1500 	if (is_read && offset + length > file->length) {
1501 		cb_fn(cb_arg, -EINVAL);
1502 		return;
1503 	}
1504 
1505 	req = alloc_fs_request(channel);
1506 	if (req == NULL) {
1507 		cb_fn(cb_arg, -ENOMEM);
1508 		return;
1509 	}
1510 
1511 	args = &req->args;
1512 	args->fn.file_op = cb_fn;
1513 	args->arg = cb_arg;
1514 	args->file = file;
1515 	args->op.rw.channel = channel->bs_channel;
1516 	args->op.rw.user_buf = payload;
1517 	args->op.rw.is_read = is_read;
1518 	args->op.rw.offset = offset;
1519 	args->op.rw.length = length;
1520 
1521 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1522 	pin_buf_length = num_pages * page_size;
1523 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL);
1524 
1525 	args->op.rw.start_page = start_page;
1526 	args->op.rw.num_pages = num_pages;
1527 
1528 	if (!is_read && file->length < offset + length) {
1529 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1530 	} else {
1531 		__do_blob_read(req, 0);
1532 	}
1533 }
1534 
1535 void
1536 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1537 		      void *payload, uint64_t offset, uint64_t length,
1538 		      spdk_file_op_complete cb_fn, void *cb_arg)
1539 {
1540 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1541 }
1542 
1543 void
1544 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1545 		     void *payload, uint64_t offset, uint64_t length,
1546 		     spdk_file_op_complete cb_fn, void *cb_arg)
1547 {
1548 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n",
1549 		      file->name, offset, length);
1550 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1551 }
1552 
1553 struct spdk_io_channel *
1554 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1555 {
1556 	struct spdk_io_channel *io_channel;
1557 	struct spdk_fs_channel *fs_channel;
1558 
1559 	io_channel = spdk_get_io_channel(&fs->io_target);
1560 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1561 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1562 	fs_channel->send_request = __send_request_direct;
1563 
1564 	return io_channel;
1565 }
1566 
1567 struct spdk_io_channel *
1568 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1569 {
1570 	struct spdk_io_channel *io_channel;
1571 	struct spdk_fs_channel *fs_channel;
1572 
1573 	io_channel = spdk_get_io_channel(&fs->io_target);
1574 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1575 	fs_channel->send_request = fs->send_request;
1576 	fs_channel->sync = 1;
1577 	pthread_spin_init(&fs_channel->lock, 0);
1578 
1579 	return io_channel;
1580 }
1581 
1582 void
1583 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1584 {
1585 	spdk_put_io_channel(channel);
1586 }
1587 
1588 void
1589 spdk_fs_set_cache_size(uint64_t size_in_mb)
1590 {
1591 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1592 }
1593 
1594 uint64_t
1595 spdk_fs_get_cache_size(void)
1596 {
1597 	return g_fs_cache_size / (1024 * 1024);
1598 }
1599 
1600 static void __file_flush(void *_args);
1601 
1602 static void *
1603 alloc_cache_memory_buffer(struct spdk_file *context)
1604 {
1605 	struct spdk_file *file;
1606 	void *buf;
1607 
1608 	buf = spdk_mempool_get(g_cache_pool);
1609 	if (buf != NULL) {
1610 		return buf;
1611 	}
1612 
1613 	pthread_spin_lock(&g_caches_lock);
1614 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1615 		if (!file->open_for_writing &&
1616 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1617 		    file != context) {
1618 			break;
1619 		}
1620 	}
1621 	pthread_spin_unlock(&g_caches_lock);
1622 	if (file != NULL) {
1623 		cache_free_buffers(file);
1624 		buf = spdk_mempool_get(g_cache_pool);
1625 		if (buf != NULL) {
1626 			return buf;
1627 		}
1628 	}
1629 
1630 	pthread_spin_lock(&g_caches_lock);
1631 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1632 		if (!file->open_for_writing && file != context) {
1633 			break;
1634 		}
1635 	}
1636 	pthread_spin_unlock(&g_caches_lock);
1637 	if (file != NULL) {
1638 		cache_free_buffers(file);
1639 		buf = spdk_mempool_get(g_cache_pool);
1640 		if (buf != NULL) {
1641 			return buf;
1642 		}
1643 	}
1644 
1645 	pthread_spin_lock(&g_caches_lock);
1646 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1647 		if (file != context) {
1648 			break;
1649 		}
1650 	}
1651 	pthread_spin_unlock(&g_caches_lock);
1652 	if (file != NULL) {
1653 		cache_free_buffers(file);
1654 		buf = spdk_mempool_get(g_cache_pool);
1655 		if (buf != NULL) {
1656 			return buf;
1657 		}
1658 	}
1659 
1660 	return NULL;
1661 }
1662 
1663 static struct cache_buffer *
1664 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1665 {
1666 	struct cache_buffer *buf;
1667 	int count = 0;
1668 
1669 	buf = calloc(1, sizeof(*buf));
1670 	if (buf == NULL) {
1671 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n");
1672 		return NULL;
1673 	}
1674 
1675 	buf->buf = alloc_cache_memory_buffer(file);
1676 	while (buf->buf == NULL) {
1677 		/*
1678 		 * TODO: alloc_cache_memory_buffer() should eventually free
1679 		 *  some buffers.  Need a more sophisticated check here, instead
1680 		 *  of just bailing if 100 tries does not result in getting a
1681 		 *  free buffer.  This will involve using the sync channel's
1682 		 *  semaphore to block until a buffer becomes available.
1683 		 */
1684 		if (count++ == 100) {
1685 			SPDK_ERRLOG("could not allocate cache buffer\n");
1686 			assert(false);
1687 			free(buf);
1688 			return NULL;
1689 		}
1690 		buf->buf = alloc_cache_memory_buffer(file);
1691 	}
1692 
1693 	buf->buf_size = CACHE_BUFFER_SIZE;
1694 	buf->offset = offset;
1695 
1696 	pthread_spin_lock(&g_caches_lock);
1697 	if (file->tree->present_mask == 0) {
1698 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1699 	}
1700 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1701 	pthread_spin_unlock(&g_caches_lock);
1702 
1703 	return buf;
1704 }
1705 
1706 static struct cache_buffer *
1707 cache_append_buffer(struct spdk_file *file)
1708 {
1709 	struct cache_buffer *last;
1710 
1711 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1712 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1713 
1714 	last = cache_insert_buffer(file, file->append_pos);
1715 	if (last == NULL) {
1716 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n");
1717 		return NULL;
1718 	}
1719 
1720 	file->last = last;
1721 
1722 	return last;
1723 }
1724 
1725 static void
1726 __wake_caller(struct spdk_fs_cb_args *args)
1727 {
1728 	sem_post(args->sem);
1729 }
1730 
1731 static void __check_sync_reqs(struct spdk_file *file);
1732 
1733 static void
1734 __file_cache_finish_sync(struct spdk_file *file)
1735 {
1736 	struct spdk_fs_request *sync_req;
1737 	struct spdk_fs_cb_args *sync_args;
1738 
1739 	pthread_spin_lock(&file->lock);
1740 	sync_req = TAILQ_FIRST(&file->sync_requests);
1741 	sync_args = &sync_req->args;
1742 	assert(sync_args->op.sync.offset <= file->length_flushed);
1743 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1744 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1745 	pthread_spin_unlock(&file->lock);
1746 
1747 	sync_args->fn.file_op(sync_args->arg, 0);
1748 	__check_sync_reqs(file);
1749 
1750 	pthread_spin_lock(&file->lock);
1751 	free_fs_request(sync_req);
1752 	pthread_spin_unlock(&file->lock);
1753 }
1754 
1755 static void
1756 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1757 {
1758 	struct spdk_file *file = ctx;
1759 
1760 	__file_cache_finish_sync(file);
1761 }
1762 
1763 static void
1764 __free_args(struct spdk_fs_cb_args *args)
1765 {
1766 	struct spdk_fs_request *req;
1767 
1768 	if (!args->from_request) {
1769 		free(args);
1770 	} else {
1771 		/* Depends on args being at the start of the spdk_fs_request structure. */
1772 		req = (struct spdk_fs_request *)args;
1773 		free_fs_request(req);
1774 	}
1775 }
1776 
1777 static void
1778 __check_sync_reqs(struct spdk_file *file)
1779 {
1780 	struct spdk_fs_request *sync_req;
1781 
1782 	pthread_spin_lock(&file->lock);
1783 
1784 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1785 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1786 			break;
1787 		}
1788 	}
1789 
1790 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1791 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1792 		sync_req->args.op.sync.xattr_in_progress = true;
1793 		spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed,
1794 				       sizeof(file->length_flushed));
1795 
1796 		pthread_spin_unlock(&file->lock);
1797 		spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file);
1798 	} else {
1799 		pthread_spin_unlock(&file->lock);
1800 	}
1801 }
1802 
1803 static void
1804 __file_flush_done(void *arg, int bserrno)
1805 {
1806 	struct spdk_fs_cb_args *args = arg;
1807 	struct spdk_file *file = args->file;
1808 	struct cache_buffer *next = args->op.flush.cache_buffer;
1809 
1810 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1811 
1812 	pthread_spin_lock(&file->lock);
1813 	next->in_progress = false;
1814 	next->bytes_flushed += args->op.flush.length;
1815 	file->length_flushed += args->op.flush.length;
1816 	if (file->length_flushed > file->length) {
1817 		file->length = file->length_flushed;
1818 	}
1819 	if (next->bytes_flushed == next->buf_size) {
1820 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1821 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1822 	}
1823 
1824 	/*
1825 	 * Assert that there is no cached data that extends past the end of the underlying
1826 	 *  blob.
1827 	 */
1828 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1829 	       next->bytes_filled == 0);
1830 
1831 	pthread_spin_unlock(&file->lock);
1832 
1833 	__check_sync_reqs(file);
1834 
1835 	__file_flush(args);
1836 }
1837 
1838 static void
1839 __file_flush(void *_args)
1840 {
1841 	struct spdk_fs_cb_args *args = _args;
1842 	struct spdk_file *file = args->file;
1843 	struct cache_buffer *next;
1844 	uint64_t offset, length, start_page, num_pages;
1845 	uint32_t page_size;
1846 
1847 	pthread_spin_lock(&file->lock);
1848 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1849 	if (next == NULL || next->in_progress) {
1850 		/*
1851 		 * There is either no data to flush, or a flush I/O is already in
1852 		 *  progress.  So return immediately - if a flush I/O is in
1853 		 *  progress we will flush more data after that is completed.
1854 		 */
1855 		__free_args(args);
1856 		pthread_spin_unlock(&file->lock);
1857 		return;
1858 	}
1859 
1860 	offset = next->offset + next->bytes_flushed;
1861 	length = next->bytes_filled - next->bytes_flushed;
1862 	if (length == 0) {
1863 		__free_args(args);
1864 		pthread_spin_unlock(&file->lock);
1865 		return;
1866 	}
1867 	args->op.flush.length = length;
1868 	args->op.flush.cache_buffer = next;
1869 
1870 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1871 
1872 	next->in_progress = true;
1873 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1874 		     offset, length, start_page, num_pages);
1875 	pthread_spin_unlock(&file->lock);
1876 	spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1877 			      next->buf + (start_page * page_size) - next->offset,
1878 			      start_page, num_pages,
1879 			      __file_flush_done, args);
1880 }
1881 
1882 static void
1883 __file_extend_done(void *arg, int bserrno)
1884 {
1885 	struct spdk_fs_cb_args *args = arg;
1886 
1887 	__wake_caller(args);
1888 }
1889 
1890 static void
1891 __file_extend_blob(void *_args)
1892 {
1893 	struct spdk_fs_cb_args *args = _args;
1894 	struct spdk_file *file = args->file;
1895 
1896 	spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters);
1897 
1898 	spdk_bs_md_sync_blob(file->blob, __file_extend_done, args);
1899 }
1900 
1901 static void
1902 __rw_from_file_done(void *arg, int bserrno)
1903 {
1904 	struct spdk_fs_cb_args *args = arg;
1905 
1906 	__wake_caller(args);
1907 	__free_args(args);
1908 }
1909 
1910 static void
1911 __rw_from_file(void *_args)
1912 {
1913 	struct spdk_fs_cb_args *args = _args;
1914 	struct spdk_file *file = args->file;
1915 
1916 	if (args->op.rw.is_read) {
1917 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1918 				     args->op.rw.offset, args->op.rw.length,
1919 				     __rw_from_file_done, args);
1920 	} else {
1921 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1922 				      args->op.rw.offset, args->op.rw.length,
1923 				      __rw_from_file_done, args);
1924 	}
1925 }
1926 
1927 static int
1928 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
1929 		    uint64_t offset, uint64_t length, bool is_read)
1930 {
1931 	struct spdk_fs_cb_args *args;
1932 
1933 	args = calloc(1, sizeof(*args));
1934 	if (args == NULL) {
1935 		sem_post(sem);
1936 		return -ENOMEM;
1937 	}
1938 
1939 	args->file = file;
1940 	args->sem = sem;
1941 	args->op.rw.user_buf = payload;
1942 	args->op.rw.offset = offset;
1943 	args->op.rw.length = length;
1944 	args->op.rw.is_read = is_read;
1945 	file->fs->send_request(__rw_from_file, args);
1946 	return 0;
1947 }
1948 
1949 int
1950 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
1951 		void *payload, uint64_t offset, uint64_t length)
1952 {
1953 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1954 	struct spdk_fs_cb_args *args;
1955 	uint64_t rem_length, copy, blob_size, cluster_sz;
1956 	uint32_t cache_buffers_filled = 0;
1957 	uint8_t *cur_payload;
1958 	struct cache_buffer *last;
1959 
1960 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
1961 
1962 	if (length == 0) {
1963 		return 0;
1964 	}
1965 
1966 	if (offset != file->append_pos) {
1967 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
1968 		return -EINVAL;
1969 	}
1970 
1971 	pthread_spin_lock(&file->lock);
1972 	file->open_for_writing = true;
1973 
1974 	if (file->last == NULL) {
1975 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
1976 			cache_append_buffer(file);
1977 		} else {
1978 			int rc;
1979 
1980 			file->append_pos += length;
1981 			pthread_spin_unlock(&file->lock);
1982 			rc = __send_rw_from_file(file, &channel->sem, payload,
1983 						 offset, length, false);
1984 			sem_wait(&channel->sem);
1985 			return rc;
1986 		}
1987 	}
1988 
1989 	blob_size = __file_get_blob_size(file);
1990 
1991 	if ((offset + length) > blob_size) {
1992 		struct spdk_fs_cb_args extend_args = {};
1993 
1994 		cluster_sz = file->fs->bs_opts.cluster_sz;
1995 		extend_args.sem = &channel->sem;
1996 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
1997 		extend_args.file = file;
1998 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
1999 		pthread_spin_unlock(&file->lock);
2000 		file->fs->send_request(__file_extend_blob, &extend_args);
2001 		sem_wait(&channel->sem);
2002 	}
2003 
2004 	last = file->last;
2005 	rem_length = length;
2006 	cur_payload = payload;
2007 	while (rem_length > 0) {
2008 		copy = last->buf_size - last->bytes_filled;
2009 		if (copy > rem_length) {
2010 			copy = rem_length;
2011 		}
2012 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2013 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2014 		file->append_pos += copy;
2015 		if (file->length < file->append_pos) {
2016 			file->length = file->append_pos;
2017 		}
2018 		cur_payload += copy;
2019 		last->bytes_filled += copy;
2020 		rem_length -= copy;
2021 		if (last->bytes_filled == last->buf_size) {
2022 			cache_buffers_filled++;
2023 			last = cache_append_buffer(file);
2024 			if (last == NULL) {
2025 				BLOBFS_TRACE(file, "nomem\n");
2026 				pthread_spin_unlock(&file->lock);
2027 				return -ENOMEM;
2028 			}
2029 		}
2030 	}
2031 
2032 	if (cache_buffers_filled == 0) {
2033 		pthread_spin_unlock(&file->lock);
2034 		return 0;
2035 	}
2036 
2037 	args = calloc(1, sizeof(*args));
2038 	if (args == NULL) {
2039 		pthread_spin_unlock(&file->lock);
2040 		return -ENOMEM;
2041 	}
2042 
2043 	args->file = file;
2044 	file->fs->send_request(__file_flush, args);
2045 	pthread_spin_unlock(&file->lock);
2046 	return 0;
2047 }
2048 
2049 static void
2050 __readahead_done(void *arg, int bserrno)
2051 {
2052 	struct spdk_fs_cb_args *args = arg;
2053 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2054 	struct spdk_file *file = args->file;
2055 
2056 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2057 
2058 	pthread_spin_lock(&file->lock);
2059 	cache_buffer->bytes_filled = args->op.readahead.length;
2060 	cache_buffer->bytes_flushed = args->op.readahead.length;
2061 	cache_buffer->in_progress = false;
2062 	pthread_spin_unlock(&file->lock);
2063 
2064 	__free_args(args);
2065 }
2066 
2067 static void
2068 __readahead(void *_args)
2069 {
2070 	struct spdk_fs_cb_args *args = _args;
2071 	struct spdk_file *file = args->file;
2072 	uint64_t offset, length, start_page, num_pages;
2073 	uint32_t page_size;
2074 
2075 	offset = args->op.readahead.offset;
2076 	length = args->op.readahead.length;
2077 	assert(length > 0);
2078 
2079 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2080 
2081 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2082 		     offset, length, start_page, num_pages);
2083 	spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2084 			     args->op.readahead.cache_buffer->buf,
2085 			     start_page, num_pages,
2086 			     __readahead_done, args);
2087 }
2088 
2089 static uint64_t
2090 __next_cache_buffer_offset(uint64_t offset)
2091 {
2092 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2093 }
2094 
2095 static void
2096 check_readahead(struct spdk_file *file, uint64_t offset)
2097 {
2098 	struct spdk_fs_cb_args *args;
2099 
2100 	offset = __next_cache_buffer_offset(offset);
2101 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2102 		return;
2103 	}
2104 
2105 	args = calloc(1, sizeof(*args));
2106 	if (args == NULL) {
2107 		return;
2108 	}
2109 
2110 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2111 
2112 	args->file = file;
2113 	args->op.readahead.offset = offset;
2114 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2115 	args->op.readahead.cache_buffer->in_progress = true;
2116 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2117 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2118 	} else {
2119 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2120 	}
2121 	file->fs->send_request(__readahead, args);
2122 }
2123 
2124 static int
2125 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2126 {
2127 	struct cache_buffer *buf;
2128 	int rc;
2129 
2130 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2131 	if (buf == NULL) {
2132 		pthread_spin_unlock(&file->lock);
2133 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2134 		pthread_spin_lock(&file->lock);
2135 		return rc;
2136 	}
2137 
2138 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2139 		length = buf->offset + buf->bytes_filled - offset;
2140 	}
2141 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2142 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2143 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2144 		pthread_spin_lock(&g_caches_lock);
2145 		spdk_tree_remove_buffer(file->tree, buf);
2146 		if (file->tree->present_mask == 0) {
2147 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2148 		}
2149 		pthread_spin_unlock(&g_caches_lock);
2150 	}
2151 
2152 	sem_post(sem);
2153 	return 0;
2154 }
2155 
2156 int64_t
2157 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2158 	       void *payload, uint64_t offset, uint64_t length)
2159 {
2160 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2161 	uint64_t final_offset, final_length;
2162 	uint32_t sub_reads = 0;
2163 	int rc = 0;
2164 
2165 	pthread_spin_lock(&file->lock);
2166 
2167 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2168 
2169 	file->open_for_writing = false;
2170 
2171 	if (length == 0 || offset >= file->append_pos) {
2172 		pthread_spin_unlock(&file->lock);
2173 		return 0;
2174 	}
2175 
2176 	if (offset + length > file->append_pos) {
2177 		length = file->append_pos - offset;
2178 	}
2179 
2180 	if (offset != file->next_seq_offset) {
2181 		file->seq_byte_count = 0;
2182 	}
2183 	file->seq_byte_count += length;
2184 	file->next_seq_offset = offset + length;
2185 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2186 		check_readahead(file, offset);
2187 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2188 	}
2189 
2190 	final_length = 0;
2191 	final_offset = offset + length;
2192 	while (offset < final_offset) {
2193 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2194 		if (length > (final_offset - offset)) {
2195 			length = final_offset - offset;
2196 		}
2197 		rc = __file_read(file, payload, offset, length, &channel->sem);
2198 		if (rc == 0) {
2199 			final_length += length;
2200 		} else {
2201 			break;
2202 		}
2203 		payload += length;
2204 		offset += length;
2205 		sub_reads++;
2206 	}
2207 	pthread_spin_unlock(&file->lock);
2208 	while (sub_reads-- > 0) {
2209 		sem_wait(&channel->sem);
2210 	}
2211 	if (rc == 0) {
2212 		return final_length;
2213 	} else {
2214 		return rc;
2215 	}
2216 }
2217 
2218 static void
2219 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2220 	   spdk_file_op_complete cb_fn, void *cb_arg)
2221 {
2222 	struct spdk_fs_request *sync_req;
2223 	struct spdk_fs_request *flush_req;
2224 	struct spdk_fs_cb_args *sync_args;
2225 	struct spdk_fs_cb_args *flush_args;
2226 
2227 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2228 
2229 	pthread_spin_lock(&file->lock);
2230 	if (file->append_pos <= file->length_flushed || file->last == NULL) {
2231 		BLOBFS_TRACE(file, "done - no data to flush\n");
2232 		pthread_spin_unlock(&file->lock);
2233 		cb_fn(cb_arg, 0);
2234 		return;
2235 	}
2236 
2237 	sync_req = alloc_fs_request(channel);
2238 	assert(sync_req != NULL);
2239 	sync_args = &sync_req->args;
2240 
2241 	flush_req = alloc_fs_request(channel);
2242 	assert(flush_req != NULL);
2243 	flush_args = &flush_req->args;
2244 
2245 	sync_args->file = file;
2246 	sync_args->fn.file_op = cb_fn;
2247 	sync_args->arg = cb_arg;
2248 	sync_args->op.sync.offset = file->append_pos;
2249 	sync_args->op.sync.xattr_in_progress = false;
2250 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2251 	pthread_spin_unlock(&file->lock);
2252 
2253 	flush_args->file = file;
2254 	channel->send_request(__file_flush, flush_args);
2255 }
2256 
2257 int
2258 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2259 {
2260 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2261 
2262 	_file_sync(file, channel, __sem_post, &channel->sem);
2263 	sem_wait(&channel->sem);
2264 
2265 	return 0;
2266 }
2267 
2268 void
2269 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2270 		     spdk_file_op_complete cb_fn, void *cb_arg)
2271 {
2272 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2273 
2274 	_file_sync(file, channel, cb_fn, cb_arg);
2275 }
2276 
2277 void
2278 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2279 {
2280 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2281 	file->priority = priority;
2282 
2283 }
2284 
2285 /*
2286  * Close routines
2287  */
2288 
2289 static void
2290 __file_close_async_done(void *ctx, int bserrno)
2291 {
2292 	struct spdk_fs_request *req = ctx;
2293 	struct spdk_fs_cb_args *args = &req->args;
2294 	struct spdk_file *file = args->file;
2295 
2296 	if (file->is_deleted) {
2297 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2298 		return;
2299 	}
2300 	args->fn.file_op(args->arg, bserrno);
2301 	free_fs_request(req);
2302 }
2303 
2304 static void
2305 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2306 {
2307 	pthread_spin_lock(&file->lock);
2308 	if (file->ref_count == 0) {
2309 		pthread_spin_unlock(&file->lock);
2310 		__file_close_async_done(req, -EBADF);
2311 		return;
2312 	}
2313 
2314 	file->ref_count--;
2315 	if (file->ref_count > 0) {
2316 		pthread_spin_unlock(&file->lock);
2317 		__file_close_async_done(req, 0);
2318 		return;
2319 	}
2320 
2321 	pthread_spin_unlock(&file->lock);
2322 
2323 	spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req);
2324 }
2325 
2326 static void
2327 __file_close_async__sync_done(void *arg, int fserrno)
2328 {
2329 	struct spdk_fs_request *req = arg;
2330 	struct spdk_fs_cb_args *args = &req->args;
2331 
2332 	__file_close_async(args->file, req);
2333 }
2334 
2335 void
2336 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2337 {
2338 	struct spdk_fs_request *req;
2339 	struct spdk_fs_cb_args *args;
2340 
2341 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2342 	if (req == NULL) {
2343 		cb_fn(cb_arg, -ENOMEM);
2344 		return;
2345 	}
2346 
2347 	args = &req->args;
2348 	args->file = file;
2349 	args->fn.file_op = cb_fn;
2350 	args->arg = cb_arg;
2351 
2352 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2353 }
2354 
2355 static void
2356 __file_close_done(void *arg, int fserrno)
2357 {
2358 	struct spdk_fs_cb_args *args = arg;
2359 
2360 	args->rc = fserrno;
2361 	sem_post(args->sem);
2362 }
2363 
2364 static void
2365 __file_close(void *arg)
2366 {
2367 	struct spdk_fs_request *req = arg;
2368 	struct spdk_fs_cb_args *args = &req->args;
2369 	struct spdk_file *file = args->file;
2370 
2371 	__file_close_async(file, req);
2372 }
2373 
2374 int
2375 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2376 {
2377 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2378 	struct spdk_fs_request *req;
2379 	struct spdk_fs_cb_args *args;
2380 
2381 	req = alloc_fs_request(channel);
2382 	assert(req != NULL);
2383 
2384 	args = &req->args;
2385 
2386 	spdk_file_sync(file, _channel);
2387 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2388 	args->file = file;
2389 	args->sem = &channel->sem;
2390 	args->fn.file_op = __file_close_done;
2391 	args->arg = req;
2392 	channel->send_request(__file_close, req);
2393 	sem_wait(&channel->sem);
2394 
2395 	return args->rc;
2396 }
2397 
2398 static void
2399 cache_free_buffers(struct spdk_file *file)
2400 {
2401 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2402 	pthread_spin_lock(&file->lock);
2403 	pthread_spin_lock(&g_caches_lock);
2404 	if (file->tree->present_mask == 0) {
2405 		pthread_spin_unlock(&g_caches_lock);
2406 		pthread_spin_unlock(&file->lock);
2407 		return;
2408 	}
2409 	spdk_tree_free_buffers(file->tree);
2410 
2411 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2412 	/* If not freed, put it in the end of the queue */
2413 	if (file->tree->present_mask != 0) {
2414 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2415 	}
2416 	file->last = NULL;
2417 	pthread_spin_unlock(&g_caches_lock);
2418 	pthread_spin_unlock(&file->lock);
2419 }
2420 
2421 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS);
2422 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW);
2423