xref: /spdk/lib/blobfs/blobfs.c (revision 546d8ab2bd809bd56543bff2c14d0503032c2526)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	bool from_request;
138 	union {
139 		struct {
140 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
141 		} fs_load;
142 		struct {
143 			uint64_t	length;
144 		} truncate;
145 		struct {
146 			struct spdk_io_channel	*channel;
147 			void		*user_buf;
148 			void		*pin_buf;
149 			int		is_read;
150 			off_t		offset;
151 			size_t		length;
152 			uint64_t	start_page;
153 			uint64_t	num_pages;
154 			uint32_t	blocklen;
155 		} rw;
156 		struct {
157 			const char	*old_name;
158 			const char	*new_name;
159 		} rename;
160 		struct {
161 			struct cache_buffer	*cache_buffer;
162 			uint64_t		length;
163 		} flush;
164 		struct {
165 			struct cache_buffer	*cache_buffer;
166 			uint64_t		length;
167 			uint64_t		offset;
168 		} readahead;
169 		struct {
170 			uint64_t			offset;
171 			TAILQ_ENTRY(spdk_fs_request)	tailq;
172 			bool				xattr_in_progress;
173 		} sync;
174 		struct {
175 			uint32_t			num_clusters;
176 		} resize;
177 		struct {
178 			const char	*name;
179 			uint32_t	flags;
180 			TAILQ_ENTRY(spdk_fs_request)	tailq;
181 		} open;
182 		struct {
183 			const char		*name;
184 			struct spdk_blob	*blob;
185 		} create;
186 		struct {
187 			const char	*name;
188 		} delete;
189 		struct {
190 			const char	*name;
191 		} stat;
192 	} op;
193 };
194 
195 static void cache_free_buffers(struct spdk_file *file);
196 
197 void
198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
199 {
200 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
201 }
202 
203 static void
204 __initialize_cache(void)
205 {
206 	assert(g_cache_pool == NULL);
207 
208 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
209 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
210 					   CACHE_BUFFER_SIZE,
211 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 					   SPDK_ENV_SOCKET_ID_ANY);
213 	if (!g_cache_pool) {
214 		SPDK_ERRLOG("Create mempool failed, you may "
215 			    "increase the memory and try again\n");
216 		assert(false);
217 	}
218 	TAILQ_INIT(&g_caches);
219 	pthread_spin_init(&g_caches_lock, 0);
220 }
221 
222 static void
223 __free_cache(void)
224 {
225 	assert(g_cache_pool != NULL);
226 
227 	spdk_mempool_free(g_cache_pool);
228 	g_cache_pool = NULL;
229 }
230 
231 static uint64_t
232 __file_get_blob_size(struct spdk_file *file)
233 {
234 	uint64_t cluster_sz;
235 
236 	cluster_sz = file->fs->bs_opts.cluster_sz;
237 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
238 }
239 
240 struct spdk_fs_request {
241 	struct spdk_fs_cb_args		args;
242 	TAILQ_ENTRY(spdk_fs_request)	link;
243 	struct spdk_fs_channel		*channel;
244 };
245 
246 struct spdk_fs_channel {
247 	struct spdk_fs_request		*req_mem;
248 	TAILQ_HEAD(, spdk_fs_request)	reqs;
249 	sem_t				sem;
250 	struct spdk_filesystem		*fs;
251 	struct spdk_io_channel		*bs_channel;
252 	fs_send_request_fn		send_request;
253 	bool				sync;
254 	pthread_spinlock_t		lock;
255 };
256 
257 static struct spdk_fs_request *
258 alloc_fs_request(struct spdk_fs_channel *channel)
259 {
260 	struct spdk_fs_request *req;
261 
262 	if (channel->sync) {
263 		pthread_spin_lock(&channel->lock);
264 	}
265 
266 	req = TAILQ_FIRST(&channel->reqs);
267 	if (req) {
268 		TAILQ_REMOVE(&channel->reqs, req, link);
269 	}
270 
271 	if (channel->sync) {
272 		pthread_spin_unlock(&channel->lock);
273 	}
274 
275 	if (req == NULL) {
276 		return NULL;
277 	}
278 	memset(req, 0, sizeof(*req));
279 	req->channel = channel;
280 	req->args.from_request = true;
281 
282 	return req;
283 }
284 
285 static void
286 free_fs_request(struct spdk_fs_request *req)
287 {
288 	struct spdk_fs_channel *channel = req->channel;
289 
290 	if (channel->sync) {
291 		pthread_spin_lock(&channel->lock);
292 	}
293 
294 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
295 
296 	if (channel->sync) {
297 		pthread_spin_unlock(&channel->lock);
298 	}
299 }
300 
301 static int
302 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
303 			uint32_t max_ops)
304 {
305 	uint32_t i;
306 
307 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
308 	if (!channel->req_mem) {
309 		return -1;
310 	}
311 
312 	TAILQ_INIT(&channel->reqs);
313 	sem_init(&channel->sem, 0, 0);
314 
315 	for (i = 0; i < max_ops; i++) {
316 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
317 	}
318 
319 	channel->fs = fs;
320 
321 	return 0;
322 }
323 
324 static int
325 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
326 {
327 	struct spdk_filesystem		*fs;
328 	struct spdk_fs_channel		*channel = ctx_buf;
329 
330 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
331 
332 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
333 }
334 
335 static int
336 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
337 {
338 	struct spdk_filesystem		*fs;
339 	struct spdk_fs_channel		*channel = ctx_buf;
340 
341 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
342 
343 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
344 }
345 
346 static int
347 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
348 {
349 	struct spdk_filesystem		*fs;
350 	struct spdk_fs_channel		*channel = ctx_buf;
351 
352 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
353 
354 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
355 }
356 
357 static void
358 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
359 {
360 	struct spdk_fs_channel *channel = ctx_buf;
361 
362 	free(channel->req_mem);
363 	if (channel->bs_channel != NULL) {
364 		spdk_bs_free_io_channel(channel->bs_channel);
365 	}
366 }
367 
368 static void
369 __send_request_direct(fs_request_fn fn, void *arg)
370 {
371 	fn(arg);
372 }
373 
374 static void
375 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
376 {
377 	fs->bs = bs;
378 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
379 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
380 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
381 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
382 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
383 
384 	pthread_mutex_lock(&g_cache_init_lock);
385 	if (g_fs_count == 0) {
386 		__initialize_cache();
387 	}
388 	g_fs_count++;
389 	pthread_mutex_unlock(&g_cache_init_lock);
390 }
391 
392 static void
393 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
394 {
395 	struct spdk_fs_request *req = ctx;
396 	struct spdk_fs_cb_args *args = &req->args;
397 	struct spdk_filesystem *fs = args->fs;
398 
399 	if (bserrno == 0) {
400 		common_fs_bs_init(fs, bs);
401 	} else {
402 		free(fs);
403 		fs = NULL;
404 	}
405 
406 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
407 	free_fs_request(req);
408 }
409 
410 static void
411 fs_conf_parse(void)
412 {
413 	struct spdk_conf_section *sp;
414 
415 	sp = spdk_conf_find_section(NULL, "Blobfs");
416 	if (sp == NULL) {
417 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
418 		return;
419 	}
420 
421 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
422 	if (g_fs_cache_buffer_shift <= 0) {
423 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
424 	}
425 }
426 
427 static struct spdk_filesystem *
428 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
429 {
430 	struct spdk_filesystem *fs;
431 
432 	fs = calloc(1, sizeof(*fs));
433 	if (fs == NULL) {
434 		return NULL;
435 	}
436 
437 	fs->bdev = dev;
438 	fs->send_request = send_request_fn;
439 	TAILQ_INIT(&fs->files);
440 
441 	fs->md_target.max_ops = 512;
442 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
443 				sizeof(struct spdk_fs_channel));
444 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
445 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
446 
447 	fs->sync_target.max_ops = 512;
448 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
449 				sizeof(struct spdk_fs_channel));
450 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
451 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
452 
453 	fs->io_target.max_ops = 512;
454 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
455 				sizeof(struct spdk_fs_channel));
456 
457 	return fs;
458 }
459 
460 static void
461 __wake_caller(void *arg, int fserrno)
462 {
463 	struct spdk_fs_cb_args *args = arg;
464 
465 	args->rc = fserrno;
466 	sem_post(args->sem);
467 }
468 
469 void
470 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
471 	     fs_send_request_fn send_request_fn,
472 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
473 {
474 	struct spdk_filesystem *fs;
475 	struct spdk_fs_request *req;
476 	struct spdk_fs_cb_args *args;
477 	struct spdk_bs_opts opts = {};
478 
479 	fs = fs_alloc(dev, send_request_fn);
480 	if (fs == NULL) {
481 		cb_fn(cb_arg, NULL, -ENOMEM);
482 		return;
483 	}
484 
485 	fs_conf_parse();
486 
487 	req = alloc_fs_request(fs->md_target.md_fs_channel);
488 	if (req == NULL) {
489 		spdk_put_io_channel(fs->md_target.md_io_channel);
490 		spdk_io_device_unregister(&fs->md_target, NULL);
491 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
492 		spdk_io_device_unregister(&fs->sync_target, NULL);
493 		spdk_io_device_unregister(&fs->io_target, NULL);
494 		free(fs);
495 		cb_fn(cb_arg, NULL, -ENOMEM);
496 		return;
497 	}
498 
499 	args = &req->args;
500 	args->fn.fs_op_with_handle = cb_fn;
501 	args->arg = cb_arg;
502 	args->fs = fs;
503 
504 	spdk_bs_opts_init(&opts);
505 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
506 	if (opt) {
507 		opts.cluster_sz = opt->cluster_sz;
508 	}
509 	spdk_bs_init(dev, &opts, init_cb, req);
510 }
511 
512 static struct spdk_file *
513 file_alloc(struct spdk_filesystem *fs)
514 {
515 	struct spdk_file *file;
516 
517 	file = calloc(1, sizeof(*file));
518 	if (file == NULL) {
519 		return NULL;
520 	}
521 
522 	file->tree = calloc(1, sizeof(*file->tree));
523 	if (file->tree == NULL) {
524 		free(file);
525 		return NULL;
526 	}
527 
528 	file->fs = fs;
529 	TAILQ_INIT(&file->open_requests);
530 	TAILQ_INIT(&file->sync_requests);
531 	pthread_spin_init(&file->lock, 0);
532 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
533 	file->priority = SPDK_FILE_PRIORITY_LOW;
534 	return file;
535 }
536 
537 static void fs_load_done(void *ctx, int bserrno);
538 
539 static int
540 _handle_deleted_files(struct spdk_fs_request *req)
541 {
542 	struct spdk_fs_cb_args *args = &req->args;
543 	struct spdk_filesystem *fs = args->fs;
544 
545 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
546 		struct spdk_deleted_file *deleted_file;
547 
548 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
549 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
550 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
551 		free(deleted_file);
552 		return 0;
553 	}
554 
555 	return 1;
556 }
557 
558 static void
559 fs_load_done(void *ctx, int bserrno)
560 {
561 	struct spdk_fs_request *req = ctx;
562 	struct spdk_fs_cb_args *args = &req->args;
563 	struct spdk_filesystem *fs = args->fs;
564 
565 	/* The filesystem has been loaded.  Now check if there are any files that
566 	 *  were marked for deletion before last unload.  Do not complete the
567 	 *  fs_load callback until all of them have been deleted on disk.
568 	 */
569 	if (_handle_deleted_files(req) == 0) {
570 		/* We found a file that's been marked for deleting but not actually
571 		 *  deleted yet.  This function will get called again once the delete
572 		 *  operation is completed.
573 		 */
574 		return;
575 	}
576 
577 	args->fn.fs_op_with_handle(args->arg, fs, 0);
578 	free_fs_request(req);
579 
580 }
581 
582 static void
583 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
584 {
585 	struct spdk_fs_request *req = ctx;
586 	struct spdk_fs_cb_args *args = &req->args;
587 	struct spdk_filesystem *fs = args->fs;
588 	uint64_t *length;
589 	const char *name;
590 	uint32_t *is_deleted;
591 	size_t value_len;
592 
593 	if (rc < 0) {
594 		args->fn.fs_op_with_handle(args->arg, fs, rc);
595 		free_fs_request(req);
596 		return;
597 	}
598 
599 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
600 	if (rc < 0) {
601 		args->fn.fs_op_with_handle(args->arg, fs, rc);
602 		free_fs_request(req);
603 		return;
604 	}
605 
606 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
607 	if (rc < 0) {
608 		args->fn.fs_op_with_handle(args->arg, fs, rc);
609 		free_fs_request(req);
610 		return;
611 	}
612 
613 	assert(value_len == 8);
614 
615 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
616 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
617 	if (rc < 0) {
618 		struct spdk_file *f;
619 
620 		f = file_alloc(fs);
621 		if (f == NULL) {
622 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
623 			free_fs_request(req);
624 			return;
625 		}
626 
627 		f->name = strdup(name);
628 		f->blobid = spdk_blob_get_id(blob);
629 		f->length = *length;
630 		f->length_flushed = *length;
631 		f->append_pos = *length;
632 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
633 	} else {
634 		struct spdk_deleted_file *deleted_file;
635 
636 		deleted_file = calloc(1, sizeof(*deleted_file));
637 		if (deleted_file == NULL) {
638 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
639 			free_fs_request(req);
640 			return;
641 		}
642 		deleted_file->id = spdk_blob_get_id(blob);
643 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
644 	}
645 }
646 
647 static void
648 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
649 {
650 	struct spdk_fs_request *req = ctx;
651 	struct spdk_fs_cb_args *args = &req->args;
652 	struct spdk_filesystem *fs = args->fs;
653 	struct spdk_bs_type bstype;
654 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
655 	static const struct spdk_bs_type zeros;
656 
657 	if (bserrno != 0) {
658 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
659 		free_fs_request(req);
660 		free(fs);
661 		return;
662 	}
663 
664 	bstype = spdk_bs_get_bstype(bs);
665 
666 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
667 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
668 		spdk_bs_set_bstype(bs, blobfs_type);
669 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
670 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
671 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
672 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
673 		free_fs_request(req);
674 		free(fs);
675 		return;
676 	}
677 
678 	common_fs_bs_init(fs, bs);
679 	fs_load_done(req, 0);
680 }
681 
682 static void
683 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
684 {
685 	assert(fs != NULL);
686 	spdk_io_device_unregister(&fs->md_target, NULL);
687 	spdk_io_device_unregister(&fs->sync_target, NULL);
688 	spdk_io_device_unregister(&fs->io_target, NULL);
689 	free(fs);
690 }
691 
692 static void
693 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
694 {
695 	assert(fs != NULL);
696 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
697 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
698 }
699 
700 void
701 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
702 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
703 {
704 	struct spdk_filesystem *fs;
705 	struct spdk_fs_cb_args *args;
706 	struct spdk_fs_request *req;
707 	struct spdk_bs_opts	bs_opts;
708 
709 	fs = fs_alloc(dev, send_request_fn);
710 	if (fs == NULL) {
711 		cb_fn(cb_arg, NULL, -ENOMEM);
712 		return;
713 	}
714 
715 	fs_conf_parse();
716 
717 	req = alloc_fs_request(fs->md_target.md_fs_channel);
718 	if (req == NULL) {
719 		spdk_fs_free_io_channels(fs);
720 		spdk_fs_io_device_unregister(fs);
721 		cb_fn(cb_arg, NULL, -ENOMEM);
722 		return;
723 	}
724 
725 	args = &req->args;
726 	args->fn.fs_op_with_handle = cb_fn;
727 	args->arg = cb_arg;
728 	args->fs = fs;
729 	TAILQ_INIT(&args->op.fs_load.deleted_files);
730 	spdk_bs_opts_init(&bs_opts);
731 	bs_opts.iter_cb_fn = iter_cb;
732 	bs_opts.iter_cb_arg = req;
733 	spdk_bs_load(dev, &bs_opts, load_cb, req);
734 }
735 
736 static void
737 unload_cb(void *ctx, int bserrno)
738 {
739 	struct spdk_fs_request *req = ctx;
740 	struct spdk_fs_cb_args *args = &req->args;
741 	struct spdk_filesystem *fs = args->fs;
742 	struct spdk_file *file, *tmp;
743 
744 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
745 		TAILQ_REMOVE(&fs->files, file, tailq);
746 		cache_free_buffers(file);
747 		free(file->name);
748 		free(file->tree);
749 		free(file);
750 	}
751 
752 	pthread_mutex_lock(&g_cache_init_lock);
753 	g_fs_count--;
754 	if (g_fs_count == 0) {
755 		__free_cache();
756 	}
757 	pthread_mutex_unlock(&g_cache_init_lock);
758 
759 	args->fn.fs_op(args->arg, bserrno);
760 	free(req);
761 
762 	spdk_fs_io_device_unregister(fs);
763 }
764 
765 void
766 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
767 {
768 	struct spdk_fs_request *req;
769 	struct spdk_fs_cb_args *args;
770 
771 	/*
772 	 * We must free the md_channel before unloading the blobstore, so just
773 	 *  allocate this request from the general heap.
774 	 */
775 	req = calloc(1, sizeof(*req));
776 	if (req == NULL) {
777 		cb_fn(cb_arg, -ENOMEM);
778 		return;
779 	}
780 
781 	args = &req->args;
782 	args->fn.fs_op = cb_fn;
783 	args->arg = cb_arg;
784 	args->fs = fs;
785 
786 	spdk_fs_free_io_channels(fs);
787 	spdk_bs_unload(fs->bs, unload_cb, req);
788 }
789 
790 static struct spdk_file *
791 fs_find_file(struct spdk_filesystem *fs, const char *name)
792 {
793 	struct spdk_file *file;
794 
795 	TAILQ_FOREACH(file, &fs->files, tailq) {
796 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
797 			return file;
798 		}
799 	}
800 
801 	return NULL;
802 }
803 
804 void
805 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
806 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
807 {
808 	struct spdk_file_stat stat;
809 	struct spdk_file *f = NULL;
810 
811 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
812 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
813 		return;
814 	}
815 
816 	f = fs_find_file(fs, name);
817 	if (f != NULL) {
818 		stat.blobid = f->blobid;
819 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
820 		cb_fn(cb_arg, &stat, 0);
821 		return;
822 	}
823 
824 	cb_fn(cb_arg, NULL, -ENOENT);
825 }
826 
827 static void
828 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
829 {
830 	struct spdk_fs_request *req = arg;
831 	struct spdk_fs_cb_args *args = &req->args;
832 
833 	args->rc = fserrno;
834 	if (fserrno == 0) {
835 		memcpy(args->arg, stat, sizeof(*stat));
836 	}
837 	sem_post(args->sem);
838 }
839 
840 static void
841 __file_stat(void *arg)
842 {
843 	struct spdk_fs_request *req = arg;
844 	struct spdk_fs_cb_args *args = &req->args;
845 
846 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
847 				args->fn.stat_op, req);
848 }
849 
850 int
851 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
852 		  const char *name, struct spdk_file_stat *stat)
853 {
854 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
855 	struct spdk_fs_request *req;
856 	int rc;
857 
858 	req = alloc_fs_request(channel);
859 	if (req == NULL) {
860 		return -ENOMEM;
861 	}
862 
863 	req->args.fs = fs;
864 	req->args.op.stat.name = name;
865 	req->args.fn.stat_op = __copy_stat;
866 	req->args.arg = stat;
867 	req->args.sem = &channel->sem;
868 	channel->send_request(__file_stat, req);
869 	sem_wait(&channel->sem);
870 
871 	rc = req->args.rc;
872 	free_fs_request(req);
873 
874 	return rc;
875 }
876 
877 static void
878 fs_create_blob_close_cb(void *ctx, int bserrno)
879 {
880 	int rc;
881 	struct spdk_fs_request *req = ctx;
882 	struct spdk_fs_cb_args *args = &req->args;
883 
884 	rc = args->rc ? args->rc : bserrno;
885 	args->fn.file_op(args->arg, rc);
886 	free_fs_request(req);
887 }
888 
889 static void
890 fs_create_blob_resize_cb(void *ctx, int bserrno)
891 {
892 	struct spdk_fs_request *req = ctx;
893 	struct spdk_fs_cb_args *args = &req->args;
894 	struct spdk_file *f = args->file;
895 	struct spdk_blob *blob = args->op.create.blob;
896 	uint64_t length = 0;
897 
898 	args->rc = bserrno;
899 	if (bserrno) {
900 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
901 		return;
902 	}
903 
904 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
905 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
906 
907 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
908 }
909 
910 static void
911 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
912 {
913 	struct spdk_fs_request *req = ctx;
914 	struct spdk_fs_cb_args *args = &req->args;
915 
916 	if (bserrno) {
917 		args->fn.file_op(args->arg, bserrno);
918 		free_fs_request(req);
919 		return;
920 	}
921 
922 	args->op.create.blob = blob;
923 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
924 }
925 
926 static void
927 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
928 {
929 	struct spdk_fs_request *req = ctx;
930 	struct spdk_fs_cb_args *args = &req->args;
931 	struct spdk_file *f = args->file;
932 
933 	if (bserrno) {
934 		args->fn.file_op(args->arg, bserrno);
935 		free_fs_request(req);
936 		return;
937 	}
938 
939 	f->blobid = blobid;
940 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
941 }
942 
943 void
944 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
945 			  spdk_file_op_complete cb_fn, void *cb_arg)
946 {
947 	struct spdk_file *file;
948 	struct spdk_fs_request *req;
949 	struct spdk_fs_cb_args *args;
950 
951 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
952 		cb_fn(cb_arg, -ENAMETOOLONG);
953 		return;
954 	}
955 
956 	file = fs_find_file(fs, name);
957 	if (file != NULL) {
958 		cb_fn(cb_arg, -EEXIST);
959 		return;
960 	}
961 
962 	file = file_alloc(fs);
963 	if (file == NULL) {
964 		cb_fn(cb_arg, -ENOMEM);
965 		return;
966 	}
967 
968 	req = alloc_fs_request(fs->md_target.md_fs_channel);
969 	if (req == NULL) {
970 		cb_fn(cb_arg, -ENOMEM);
971 		return;
972 	}
973 
974 	args = &req->args;
975 	args->file = file;
976 	args->fn.file_op = cb_fn;
977 	args->arg = cb_arg;
978 
979 	file->name = strdup(name);
980 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
981 }
982 
983 static void
984 __fs_create_file_done(void *arg, int fserrno)
985 {
986 	struct spdk_fs_request *req = arg;
987 	struct spdk_fs_cb_args *args = &req->args;
988 
989 	args->rc = fserrno;
990 	sem_post(args->sem);
991 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
992 }
993 
994 static void
995 __fs_create_file(void *arg)
996 {
997 	struct spdk_fs_request *req = arg;
998 	struct spdk_fs_cb_args *args = &req->args;
999 
1000 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1001 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1002 }
1003 
1004 int
1005 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
1006 {
1007 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1008 	struct spdk_fs_request *req;
1009 	struct spdk_fs_cb_args *args;
1010 	int rc;
1011 
1012 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1013 
1014 	req = alloc_fs_request(channel);
1015 	if (req == NULL) {
1016 		return -ENOMEM;
1017 	}
1018 
1019 	args = &req->args;
1020 	args->fs = fs;
1021 	args->op.create.name = name;
1022 	args->sem = &channel->sem;
1023 	fs->send_request(__fs_create_file, req);
1024 	sem_wait(&channel->sem);
1025 	rc = args->rc;
1026 	free_fs_request(req);
1027 
1028 	return rc;
1029 }
1030 
1031 static void
1032 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1033 {
1034 	struct spdk_fs_request *req = ctx;
1035 	struct spdk_fs_cb_args *args = &req->args;
1036 	struct spdk_file *f = args->file;
1037 
1038 	f->blob = blob;
1039 	while (!TAILQ_EMPTY(&f->open_requests)) {
1040 		req = TAILQ_FIRST(&f->open_requests);
1041 		args = &req->args;
1042 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1043 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1044 		free_fs_request(req);
1045 	}
1046 }
1047 
1048 static void
1049 fs_open_blob_create_cb(void *ctx, int bserrno)
1050 {
1051 	struct spdk_fs_request *req = ctx;
1052 	struct spdk_fs_cb_args *args = &req->args;
1053 	struct spdk_file *file = args->file;
1054 	struct spdk_filesystem *fs = args->fs;
1055 
1056 	if (file == NULL) {
1057 		/*
1058 		 * This is from an open with CREATE flag - the file
1059 		 *  is now created so look it up in the file list for this
1060 		 *  filesystem.
1061 		 */
1062 		file = fs_find_file(fs, args->op.open.name);
1063 		assert(file != NULL);
1064 		args->file = file;
1065 	}
1066 
1067 	file->ref_count++;
1068 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1069 	if (file->ref_count == 1) {
1070 		assert(file->blob == NULL);
1071 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1072 	} else if (file->blob != NULL) {
1073 		fs_open_blob_done(req, file->blob, 0);
1074 	} else {
1075 		/*
1076 		 * The blob open for this file is in progress due to a previous
1077 		 *  open request.  When that open completes, it will invoke the
1078 		 *  open callback for this request.
1079 		 */
1080 	}
1081 }
1082 
1083 void
1084 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1085 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1086 {
1087 	struct spdk_file *f = NULL;
1088 	struct spdk_fs_request *req;
1089 	struct spdk_fs_cb_args *args;
1090 
1091 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1092 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1093 		return;
1094 	}
1095 
1096 	f = fs_find_file(fs, name);
1097 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1098 		cb_fn(cb_arg, NULL, -ENOENT);
1099 		return;
1100 	}
1101 
1102 	if (f != NULL && f->is_deleted == true) {
1103 		cb_fn(cb_arg, NULL, -ENOENT);
1104 		return;
1105 	}
1106 
1107 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1108 	if (req == NULL) {
1109 		cb_fn(cb_arg, NULL, -ENOMEM);
1110 		return;
1111 	}
1112 
1113 	args = &req->args;
1114 	args->fn.file_op_with_handle = cb_fn;
1115 	args->arg = cb_arg;
1116 	args->file = f;
1117 	args->fs = fs;
1118 	args->op.open.name = name;
1119 
1120 	if (f == NULL) {
1121 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1122 	} else {
1123 		fs_open_blob_create_cb(req, 0);
1124 	}
1125 }
1126 
1127 static void
1128 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1129 {
1130 	struct spdk_fs_request *req = arg;
1131 	struct spdk_fs_cb_args *args = &req->args;
1132 
1133 	args->file = file;
1134 	__wake_caller(args, bserrno);
1135 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1136 }
1137 
1138 static void
1139 __fs_open_file(void *arg)
1140 {
1141 	struct spdk_fs_request *req = arg;
1142 	struct spdk_fs_cb_args *args = &req->args;
1143 
1144 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1145 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1146 				__fs_open_file_done, req);
1147 }
1148 
1149 int
1150 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1151 		  const char *name, uint32_t flags, struct spdk_file **file)
1152 {
1153 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1154 	struct spdk_fs_request *req;
1155 	struct spdk_fs_cb_args *args;
1156 	int rc;
1157 
1158 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1159 
1160 	req = alloc_fs_request(channel);
1161 	if (req == NULL) {
1162 		return -ENOMEM;
1163 	}
1164 
1165 	args = &req->args;
1166 	args->fs = fs;
1167 	args->op.open.name = name;
1168 	args->op.open.flags = flags;
1169 	args->sem = &channel->sem;
1170 	fs->send_request(__fs_open_file, req);
1171 	sem_wait(&channel->sem);
1172 	rc = args->rc;
1173 	if (rc == 0) {
1174 		*file = args->file;
1175 	} else {
1176 		*file = NULL;
1177 	}
1178 	free_fs_request(req);
1179 
1180 	return rc;
1181 }
1182 
1183 static void
1184 fs_rename_blob_close_cb(void *ctx, int bserrno)
1185 {
1186 	struct spdk_fs_request *req = ctx;
1187 	struct spdk_fs_cb_args *args = &req->args;
1188 
1189 	args->fn.fs_op(args->arg, bserrno);
1190 	free_fs_request(req);
1191 }
1192 
1193 static void
1194 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1195 {
1196 	struct spdk_fs_request *req = ctx;
1197 	struct spdk_fs_cb_args *args = &req->args;
1198 	const char *new_name = args->op.rename.new_name;
1199 
1200 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1201 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1202 }
1203 
1204 static void
1205 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1206 {
1207 	struct spdk_fs_cb_args *args = &req->args;
1208 	struct spdk_file *f;
1209 
1210 	f = fs_find_file(args->fs, args->op.rename.old_name);
1211 	if (f == NULL) {
1212 		args->fn.fs_op(args->arg, -ENOENT);
1213 		free_fs_request(req);
1214 		return;
1215 	}
1216 
1217 	free(f->name);
1218 	f->name = strdup(args->op.rename.new_name);
1219 	args->file = f;
1220 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1221 }
1222 
1223 static void
1224 fs_rename_delete_done(void *arg, int fserrno)
1225 {
1226 	__spdk_fs_md_rename_file(arg);
1227 }
1228 
1229 void
1230 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1231 			  const char *old_name, const char *new_name,
1232 			  spdk_file_op_complete cb_fn, void *cb_arg)
1233 {
1234 	struct spdk_file *f;
1235 	struct spdk_fs_request *req;
1236 	struct spdk_fs_cb_args *args;
1237 
1238 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1239 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1240 		cb_fn(cb_arg, -ENAMETOOLONG);
1241 		return;
1242 	}
1243 
1244 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1245 	if (req == NULL) {
1246 		cb_fn(cb_arg, -ENOMEM);
1247 		return;
1248 	}
1249 
1250 	args = &req->args;
1251 	args->fn.fs_op = cb_fn;
1252 	args->fs = fs;
1253 	args->arg = cb_arg;
1254 	args->op.rename.old_name = old_name;
1255 	args->op.rename.new_name = new_name;
1256 
1257 	f = fs_find_file(fs, new_name);
1258 	if (f == NULL) {
1259 		__spdk_fs_md_rename_file(req);
1260 		return;
1261 	}
1262 
1263 	/*
1264 	 * The rename overwrites an existing file.  So delete the existing file, then
1265 	 *  do the actual rename.
1266 	 */
1267 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1268 }
1269 
1270 static void
1271 __fs_rename_file_done(void *arg, int fserrno)
1272 {
1273 	struct spdk_fs_request *req = arg;
1274 	struct spdk_fs_cb_args *args = &req->args;
1275 
1276 	__wake_caller(args, fserrno);
1277 }
1278 
1279 static void
1280 __fs_rename_file(void *arg)
1281 {
1282 	struct spdk_fs_request *req = arg;
1283 	struct spdk_fs_cb_args *args = &req->args;
1284 
1285 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1286 				  __fs_rename_file_done, req);
1287 }
1288 
1289 int
1290 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1291 		    const char *old_name, const char *new_name)
1292 {
1293 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1294 	struct spdk_fs_request *req;
1295 	struct spdk_fs_cb_args *args;
1296 	int rc;
1297 
1298 	req = alloc_fs_request(channel);
1299 	if (req == NULL) {
1300 		return -ENOMEM;
1301 	}
1302 
1303 	args = &req->args;
1304 
1305 	args->fs = fs;
1306 	args->op.rename.old_name = old_name;
1307 	args->op.rename.new_name = new_name;
1308 	args->sem = &channel->sem;
1309 	fs->send_request(__fs_rename_file, req);
1310 	sem_wait(&channel->sem);
1311 	rc = args->rc;
1312 	free_fs_request(req);
1313 	return rc;
1314 }
1315 
1316 static void
1317 blob_delete_cb(void *ctx, int bserrno)
1318 {
1319 	struct spdk_fs_request *req = ctx;
1320 	struct spdk_fs_cb_args *args = &req->args;
1321 
1322 	args->fn.file_op(args->arg, bserrno);
1323 	free_fs_request(req);
1324 }
1325 
1326 void
1327 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1328 			  spdk_file_op_complete cb_fn, void *cb_arg)
1329 {
1330 	struct spdk_file *f;
1331 	spdk_blob_id blobid;
1332 	struct spdk_fs_request *req;
1333 	struct spdk_fs_cb_args *args;
1334 
1335 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1336 
1337 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1338 		cb_fn(cb_arg, -ENAMETOOLONG);
1339 		return;
1340 	}
1341 
1342 	f = fs_find_file(fs, name);
1343 	if (f == NULL) {
1344 		cb_fn(cb_arg, -ENOENT);
1345 		return;
1346 	}
1347 
1348 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1349 	if (req == NULL) {
1350 		cb_fn(cb_arg, -ENOMEM);
1351 		return;
1352 	}
1353 
1354 	args = &req->args;
1355 	args->fn.file_op = cb_fn;
1356 	args->arg = cb_arg;
1357 
1358 	if (f->ref_count > 0) {
1359 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1360 		f->is_deleted = true;
1361 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1362 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1363 		return;
1364 	}
1365 
1366 	TAILQ_REMOVE(&fs->files, f, tailq);
1367 
1368 	cache_free_buffers(f);
1369 
1370 	blobid = f->blobid;
1371 
1372 	free(f->name);
1373 	free(f->tree);
1374 	free(f);
1375 
1376 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1377 }
1378 
1379 static void
1380 __fs_delete_file_done(void *arg, int fserrno)
1381 {
1382 	struct spdk_fs_request *req = arg;
1383 	struct spdk_fs_cb_args *args = &req->args;
1384 
1385 	__wake_caller(args, fserrno);
1386 }
1387 
1388 static void
1389 __fs_delete_file(void *arg)
1390 {
1391 	struct spdk_fs_request *req = arg;
1392 	struct spdk_fs_cb_args *args = &req->args;
1393 
1394 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1395 }
1396 
1397 int
1398 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1399 		    const char *name)
1400 {
1401 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1402 	struct spdk_fs_request *req;
1403 	struct spdk_fs_cb_args *args;
1404 	int rc;
1405 
1406 	req = alloc_fs_request(channel);
1407 	if (req == NULL) {
1408 		return -ENOMEM;
1409 	}
1410 
1411 	args = &req->args;
1412 	args->fs = fs;
1413 	args->op.delete.name = name;
1414 	args->sem = &channel->sem;
1415 	fs->send_request(__fs_delete_file, req);
1416 	sem_wait(&channel->sem);
1417 	rc = args->rc;
1418 	free_fs_request(req);
1419 
1420 	return rc;
1421 }
1422 
1423 spdk_fs_iter
1424 spdk_fs_iter_first(struct spdk_filesystem *fs)
1425 {
1426 	struct spdk_file *f;
1427 
1428 	f = TAILQ_FIRST(&fs->files);
1429 	return f;
1430 }
1431 
1432 spdk_fs_iter
1433 spdk_fs_iter_next(spdk_fs_iter iter)
1434 {
1435 	struct spdk_file *f = iter;
1436 
1437 	if (f == NULL) {
1438 		return NULL;
1439 	}
1440 
1441 	f = TAILQ_NEXT(f, tailq);
1442 	return f;
1443 }
1444 
1445 const char *
1446 spdk_file_get_name(struct spdk_file *file)
1447 {
1448 	return file->name;
1449 }
1450 
1451 uint64_t
1452 spdk_file_get_length(struct spdk_file *file)
1453 {
1454 	assert(file != NULL);
1455 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1456 	return file->length;
1457 }
1458 
1459 static void
1460 fs_truncate_complete_cb(void *ctx, int bserrno)
1461 {
1462 	struct spdk_fs_request *req = ctx;
1463 	struct spdk_fs_cb_args *args = &req->args;
1464 
1465 	args->fn.file_op(args->arg, bserrno);
1466 	free_fs_request(req);
1467 }
1468 
1469 static void
1470 fs_truncate_resize_cb(void *ctx, int bserrno)
1471 {
1472 	struct spdk_fs_request *req = ctx;
1473 	struct spdk_fs_cb_args *args = &req->args;
1474 	struct spdk_file *file = args->file;
1475 	uint64_t *length = &args->op.truncate.length;
1476 
1477 	if (bserrno) {
1478 		args->fn.file_op(args->arg, bserrno);
1479 		free_fs_request(req);
1480 		return;
1481 	}
1482 
1483 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1484 
1485 	file->length = *length;
1486 	if (file->append_pos > file->length) {
1487 		file->append_pos = file->length;
1488 	}
1489 
1490 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1491 }
1492 
1493 static uint64_t
1494 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1495 {
1496 	return (length + cluster_sz - 1) / cluster_sz;
1497 }
1498 
1499 void
1500 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1501 			 spdk_file_op_complete cb_fn, void *cb_arg)
1502 {
1503 	struct spdk_filesystem *fs;
1504 	size_t num_clusters;
1505 	struct spdk_fs_request *req;
1506 	struct spdk_fs_cb_args *args;
1507 
1508 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1509 	if (length == file->length) {
1510 		cb_fn(cb_arg, 0);
1511 		return;
1512 	}
1513 
1514 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1515 	if (req == NULL) {
1516 		cb_fn(cb_arg, -ENOMEM);
1517 		return;
1518 	}
1519 
1520 	args = &req->args;
1521 	args->fn.file_op = cb_fn;
1522 	args->arg = cb_arg;
1523 	args->file = file;
1524 	args->op.truncate.length = length;
1525 	fs = file->fs;
1526 
1527 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1528 
1529 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1530 }
1531 
1532 static void
1533 __truncate(void *arg)
1534 {
1535 	struct spdk_fs_request *req = arg;
1536 	struct spdk_fs_cb_args *args = &req->args;
1537 
1538 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1539 				 args->fn.file_op, args);
1540 }
1541 
1542 int
1543 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1544 		   uint64_t length)
1545 {
1546 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1547 	struct spdk_fs_request *req;
1548 	struct spdk_fs_cb_args *args;
1549 	int rc;
1550 
1551 	req = alloc_fs_request(channel);
1552 	if (req == NULL) {
1553 		return -ENOMEM;
1554 	}
1555 
1556 	args = &req->args;
1557 
1558 	args->file = file;
1559 	args->op.truncate.length = length;
1560 	args->fn.file_op = __wake_caller;
1561 	args->sem = &channel->sem;
1562 
1563 	channel->send_request(__truncate, req);
1564 	sem_wait(&channel->sem);
1565 	rc = args->rc;
1566 	free_fs_request(req);
1567 
1568 	return rc;
1569 }
1570 
1571 static void
1572 __rw_done(void *ctx, int bserrno)
1573 {
1574 	struct spdk_fs_request *req = ctx;
1575 	struct spdk_fs_cb_args *args = &req->args;
1576 
1577 	spdk_dma_free(args->op.rw.pin_buf);
1578 	args->fn.file_op(args->arg, bserrno);
1579 	free_fs_request(req);
1580 }
1581 
1582 static void
1583 __read_done(void *ctx, int bserrno)
1584 {
1585 	struct spdk_fs_request *req = ctx;
1586 	struct spdk_fs_cb_args *args = &req->args;
1587 
1588 	assert(req != NULL);
1589 	if (args->op.rw.is_read) {
1590 		memcpy(args->op.rw.user_buf,
1591 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1592 		       args->op.rw.length);
1593 		__rw_done(req, 0);
1594 	} else {
1595 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1596 		       args->op.rw.user_buf,
1597 		       args->op.rw.length);
1598 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1599 				   args->op.rw.pin_buf,
1600 				   args->op.rw.start_page, args->op.rw.num_pages,
1601 				   __rw_done, req);
1602 	}
1603 }
1604 
1605 static void
1606 __do_blob_read(void *ctx, int fserrno)
1607 {
1608 	struct spdk_fs_request *req = ctx;
1609 	struct spdk_fs_cb_args *args = &req->args;
1610 
1611 	if (fserrno) {
1612 		__rw_done(req, fserrno);
1613 		return;
1614 	}
1615 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1616 			  args->op.rw.pin_buf,
1617 			  args->op.rw.start_page, args->op.rw.num_pages,
1618 			  __read_done, req);
1619 }
1620 
1621 static void
1622 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1623 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1624 {
1625 	uint64_t end_page;
1626 
1627 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1628 	*start_page = offset / *page_size;
1629 	end_page = (offset + length - 1) / *page_size;
1630 	*num_pages = (end_page - *start_page + 1);
1631 }
1632 
1633 static void
1634 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1635 	    void *payload, uint64_t offset, uint64_t length,
1636 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1637 {
1638 	struct spdk_fs_request *req;
1639 	struct spdk_fs_cb_args *args;
1640 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1641 	uint64_t start_page, num_pages, pin_buf_length;
1642 	uint32_t page_size;
1643 
1644 	if (is_read && offset + length > file->length) {
1645 		cb_fn(cb_arg, -EINVAL);
1646 		return;
1647 	}
1648 
1649 	req = alloc_fs_request(channel);
1650 	if (req == NULL) {
1651 		cb_fn(cb_arg, -ENOMEM);
1652 		return;
1653 	}
1654 
1655 	args = &req->args;
1656 	args->fn.file_op = cb_fn;
1657 	args->arg = cb_arg;
1658 	args->file = file;
1659 	args->op.rw.channel = channel->bs_channel;
1660 	args->op.rw.user_buf = payload;
1661 	args->op.rw.is_read = is_read;
1662 	args->op.rw.offset = offset;
1663 	args->op.rw.length = length;
1664 
1665 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1666 	pin_buf_length = num_pages * page_size;
1667 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL);
1668 	if (args->op.rw.pin_buf == NULL) {
1669 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1670 			      file->name, offset, length);
1671 		free_fs_request(req);
1672 		cb_fn(cb_arg, -ENOMEM);
1673 		return;
1674 	}
1675 
1676 	args->op.rw.start_page = start_page;
1677 	args->op.rw.num_pages = num_pages;
1678 
1679 	if (!is_read && file->length < offset + length) {
1680 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1681 	} else {
1682 		__do_blob_read(req, 0);
1683 	}
1684 }
1685 
1686 void
1687 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1688 		      void *payload, uint64_t offset, uint64_t length,
1689 		      spdk_file_op_complete cb_fn, void *cb_arg)
1690 {
1691 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1692 }
1693 
1694 void
1695 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1696 		     void *payload, uint64_t offset, uint64_t length,
1697 		     spdk_file_op_complete cb_fn, void *cb_arg)
1698 {
1699 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1700 		      file->name, offset, length);
1701 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1702 }
1703 
1704 struct spdk_io_channel *
1705 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1706 {
1707 	struct spdk_io_channel *io_channel;
1708 	struct spdk_fs_channel *fs_channel;
1709 
1710 	io_channel = spdk_get_io_channel(&fs->io_target);
1711 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1712 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1713 	fs_channel->send_request = __send_request_direct;
1714 
1715 	return io_channel;
1716 }
1717 
1718 struct spdk_io_channel *
1719 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1720 {
1721 	struct spdk_io_channel *io_channel;
1722 	struct spdk_fs_channel *fs_channel;
1723 
1724 	io_channel = spdk_get_io_channel(&fs->io_target);
1725 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1726 	fs_channel->send_request = fs->send_request;
1727 	fs_channel->sync = 1;
1728 	pthread_spin_init(&fs_channel->lock, 0);
1729 
1730 	return io_channel;
1731 }
1732 
1733 void
1734 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1735 {
1736 	spdk_put_io_channel(channel);
1737 }
1738 
1739 void
1740 spdk_fs_set_cache_size(uint64_t size_in_mb)
1741 {
1742 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1743 }
1744 
1745 uint64_t
1746 spdk_fs_get_cache_size(void)
1747 {
1748 	return g_fs_cache_size / (1024 * 1024);
1749 }
1750 
1751 static void __file_flush(void *_args);
1752 
1753 static void *
1754 alloc_cache_memory_buffer(struct spdk_file *context)
1755 {
1756 	struct spdk_file *file;
1757 	void *buf;
1758 
1759 	buf = spdk_mempool_get(g_cache_pool);
1760 	if (buf != NULL) {
1761 		return buf;
1762 	}
1763 
1764 	pthread_spin_lock(&g_caches_lock);
1765 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1766 		if (!file->open_for_writing &&
1767 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1768 		    file != context) {
1769 			break;
1770 		}
1771 	}
1772 	pthread_spin_unlock(&g_caches_lock);
1773 	if (file != NULL) {
1774 		cache_free_buffers(file);
1775 		buf = spdk_mempool_get(g_cache_pool);
1776 		if (buf != NULL) {
1777 			return buf;
1778 		}
1779 	}
1780 
1781 	pthread_spin_lock(&g_caches_lock);
1782 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1783 		if (!file->open_for_writing && file != context) {
1784 			break;
1785 		}
1786 	}
1787 	pthread_spin_unlock(&g_caches_lock);
1788 	if (file != NULL) {
1789 		cache_free_buffers(file);
1790 		buf = spdk_mempool_get(g_cache_pool);
1791 		if (buf != NULL) {
1792 			return buf;
1793 		}
1794 	}
1795 
1796 	pthread_spin_lock(&g_caches_lock);
1797 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1798 		if (file != context) {
1799 			break;
1800 		}
1801 	}
1802 	pthread_spin_unlock(&g_caches_lock);
1803 	if (file != NULL) {
1804 		cache_free_buffers(file);
1805 		buf = spdk_mempool_get(g_cache_pool);
1806 		if (buf != NULL) {
1807 			return buf;
1808 		}
1809 	}
1810 
1811 	return NULL;
1812 }
1813 
1814 static struct cache_buffer *
1815 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1816 {
1817 	struct cache_buffer *buf;
1818 	int count = 0;
1819 
1820 	buf = calloc(1, sizeof(*buf));
1821 	if (buf == NULL) {
1822 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1823 		return NULL;
1824 	}
1825 
1826 	buf->buf = alloc_cache_memory_buffer(file);
1827 	while (buf->buf == NULL) {
1828 		/*
1829 		 * TODO: alloc_cache_memory_buffer() should eventually free
1830 		 *  some buffers.  Need a more sophisticated check here, instead
1831 		 *  of just bailing if 100 tries does not result in getting a
1832 		 *  free buffer.  This will involve using the sync channel's
1833 		 *  semaphore to block until a buffer becomes available.
1834 		 */
1835 		if (count++ == 100) {
1836 			SPDK_ERRLOG("could not allocate cache buffer\n");
1837 			assert(false);
1838 			free(buf);
1839 			return NULL;
1840 		}
1841 		buf->buf = alloc_cache_memory_buffer(file);
1842 	}
1843 
1844 	buf->buf_size = CACHE_BUFFER_SIZE;
1845 	buf->offset = offset;
1846 
1847 	pthread_spin_lock(&g_caches_lock);
1848 	if (file->tree->present_mask == 0) {
1849 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1850 	}
1851 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1852 	pthread_spin_unlock(&g_caches_lock);
1853 
1854 	return buf;
1855 }
1856 
1857 static struct cache_buffer *
1858 cache_append_buffer(struct spdk_file *file)
1859 {
1860 	struct cache_buffer *last;
1861 
1862 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1863 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1864 
1865 	last = cache_insert_buffer(file, file->append_pos);
1866 	if (last == NULL) {
1867 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1868 		return NULL;
1869 	}
1870 
1871 	file->last = last;
1872 
1873 	return last;
1874 }
1875 
1876 static void __check_sync_reqs(struct spdk_file *file);
1877 
1878 static void
1879 __file_cache_finish_sync(void *ctx, int bserrno)
1880 {
1881 	struct spdk_file *file = ctx;
1882 	struct spdk_fs_request *sync_req;
1883 	struct spdk_fs_cb_args *sync_args;
1884 
1885 	pthread_spin_lock(&file->lock);
1886 	sync_req = TAILQ_FIRST(&file->sync_requests);
1887 	sync_args = &sync_req->args;
1888 	assert(sync_args->op.sync.offset <= file->length_flushed);
1889 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1890 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1891 	pthread_spin_unlock(&file->lock);
1892 
1893 	sync_args->fn.file_op(sync_args->arg, bserrno);
1894 	__check_sync_reqs(file);
1895 
1896 	pthread_spin_lock(&file->lock);
1897 	free_fs_request(sync_req);
1898 	pthread_spin_unlock(&file->lock);
1899 }
1900 
1901 static void
1902 __free_args(struct spdk_fs_cb_args *args)
1903 {
1904 	struct spdk_fs_request *req;
1905 
1906 	if (!args->from_request) {
1907 		free(args);
1908 	} else {
1909 		/* Depends on args being at the start of the spdk_fs_request structure. */
1910 		req = (struct spdk_fs_request *)args;
1911 		free_fs_request(req);
1912 	}
1913 }
1914 
1915 static void
1916 __check_sync_reqs(struct spdk_file *file)
1917 {
1918 	struct spdk_fs_request *sync_req;
1919 
1920 	pthread_spin_lock(&file->lock);
1921 
1922 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1923 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1924 			break;
1925 		}
1926 	}
1927 
1928 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1929 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1930 		sync_req->args.op.sync.xattr_in_progress = true;
1931 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1932 				    sizeof(file->length_flushed));
1933 
1934 		pthread_spin_unlock(&file->lock);
1935 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1936 	} else {
1937 		pthread_spin_unlock(&file->lock);
1938 	}
1939 }
1940 
1941 static void
1942 __file_flush_done(void *arg, int bserrno)
1943 {
1944 	struct spdk_fs_cb_args *args = arg;
1945 	struct spdk_file *file = args->file;
1946 	struct cache_buffer *next = args->op.flush.cache_buffer;
1947 
1948 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1949 
1950 	pthread_spin_lock(&file->lock);
1951 	next->in_progress = false;
1952 	next->bytes_flushed += args->op.flush.length;
1953 	file->length_flushed += args->op.flush.length;
1954 	if (file->length_flushed > file->length) {
1955 		file->length = file->length_flushed;
1956 	}
1957 	if (next->bytes_flushed == next->buf_size) {
1958 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1959 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1960 	}
1961 
1962 	/*
1963 	 * Assert that there is no cached data that extends past the end of the underlying
1964 	 *  blob.
1965 	 */
1966 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1967 	       next->bytes_filled == 0);
1968 
1969 	pthread_spin_unlock(&file->lock);
1970 
1971 	__check_sync_reqs(file);
1972 
1973 	__file_flush(args);
1974 }
1975 
1976 static void
1977 __file_flush(void *_args)
1978 {
1979 	struct spdk_fs_cb_args *args = _args;
1980 	struct spdk_file *file = args->file;
1981 	struct cache_buffer *next;
1982 	uint64_t offset, length, start_page, num_pages;
1983 	uint32_t page_size;
1984 
1985 	pthread_spin_lock(&file->lock);
1986 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1987 	if (next == NULL || next->in_progress) {
1988 		/*
1989 		 * There is either no data to flush, or a flush I/O is already in
1990 		 *  progress.  So return immediately - if a flush I/O is in
1991 		 *  progress we will flush more data after that is completed.
1992 		 */
1993 		__free_args(args);
1994 		if (next == NULL) {
1995 			/*
1996 			 * For cases where a file's cache was evicted, and then the
1997 			 *  file was later appended, we will write the data directly
1998 			 *  to disk and bypass cache.  So just update length_flushed
1999 			 *  here to reflect that all data was already written to disk.
2000 			 */
2001 			file->length_flushed = file->append_pos;
2002 		}
2003 		pthread_spin_unlock(&file->lock);
2004 		if (next == NULL) {
2005 			/*
2006 			 * There is no data to flush, but we still need to check for any
2007 			 *  outstanding sync requests to make sure metadata gets updated.
2008 			 */
2009 			__check_sync_reqs(file);
2010 		}
2011 		return;
2012 	}
2013 
2014 	offset = next->offset + next->bytes_flushed;
2015 	length = next->bytes_filled - next->bytes_flushed;
2016 	if (length == 0) {
2017 		__free_args(args);
2018 		pthread_spin_unlock(&file->lock);
2019 		return;
2020 	}
2021 	args->op.flush.length = length;
2022 	args->op.flush.cache_buffer = next;
2023 
2024 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2025 
2026 	next->in_progress = true;
2027 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2028 		     offset, length, start_page, num_pages);
2029 	pthread_spin_unlock(&file->lock);
2030 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2031 			   next->buf + (start_page * page_size) - next->offset,
2032 			   start_page, num_pages, __file_flush_done, args);
2033 }
2034 
2035 static void
2036 __file_extend_done(void *arg, int bserrno)
2037 {
2038 	struct spdk_fs_cb_args *args = arg;
2039 
2040 	__wake_caller(args, bserrno);
2041 }
2042 
2043 static void
2044 __file_extend_resize_cb(void *_args, int bserrno)
2045 {
2046 	struct spdk_fs_cb_args *args = _args;
2047 	struct spdk_file *file = args->file;
2048 
2049 	if (bserrno) {
2050 		__wake_caller(args, bserrno);
2051 		return;
2052 	}
2053 
2054 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2055 }
2056 
2057 static void
2058 __file_extend_blob(void *_args)
2059 {
2060 	struct spdk_fs_cb_args *args = _args;
2061 	struct spdk_file *file = args->file;
2062 
2063 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2064 }
2065 
2066 static void
2067 __rw_from_file_done(void *arg, int bserrno)
2068 {
2069 	struct spdk_fs_cb_args *args = arg;
2070 
2071 	__wake_caller(args, bserrno);
2072 	__free_args(args);
2073 }
2074 
2075 static void
2076 __rw_from_file(void *_args)
2077 {
2078 	struct spdk_fs_cb_args *args = _args;
2079 	struct spdk_file *file = args->file;
2080 
2081 	if (args->op.rw.is_read) {
2082 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2083 				     args->op.rw.offset, args->op.rw.length,
2084 				     __rw_from_file_done, args);
2085 	} else {
2086 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2087 				      args->op.rw.offset, args->op.rw.length,
2088 				      __rw_from_file_done, args);
2089 	}
2090 }
2091 
2092 static int
2093 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
2094 		    uint64_t offset, uint64_t length, bool is_read)
2095 {
2096 	struct spdk_fs_cb_args *args;
2097 
2098 	args = calloc(1, sizeof(*args));
2099 	if (args == NULL) {
2100 		sem_post(sem);
2101 		return -ENOMEM;
2102 	}
2103 
2104 	args->file = file;
2105 	args->sem = sem;
2106 	args->op.rw.user_buf = payload;
2107 	args->op.rw.offset = offset;
2108 	args->op.rw.length = length;
2109 	args->op.rw.is_read = is_read;
2110 	file->fs->send_request(__rw_from_file, args);
2111 	return 0;
2112 }
2113 
2114 int
2115 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
2116 		void *payload, uint64_t offset, uint64_t length)
2117 {
2118 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2119 	struct spdk_fs_cb_args *args;
2120 	uint64_t rem_length, copy, blob_size, cluster_sz;
2121 	uint32_t cache_buffers_filled = 0;
2122 	uint8_t *cur_payload;
2123 	struct cache_buffer *last;
2124 
2125 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2126 
2127 	if (length == 0) {
2128 		return 0;
2129 	}
2130 
2131 	if (offset != file->append_pos) {
2132 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2133 		return -EINVAL;
2134 	}
2135 
2136 	pthread_spin_lock(&file->lock);
2137 	file->open_for_writing = true;
2138 
2139 	if (file->last == NULL) {
2140 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
2141 			cache_append_buffer(file);
2142 		} else {
2143 			int rc;
2144 
2145 			file->append_pos += length;
2146 			pthread_spin_unlock(&file->lock);
2147 			rc = __send_rw_from_file(file, &channel->sem, payload,
2148 						 offset, length, false);
2149 			sem_wait(&channel->sem);
2150 			return rc;
2151 		}
2152 	}
2153 
2154 	blob_size = __file_get_blob_size(file);
2155 
2156 	if ((offset + length) > blob_size) {
2157 		struct spdk_fs_cb_args extend_args = {};
2158 
2159 		cluster_sz = file->fs->bs_opts.cluster_sz;
2160 		extend_args.sem = &channel->sem;
2161 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2162 		extend_args.file = file;
2163 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2164 		pthread_spin_unlock(&file->lock);
2165 		file->fs->send_request(__file_extend_blob, &extend_args);
2166 		sem_wait(&channel->sem);
2167 		if (extend_args.rc) {
2168 			return extend_args.rc;
2169 		}
2170 	}
2171 
2172 	last = file->last;
2173 	rem_length = length;
2174 	cur_payload = payload;
2175 	while (rem_length > 0) {
2176 		copy = last->buf_size - last->bytes_filled;
2177 		if (copy > rem_length) {
2178 			copy = rem_length;
2179 		}
2180 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2181 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2182 		file->append_pos += copy;
2183 		if (file->length < file->append_pos) {
2184 			file->length = file->append_pos;
2185 		}
2186 		cur_payload += copy;
2187 		last->bytes_filled += copy;
2188 		rem_length -= copy;
2189 		if (last->bytes_filled == last->buf_size) {
2190 			cache_buffers_filled++;
2191 			last = cache_append_buffer(file);
2192 			if (last == NULL) {
2193 				BLOBFS_TRACE(file, "nomem\n");
2194 				pthread_spin_unlock(&file->lock);
2195 				return -ENOMEM;
2196 			}
2197 		}
2198 	}
2199 
2200 	pthread_spin_unlock(&file->lock);
2201 
2202 	if (cache_buffers_filled == 0) {
2203 		return 0;
2204 	}
2205 
2206 	args = calloc(1, sizeof(*args));
2207 	if (args == NULL) {
2208 		return -ENOMEM;
2209 	}
2210 
2211 	args->file = file;
2212 	file->fs->send_request(__file_flush, args);
2213 	return 0;
2214 }
2215 
2216 static void
2217 __readahead_done(void *arg, int bserrno)
2218 {
2219 	struct spdk_fs_cb_args *args = arg;
2220 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2221 	struct spdk_file *file = args->file;
2222 
2223 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2224 
2225 	pthread_spin_lock(&file->lock);
2226 	cache_buffer->bytes_filled = args->op.readahead.length;
2227 	cache_buffer->bytes_flushed = args->op.readahead.length;
2228 	cache_buffer->in_progress = false;
2229 	pthread_spin_unlock(&file->lock);
2230 
2231 	__free_args(args);
2232 }
2233 
2234 static void
2235 __readahead(void *_args)
2236 {
2237 	struct spdk_fs_cb_args *args = _args;
2238 	struct spdk_file *file = args->file;
2239 	uint64_t offset, length, start_page, num_pages;
2240 	uint32_t page_size;
2241 
2242 	offset = args->op.readahead.offset;
2243 	length = args->op.readahead.length;
2244 	assert(length > 0);
2245 
2246 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2247 
2248 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2249 		     offset, length, start_page, num_pages);
2250 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2251 			  args->op.readahead.cache_buffer->buf,
2252 			  start_page, num_pages, __readahead_done, args);
2253 }
2254 
2255 static uint64_t
2256 __next_cache_buffer_offset(uint64_t offset)
2257 {
2258 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2259 }
2260 
2261 static void
2262 check_readahead(struct spdk_file *file, uint64_t offset)
2263 {
2264 	struct spdk_fs_cb_args *args;
2265 
2266 	offset = __next_cache_buffer_offset(offset);
2267 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2268 		return;
2269 	}
2270 
2271 	args = calloc(1, sizeof(*args));
2272 	if (args == NULL) {
2273 		return;
2274 	}
2275 
2276 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2277 
2278 	args->file = file;
2279 	args->op.readahead.offset = offset;
2280 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2281 	if (!args->op.readahead.cache_buffer) {
2282 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2283 		free(args);
2284 		return;
2285 	}
2286 
2287 	args->op.readahead.cache_buffer->in_progress = true;
2288 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2289 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2290 	} else {
2291 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2292 	}
2293 	file->fs->send_request(__readahead, args);
2294 }
2295 
2296 static int
2297 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2298 {
2299 	struct cache_buffer *buf;
2300 	int rc;
2301 
2302 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2303 	if (buf == NULL) {
2304 		pthread_spin_unlock(&file->lock);
2305 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2306 		pthread_spin_lock(&file->lock);
2307 		return rc;
2308 	}
2309 
2310 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2311 		length = buf->offset + buf->bytes_filled - offset;
2312 	}
2313 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2314 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2315 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2316 		pthread_spin_lock(&g_caches_lock);
2317 		spdk_tree_remove_buffer(file->tree, buf);
2318 		if (file->tree->present_mask == 0) {
2319 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2320 		}
2321 		pthread_spin_unlock(&g_caches_lock);
2322 	}
2323 
2324 	sem_post(sem);
2325 	return 0;
2326 }
2327 
2328 int64_t
2329 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2330 	       void *payload, uint64_t offset, uint64_t length)
2331 {
2332 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2333 	uint64_t final_offset, final_length;
2334 	uint32_t sub_reads = 0;
2335 	int rc = 0;
2336 
2337 	pthread_spin_lock(&file->lock);
2338 
2339 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2340 
2341 	file->open_for_writing = false;
2342 
2343 	if (length == 0 || offset >= file->append_pos) {
2344 		pthread_spin_unlock(&file->lock);
2345 		return 0;
2346 	}
2347 
2348 	if (offset + length > file->append_pos) {
2349 		length = file->append_pos - offset;
2350 	}
2351 
2352 	if (offset != file->next_seq_offset) {
2353 		file->seq_byte_count = 0;
2354 	}
2355 	file->seq_byte_count += length;
2356 	file->next_seq_offset = offset + length;
2357 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2358 		check_readahead(file, offset);
2359 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2360 	}
2361 
2362 	final_length = 0;
2363 	final_offset = offset + length;
2364 	while (offset < final_offset) {
2365 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2366 		if (length > (final_offset - offset)) {
2367 			length = final_offset - offset;
2368 		}
2369 		rc = __file_read(file, payload, offset, length, &channel->sem);
2370 		if (rc == 0) {
2371 			final_length += length;
2372 		} else {
2373 			break;
2374 		}
2375 		payload += length;
2376 		offset += length;
2377 		sub_reads++;
2378 	}
2379 	pthread_spin_unlock(&file->lock);
2380 	while (sub_reads-- > 0) {
2381 		sem_wait(&channel->sem);
2382 	}
2383 	if (rc == 0) {
2384 		return final_length;
2385 	} else {
2386 		return rc;
2387 	}
2388 }
2389 
2390 static void
2391 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2392 	   spdk_file_op_complete cb_fn, void *cb_arg)
2393 {
2394 	struct spdk_fs_request *sync_req;
2395 	struct spdk_fs_request *flush_req;
2396 	struct spdk_fs_cb_args *sync_args;
2397 	struct spdk_fs_cb_args *flush_args;
2398 
2399 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2400 
2401 	pthread_spin_lock(&file->lock);
2402 	if (file->append_pos <= file->length_flushed) {
2403 		BLOBFS_TRACE(file, "done - no data to flush\n");
2404 		pthread_spin_unlock(&file->lock);
2405 		cb_fn(cb_arg, 0);
2406 		return;
2407 	}
2408 
2409 	sync_req = alloc_fs_request(channel);
2410 	if (!sync_req) {
2411 		pthread_spin_unlock(&file->lock);
2412 		cb_fn(cb_arg, -ENOMEM);
2413 		return;
2414 	}
2415 	sync_args = &sync_req->args;
2416 
2417 	flush_req = alloc_fs_request(channel);
2418 	if (!flush_req) {
2419 		pthread_spin_unlock(&file->lock);
2420 		cb_fn(cb_arg, -ENOMEM);
2421 		return;
2422 	}
2423 	flush_args = &flush_req->args;
2424 
2425 	sync_args->file = file;
2426 	sync_args->fn.file_op = cb_fn;
2427 	sync_args->arg = cb_arg;
2428 	sync_args->op.sync.offset = file->append_pos;
2429 	sync_args->op.sync.xattr_in_progress = false;
2430 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2431 	pthread_spin_unlock(&file->lock);
2432 
2433 	flush_args->file = file;
2434 	channel->send_request(__file_flush, flush_args);
2435 }
2436 
2437 int
2438 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2439 {
2440 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2441 	struct spdk_fs_cb_args args = {};
2442 
2443 	args.sem = &channel->sem;
2444 	_file_sync(file, channel, __wake_caller, &args);
2445 	sem_wait(&channel->sem);
2446 
2447 	return args.rc;
2448 }
2449 
2450 void
2451 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2452 		     spdk_file_op_complete cb_fn, void *cb_arg)
2453 {
2454 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2455 
2456 	_file_sync(file, channel, cb_fn, cb_arg);
2457 }
2458 
2459 void
2460 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2461 {
2462 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2463 	file->priority = priority;
2464 
2465 }
2466 
2467 /*
2468  * Close routines
2469  */
2470 
2471 static void
2472 __file_close_async_done(void *ctx, int bserrno)
2473 {
2474 	struct spdk_fs_request *req = ctx;
2475 	struct spdk_fs_cb_args *args = &req->args;
2476 	struct spdk_file *file = args->file;
2477 
2478 	if (file->is_deleted) {
2479 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2480 		return;
2481 	}
2482 
2483 	args->fn.file_op(args->arg, bserrno);
2484 	free_fs_request(req);
2485 }
2486 
2487 static void
2488 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2489 {
2490 	struct spdk_blob *blob;
2491 
2492 	pthread_spin_lock(&file->lock);
2493 	if (file->ref_count == 0) {
2494 		pthread_spin_unlock(&file->lock);
2495 		__file_close_async_done(req, -EBADF);
2496 		return;
2497 	}
2498 
2499 	file->ref_count--;
2500 	if (file->ref_count > 0) {
2501 		pthread_spin_unlock(&file->lock);
2502 		req->args.fn.file_op(req->args.arg, 0);
2503 		free_fs_request(req);
2504 		return;
2505 	}
2506 
2507 	pthread_spin_unlock(&file->lock);
2508 
2509 	blob = file->blob;
2510 	file->blob = NULL;
2511 	spdk_blob_close(blob, __file_close_async_done, req);
2512 }
2513 
2514 static void
2515 __file_close_async__sync_done(void *arg, int fserrno)
2516 {
2517 	struct spdk_fs_request *req = arg;
2518 	struct spdk_fs_cb_args *args = &req->args;
2519 
2520 	__file_close_async(args->file, req);
2521 }
2522 
2523 void
2524 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2525 {
2526 	struct spdk_fs_request *req;
2527 	struct spdk_fs_cb_args *args;
2528 
2529 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2530 	if (req == NULL) {
2531 		cb_fn(cb_arg, -ENOMEM);
2532 		return;
2533 	}
2534 
2535 	args = &req->args;
2536 	args->file = file;
2537 	args->fn.file_op = cb_fn;
2538 	args->arg = cb_arg;
2539 
2540 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2541 }
2542 
2543 static void
2544 __file_close(void *arg)
2545 {
2546 	struct spdk_fs_request *req = arg;
2547 	struct spdk_fs_cb_args *args = &req->args;
2548 	struct spdk_file *file = args->file;
2549 
2550 	__file_close_async(file, req);
2551 }
2552 
2553 int
2554 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2555 {
2556 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2557 	struct spdk_fs_request *req;
2558 	struct spdk_fs_cb_args *args;
2559 
2560 	req = alloc_fs_request(channel);
2561 	if (req == NULL) {
2562 		return -ENOMEM;
2563 	}
2564 
2565 	args = &req->args;
2566 
2567 	spdk_file_sync(file, _channel);
2568 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2569 	args->file = file;
2570 	args->sem = &channel->sem;
2571 	args->fn.file_op = __wake_caller;
2572 	args->arg = req;
2573 	channel->send_request(__file_close, req);
2574 	sem_wait(&channel->sem);
2575 
2576 	return args->rc;
2577 }
2578 
2579 int
2580 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2581 {
2582 	if (size < sizeof(spdk_blob_id)) {
2583 		return -EINVAL;
2584 	}
2585 
2586 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2587 
2588 	return sizeof(spdk_blob_id);
2589 }
2590 
2591 static void
2592 cache_free_buffers(struct spdk_file *file)
2593 {
2594 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2595 	pthread_spin_lock(&file->lock);
2596 	pthread_spin_lock(&g_caches_lock);
2597 	if (file->tree->present_mask == 0) {
2598 		pthread_spin_unlock(&g_caches_lock);
2599 		pthread_spin_unlock(&file->lock);
2600 		return;
2601 	}
2602 	spdk_tree_free_buffers(file->tree);
2603 
2604 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2605 	/* If not freed, put it in the end of the queue */
2606 	if (file->tree->present_mask != 0) {
2607 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2608 	}
2609 	file->last = NULL;
2610 	pthread_spin_unlock(&g_caches_lock);
2611 	pthread_spin_unlock(&file->lock);
2612 }
2613 
2614 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2615 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2616