xref: /spdk/lib/blobfs/blobfs.c (revision 282463f53cad9b2aec79245008078a4990018863)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 static void
64 __sem_post(void *arg, int bserrno)
65 {
66 	sem_t *sem = arg;
67 
68 	sem_post(sem);
69 }
70 
71 void
72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
73 {
74 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
75 	free(cache_buffer);
76 }
77 
78 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
79 
80 struct spdk_file {
81 	struct spdk_filesystem	*fs;
82 	struct spdk_blob	*blob;
83 	char			*name;
84 	uint64_t		length;
85 	bool                    is_deleted;
86 	bool			open_for_writing;
87 	uint64_t		length_flushed;
88 	uint64_t		append_pos;
89 	uint64_t		seq_byte_count;
90 	uint64_t		next_seq_offset;
91 	uint32_t		priority;
92 	TAILQ_ENTRY(spdk_file)	tailq;
93 	spdk_blob_id		blobid;
94 	uint32_t		ref_count;
95 	pthread_spinlock_t	lock;
96 	struct cache_buffer	*last;
97 	struct cache_tree	*tree;
98 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
99 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
100 	TAILQ_ENTRY(spdk_file)	cache_tailq;
101 };
102 
103 struct spdk_deleted_file {
104 	spdk_blob_id	id;
105 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
106 };
107 
108 struct spdk_filesystem {
109 	struct spdk_blob_store	*bs;
110 	TAILQ_HEAD(, spdk_file)	files;
111 	struct spdk_bs_opts	bs_opts;
112 	struct spdk_bs_dev	*bdev;
113 	fs_send_request_fn	send_request;
114 
115 	struct {
116 		uint32_t		max_ops;
117 		struct spdk_io_channel	*sync_io_channel;
118 		struct spdk_fs_channel	*sync_fs_channel;
119 	} sync_target;
120 
121 	struct {
122 		uint32_t		max_ops;
123 		struct spdk_io_channel	*md_io_channel;
124 		struct spdk_fs_channel	*md_fs_channel;
125 	} md_target;
126 
127 	struct {
128 		uint32_t		max_ops;
129 	} io_target;
130 };
131 
132 struct spdk_fs_cb_args {
133 	union {
134 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
135 		spdk_fs_op_complete			fs_op;
136 		spdk_file_op_with_handle_complete	file_op_with_handle;
137 		spdk_file_op_complete			file_op;
138 		spdk_file_stat_op_complete		stat_op;
139 	} fn;
140 	void *arg;
141 	sem_t *sem;
142 	struct spdk_filesystem *fs;
143 	struct spdk_file *file;
144 	int rc;
145 	bool from_request;
146 	union {
147 		struct {
148 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
149 		} fs_load;
150 		struct {
151 			uint64_t	length;
152 		} truncate;
153 		struct {
154 			struct spdk_io_channel	*channel;
155 			void		*user_buf;
156 			void		*pin_buf;
157 			int		is_read;
158 			off_t		offset;
159 			size_t		length;
160 			uint64_t	start_page;
161 			uint64_t	num_pages;
162 			uint32_t	blocklen;
163 		} rw;
164 		struct {
165 			const char	*old_name;
166 			const char	*new_name;
167 		} rename;
168 		struct {
169 			struct cache_buffer	*cache_buffer;
170 			uint64_t		length;
171 		} flush;
172 		struct {
173 			struct cache_buffer	*cache_buffer;
174 			uint64_t		length;
175 			uint64_t		offset;
176 		} readahead;
177 		struct {
178 			uint64_t			offset;
179 			TAILQ_ENTRY(spdk_fs_request)	tailq;
180 			bool				xattr_in_progress;
181 		} sync;
182 		struct {
183 			uint32_t			num_clusters;
184 		} resize;
185 		struct {
186 			const char	*name;
187 			uint32_t	flags;
188 			TAILQ_ENTRY(spdk_fs_request)	tailq;
189 		} open;
190 		struct {
191 			const char		*name;
192 			struct spdk_blob	*blob;
193 		} create;
194 		struct {
195 			const char	*name;
196 		} delete;
197 		struct {
198 			const char	*name;
199 		} stat;
200 	} op;
201 };
202 
203 static void cache_free_buffers(struct spdk_file *file);
204 
205 void
206 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
207 {
208 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
209 }
210 
211 static void
212 __initialize_cache(void)
213 {
214 	assert(g_cache_pool == NULL);
215 
216 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
217 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
218 					   CACHE_BUFFER_SIZE,
219 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
220 					   SPDK_ENV_SOCKET_ID_ANY);
221 	if (!g_cache_pool) {
222 		SPDK_ERRLOG("Create mempool failed, you may "
223 			    "increase the memory and try again\n");
224 		assert(false);
225 	}
226 	TAILQ_INIT(&g_caches);
227 	pthread_spin_init(&g_caches_lock, 0);
228 }
229 
230 static void
231 __free_cache(void)
232 {
233 	assert(g_cache_pool != NULL);
234 
235 	spdk_mempool_free(g_cache_pool);
236 	g_cache_pool = NULL;
237 }
238 
239 static uint64_t
240 __file_get_blob_size(struct spdk_file *file)
241 {
242 	uint64_t cluster_sz;
243 
244 	cluster_sz = file->fs->bs_opts.cluster_sz;
245 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
246 }
247 
248 struct spdk_fs_request {
249 	struct spdk_fs_cb_args		args;
250 	TAILQ_ENTRY(spdk_fs_request)	link;
251 	struct spdk_fs_channel		*channel;
252 };
253 
254 struct spdk_fs_channel {
255 	struct spdk_fs_request		*req_mem;
256 	TAILQ_HEAD(, spdk_fs_request)	reqs;
257 	sem_t				sem;
258 	struct spdk_filesystem		*fs;
259 	struct spdk_io_channel		*bs_channel;
260 	fs_send_request_fn		send_request;
261 	bool				sync;
262 	pthread_spinlock_t		lock;
263 };
264 
265 static struct spdk_fs_request *
266 alloc_fs_request(struct spdk_fs_channel *channel)
267 {
268 	struct spdk_fs_request *req;
269 
270 	if (channel->sync) {
271 		pthread_spin_lock(&channel->lock);
272 	}
273 
274 	req = TAILQ_FIRST(&channel->reqs);
275 	if (req) {
276 		TAILQ_REMOVE(&channel->reqs, req, link);
277 	}
278 
279 	if (channel->sync) {
280 		pthread_spin_unlock(&channel->lock);
281 	}
282 
283 	if (req == NULL) {
284 		return NULL;
285 	}
286 	memset(req, 0, sizeof(*req));
287 	req->channel = channel;
288 	req->args.from_request = true;
289 
290 	return req;
291 }
292 
293 static void
294 free_fs_request(struct spdk_fs_request *req)
295 {
296 	struct spdk_fs_channel *channel = req->channel;
297 
298 	if (channel->sync) {
299 		pthread_spin_lock(&channel->lock);
300 	}
301 
302 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
303 
304 	if (channel->sync) {
305 		pthread_spin_unlock(&channel->lock);
306 	}
307 }
308 
309 static int
310 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
311 			uint32_t max_ops)
312 {
313 	uint32_t i;
314 
315 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
316 	if (!channel->req_mem) {
317 		return -1;
318 	}
319 
320 	TAILQ_INIT(&channel->reqs);
321 	sem_init(&channel->sem, 0, 0);
322 
323 	for (i = 0; i < max_ops; i++) {
324 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
325 	}
326 
327 	channel->fs = fs;
328 
329 	return 0;
330 }
331 
332 static int
333 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
334 {
335 	struct spdk_filesystem		*fs;
336 	struct spdk_fs_channel		*channel = ctx_buf;
337 
338 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
339 
340 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
341 }
342 
343 static int
344 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
345 {
346 	struct spdk_filesystem		*fs;
347 	struct spdk_fs_channel		*channel = ctx_buf;
348 
349 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
350 
351 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
352 }
353 
354 static int
355 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
356 {
357 	struct spdk_filesystem		*fs;
358 	struct spdk_fs_channel		*channel = ctx_buf;
359 
360 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
361 
362 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
363 }
364 
365 static void
366 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
367 {
368 	struct spdk_fs_channel *channel = ctx_buf;
369 
370 	free(channel->req_mem);
371 	if (channel->bs_channel != NULL) {
372 		spdk_bs_free_io_channel(channel->bs_channel);
373 	}
374 }
375 
376 static void
377 __send_request_direct(fs_request_fn fn, void *arg)
378 {
379 	fn(arg);
380 }
381 
382 static void
383 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
384 {
385 	fs->bs = bs;
386 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
387 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
388 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
389 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
390 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
391 
392 	pthread_mutex_lock(&g_cache_init_lock);
393 	if (g_fs_count == 0) {
394 		__initialize_cache();
395 	}
396 	g_fs_count++;
397 	pthread_mutex_unlock(&g_cache_init_lock);
398 }
399 
400 static void
401 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
402 {
403 	struct spdk_fs_request *req = ctx;
404 	struct spdk_fs_cb_args *args = &req->args;
405 	struct spdk_filesystem *fs = args->fs;
406 
407 	if (bserrno == 0) {
408 		common_fs_bs_init(fs, bs);
409 	} else {
410 		free(fs);
411 		fs = NULL;
412 	}
413 
414 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
415 	free_fs_request(req);
416 }
417 
418 static void
419 fs_conf_parse(void)
420 {
421 	struct spdk_conf_section *sp;
422 
423 	sp = spdk_conf_find_section(NULL, "Blobfs");
424 	if (sp == NULL) {
425 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
426 		return;
427 	}
428 
429 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
430 	if (g_fs_cache_buffer_shift <= 0) {
431 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
432 	}
433 }
434 
435 static struct spdk_filesystem *
436 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
437 {
438 	struct spdk_filesystem *fs;
439 
440 	fs = calloc(1, sizeof(*fs));
441 	if (fs == NULL) {
442 		return NULL;
443 	}
444 
445 	fs->bdev = dev;
446 	fs->send_request = send_request_fn;
447 	TAILQ_INIT(&fs->files);
448 
449 	fs->md_target.max_ops = 512;
450 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
451 				sizeof(struct spdk_fs_channel));
452 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
453 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
454 
455 	fs->sync_target.max_ops = 512;
456 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
457 				sizeof(struct spdk_fs_channel));
458 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
459 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
460 
461 	fs->io_target.max_ops = 512;
462 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
463 				sizeof(struct spdk_fs_channel));
464 
465 	return fs;
466 }
467 
468 void
469 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
470 	     fs_send_request_fn send_request_fn,
471 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
472 {
473 	struct spdk_filesystem *fs;
474 	struct spdk_fs_request *req;
475 	struct spdk_fs_cb_args *args;
476 	struct spdk_bs_opts opts = {};
477 
478 	fs = fs_alloc(dev, send_request_fn);
479 	if (fs == NULL) {
480 		cb_fn(cb_arg, NULL, -ENOMEM);
481 		return;
482 	}
483 
484 	fs_conf_parse();
485 
486 	req = alloc_fs_request(fs->md_target.md_fs_channel);
487 	if (req == NULL) {
488 		spdk_put_io_channel(fs->md_target.md_io_channel);
489 		spdk_io_device_unregister(&fs->md_target, NULL);
490 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
491 		spdk_io_device_unregister(&fs->sync_target, NULL);
492 		spdk_io_device_unregister(&fs->io_target, NULL);
493 		free(fs);
494 		cb_fn(cb_arg, NULL, -ENOMEM);
495 		return;
496 	}
497 
498 	args = &req->args;
499 	args->fn.fs_op_with_handle = cb_fn;
500 	args->arg = cb_arg;
501 	args->fs = fs;
502 
503 	spdk_bs_opts_init(&opts);
504 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
505 	if (opt) {
506 		opts.cluster_sz = opt->cluster_sz;
507 	}
508 	spdk_bs_init(dev, &opts, init_cb, req);
509 }
510 
511 static struct spdk_file *
512 file_alloc(struct spdk_filesystem *fs)
513 {
514 	struct spdk_file *file;
515 
516 	file = calloc(1, sizeof(*file));
517 	if (file == NULL) {
518 		return NULL;
519 	}
520 
521 	file->tree = calloc(1, sizeof(*file->tree));
522 	if (file->tree == NULL) {
523 		free(file);
524 		return NULL;
525 	}
526 
527 	file->fs = fs;
528 	TAILQ_INIT(&file->open_requests);
529 	TAILQ_INIT(&file->sync_requests);
530 	pthread_spin_init(&file->lock, 0);
531 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
532 	file->priority = SPDK_FILE_PRIORITY_LOW;
533 	return file;
534 }
535 
536 static void fs_load_done(void *ctx, int bserrno);
537 
538 static int
539 _handle_deleted_files(struct spdk_fs_request *req)
540 {
541 	struct spdk_fs_cb_args *args = &req->args;
542 	struct spdk_filesystem *fs = args->fs;
543 
544 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
545 		struct spdk_deleted_file *deleted_file;
546 
547 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
548 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
549 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
550 		free(deleted_file);
551 		return 0;
552 	}
553 
554 	return 1;
555 }
556 
557 static void
558 fs_load_done(void *ctx, int bserrno)
559 {
560 	struct spdk_fs_request *req = ctx;
561 	struct spdk_fs_cb_args *args = &req->args;
562 	struct spdk_filesystem *fs = args->fs;
563 
564 	/* The filesystem has been loaded.  Now check if there are any files that
565 	 *  were marked for deletion before last unload.  Do not complete the
566 	 *  fs_load callback until all of them have been deleted on disk.
567 	 */
568 	if (_handle_deleted_files(req) == 0) {
569 		/* We found a file that's been marked for deleting but not actually
570 		 *  deleted yet.  This function will get called again once the delete
571 		 *  operation is completed.
572 		 */
573 		return;
574 	}
575 
576 	args->fn.fs_op_with_handle(args->arg, fs, 0);
577 	free_fs_request(req);
578 
579 }
580 
581 static void
582 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
583 {
584 	struct spdk_fs_request *req = ctx;
585 	struct spdk_fs_cb_args *args = &req->args;
586 	struct spdk_filesystem *fs = args->fs;
587 	uint64_t *length;
588 	const char *name;
589 	uint32_t *is_deleted;
590 	size_t value_len;
591 
592 	if (rc < 0) {
593 		args->fn.fs_op_with_handle(args->arg, fs, rc);
594 		free_fs_request(req);
595 		return;
596 	}
597 
598 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
599 	if (rc < 0) {
600 		args->fn.fs_op_with_handle(args->arg, fs, rc);
601 		free_fs_request(req);
602 		return;
603 	}
604 
605 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
606 	if (rc < 0) {
607 		args->fn.fs_op_with_handle(args->arg, fs, rc);
608 		free_fs_request(req);
609 		return;
610 	}
611 
612 	assert(value_len == 8);
613 
614 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
615 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
616 	if (rc < 0) {
617 		struct spdk_file *f;
618 
619 		f = file_alloc(fs);
620 		if (f == NULL) {
621 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
622 			free_fs_request(req);
623 			return;
624 		}
625 
626 		f->name = strdup(name);
627 		f->blobid = spdk_blob_get_id(blob);
628 		f->length = *length;
629 		f->length_flushed = *length;
630 		f->append_pos = *length;
631 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
632 	} else {
633 		struct spdk_deleted_file *deleted_file;
634 
635 		deleted_file = calloc(1, sizeof(*deleted_file));
636 		if (deleted_file == NULL) {
637 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
638 			free_fs_request(req);
639 			return;
640 		}
641 		deleted_file->id = spdk_blob_get_id(blob);
642 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
643 	}
644 }
645 
646 static void
647 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
648 {
649 	struct spdk_fs_request *req = ctx;
650 	struct spdk_fs_cb_args *args = &req->args;
651 	struct spdk_filesystem *fs = args->fs;
652 	struct spdk_bs_type bstype;
653 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
654 	static const struct spdk_bs_type zeros;
655 
656 	if (bserrno != 0) {
657 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
658 		free_fs_request(req);
659 		free(fs);
660 		return;
661 	}
662 
663 	bstype = spdk_bs_get_bstype(bs);
664 
665 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
666 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
667 		spdk_bs_set_bstype(bs, blobfs_type);
668 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
669 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
670 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
671 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
672 		free_fs_request(req);
673 		free(fs);
674 		return;
675 	}
676 
677 	common_fs_bs_init(fs, bs);
678 	fs_load_done(req, 0);
679 }
680 
681 static void
682 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
683 {
684 	assert(fs != NULL);
685 	spdk_io_device_unregister(&fs->md_target, NULL);
686 	spdk_io_device_unregister(&fs->sync_target, NULL);
687 	spdk_io_device_unregister(&fs->io_target, NULL);
688 	free(fs);
689 }
690 
691 static void
692 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
693 {
694 	assert(fs != NULL);
695 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
696 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
697 }
698 
699 void
700 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
701 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
702 {
703 	struct spdk_filesystem *fs;
704 	struct spdk_fs_cb_args *args;
705 	struct spdk_fs_request *req;
706 	struct spdk_bs_opts	bs_opts;
707 
708 	fs = fs_alloc(dev, send_request_fn);
709 	if (fs == NULL) {
710 		cb_fn(cb_arg, NULL, -ENOMEM);
711 		return;
712 	}
713 
714 	fs_conf_parse();
715 
716 	req = alloc_fs_request(fs->md_target.md_fs_channel);
717 	if (req == NULL) {
718 		spdk_fs_free_io_channels(fs);
719 		spdk_fs_io_device_unregister(fs);
720 		cb_fn(cb_arg, NULL, -ENOMEM);
721 		return;
722 	}
723 
724 	args = &req->args;
725 	args->fn.fs_op_with_handle = cb_fn;
726 	args->arg = cb_arg;
727 	args->fs = fs;
728 	TAILQ_INIT(&args->op.fs_load.deleted_files);
729 	spdk_bs_opts_init(&bs_opts);
730 	bs_opts.iter_cb_fn = iter_cb;
731 	bs_opts.iter_cb_arg = req;
732 	spdk_bs_load(dev, &bs_opts, load_cb, req);
733 }
734 
735 static void
736 unload_cb(void *ctx, int bserrno)
737 {
738 	struct spdk_fs_request *req = ctx;
739 	struct spdk_fs_cb_args *args = &req->args;
740 	struct spdk_filesystem *fs = args->fs;
741 
742 	pthread_mutex_lock(&g_cache_init_lock);
743 	g_fs_count--;
744 	if (g_fs_count == 0) {
745 		__free_cache();
746 	}
747 	pthread_mutex_unlock(&g_cache_init_lock);
748 
749 	args->fn.fs_op(args->arg, bserrno);
750 	free(req);
751 
752 	spdk_fs_io_device_unregister(fs);
753 }
754 
755 void
756 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
757 {
758 	struct spdk_fs_request *req;
759 	struct spdk_fs_cb_args *args;
760 
761 	/*
762 	 * We must free the md_channel before unloading the blobstore, so just
763 	 *  allocate this request from the general heap.
764 	 */
765 	req = calloc(1, sizeof(*req));
766 	if (req == NULL) {
767 		cb_fn(cb_arg, -ENOMEM);
768 		return;
769 	}
770 
771 	args = &req->args;
772 	args->fn.fs_op = cb_fn;
773 	args->arg = cb_arg;
774 	args->fs = fs;
775 
776 	spdk_fs_free_io_channels(fs);
777 	spdk_bs_unload(fs->bs, unload_cb, req);
778 }
779 
780 static struct spdk_file *
781 fs_find_file(struct spdk_filesystem *fs, const char *name)
782 {
783 	struct spdk_file *file;
784 
785 	TAILQ_FOREACH(file, &fs->files, tailq) {
786 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
787 			return file;
788 		}
789 	}
790 
791 	return NULL;
792 }
793 
794 void
795 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
796 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
797 {
798 	struct spdk_file_stat stat;
799 	struct spdk_file *f = NULL;
800 
801 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
802 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
803 		return;
804 	}
805 
806 	f = fs_find_file(fs, name);
807 	if (f != NULL) {
808 		stat.blobid = f->blobid;
809 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
810 		cb_fn(cb_arg, &stat, 0);
811 		return;
812 	}
813 
814 	cb_fn(cb_arg, NULL, -ENOENT);
815 }
816 
817 static void
818 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
819 {
820 	struct spdk_fs_request *req = arg;
821 	struct spdk_fs_cb_args *args = &req->args;
822 
823 	args->rc = fserrno;
824 	if (fserrno == 0) {
825 		memcpy(args->arg, stat, sizeof(*stat));
826 	}
827 	sem_post(args->sem);
828 }
829 
830 static void
831 __file_stat(void *arg)
832 {
833 	struct spdk_fs_request *req = arg;
834 	struct spdk_fs_cb_args *args = &req->args;
835 
836 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
837 				args->fn.stat_op, req);
838 }
839 
840 int
841 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
842 		  const char *name, struct spdk_file_stat *stat)
843 {
844 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
845 	struct spdk_fs_request *req;
846 	int rc;
847 
848 	req = alloc_fs_request(channel);
849 	if (req == NULL) {
850 		return -ENOMEM;
851 	}
852 
853 	req->args.fs = fs;
854 	req->args.op.stat.name = name;
855 	req->args.fn.stat_op = __copy_stat;
856 	req->args.arg = stat;
857 	req->args.sem = &channel->sem;
858 	channel->send_request(__file_stat, req);
859 	sem_wait(&channel->sem);
860 
861 	rc = req->args.rc;
862 	free_fs_request(req);
863 
864 	return rc;
865 }
866 
867 static void
868 fs_create_blob_close_cb(void *ctx, int bserrno)
869 {
870 	struct spdk_fs_request *req = ctx;
871 	struct spdk_fs_cb_args *args = &req->args;
872 
873 	args->fn.file_op(args->arg, bserrno);
874 	free_fs_request(req);
875 }
876 
877 static void
878 fs_create_blob_resize_cb(void *ctx, int bserrno)
879 {
880 	struct spdk_fs_request *req = ctx;
881 	struct spdk_fs_cb_args *args = &req->args;
882 	struct spdk_file *f = args->file;
883 	struct spdk_blob *blob = args->op.create.blob;
884 	uint64_t length = 0;
885 
886 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
887 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
888 
889 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
890 }
891 
892 static void
893 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
894 {
895 	struct spdk_fs_request *req = ctx;
896 	struct spdk_fs_cb_args *args = &req->args;
897 
898 	args->op.create.blob = blob;
899 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
900 }
901 
902 static void
903 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
904 {
905 	struct spdk_fs_request *req = ctx;
906 	struct spdk_fs_cb_args *args = &req->args;
907 	struct spdk_file *f = args->file;
908 
909 	f->blobid = blobid;
910 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
911 }
912 
913 void
914 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
915 			  spdk_file_op_complete cb_fn, void *cb_arg)
916 {
917 	struct spdk_file *file;
918 	struct spdk_fs_request *req;
919 	struct spdk_fs_cb_args *args;
920 
921 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
922 		cb_fn(cb_arg, -ENAMETOOLONG);
923 		return;
924 	}
925 
926 	file = fs_find_file(fs, name);
927 	if (file != NULL) {
928 		cb_fn(cb_arg, -EEXIST);
929 		return;
930 	}
931 
932 	file = file_alloc(fs);
933 	if (file == NULL) {
934 		cb_fn(cb_arg, -ENOMEM);
935 		return;
936 	}
937 
938 	req = alloc_fs_request(fs->md_target.md_fs_channel);
939 	if (req == NULL) {
940 		cb_fn(cb_arg, -ENOMEM);
941 		return;
942 	}
943 
944 	args = &req->args;
945 	args->file = file;
946 	args->fn.file_op = cb_fn;
947 	args->arg = cb_arg;
948 
949 	file->name = strdup(name);
950 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
951 }
952 
953 static void
954 __fs_create_file_done(void *arg, int fserrno)
955 {
956 	struct spdk_fs_request *req = arg;
957 	struct spdk_fs_cb_args *args = &req->args;
958 
959 	args->rc = fserrno;
960 	sem_post(args->sem);
961 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
962 }
963 
964 static void
965 __fs_create_file(void *arg)
966 {
967 	struct spdk_fs_request *req = arg;
968 	struct spdk_fs_cb_args *args = &req->args;
969 
970 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
971 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
972 }
973 
974 int
975 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
976 {
977 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
978 	struct spdk_fs_request *req;
979 	struct spdk_fs_cb_args *args;
980 	int rc;
981 
982 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
983 
984 	req = alloc_fs_request(channel);
985 	if (req == NULL) {
986 		return -ENOMEM;
987 	}
988 
989 	args = &req->args;
990 	args->fs = fs;
991 	args->op.create.name = name;
992 	args->sem = &channel->sem;
993 	fs->send_request(__fs_create_file, req);
994 	sem_wait(&channel->sem);
995 	rc = args->rc;
996 	free_fs_request(req);
997 
998 	return rc;
999 }
1000 
1001 static void
1002 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1003 {
1004 	struct spdk_fs_request *req = ctx;
1005 	struct spdk_fs_cb_args *args = &req->args;
1006 	struct spdk_file *f = args->file;
1007 
1008 	f->blob = blob;
1009 	while (!TAILQ_EMPTY(&f->open_requests)) {
1010 		req = TAILQ_FIRST(&f->open_requests);
1011 		args = &req->args;
1012 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1013 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1014 		free_fs_request(req);
1015 	}
1016 }
1017 
1018 static void
1019 fs_open_blob_create_cb(void *ctx, int bserrno)
1020 {
1021 	struct spdk_fs_request *req = ctx;
1022 	struct spdk_fs_cb_args *args = &req->args;
1023 	struct spdk_file *file = args->file;
1024 	struct spdk_filesystem *fs = args->fs;
1025 
1026 	if (file == NULL) {
1027 		/*
1028 		 * This is from an open with CREATE flag - the file
1029 		 *  is now created so look it up in the file list for this
1030 		 *  filesystem.
1031 		 */
1032 		file = fs_find_file(fs, args->op.open.name);
1033 		assert(file != NULL);
1034 		args->file = file;
1035 	}
1036 
1037 	file->ref_count++;
1038 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1039 	if (file->ref_count == 1) {
1040 		assert(file->blob == NULL);
1041 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1042 	} else if (file->blob != NULL) {
1043 		fs_open_blob_done(req, file->blob, 0);
1044 	} else {
1045 		/*
1046 		 * The blob open for this file is in progress due to a previous
1047 		 *  open request.  When that open completes, it will invoke the
1048 		 *  open callback for this request.
1049 		 */
1050 	}
1051 }
1052 
1053 void
1054 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1055 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1056 {
1057 	struct spdk_file *f = NULL;
1058 	struct spdk_fs_request *req;
1059 	struct spdk_fs_cb_args *args;
1060 
1061 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1062 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1063 		return;
1064 	}
1065 
1066 	f = fs_find_file(fs, name);
1067 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1068 		cb_fn(cb_arg, NULL, -ENOENT);
1069 		return;
1070 	}
1071 
1072 	if (f != NULL && f->is_deleted == true) {
1073 		cb_fn(cb_arg, NULL, -ENOENT);
1074 		return;
1075 	}
1076 
1077 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1078 	if (req == NULL) {
1079 		cb_fn(cb_arg, NULL, -ENOMEM);
1080 		return;
1081 	}
1082 
1083 	args = &req->args;
1084 	args->fn.file_op_with_handle = cb_fn;
1085 	args->arg = cb_arg;
1086 	args->file = f;
1087 	args->fs = fs;
1088 	args->op.open.name = name;
1089 
1090 	if (f == NULL) {
1091 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1092 	} else {
1093 		fs_open_blob_create_cb(req, 0);
1094 	}
1095 }
1096 
1097 static void
1098 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1099 {
1100 	struct spdk_fs_request *req = arg;
1101 	struct spdk_fs_cb_args *args = &req->args;
1102 
1103 	args->file = file;
1104 	args->rc = bserrno;
1105 	sem_post(args->sem);
1106 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1107 }
1108 
1109 static void
1110 __fs_open_file(void *arg)
1111 {
1112 	struct spdk_fs_request *req = arg;
1113 	struct spdk_fs_cb_args *args = &req->args;
1114 
1115 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1116 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1117 				__fs_open_file_done, req);
1118 }
1119 
1120 int
1121 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1122 		  const char *name, uint32_t flags, struct spdk_file **file)
1123 {
1124 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1125 	struct spdk_fs_request *req;
1126 	struct spdk_fs_cb_args *args;
1127 	int rc;
1128 
1129 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1130 
1131 	req = alloc_fs_request(channel);
1132 	if (req == NULL) {
1133 		return -ENOMEM;
1134 	}
1135 
1136 	args = &req->args;
1137 	args->fs = fs;
1138 	args->op.open.name = name;
1139 	args->op.open.flags = flags;
1140 	args->sem = &channel->sem;
1141 	fs->send_request(__fs_open_file, req);
1142 	sem_wait(&channel->sem);
1143 	rc = args->rc;
1144 	if (rc == 0) {
1145 		*file = args->file;
1146 	} else {
1147 		*file = NULL;
1148 	}
1149 	free_fs_request(req);
1150 
1151 	return rc;
1152 }
1153 
1154 static void
1155 fs_rename_blob_close_cb(void *ctx, int bserrno)
1156 {
1157 	struct spdk_fs_request *req = ctx;
1158 	struct spdk_fs_cb_args *args = &req->args;
1159 
1160 	args->fn.fs_op(args->arg, bserrno);
1161 	free_fs_request(req);
1162 }
1163 
1164 static void
1165 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1166 {
1167 	struct spdk_fs_request *req = ctx;
1168 	struct spdk_fs_cb_args *args = &req->args;
1169 	const char *new_name = args->op.rename.new_name;
1170 
1171 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1172 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1173 }
1174 
1175 static void
1176 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1177 {
1178 	struct spdk_fs_cb_args *args = &req->args;
1179 	struct spdk_file *f;
1180 
1181 	f = fs_find_file(args->fs, args->op.rename.old_name);
1182 	if (f == NULL) {
1183 		args->fn.fs_op(args->arg, -ENOENT);
1184 		free_fs_request(req);
1185 		return;
1186 	}
1187 
1188 	free(f->name);
1189 	f->name = strdup(args->op.rename.new_name);
1190 	args->file = f;
1191 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1192 }
1193 
1194 static void
1195 fs_rename_delete_done(void *arg, int fserrno)
1196 {
1197 	__spdk_fs_md_rename_file(arg);
1198 }
1199 
1200 void
1201 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1202 			  const char *old_name, const char *new_name,
1203 			  spdk_file_op_complete cb_fn, void *cb_arg)
1204 {
1205 	struct spdk_file *f;
1206 	struct spdk_fs_request *req;
1207 	struct spdk_fs_cb_args *args;
1208 
1209 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1210 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1211 		cb_fn(cb_arg, -ENAMETOOLONG);
1212 		return;
1213 	}
1214 
1215 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1216 	if (req == NULL) {
1217 		cb_fn(cb_arg, -ENOMEM);
1218 		return;
1219 	}
1220 
1221 	args = &req->args;
1222 	args->fn.fs_op = cb_fn;
1223 	args->fs = fs;
1224 	args->arg = cb_arg;
1225 	args->op.rename.old_name = old_name;
1226 	args->op.rename.new_name = new_name;
1227 
1228 	f = fs_find_file(fs, new_name);
1229 	if (f == NULL) {
1230 		__spdk_fs_md_rename_file(req);
1231 		return;
1232 	}
1233 
1234 	/*
1235 	 * The rename overwrites an existing file.  So delete the existing file, then
1236 	 *  do the actual rename.
1237 	 */
1238 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1239 }
1240 
1241 static void
1242 __fs_rename_file_done(void *arg, int fserrno)
1243 {
1244 	struct spdk_fs_request *req = arg;
1245 	struct spdk_fs_cb_args *args = &req->args;
1246 
1247 	args->rc = fserrno;
1248 	sem_post(args->sem);
1249 }
1250 
1251 static void
1252 __fs_rename_file(void *arg)
1253 {
1254 	struct spdk_fs_request *req = arg;
1255 	struct spdk_fs_cb_args *args = &req->args;
1256 
1257 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1258 				  __fs_rename_file_done, req);
1259 }
1260 
1261 int
1262 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1263 		    const char *old_name, const char *new_name)
1264 {
1265 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1266 	struct spdk_fs_request *req;
1267 	struct spdk_fs_cb_args *args;
1268 	int rc;
1269 
1270 	req = alloc_fs_request(channel);
1271 	if (req == NULL) {
1272 		return -ENOMEM;
1273 	}
1274 
1275 	args = &req->args;
1276 
1277 	args->fs = fs;
1278 	args->op.rename.old_name = old_name;
1279 	args->op.rename.new_name = new_name;
1280 	args->sem = &channel->sem;
1281 	fs->send_request(__fs_rename_file, req);
1282 	sem_wait(&channel->sem);
1283 	rc = args->rc;
1284 	free_fs_request(req);
1285 	return rc;
1286 }
1287 
1288 static void
1289 blob_delete_cb(void *ctx, int bserrno)
1290 {
1291 	struct spdk_fs_request *req = ctx;
1292 	struct spdk_fs_cb_args *args = &req->args;
1293 
1294 	args->fn.file_op(args->arg, bserrno);
1295 	free_fs_request(req);
1296 }
1297 
1298 void
1299 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1300 			  spdk_file_op_complete cb_fn, void *cb_arg)
1301 {
1302 	struct spdk_file *f;
1303 	spdk_blob_id blobid;
1304 	struct spdk_fs_request *req;
1305 	struct spdk_fs_cb_args *args;
1306 
1307 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1308 
1309 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1310 		cb_fn(cb_arg, -ENAMETOOLONG);
1311 		return;
1312 	}
1313 
1314 	f = fs_find_file(fs, name);
1315 	if (f == NULL) {
1316 		cb_fn(cb_arg, -ENOENT);
1317 		return;
1318 	}
1319 
1320 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1321 	if (req == NULL) {
1322 		cb_fn(cb_arg, -ENOMEM);
1323 		return;
1324 	}
1325 
1326 	args = &req->args;
1327 	args->fn.file_op = cb_fn;
1328 	args->arg = cb_arg;
1329 
1330 	if (f->ref_count > 0) {
1331 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1332 		f->is_deleted = true;
1333 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1334 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1335 		return;
1336 	}
1337 
1338 	TAILQ_REMOVE(&fs->files, f, tailq);
1339 
1340 	cache_free_buffers(f);
1341 
1342 	blobid = f->blobid;
1343 
1344 	free(f->name);
1345 	free(f->tree);
1346 	free(f);
1347 
1348 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1349 }
1350 
1351 static void
1352 __fs_delete_file_done(void *arg, int fserrno)
1353 {
1354 	struct spdk_fs_request *req = arg;
1355 	struct spdk_fs_cb_args *args = &req->args;
1356 
1357 	args->rc = fserrno;
1358 	sem_post(args->sem);
1359 }
1360 
1361 static void
1362 __fs_delete_file(void *arg)
1363 {
1364 	struct spdk_fs_request *req = arg;
1365 	struct spdk_fs_cb_args *args = &req->args;
1366 
1367 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1368 }
1369 
1370 int
1371 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1372 		    const char *name)
1373 {
1374 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1375 	struct spdk_fs_request *req;
1376 	struct spdk_fs_cb_args *args;
1377 	int rc;
1378 
1379 	req = alloc_fs_request(channel);
1380 	if (req == NULL) {
1381 		return -ENOMEM;
1382 	}
1383 
1384 	args = &req->args;
1385 	args->fs = fs;
1386 	args->op.delete.name = name;
1387 	args->sem = &channel->sem;
1388 	fs->send_request(__fs_delete_file, req);
1389 	sem_wait(&channel->sem);
1390 	rc = args->rc;
1391 	free_fs_request(req);
1392 
1393 	return rc;
1394 }
1395 
1396 spdk_fs_iter
1397 spdk_fs_iter_first(struct spdk_filesystem *fs)
1398 {
1399 	struct spdk_file *f;
1400 
1401 	f = TAILQ_FIRST(&fs->files);
1402 	return f;
1403 }
1404 
1405 spdk_fs_iter
1406 spdk_fs_iter_next(spdk_fs_iter iter)
1407 {
1408 	struct spdk_file *f = iter;
1409 
1410 	if (f == NULL) {
1411 		return NULL;
1412 	}
1413 
1414 	f = TAILQ_NEXT(f, tailq);
1415 	return f;
1416 }
1417 
1418 const char *
1419 spdk_file_get_name(struct spdk_file *file)
1420 {
1421 	return file->name;
1422 }
1423 
1424 uint64_t
1425 spdk_file_get_length(struct spdk_file *file)
1426 {
1427 	assert(file != NULL);
1428 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1429 	return file->length;
1430 }
1431 
1432 static void
1433 fs_truncate_complete_cb(void *ctx, int bserrno)
1434 {
1435 	struct spdk_fs_request *req = ctx;
1436 	struct spdk_fs_cb_args *args = &req->args;
1437 
1438 	args->fn.file_op(args->arg, bserrno);
1439 	free_fs_request(req);
1440 }
1441 
1442 static void
1443 fs_truncate_resize_cb(void *ctx, int bserrno)
1444 {
1445 	struct spdk_fs_request *req = ctx;
1446 	struct spdk_fs_cb_args *args = &req->args;
1447 	struct spdk_file *file = args->file;
1448 	uint64_t *length = &args->op.truncate.length;
1449 
1450 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1451 
1452 	file->length = *length;
1453 	if (file->append_pos > file->length) {
1454 		file->append_pos = file->length;
1455 	}
1456 
1457 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1458 }
1459 
1460 static uint64_t
1461 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1462 {
1463 	return (length + cluster_sz - 1) / cluster_sz;
1464 }
1465 
1466 void
1467 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1468 			 spdk_file_op_complete cb_fn, void *cb_arg)
1469 {
1470 	struct spdk_filesystem *fs;
1471 	size_t num_clusters;
1472 	struct spdk_fs_request *req;
1473 	struct spdk_fs_cb_args *args;
1474 
1475 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1476 	if (length == file->length) {
1477 		cb_fn(cb_arg, 0);
1478 		return;
1479 	}
1480 
1481 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1482 	if (req == NULL) {
1483 		cb_fn(cb_arg, -ENOMEM);
1484 		return;
1485 	}
1486 
1487 	args = &req->args;
1488 	args->fn.file_op = cb_fn;
1489 	args->arg = cb_arg;
1490 	args->file = file;
1491 	args->op.truncate.length = length;
1492 	fs = file->fs;
1493 
1494 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1495 
1496 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1497 }
1498 
1499 static void
1500 __truncate(void *arg)
1501 {
1502 	struct spdk_fs_request *req = arg;
1503 	struct spdk_fs_cb_args *args = &req->args;
1504 
1505 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1506 				 args->fn.file_op, args->arg);
1507 }
1508 
1509 int
1510 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1511 		   uint64_t length)
1512 {
1513 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1514 	struct spdk_fs_request *req;
1515 	struct spdk_fs_cb_args *args;
1516 
1517 	req = alloc_fs_request(channel);
1518 	if (req == NULL) {
1519 		return -ENOMEM;
1520 	}
1521 
1522 	args = &req->args;
1523 
1524 	args->file = file;
1525 	args->op.truncate.length = length;
1526 	args->fn.file_op = __sem_post;
1527 	args->arg = &channel->sem;
1528 
1529 	channel->send_request(__truncate, req);
1530 	sem_wait(&channel->sem);
1531 	free_fs_request(req);
1532 
1533 	return 0;
1534 }
1535 
1536 static void
1537 __rw_done(void *ctx, int bserrno)
1538 {
1539 	struct spdk_fs_request *req = ctx;
1540 	struct spdk_fs_cb_args *args = &req->args;
1541 
1542 	spdk_dma_free(args->op.rw.pin_buf);
1543 	args->fn.file_op(args->arg, bserrno);
1544 	free_fs_request(req);
1545 }
1546 
1547 static void
1548 __read_done(void *ctx, int bserrno)
1549 {
1550 	struct spdk_fs_request *req = ctx;
1551 	struct spdk_fs_cb_args *args = &req->args;
1552 
1553 	assert(req != NULL);
1554 	if (args->op.rw.is_read) {
1555 		memcpy(args->op.rw.user_buf,
1556 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1557 		       args->op.rw.length);
1558 		__rw_done(req, 0);
1559 	} else {
1560 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1561 		       args->op.rw.user_buf,
1562 		       args->op.rw.length);
1563 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1564 				   args->op.rw.pin_buf,
1565 				   args->op.rw.start_page, args->op.rw.num_pages,
1566 				   __rw_done, req);
1567 	}
1568 }
1569 
1570 static void
1571 __do_blob_read(void *ctx, int fserrno)
1572 {
1573 	struct spdk_fs_request *req = ctx;
1574 	struct spdk_fs_cb_args *args = &req->args;
1575 
1576 	if (fserrno) {
1577 		__rw_done(req, fserrno);
1578 		return;
1579 	}
1580 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1581 			  args->op.rw.pin_buf,
1582 			  args->op.rw.start_page, args->op.rw.num_pages,
1583 			  __read_done, req);
1584 }
1585 
1586 static void
1587 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1588 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1589 {
1590 	uint64_t end_page;
1591 
1592 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1593 	*start_page = offset / *page_size;
1594 	end_page = (offset + length - 1) / *page_size;
1595 	*num_pages = (end_page - *start_page + 1);
1596 }
1597 
1598 static void
1599 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1600 	    void *payload, uint64_t offset, uint64_t length,
1601 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1602 {
1603 	struct spdk_fs_request *req;
1604 	struct spdk_fs_cb_args *args;
1605 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1606 	uint64_t start_page, num_pages, pin_buf_length;
1607 	uint32_t page_size;
1608 
1609 	if (is_read && offset + length > file->length) {
1610 		cb_fn(cb_arg, -EINVAL);
1611 		return;
1612 	}
1613 
1614 	req = alloc_fs_request(channel);
1615 	if (req == NULL) {
1616 		cb_fn(cb_arg, -ENOMEM);
1617 		return;
1618 	}
1619 
1620 	args = &req->args;
1621 	args->fn.file_op = cb_fn;
1622 	args->arg = cb_arg;
1623 	args->file = file;
1624 	args->op.rw.channel = channel->bs_channel;
1625 	args->op.rw.user_buf = payload;
1626 	args->op.rw.is_read = is_read;
1627 	args->op.rw.offset = offset;
1628 	args->op.rw.length = length;
1629 
1630 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1631 	pin_buf_length = num_pages * page_size;
1632 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL);
1633 	if (args->op.rw.pin_buf == NULL) {
1634 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1635 			      file->name, offset, length);
1636 		free_fs_request(req);
1637 		cb_fn(cb_arg, -ENOMEM);
1638 		return;
1639 	}
1640 
1641 	args->op.rw.start_page = start_page;
1642 	args->op.rw.num_pages = num_pages;
1643 
1644 	if (!is_read && file->length < offset + length) {
1645 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1646 	} else {
1647 		__do_blob_read(req, 0);
1648 	}
1649 }
1650 
1651 void
1652 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1653 		      void *payload, uint64_t offset, uint64_t length,
1654 		      spdk_file_op_complete cb_fn, void *cb_arg)
1655 {
1656 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1657 }
1658 
1659 void
1660 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1661 		     void *payload, uint64_t offset, uint64_t length,
1662 		     spdk_file_op_complete cb_fn, void *cb_arg)
1663 {
1664 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1665 		      file->name, offset, length);
1666 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1667 }
1668 
1669 struct spdk_io_channel *
1670 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1671 {
1672 	struct spdk_io_channel *io_channel;
1673 	struct spdk_fs_channel *fs_channel;
1674 
1675 	io_channel = spdk_get_io_channel(&fs->io_target);
1676 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1677 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1678 	fs_channel->send_request = __send_request_direct;
1679 
1680 	return io_channel;
1681 }
1682 
1683 struct spdk_io_channel *
1684 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1685 {
1686 	struct spdk_io_channel *io_channel;
1687 	struct spdk_fs_channel *fs_channel;
1688 
1689 	io_channel = spdk_get_io_channel(&fs->io_target);
1690 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1691 	fs_channel->send_request = fs->send_request;
1692 	fs_channel->sync = 1;
1693 	pthread_spin_init(&fs_channel->lock, 0);
1694 
1695 	return io_channel;
1696 }
1697 
1698 void
1699 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1700 {
1701 	spdk_put_io_channel(channel);
1702 }
1703 
1704 void
1705 spdk_fs_set_cache_size(uint64_t size_in_mb)
1706 {
1707 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1708 }
1709 
1710 uint64_t
1711 spdk_fs_get_cache_size(void)
1712 {
1713 	return g_fs_cache_size / (1024 * 1024);
1714 }
1715 
1716 static void __file_flush(void *_args);
1717 
1718 static void *
1719 alloc_cache_memory_buffer(struct spdk_file *context)
1720 {
1721 	struct spdk_file *file;
1722 	void *buf;
1723 
1724 	buf = spdk_mempool_get(g_cache_pool);
1725 	if (buf != NULL) {
1726 		return buf;
1727 	}
1728 
1729 	pthread_spin_lock(&g_caches_lock);
1730 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1731 		if (!file->open_for_writing &&
1732 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1733 		    file != context) {
1734 			break;
1735 		}
1736 	}
1737 	pthread_spin_unlock(&g_caches_lock);
1738 	if (file != NULL) {
1739 		cache_free_buffers(file);
1740 		buf = spdk_mempool_get(g_cache_pool);
1741 		if (buf != NULL) {
1742 			return buf;
1743 		}
1744 	}
1745 
1746 	pthread_spin_lock(&g_caches_lock);
1747 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1748 		if (!file->open_for_writing && file != context) {
1749 			break;
1750 		}
1751 	}
1752 	pthread_spin_unlock(&g_caches_lock);
1753 	if (file != NULL) {
1754 		cache_free_buffers(file);
1755 		buf = spdk_mempool_get(g_cache_pool);
1756 		if (buf != NULL) {
1757 			return buf;
1758 		}
1759 	}
1760 
1761 	pthread_spin_lock(&g_caches_lock);
1762 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1763 		if (file != context) {
1764 			break;
1765 		}
1766 	}
1767 	pthread_spin_unlock(&g_caches_lock);
1768 	if (file != NULL) {
1769 		cache_free_buffers(file);
1770 		buf = spdk_mempool_get(g_cache_pool);
1771 		if (buf != NULL) {
1772 			return buf;
1773 		}
1774 	}
1775 
1776 	return NULL;
1777 }
1778 
1779 static struct cache_buffer *
1780 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1781 {
1782 	struct cache_buffer *buf;
1783 	int count = 0;
1784 
1785 	buf = calloc(1, sizeof(*buf));
1786 	if (buf == NULL) {
1787 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1788 		return NULL;
1789 	}
1790 
1791 	buf->buf = alloc_cache_memory_buffer(file);
1792 	while (buf->buf == NULL) {
1793 		/*
1794 		 * TODO: alloc_cache_memory_buffer() should eventually free
1795 		 *  some buffers.  Need a more sophisticated check here, instead
1796 		 *  of just bailing if 100 tries does not result in getting a
1797 		 *  free buffer.  This will involve using the sync channel's
1798 		 *  semaphore to block until a buffer becomes available.
1799 		 */
1800 		if (count++ == 100) {
1801 			SPDK_ERRLOG("could not allocate cache buffer\n");
1802 			assert(false);
1803 			free(buf);
1804 			return NULL;
1805 		}
1806 		buf->buf = alloc_cache_memory_buffer(file);
1807 	}
1808 
1809 	buf->buf_size = CACHE_BUFFER_SIZE;
1810 	buf->offset = offset;
1811 
1812 	pthread_spin_lock(&g_caches_lock);
1813 	if (file->tree->present_mask == 0) {
1814 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1815 	}
1816 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1817 	pthread_spin_unlock(&g_caches_lock);
1818 
1819 	return buf;
1820 }
1821 
1822 static struct cache_buffer *
1823 cache_append_buffer(struct spdk_file *file)
1824 {
1825 	struct cache_buffer *last;
1826 
1827 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1828 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1829 
1830 	last = cache_insert_buffer(file, file->append_pos);
1831 	if (last == NULL) {
1832 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1833 		return NULL;
1834 	}
1835 
1836 	file->last = last;
1837 
1838 	return last;
1839 }
1840 
1841 static void
1842 __wake_caller(struct spdk_fs_cb_args *args)
1843 {
1844 	sem_post(args->sem);
1845 }
1846 
1847 static void __check_sync_reqs(struct spdk_file *file);
1848 
1849 static void
1850 __file_cache_finish_sync(struct spdk_file *file)
1851 {
1852 	struct spdk_fs_request *sync_req;
1853 	struct spdk_fs_cb_args *sync_args;
1854 
1855 	pthread_spin_lock(&file->lock);
1856 	sync_req = TAILQ_FIRST(&file->sync_requests);
1857 	sync_args = &sync_req->args;
1858 	assert(sync_args->op.sync.offset <= file->length_flushed);
1859 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1860 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1861 	pthread_spin_unlock(&file->lock);
1862 
1863 	sync_args->fn.file_op(sync_args->arg, 0);
1864 	__check_sync_reqs(file);
1865 
1866 	pthread_spin_lock(&file->lock);
1867 	free_fs_request(sync_req);
1868 	pthread_spin_unlock(&file->lock);
1869 }
1870 
1871 static void
1872 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1873 {
1874 	struct spdk_file *file = ctx;
1875 
1876 	__file_cache_finish_sync(file);
1877 }
1878 
1879 static void
1880 __free_args(struct spdk_fs_cb_args *args)
1881 {
1882 	struct spdk_fs_request *req;
1883 
1884 	if (!args->from_request) {
1885 		free(args);
1886 	} else {
1887 		/* Depends on args being at the start of the spdk_fs_request structure. */
1888 		req = (struct spdk_fs_request *)args;
1889 		free_fs_request(req);
1890 	}
1891 }
1892 
1893 static void
1894 __check_sync_reqs(struct spdk_file *file)
1895 {
1896 	struct spdk_fs_request *sync_req;
1897 
1898 	pthread_spin_lock(&file->lock);
1899 
1900 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1901 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1902 			break;
1903 		}
1904 	}
1905 
1906 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1907 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1908 		sync_req->args.op.sync.xattr_in_progress = true;
1909 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1910 				    sizeof(file->length_flushed));
1911 
1912 		pthread_spin_unlock(&file->lock);
1913 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file);
1914 	} else {
1915 		pthread_spin_unlock(&file->lock);
1916 	}
1917 }
1918 
1919 static void
1920 __file_flush_done(void *arg, int bserrno)
1921 {
1922 	struct spdk_fs_cb_args *args = arg;
1923 	struct spdk_file *file = args->file;
1924 	struct cache_buffer *next = args->op.flush.cache_buffer;
1925 
1926 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1927 
1928 	pthread_spin_lock(&file->lock);
1929 	next->in_progress = false;
1930 	next->bytes_flushed += args->op.flush.length;
1931 	file->length_flushed += args->op.flush.length;
1932 	if (file->length_flushed > file->length) {
1933 		file->length = file->length_flushed;
1934 	}
1935 	if (next->bytes_flushed == next->buf_size) {
1936 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1937 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1938 	}
1939 
1940 	/*
1941 	 * Assert that there is no cached data that extends past the end of the underlying
1942 	 *  blob.
1943 	 */
1944 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1945 	       next->bytes_filled == 0);
1946 
1947 	pthread_spin_unlock(&file->lock);
1948 
1949 	__check_sync_reqs(file);
1950 
1951 	__file_flush(args);
1952 }
1953 
1954 static void
1955 __file_flush(void *_args)
1956 {
1957 	struct spdk_fs_cb_args *args = _args;
1958 	struct spdk_file *file = args->file;
1959 	struct cache_buffer *next;
1960 	uint64_t offset, length, start_page, num_pages;
1961 	uint32_t page_size;
1962 
1963 	pthread_spin_lock(&file->lock);
1964 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1965 	if (next == NULL || next->in_progress) {
1966 		/*
1967 		 * There is either no data to flush, or a flush I/O is already in
1968 		 *  progress.  So return immediately - if a flush I/O is in
1969 		 *  progress we will flush more data after that is completed.
1970 		 */
1971 		__free_args(args);
1972 		if (next == NULL) {
1973 			/*
1974 			 * For cases where a file's cache was evicted, and then the
1975 			 *  file was later appended, we will write the data directly
1976 			 *  to disk and bypass cache.  So just update length_flushed
1977 			 *  here to reflect that all data was already written to disk.
1978 			 */
1979 			file->length_flushed = file->append_pos;
1980 		}
1981 		pthread_spin_unlock(&file->lock);
1982 		if (next == NULL) {
1983 			/*
1984 			 * There is no data to flush, but we still need to check for any
1985 			 *  outstanding sync requests to make sure metadata gets updated.
1986 			 */
1987 			__check_sync_reqs(file);
1988 		}
1989 		return;
1990 	}
1991 
1992 	offset = next->offset + next->bytes_flushed;
1993 	length = next->bytes_filled - next->bytes_flushed;
1994 	if (length == 0) {
1995 		__free_args(args);
1996 		pthread_spin_unlock(&file->lock);
1997 		return;
1998 	}
1999 	args->op.flush.length = length;
2000 	args->op.flush.cache_buffer = next;
2001 
2002 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2003 
2004 	next->in_progress = true;
2005 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2006 		     offset, length, start_page, num_pages);
2007 	pthread_spin_unlock(&file->lock);
2008 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2009 			   next->buf + (start_page * page_size) - next->offset,
2010 			   start_page, num_pages, __file_flush_done, args);
2011 }
2012 
2013 static void
2014 __file_extend_done(void *arg, int bserrno)
2015 {
2016 	struct spdk_fs_cb_args *args = arg;
2017 
2018 	__wake_caller(args);
2019 }
2020 
2021 static void
2022 __file_extend_resize_cb(void *_args, int bserrno)
2023 {
2024 	struct spdk_fs_cb_args *args = _args;
2025 	struct spdk_file *file = args->file;
2026 
2027 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2028 }
2029 
2030 static void
2031 __file_extend_blob(void *_args)
2032 {
2033 	struct spdk_fs_cb_args *args = _args;
2034 	struct spdk_file *file = args->file;
2035 
2036 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2037 }
2038 
2039 static void
2040 __rw_from_file_done(void *arg, int bserrno)
2041 {
2042 	struct spdk_fs_cb_args *args = arg;
2043 
2044 	__wake_caller(args);
2045 	__free_args(args);
2046 }
2047 
2048 static void
2049 __rw_from_file(void *_args)
2050 {
2051 	struct spdk_fs_cb_args *args = _args;
2052 	struct spdk_file *file = args->file;
2053 
2054 	if (args->op.rw.is_read) {
2055 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2056 				     args->op.rw.offset, args->op.rw.length,
2057 				     __rw_from_file_done, args);
2058 	} else {
2059 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2060 				      args->op.rw.offset, args->op.rw.length,
2061 				      __rw_from_file_done, args);
2062 	}
2063 }
2064 
2065 static int
2066 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
2067 		    uint64_t offset, uint64_t length, bool is_read)
2068 {
2069 	struct spdk_fs_cb_args *args;
2070 
2071 	args = calloc(1, sizeof(*args));
2072 	if (args == NULL) {
2073 		sem_post(sem);
2074 		return -ENOMEM;
2075 	}
2076 
2077 	args->file = file;
2078 	args->sem = sem;
2079 	args->op.rw.user_buf = payload;
2080 	args->op.rw.offset = offset;
2081 	args->op.rw.length = length;
2082 	args->op.rw.is_read = is_read;
2083 	file->fs->send_request(__rw_from_file, args);
2084 	return 0;
2085 }
2086 
2087 int
2088 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
2089 		void *payload, uint64_t offset, uint64_t length)
2090 {
2091 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2092 	struct spdk_fs_cb_args *args;
2093 	uint64_t rem_length, copy, blob_size, cluster_sz;
2094 	uint32_t cache_buffers_filled = 0;
2095 	uint8_t *cur_payload;
2096 	struct cache_buffer *last;
2097 
2098 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2099 
2100 	if (length == 0) {
2101 		return 0;
2102 	}
2103 
2104 	if (offset != file->append_pos) {
2105 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2106 		return -EINVAL;
2107 	}
2108 
2109 	pthread_spin_lock(&file->lock);
2110 	file->open_for_writing = true;
2111 
2112 	if (file->last == NULL) {
2113 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
2114 			cache_append_buffer(file);
2115 		} else {
2116 			int rc;
2117 
2118 			file->append_pos += length;
2119 			pthread_spin_unlock(&file->lock);
2120 			rc = __send_rw_from_file(file, &channel->sem, payload,
2121 						 offset, length, false);
2122 			sem_wait(&channel->sem);
2123 			return rc;
2124 		}
2125 	}
2126 
2127 	blob_size = __file_get_blob_size(file);
2128 
2129 	if ((offset + length) > blob_size) {
2130 		struct spdk_fs_cb_args extend_args = {};
2131 
2132 		cluster_sz = file->fs->bs_opts.cluster_sz;
2133 		extend_args.sem = &channel->sem;
2134 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2135 		extend_args.file = file;
2136 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2137 		pthread_spin_unlock(&file->lock);
2138 		file->fs->send_request(__file_extend_blob, &extend_args);
2139 		sem_wait(&channel->sem);
2140 	}
2141 
2142 	last = file->last;
2143 	rem_length = length;
2144 	cur_payload = payload;
2145 	while (rem_length > 0) {
2146 		copy = last->buf_size - last->bytes_filled;
2147 		if (copy > rem_length) {
2148 			copy = rem_length;
2149 		}
2150 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2151 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2152 		file->append_pos += copy;
2153 		if (file->length < file->append_pos) {
2154 			file->length = file->append_pos;
2155 		}
2156 		cur_payload += copy;
2157 		last->bytes_filled += copy;
2158 		rem_length -= copy;
2159 		if (last->bytes_filled == last->buf_size) {
2160 			cache_buffers_filled++;
2161 			last = cache_append_buffer(file);
2162 			if (last == NULL) {
2163 				BLOBFS_TRACE(file, "nomem\n");
2164 				pthread_spin_unlock(&file->lock);
2165 				return -ENOMEM;
2166 			}
2167 		}
2168 	}
2169 
2170 	pthread_spin_unlock(&file->lock);
2171 
2172 	if (cache_buffers_filled == 0) {
2173 		return 0;
2174 	}
2175 
2176 	args = calloc(1, sizeof(*args));
2177 	if (args == NULL) {
2178 		return -ENOMEM;
2179 	}
2180 
2181 	args->file = file;
2182 	file->fs->send_request(__file_flush, args);
2183 	return 0;
2184 }
2185 
2186 static void
2187 __readahead_done(void *arg, int bserrno)
2188 {
2189 	struct spdk_fs_cb_args *args = arg;
2190 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2191 	struct spdk_file *file = args->file;
2192 
2193 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2194 
2195 	pthread_spin_lock(&file->lock);
2196 	cache_buffer->bytes_filled = args->op.readahead.length;
2197 	cache_buffer->bytes_flushed = args->op.readahead.length;
2198 	cache_buffer->in_progress = false;
2199 	pthread_spin_unlock(&file->lock);
2200 
2201 	__free_args(args);
2202 }
2203 
2204 static void
2205 __readahead(void *_args)
2206 {
2207 	struct spdk_fs_cb_args *args = _args;
2208 	struct spdk_file *file = args->file;
2209 	uint64_t offset, length, start_page, num_pages;
2210 	uint32_t page_size;
2211 
2212 	offset = args->op.readahead.offset;
2213 	length = args->op.readahead.length;
2214 	assert(length > 0);
2215 
2216 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2217 
2218 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2219 		     offset, length, start_page, num_pages);
2220 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2221 			  args->op.readahead.cache_buffer->buf,
2222 			  start_page, num_pages, __readahead_done, args);
2223 }
2224 
2225 static uint64_t
2226 __next_cache_buffer_offset(uint64_t offset)
2227 {
2228 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2229 }
2230 
2231 static void
2232 check_readahead(struct spdk_file *file, uint64_t offset)
2233 {
2234 	struct spdk_fs_cb_args *args;
2235 
2236 	offset = __next_cache_buffer_offset(offset);
2237 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2238 		return;
2239 	}
2240 
2241 	args = calloc(1, sizeof(*args));
2242 	if (args == NULL) {
2243 		return;
2244 	}
2245 
2246 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2247 
2248 	args->file = file;
2249 	args->op.readahead.offset = offset;
2250 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2251 	if (!args->op.readahead.cache_buffer) {
2252 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2253 		free(args);
2254 		return;
2255 	}
2256 
2257 	args->op.readahead.cache_buffer->in_progress = true;
2258 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2259 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2260 	} else {
2261 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2262 	}
2263 	file->fs->send_request(__readahead, args);
2264 }
2265 
2266 static int
2267 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2268 {
2269 	struct cache_buffer *buf;
2270 	int rc;
2271 
2272 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2273 	if (buf == NULL) {
2274 		pthread_spin_unlock(&file->lock);
2275 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2276 		pthread_spin_lock(&file->lock);
2277 		return rc;
2278 	}
2279 
2280 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2281 		length = buf->offset + buf->bytes_filled - offset;
2282 	}
2283 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2284 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2285 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2286 		pthread_spin_lock(&g_caches_lock);
2287 		spdk_tree_remove_buffer(file->tree, buf);
2288 		if (file->tree->present_mask == 0) {
2289 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2290 		}
2291 		pthread_spin_unlock(&g_caches_lock);
2292 	}
2293 
2294 	sem_post(sem);
2295 	return 0;
2296 }
2297 
2298 int64_t
2299 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2300 	       void *payload, uint64_t offset, uint64_t length)
2301 {
2302 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2303 	uint64_t final_offset, final_length;
2304 	uint32_t sub_reads = 0;
2305 	int rc = 0;
2306 
2307 	pthread_spin_lock(&file->lock);
2308 
2309 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2310 
2311 	file->open_for_writing = false;
2312 
2313 	if (length == 0 || offset >= file->append_pos) {
2314 		pthread_spin_unlock(&file->lock);
2315 		return 0;
2316 	}
2317 
2318 	if (offset + length > file->append_pos) {
2319 		length = file->append_pos - offset;
2320 	}
2321 
2322 	if (offset != file->next_seq_offset) {
2323 		file->seq_byte_count = 0;
2324 	}
2325 	file->seq_byte_count += length;
2326 	file->next_seq_offset = offset + length;
2327 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2328 		check_readahead(file, offset);
2329 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2330 	}
2331 
2332 	final_length = 0;
2333 	final_offset = offset + length;
2334 	while (offset < final_offset) {
2335 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2336 		if (length > (final_offset - offset)) {
2337 			length = final_offset - offset;
2338 		}
2339 		rc = __file_read(file, payload, offset, length, &channel->sem);
2340 		if (rc == 0) {
2341 			final_length += length;
2342 		} else {
2343 			break;
2344 		}
2345 		payload += length;
2346 		offset += length;
2347 		sub_reads++;
2348 	}
2349 	pthread_spin_unlock(&file->lock);
2350 	while (sub_reads-- > 0) {
2351 		sem_wait(&channel->sem);
2352 	}
2353 	if (rc == 0) {
2354 		return final_length;
2355 	} else {
2356 		return rc;
2357 	}
2358 }
2359 
2360 static void
2361 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2362 	   spdk_file_op_complete cb_fn, void *cb_arg)
2363 {
2364 	struct spdk_fs_request *sync_req;
2365 	struct spdk_fs_request *flush_req;
2366 	struct spdk_fs_cb_args *sync_args;
2367 	struct spdk_fs_cb_args *flush_args;
2368 
2369 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2370 
2371 	pthread_spin_lock(&file->lock);
2372 	if (file->append_pos <= file->length_flushed) {
2373 		BLOBFS_TRACE(file, "done - no data to flush\n");
2374 		pthread_spin_unlock(&file->lock);
2375 		cb_fn(cb_arg, 0);
2376 		return;
2377 	}
2378 
2379 	sync_req = alloc_fs_request(channel);
2380 	if (!sync_req) {
2381 		pthread_spin_unlock(&file->lock);
2382 		cb_fn(cb_arg, -ENOMEM);
2383 		return;
2384 	}
2385 	sync_args = &sync_req->args;
2386 
2387 	flush_req = alloc_fs_request(channel);
2388 	if (!flush_req) {
2389 		pthread_spin_unlock(&file->lock);
2390 		cb_fn(cb_arg, -ENOMEM);
2391 		return;
2392 	}
2393 	flush_args = &flush_req->args;
2394 
2395 	sync_args->file = file;
2396 	sync_args->fn.file_op = cb_fn;
2397 	sync_args->arg = cb_arg;
2398 	sync_args->op.sync.offset = file->append_pos;
2399 	sync_args->op.sync.xattr_in_progress = false;
2400 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2401 	pthread_spin_unlock(&file->lock);
2402 
2403 	flush_args->file = file;
2404 	channel->send_request(__file_flush, flush_args);
2405 }
2406 
2407 int
2408 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2409 {
2410 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2411 
2412 	_file_sync(file, channel, __sem_post, &channel->sem);
2413 	sem_wait(&channel->sem);
2414 
2415 	return 0;
2416 }
2417 
2418 void
2419 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2420 		     spdk_file_op_complete cb_fn, void *cb_arg)
2421 {
2422 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2423 
2424 	_file_sync(file, channel, cb_fn, cb_arg);
2425 }
2426 
2427 void
2428 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2429 {
2430 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2431 	file->priority = priority;
2432 
2433 }
2434 
2435 /*
2436  * Close routines
2437  */
2438 
2439 static void
2440 __file_close_async_done(void *ctx, int bserrno)
2441 {
2442 	struct spdk_fs_request *req = ctx;
2443 	struct spdk_fs_cb_args *args = &req->args;
2444 	struct spdk_file *file = args->file;
2445 
2446 	if (file->is_deleted) {
2447 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2448 		return;
2449 	}
2450 
2451 	args->fn.file_op(args->arg, bserrno);
2452 	free_fs_request(req);
2453 }
2454 
2455 static void
2456 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2457 {
2458 	struct spdk_blob *blob;
2459 
2460 	pthread_spin_lock(&file->lock);
2461 	if (file->ref_count == 0) {
2462 		pthread_spin_unlock(&file->lock);
2463 		__file_close_async_done(req, -EBADF);
2464 		return;
2465 	}
2466 
2467 	file->ref_count--;
2468 	if (file->ref_count > 0) {
2469 		pthread_spin_unlock(&file->lock);
2470 		req->args.fn.file_op(req->args.arg, 0);
2471 		free_fs_request(req);
2472 		return;
2473 	}
2474 
2475 	pthread_spin_unlock(&file->lock);
2476 
2477 	blob = file->blob;
2478 	file->blob = NULL;
2479 	spdk_blob_close(blob, __file_close_async_done, req);
2480 }
2481 
2482 static void
2483 __file_close_async__sync_done(void *arg, int fserrno)
2484 {
2485 	struct spdk_fs_request *req = arg;
2486 	struct spdk_fs_cb_args *args = &req->args;
2487 
2488 	__file_close_async(args->file, req);
2489 }
2490 
2491 void
2492 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2493 {
2494 	struct spdk_fs_request *req;
2495 	struct spdk_fs_cb_args *args;
2496 
2497 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2498 	if (req == NULL) {
2499 		cb_fn(cb_arg, -ENOMEM);
2500 		return;
2501 	}
2502 
2503 	args = &req->args;
2504 	args->file = file;
2505 	args->fn.file_op = cb_fn;
2506 	args->arg = cb_arg;
2507 
2508 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2509 }
2510 
2511 static void
2512 __file_close_done(void *arg, int fserrno)
2513 {
2514 	struct spdk_fs_cb_args *args = arg;
2515 
2516 	args->rc = fserrno;
2517 	sem_post(args->sem);
2518 }
2519 
2520 static void
2521 __file_close(void *arg)
2522 {
2523 	struct spdk_fs_request *req = arg;
2524 	struct spdk_fs_cb_args *args = &req->args;
2525 	struct spdk_file *file = args->file;
2526 
2527 	__file_close_async(file, req);
2528 }
2529 
2530 int
2531 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2532 {
2533 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2534 	struct spdk_fs_request *req;
2535 	struct spdk_fs_cb_args *args;
2536 
2537 	req = alloc_fs_request(channel);
2538 	if (req == NULL) {
2539 		return -ENOMEM;
2540 	}
2541 
2542 	args = &req->args;
2543 
2544 	spdk_file_sync(file, _channel);
2545 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2546 	args->file = file;
2547 	args->sem = &channel->sem;
2548 	args->fn.file_op = __file_close_done;
2549 	args->arg = req;
2550 	channel->send_request(__file_close, req);
2551 	sem_wait(&channel->sem);
2552 
2553 	return args->rc;
2554 }
2555 
2556 static void
2557 cache_free_buffers(struct spdk_file *file)
2558 {
2559 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2560 	pthread_spin_lock(&file->lock);
2561 	pthread_spin_lock(&g_caches_lock);
2562 	if (file->tree->present_mask == 0) {
2563 		pthread_spin_unlock(&g_caches_lock);
2564 		pthread_spin_unlock(&file->lock);
2565 		return;
2566 	}
2567 	spdk_tree_free_buffers(file->tree);
2568 
2569 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2570 	/* If not freed, put it in the end of the queue */
2571 	if (file->tree->present_mask != 0) {
2572 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2573 	}
2574 	file->last = NULL;
2575 	pthread_spin_unlock(&g_caches_lock);
2576 	pthread_spin_unlock(&file->lock);
2577 }
2578 
2579 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2580 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2581