xref: /spdk/lib/blobfs/blobfs.c (revision 552e21cce6cccbf833ed9109827e08337377d7ce)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	bool from_request;
138 	union {
139 		struct {
140 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
141 		} fs_load;
142 		struct {
143 			uint64_t	length;
144 		} truncate;
145 		struct {
146 			struct spdk_io_channel	*channel;
147 			void		*user_buf;
148 			void		*pin_buf;
149 			int		is_read;
150 			off_t		offset;
151 			size_t		length;
152 			uint64_t	start_lba;
153 			uint64_t	num_lba;
154 			uint32_t	blocklen;
155 		} rw;
156 		struct {
157 			const char	*old_name;
158 			const char	*new_name;
159 		} rename;
160 		struct {
161 			struct cache_buffer	*cache_buffer;
162 			uint64_t		length;
163 		} flush;
164 		struct {
165 			struct cache_buffer	*cache_buffer;
166 			uint64_t		length;
167 			uint64_t		offset;
168 		} readahead;
169 		struct {
170 			uint64_t			offset;
171 			TAILQ_ENTRY(spdk_fs_request)	tailq;
172 			bool				xattr_in_progress;
173 		} sync;
174 		struct {
175 			uint32_t			num_clusters;
176 		} resize;
177 		struct {
178 			const char	*name;
179 			uint32_t	flags;
180 			TAILQ_ENTRY(spdk_fs_request)	tailq;
181 		} open;
182 		struct {
183 			const char		*name;
184 			struct spdk_blob	*blob;
185 		} create;
186 		struct {
187 			const char	*name;
188 		} delete;
189 		struct {
190 			const char	*name;
191 		} stat;
192 	} op;
193 };
194 
195 static void cache_free_buffers(struct spdk_file *file);
196 
197 void
198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
199 {
200 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
201 }
202 
203 static void
204 __initialize_cache(void)
205 {
206 	assert(g_cache_pool == NULL);
207 
208 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
209 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
210 					   CACHE_BUFFER_SIZE,
211 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 					   SPDK_ENV_SOCKET_ID_ANY);
213 	if (!g_cache_pool) {
214 		SPDK_ERRLOG("Create mempool failed, you may "
215 			    "increase the memory and try again\n");
216 		assert(false);
217 	}
218 	TAILQ_INIT(&g_caches);
219 	pthread_spin_init(&g_caches_lock, 0);
220 }
221 
222 static void
223 __free_cache(void)
224 {
225 	assert(g_cache_pool != NULL);
226 
227 	spdk_mempool_free(g_cache_pool);
228 	g_cache_pool = NULL;
229 }
230 
231 static uint64_t
232 __file_get_blob_size(struct spdk_file *file)
233 {
234 	uint64_t cluster_sz;
235 
236 	cluster_sz = file->fs->bs_opts.cluster_sz;
237 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
238 }
239 
240 struct spdk_fs_request {
241 	struct spdk_fs_cb_args		args;
242 	TAILQ_ENTRY(spdk_fs_request)	link;
243 	struct spdk_fs_channel		*channel;
244 };
245 
246 struct spdk_fs_channel {
247 	struct spdk_fs_request		*req_mem;
248 	TAILQ_HEAD(, spdk_fs_request)	reqs;
249 	sem_t				sem;
250 	struct spdk_filesystem		*fs;
251 	struct spdk_io_channel		*bs_channel;
252 	fs_send_request_fn		send_request;
253 	bool				sync;
254 	pthread_spinlock_t		lock;
255 };
256 
257 static struct spdk_fs_request *
258 alloc_fs_request(struct spdk_fs_channel *channel)
259 {
260 	struct spdk_fs_request *req;
261 
262 	if (channel->sync) {
263 		pthread_spin_lock(&channel->lock);
264 	}
265 
266 	req = TAILQ_FIRST(&channel->reqs);
267 	if (req) {
268 		TAILQ_REMOVE(&channel->reqs, req, link);
269 	}
270 
271 	if (channel->sync) {
272 		pthread_spin_unlock(&channel->lock);
273 	}
274 
275 	if (req == NULL) {
276 		return NULL;
277 	}
278 	memset(req, 0, sizeof(*req));
279 	req->channel = channel;
280 	req->args.from_request = true;
281 
282 	return req;
283 }
284 
285 static void
286 free_fs_request(struct spdk_fs_request *req)
287 {
288 	struct spdk_fs_channel *channel = req->channel;
289 
290 	if (channel->sync) {
291 		pthread_spin_lock(&channel->lock);
292 	}
293 
294 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
295 
296 	if (channel->sync) {
297 		pthread_spin_unlock(&channel->lock);
298 	}
299 }
300 
301 static int
302 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
303 			uint32_t max_ops)
304 {
305 	uint32_t i;
306 
307 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
308 	if (!channel->req_mem) {
309 		return -1;
310 	}
311 
312 	TAILQ_INIT(&channel->reqs);
313 	sem_init(&channel->sem, 0, 0);
314 
315 	for (i = 0; i < max_ops; i++) {
316 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
317 	}
318 
319 	channel->fs = fs;
320 
321 	return 0;
322 }
323 
324 static int
325 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
326 {
327 	struct spdk_filesystem		*fs;
328 	struct spdk_fs_channel		*channel = ctx_buf;
329 
330 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
331 
332 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
333 }
334 
335 static int
336 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
337 {
338 	struct spdk_filesystem		*fs;
339 	struct spdk_fs_channel		*channel = ctx_buf;
340 
341 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
342 
343 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
344 }
345 
346 static int
347 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
348 {
349 	struct spdk_filesystem		*fs;
350 	struct spdk_fs_channel		*channel = ctx_buf;
351 
352 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
353 
354 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
355 }
356 
357 static void
358 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
359 {
360 	struct spdk_fs_channel *channel = ctx_buf;
361 
362 	free(channel->req_mem);
363 	if (channel->bs_channel != NULL) {
364 		spdk_bs_free_io_channel(channel->bs_channel);
365 	}
366 }
367 
368 static void
369 __send_request_direct(fs_request_fn fn, void *arg)
370 {
371 	fn(arg);
372 }
373 
374 static void
375 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
376 {
377 	fs->bs = bs;
378 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
379 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
380 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
381 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
382 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
383 
384 	pthread_mutex_lock(&g_cache_init_lock);
385 	if (g_fs_count == 0) {
386 		__initialize_cache();
387 	}
388 	g_fs_count++;
389 	pthread_mutex_unlock(&g_cache_init_lock);
390 }
391 
392 static void
393 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
394 {
395 	struct spdk_fs_request *req = ctx;
396 	struct spdk_fs_cb_args *args = &req->args;
397 	struct spdk_filesystem *fs = args->fs;
398 
399 	if (bserrno == 0) {
400 		common_fs_bs_init(fs, bs);
401 	} else {
402 		free(fs);
403 		fs = NULL;
404 	}
405 
406 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
407 	free_fs_request(req);
408 }
409 
410 static void
411 fs_conf_parse(void)
412 {
413 	struct spdk_conf_section *sp;
414 
415 	sp = spdk_conf_find_section(NULL, "Blobfs");
416 	if (sp == NULL) {
417 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
418 		return;
419 	}
420 
421 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
422 	if (g_fs_cache_buffer_shift <= 0) {
423 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
424 	}
425 }
426 
427 static struct spdk_filesystem *
428 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
429 {
430 	struct spdk_filesystem *fs;
431 
432 	fs = calloc(1, sizeof(*fs));
433 	if (fs == NULL) {
434 		return NULL;
435 	}
436 
437 	fs->bdev = dev;
438 	fs->send_request = send_request_fn;
439 	TAILQ_INIT(&fs->files);
440 
441 	fs->md_target.max_ops = 512;
442 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
443 				sizeof(struct spdk_fs_channel), "blobfs_md");
444 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
445 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
446 
447 	fs->sync_target.max_ops = 512;
448 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
449 				sizeof(struct spdk_fs_channel), "blobfs_sync");
450 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
451 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
452 
453 	fs->io_target.max_ops = 512;
454 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
455 				sizeof(struct spdk_fs_channel), "blobfs_io");
456 
457 	return fs;
458 }
459 
460 static void
461 __wake_caller(void *arg, int fserrno)
462 {
463 	struct spdk_fs_cb_args *args = arg;
464 
465 	args->rc = fserrno;
466 	sem_post(args->sem);
467 }
468 
469 void
470 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
471 	     fs_send_request_fn send_request_fn,
472 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
473 {
474 	struct spdk_filesystem *fs;
475 	struct spdk_fs_request *req;
476 	struct spdk_fs_cb_args *args;
477 	struct spdk_bs_opts opts = {};
478 
479 	fs = fs_alloc(dev, send_request_fn);
480 	if (fs == NULL) {
481 		cb_fn(cb_arg, NULL, -ENOMEM);
482 		return;
483 	}
484 
485 	fs_conf_parse();
486 
487 	req = alloc_fs_request(fs->md_target.md_fs_channel);
488 	if (req == NULL) {
489 		spdk_put_io_channel(fs->md_target.md_io_channel);
490 		spdk_io_device_unregister(&fs->md_target, NULL);
491 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
492 		spdk_io_device_unregister(&fs->sync_target, NULL);
493 		spdk_io_device_unregister(&fs->io_target, NULL);
494 		free(fs);
495 		cb_fn(cb_arg, NULL, -ENOMEM);
496 		return;
497 	}
498 
499 	args = &req->args;
500 	args->fn.fs_op_with_handle = cb_fn;
501 	args->arg = cb_arg;
502 	args->fs = fs;
503 
504 	spdk_bs_opts_init(&opts);
505 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
506 	if (opt) {
507 		opts.cluster_sz = opt->cluster_sz;
508 	}
509 	spdk_bs_init(dev, &opts, init_cb, req);
510 }
511 
512 static struct spdk_file *
513 file_alloc(struct spdk_filesystem *fs)
514 {
515 	struct spdk_file *file;
516 
517 	file = calloc(1, sizeof(*file));
518 	if (file == NULL) {
519 		return NULL;
520 	}
521 
522 	file->tree = calloc(1, sizeof(*file->tree));
523 	if (file->tree == NULL) {
524 		free(file);
525 		return NULL;
526 	}
527 
528 	file->fs = fs;
529 	TAILQ_INIT(&file->open_requests);
530 	TAILQ_INIT(&file->sync_requests);
531 	pthread_spin_init(&file->lock, 0);
532 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
533 	file->priority = SPDK_FILE_PRIORITY_LOW;
534 	return file;
535 }
536 
537 static void fs_load_done(void *ctx, int bserrno);
538 
539 static int
540 _handle_deleted_files(struct spdk_fs_request *req)
541 {
542 	struct spdk_fs_cb_args *args = &req->args;
543 	struct spdk_filesystem *fs = args->fs;
544 
545 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
546 		struct spdk_deleted_file *deleted_file;
547 
548 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
549 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
550 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
551 		free(deleted_file);
552 		return 0;
553 	}
554 
555 	return 1;
556 }
557 
558 static void
559 fs_load_done(void *ctx, int bserrno)
560 {
561 	struct spdk_fs_request *req = ctx;
562 	struct spdk_fs_cb_args *args = &req->args;
563 	struct spdk_filesystem *fs = args->fs;
564 
565 	/* The filesystem has been loaded.  Now check if there are any files that
566 	 *  were marked for deletion before last unload.  Do not complete the
567 	 *  fs_load callback until all of them have been deleted on disk.
568 	 */
569 	if (_handle_deleted_files(req) == 0) {
570 		/* We found a file that's been marked for deleting but not actually
571 		 *  deleted yet.  This function will get called again once the delete
572 		 *  operation is completed.
573 		 */
574 		return;
575 	}
576 
577 	args->fn.fs_op_with_handle(args->arg, fs, 0);
578 	free_fs_request(req);
579 
580 }
581 
582 static void
583 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
584 {
585 	struct spdk_fs_request *req = ctx;
586 	struct spdk_fs_cb_args *args = &req->args;
587 	struct spdk_filesystem *fs = args->fs;
588 	uint64_t *length;
589 	const char *name;
590 	uint32_t *is_deleted;
591 	size_t value_len;
592 
593 	if (rc < 0) {
594 		args->fn.fs_op_with_handle(args->arg, fs, rc);
595 		free_fs_request(req);
596 		return;
597 	}
598 
599 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
600 	if (rc < 0) {
601 		args->fn.fs_op_with_handle(args->arg, fs, rc);
602 		free_fs_request(req);
603 		return;
604 	}
605 
606 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
607 	if (rc < 0) {
608 		args->fn.fs_op_with_handle(args->arg, fs, rc);
609 		free_fs_request(req);
610 		return;
611 	}
612 
613 	assert(value_len == 8);
614 
615 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
616 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
617 	if (rc < 0) {
618 		struct spdk_file *f;
619 
620 		f = file_alloc(fs);
621 		if (f == NULL) {
622 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
623 			free_fs_request(req);
624 			return;
625 		}
626 
627 		f->name = strdup(name);
628 		f->blobid = spdk_blob_get_id(blob);
629 		f->length = *length;
630 		f->length_flushed = *length;
631 		f->append_pos = *length;
632 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
633 	} else {
634 		struct spdk_deleted_file *deleted_file;
635 
636 		deleted_file = calloc(1, sizeof(*deleted_file));
637 		if (deleted_file == NULL) {
638 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
639 			free_fs_request(req);
640 			return;
641 		}
642 		deleted_file->id = spdk_blob_get_id(blob);
643 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
644 	}
645 }
646 
647 static void
648 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
649 {
650 	struct spdk_fs_request *req = ctx;
651 	struct spdk_fs_cb_args *args = &req->args;
652 	struct spdk_filesystem *fs = args->fs;
653 	struct spdk_bs_type bstype;
654 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
655 	static const struct spdk_bs_type zeros;
656 
657 	if (bserrno != 0) {
658 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
659 		free_fs_request(req);
660 		free(fs);
661 		return;
662 	}
663 
664 	bstype = spdk_bs_get_bstype(bs);
665 
666 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
667 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
668 		spdk_bs_set_bstype(bs, blobfs_type);
669 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
670 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
671 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
672 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
673 		free_fs_request(req);
674 		free(fs);
675 		return;
676 	}
677 
678 	common_fs_bs_init(fs, bs);
679 	fs_load_done(req, 0);
680 }
681 
682 static void
683 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
684 {
685 	assert(fs != NULL);
686 	spdk_io_device_unregister(&fs->md_target, NULL);
687 	spdk_io_device_unregister(&fs->sync_target, NULL);
688 	spdk_io_device_unregister(&fs->io_target, NULL);
689 	free(fs);
690 }
691 
692 static void
693 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
694 {
695 	assert(fs != NULL);
696 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
697 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
698 }
699 
700 void
701 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
702 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
703 {
704 	struct spdk_filesystem *fs;
705 	struct spdk_fs_cb_args *args;
706 	struct spdk_fs_request *req;
707 	struct spdk_bs_opts	bs_opts;
708 
709 	fs = fs_alloc(dev, send_request_fn);
710 	if (fs == NULL) {
711 		cb_fn(cb_arg, NULL, -ENOMEM);
712 		return;
713 	}
714 
715 	fs_conf_parse();
716 
717 	req = alloc_fs_request(fs->md_target.md_fs_channel);
718 	if (req == NULL) {
719 		spdk_fs_free_io_channels(fs);
720 		spdk_fs_io_device_unregister(fs);
721 		cb_fn(cb_arg, NULL, -ENOMEM);
722 		return;
723 	}
724 
725 	args = &req->args;
726 	args->fn.fs_op_with_handle = cb_fn;
727 	args->arg = cb_arg;
728 	args->fs = fs;
729 	TAILQ_INIT(&args->op.fs_load.deleted_files);
730 	spdk_bs_opts_init(&bs_opts);
731 	bs_opts.iter_cb_fn = iter_cb;
732 	bs_opts.iter_cb_arg = req;
733 	spdk_bs_load(dev, &bs_opts, load_cb, req);
734 }
735 
736 static void
737 unload_cb(void *ctx, int bserrno)
738 {
739 	struct spdk_fs_request *req = ctx;
740 	struct spdk_fs_cb_args *args = &req->args;
741 	struct spdk_filesystem *fs = args->fs;
742 	struct spdk_file *file, *tmp;
743 
744 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
745 		TAILQ_REMOVE(&fs->files, file, tailq);
746 		cache_free_buffers(file);
747 		free(file->name);
748 		free(file->tree);
749 		free(file);
750 	}
751 
752 	pthread_mutex_lock(&g_cache_init_lock);
753 	g_fs_count--;
754 	if (g_fs_count == 0) {
755 		__free_cache();
756 	}
757 	pthread_mutex_unlock(&g_cache_init_lock);
758 
759 	args->fn.fs_op(args->arg, bserrno);
760 	free(req);
761 
762 	spdk_fs_io_device_unregister(fs);
763 }
764 
765 void
766 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
767 {
768 	struct spdk_fs_request *req;
769 	struct spdk_fs_cb_args *args;
770 
771 	/*
772 	 * We must free the md_channel before unloading the blobstore, so just
773 	 *  allocate this request from the general heap.
774 	 */
775 	req = calloc(1, sizeof(*req));
776 	if (req == NULL) {
777 		cb_fn(cb_arg, -ENOMEM);
778 		return;
779 	}
780 
781 	args = &req->args;
782 	args->fn.fs_op = cb_fn;
783 	args->arg = cb_arg;
784 	args->fs = fs;
785 
786 	spdk_fs_free_io_channels(fs);
787 	spdk_bs_unload(fs->bs, unload_cb, req);
788 }
789 
790 static struct spdk_file *
791 fs_find_file(struct spdk_filesystem *fs, const char *name)
792 {
793 	struct spdk_file *file;
794 
795 	TAILQ_FOREACH(file, &fs->files, tailq) {
796 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
797 			return file;
798 		}
799 	}
800 
801 	return NULL;
802 }
803 
804 void
805 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
806 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
807 {
808 	struct spdk_file_stat stat;
809 	struct spdk_file *f = NULL;
810 
811 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
812 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
813 		return;
814 	}
815 
816 	f = fs_find_file(fs, name);
817 	if (f != NULL) {
818 		stat.blobid = f->blobid;
819 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
820 		cb_fn(cb_arg, &stat, 0);
821 		return;
822 	}
823 
824 	cb_fn(cb_arg, NULL, -ENOENT);
825 }
826 
827 static void
828 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
829 {
830 	struct spdk_fs_request *req = arg;
831 	struct spdk_fs_cb_args *args = &req->args;
832 
833 	args->rc = fserrno;
834 	if (fserrno == 0) {
835 		memcpy(args->arg, stat, sizeof(*stat));
836 	}
837 	sem_post(args->sem);
838 }
839 
840 static void
841 __file_stat(void *arg)
842 {
843 	struct spdk_fs_request *req = arg;
844 	struct spdk_fs_cb_args *args = &req->args;
845 
846 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
847 				args->fn.stat_op, req);
848 }
849 
850 int
851 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
852 		  const char *name, struct spdk_file_stat *stat)
853 {
854 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
855 	struct spdk_fs_request *req;
856 	int rc;
857 
858 	req = alloc_fs_request(channel);
859 	if (req == NULL) {
860 		return -ENOMEM;
861 	}
862 
863 	req->args.fs = fs;
864 	req->args.op.stat.name = name;
865 	req->args.fn.stat_op = __copy_stat;
866 	req->args.arg = stat;
867 	req->args.sem = &channel->sem;
868 	channel->send_request(__file_stat, req);
869 	sem_wait(&channel->sem);
870 
871 	rc = req->args.rc;
872 	free_fs_request(req);
873 
874 	return rc;
875 }
876 
877 static void
878 fs_create_blob_close_cb(void *ctx, int bserrno)
879 {
880 	int rc;
881 	struct spdk_fs_request *req = ctx;
882 	struct spdk_fs_cb_args *args = &req->args;
883 
884 	rc = args->rc ? args->rc : bserrno;
885 	args->fn.file_op(args->arg, rc);
886 	free_fs_request(req);
887 }
888 
889 static void
890 fs_create_blob_resize_cb(void *ctx, int bserrno)
891 {
892 	struct spdk_fs_request *req = ctx;
893 	struct spdk_fs_cb_args *args = &req->args;
894 	struct spdk_file *f = args->file;
895 	struct spdk_blob *blob = args->op.create.blob;
896 	uint64_t length = 0;
897 
898 	args->rc = bserrno;
899 	if (bserrno) {
900 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
901 		return;
902 	}
903 
904 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
905 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
906 
907 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
908 }
909 
910 static void
911 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
912 {
913 	struct spdk_fs_request *req = ctx;
914 	struct spdk_fs_cb_args *args = &req->args;
915 
916 	if (bserrno) {
917 		args->fn.file_op(args->arg, bserrno);
918 		free_fs_request(req);
919 		return;
920 	}
921 
922 	args->op.create.blob = blob;
923 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
924 }
925 
926 static void
927 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
928 {
929 	struct spdk_fs_request *req = ctx;
930 	struct spdk_fs_cb_args *args = &req->args;
931 	struct spdk_file *f = args->file;
932 
933 	if (bserrno) {
934 		args->fn.file_op(args->arg, bserrno);
935 		free_fs_request(req);
936 		return;
937 	}
938 
939 	f->blobid = blobid;
940 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
941 }
942 
943 void
944 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
945 			  spdk_file_op_complete cb_fn, void *cb_arg)
946 {
947 	struct spdk_file *file;
948 	struct spdk_fs_request *req;
949 	struct spdk_fs_cb_args *args;
950 
951 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
952 		cb_fn(cb_arg, -ENAMETOOLONG);
953 		return;
954 	}
955 
956 	file = fs_find_file(fs, name);
957 	if (file != NULL) {
958 		cb_fn(cb_arg, -EEXIST);
959 		return;
960 	}
961 
962 	file = file_alloc(fs);
963 	if (file == NULL) {
964 		cb_fn(cb_arg, -ENOMEM);
965 		return;
966 	}
967 
968 	req = alloc_fs_request(fs->md_target.md_fs_channel);
969 	if (req == NULL) {
970 		cb_fn(cb_arg, -ENOMEM);
971 		return;
972 	}
973 
974 	args = &req->args;
975 	args->file = file;
976 	args->fn.file_op = cb_fn;
977 	args->arg = cb_arg;
978 
979 	file->name = strdup(name);
980 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
981 }
982 
983 static void
984 __fs_create_file_done(void *arg, int fserrno)
985 {
986 	struct spdk_fs_request *req = arg;
987 	struct spdk_fs_cb_args *args = &req->args;
988 
989 	args->rc = fserrno;
990 	sem_post(args->sem);
991 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
992 }
993 
994 static void
995 __fs_create_file(void *arg)
996 {
997 	struct spdk_fs_request *req = arg;
998 	struct spdk_fs_cb_args *args = &req->args;
999 
1000 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1001 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1002 }
1003 
1004 int
1005 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
1006 {
1007 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1008 	struct spdk_fs_request *req;
1009 	struct spdk_fs_cb_args *args;
1010 	int rc;
1011 
1012 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1013 
1014 	req = alloc_fs_request(channel);
1015 	if (req == NULL) {
1016 		return -ENOMEM;
1017 	}
1018 
1019 	args = &req->args;
1020 	args->fs = fs;
1021 	args->op.create.name = name;
1022 	args->sem = &channel->sem;
1023 	fs->send_request(__fs_create_file, req);
1024 	sem_wait(&channel->sem);
1025 	rc = args->rc;
1026 	free_fs_request(req);
1027 
1028 	return rc;
1029 }
1030 
1031 static void
1032 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1033 {
1034 	struct spdk_fs_request *req = ctx;
1035 	struct spdk_fs_cb_args *args = &req->args;
1036 	struct spdk_file *f = args->file;
1037 
1038 	f->blob = blob;
1039 	while (!TAILQ_EMPTY(&f->open_requests)) {
1040 		req = TAILQ_FIRST(&f->open_requests);
1041 		args = &req->args;
1042 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1043 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1044 		free_fs_request(req);
1045 	}
1046 }
1047 
1048 static void
1049 fs_open_blob_create_cb(void *ctx, int bserrno)
1050 {
1051 	struct spdk_fs_request *req = ctx;
1052 	struct spdk_fs_cb_args *args = &req->args;
1053 	struct spdk_file *file = args->file;
1054 	struct spdk_filesystem *fs = args->fs;
1055 
1056 	if (file == NULL) {
1057 		/*
1058 		 * This is from an open with CREATE flag - the file
1059 		 *  is now created so look it up in the file list for this
1060 		 *  filesystem.
1061 		 */
1062 		file = fs_find_file(fs, args->op.open.name);
1063 		assert(file != NULL);
1064 		args->file = file;
1065 	}
1066 
1067 	file->ref_count++;
1068 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1069 	if (file->ref_count == 1) {
1070 		assert(file->blob == NULL);
1071 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1072 	} else if (file->blob != NULL) {
1073 		fs_open_blob_done(req, file->blob, 0);
1074 	} else {
1075 		/*
1076 		 * The blob open for this file is in progress due to a previous
1077 		 *  open request.  When that open completes, it will invoke the
1078 		 *  open callback for this request.
1079 		 */
1080 	}
1081 }
1082 
1083 void
1084 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1085 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1086 {
1087 	struct spdk_file *f = NULL;
1088 	struct spdk_fs_request *req;
1089 	struct spdk_fs_cb_args *args;
1090 
1091 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1092 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1093 		return;
1094 	}
1095 
1096 	f = fs_find_file(fs, name);
1097 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1098 		cb_fn(cb_arg, NULL, -ENOENT);
1099 		return;
1100 	}
1101 
1102 	if (f != NULL && f->is_deleted == true) {
1103 		cb_fn(cb_arg, NULL, -ENOENT);
1104 		return;
1105 	}
1106 
1107 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1108 	if (req == NULL) {
1109 		cb_fn(cb_arg, NULL, -ENOMEM);
1110 		return;
1111 	}
1112 
1113 	args = &req->args;
1114 	args->fn.file_op_with_handle = cb_fn;
1115 	args->arg = cb_arg;
1116 	args->file = f;
1117 	args->fs = fs;
1118 	args->op.open.name = name;
1119 
1120 	if (f == NULL) {
1121 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1122 	} else {
1123 		fs_open_blob_create_cb(req, 0);
1124 	}
1125 }
1126 
1127 static void
1128 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1129 {
1130 	struct spdk_fs_request *req = arg;
1131 	struct spdk_fs_cb_args *args = &req->args;
1132 
1133 	args->file = file;
1134 	__wake_caller(args, bserrno);
1135 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1136 }
1137 
1138 static void
1139 __fs_open_file(void *arg)
1140 {
1141 	struct spdk_fs_request *req = arg;
1142 	struct spdk_fs_cb_args *args = &req->args;
1143 
1144 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1145 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1146 				__fs_open_file_done, req);
1147 }
1148 
1149 int
1150 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1151 		  const char *name, uint32_t flags, struct spdk_file **file)
1152 {
1153 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1154 	struct spdk_fs_request *req;
1155 	struct spdk_fs_cb_args *args;
1156 	int rc;
1157 
1158 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1159 
1160 	req = alloc_fs_request(channel);
1161 	if (req == NULL) {
1162 		return -ENOMEM;
1163 	}
1164 
1165 	args = &req->args;
1166 	args->fs = fs;
1167 	args->op.open.name = name;
1168 	args->op.open.flags = flags;
1169 	args->sem = &channel->sem;
1170 	fs->send_request(__fs_open_file, req);
1171 	sem_wait(&channel->sem);
1172 	rc = args->rc;
1173 	if (rc == 0) {
1174 		*file = args->file;
1175 	} else {
1176 		*file = NULL;
1177 	}
1178 	free_fs_request(req);
1179 
1180 	return rc;
1181 }
1182 
1183 static void
1184 fs_rename_blob_close_cb(void *ctx, int bserrno)
1185 {
1186 	struct spdk_fs_request *req = ctx;
1187 	struct spdk_fs_cb_args *args = &req->args;
1188 
1189 	args->fn.fs_op(args->arg, bserrno);
1190 	free_fs_request(req);
1191 }
1192 
1193 static void
1194 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1195 {
1196 	struct spdk_fs_request *req = ctx;
1197 	struct spdk_fs_cb_args *args = &req->args;
1198 	const char *new_name = args->op.rename.new_name;
1199 
1200 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1201 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1202 }
1203 
1204 static void
1205 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1206 {
1207 	struct spdk_fs_cb_args *args = &req->args;
1208 	struct spdk_file *f;
1209 
1210 	f = fs_find_file(args->fs, args->op.rename.old_name);
1211 	if (f == NULL) {
1212 		args->fn.fs_op(args->arg, -ENOENT);
1213 		free_fs_request(req);
1214 		return;
1215 	}
1216 
1217 	free(f->name);
1218 	f->name = strdup(args->op.rename.new_name);
1219 	args->file = f;
1220 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1221 }
1222 
1223 static void
1224 fs_rename_delete_done(void *arg, int fserrno)
1225 {
1226 	__spdk_fs_md_rename_file(arg);
1227 }
1228 
1229 void
1230 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1231 			  const char *old_name, const char *new_name,
1232 			  spdk_file_op_complete cb_fn, void *cb_arg)
1233 {
1234 	struct spdk_file *f;
1235 	struct spdk_fs_request *req;
1236 	struct spdk_fs_cb_args *args;
1237 
1238 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1239 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1240 		cb_fn(cb_arg, -ENAMETOOLONG);
1241 		return;
1242 	}
1243 
1244 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1245 	if (req == NULL) {
1246 		cb_fn(cb_arg, -ENOMEM);
1247 		return;
1248 	}
1249 
1250 	args = &req->args;
1251 	args->fn.fs_op = cb_fn;
1252 	args->fs = fs;
1253 	args->arg = cb_arg;
1254 	args->op.rename.old_name = old_name;
1255 	args->op.rename.new_name = new_name;
1256 
1257 	f = fs_find_file(fs, new_name);
1258 	if (f == NULL) {
1259 		__spdk_fs_md_rename_file(req);
1260 		return;
1261 	}
1262 
1263 	/*
1264 	 * The rename overwrites an existing file.  So delete the existing file, then
1265 	 *  do the actual rename.
1266 	 */
1267 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1268 }
1269 
1270 static void
1271 __fs_rename_file_done(void *arg, int fserrno)
1272 {
1273 	struct spdk_fs_request *req = arg;
1274 	struct spdk_fs_cb_args *args = &req->args;
1275 
1276 	__wake_caller(args, fserrno);
1277 }
1278 
1279 static void
1280 __fs_rename_file(void *arg)
1281 {
1282 	struct spdk_fs_request *req = arg;
1283 	struct spdk_fs_cb_args *args = &req->args;
1284 
1285 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1286 				  __fs_rename_file_done, req);
1287 }
1288 
1289 int
1290 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1291 		    const char *old_name, const char *new_name)
1292 {
1293 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1294 	struct spdk_fs_request *req;
1295 	struct spdk_fs_cb_args *args;
1296 	int rc;
1297 
1298 	req = alloc_fs_request(channel);
1299 	if (req == NULL) {
1300 		return -ENOMEM;
1301 	}
1302 
1303 	args = &req->args;
1304 
1305 	args->fs = fs;
1306 	args->op.rename.old_name = old_name;
1307 	args->op.rename.new_name = new_name;
1308 	args->sem = &channel->sem;
1309 	fs->send_request(__fs_rename_file, req);
1310 	sem_wait(&channel->sem);
1311 	rc = args->rc;
1312 	free_fs_request(req);
1313 	return rc;
1314 }
1315 
1316 static void
1317 blob_delete_cb(void *ctx, int bserrno)
1318 {
1319 	struct spdk_fs_request *req = ctx;
1320 	struct spdk_fs_cb_args *args = &req->args;
1321 
1322 	args->fn.file_op(args->arg, bserrno);
1323 	free_fs_request(req);
1324 }
1325 
1326 void
1327 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1328 			  spdk_file_op_complete cb_fn, void *cb_arg)
1329 {
1330 	struct spdk_file *f;
1331 	spdk_blob_id blobid;
1332 	struct spdk_fs_request *req;
1333 	struct spdk_fs_cb_args *args;
1334 
1335 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1336 
1337 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1338 		cb_fn(cb_arg, -ENAMETOOLONG);
1339 		return;
1340 	}
1341 
1342 	f = fs_find_file(fs, name);
1343 	if (f == NULL) {
1344 		cb_fn(cb_arg, -ENOENT);
1345 		return;
1346 	}
1347 
1348 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1349 	if (req == NULL) {
1350 		cb_fn(cb_arg, -ENOMEM);
1351 		return;
1352 	}
1353 
1354 	args = &req->args;
1355 	args->fn.file_op = cb_fn;
1356 	args->arg = cb_arg;
1357 
1358 	if (f->ref_count > 0) {
1359 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1360 		f->is_deleted = true;
1361 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1362 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1363 		return;
1364 	}
1365 
1366 	TAILQ_REMOVE(&fs->files, f, tailq);
1367 
1368 	cache_free_buffers(f);
1369 
1370 	blobid = f->blobid;
1371 
1372 	free(f->name);
1373 	free(f->tree);
1374 	free(f);
1375 
1376 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1377 }
1378 
1379 static void
1380 __fs_delete_file_done(void *arg, int fserrno)
1381 {
1382 	struct spdk_fs_request *req = arg;
1383 	struct spdk_fs_cb_args *args = &req->args;
1384 
1385 	__wake_caller(args, fserrno);
1386 }
1387 
1388 static void
1389 __fs_delete_file(void *arg)
1390 {
1391 	struct spdk_fs_request *req = arg;
1392 	struct spdk_fs_cb_args *args = &req->args;
1393 
1394 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1395 }
1396 
1397 int
1398 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1399 		    const char *name)
1400 {
1401 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1402 	struct spdk_fs_request *req;
1403 	struct spdk_fs_cb_args *args;
1404 	int rc;
1405 
1406 	req = alloc_fs_request(channel);
1407 	if (req == NULL) {
1408 		return -ENOMEM;
1409 	}
1410 
1411 	args = &req->args;
1412 	args->fs = fs;
1413 	args->op.delete.name = name;
1414 	args->sem = &channel->sem;
1415 	fs->send_request(__fs_delete_file, req);
1416 	sem_wait(&channel->sem);
1417 	rc = args->rc;
1418 	free_fs_request(req);
1419 
1420 	return rc;
1421 }
1422 
1423 spdk_fs_iter
1424 spdk_fs_iter_first(struct spdk_filesystem *fs)
1425 {
1426 	struct spdk_file *f;
1427 
1428 	f = TAILQ_FIRST(&fs->files);
1429 	return f;
1430 }
1431 
1432 spdk_fs_iter
1433 spdk_fs_iter_next(spdk_fs_iter iter)
1434 {
1435 	struct spdk_file *f = iter;
1436 
1437 	if (f == NULL) {
1438 		return NULL;
1439 	}
1440 
1441 	f = TAILQ_NEXT(f, tailq);
1442 	return f;
1443 }
1444 
1445 const char *
1446 spdk_file_get_name(struct spdk_file *file)
1447 {
1448 	return file->name;
1449 }
1450 
1451 uint64_t
1452 spdk_file_get_length(struct spdk_file *file)
1453 {
1454 	uint64_t length;
1455 
1456 	assert(file != NULL);
1457 
1458 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1459 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1460 	return length;
1461 }
1462 
1463 static void
1464 fs_truncate_complete_cb(void *ctx, int bserrno)
1465 {
1466 	struct spdk_fs_request *req = ctx;
1467 	struct spdk_fs_cb_args *args = &req->args;
1468 
1469 	args->fn.file_op(args->arg, bserrno);
1470 	free_fs_request(req);
1471 }
1472 
1473 static void
1474 fs_truncate_resize_cb(void *ctx, int bserrno)
1475 {
1476 	struct spdk_fs_request *req = ctx;
1477 	struct spdk_fs_cb_args *args = &req->args;
1478 	struct spdk_file *file = args->file;
1479 	uint64_t *length = &args->op.truncate.length;
1480 
1481 	if (bserrno) {
1482 		args->fn.file_op(args->arg, bserrno);
1483 		free_fs_request(req);
1484 		return;
1485 	}
1486 
1487 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1488 
1489 	file->length = *length;
1490 	if (file->append_pos > file->length) {
1491 		file->append_pos = file->length;
1492 	}
1493 
1494 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1495 }
1496 
1497 static uint64_t
1498 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1499 {
1500 	return (length + cluster_sz - 1) / cluster_sz;
1501 }
1502 
1503 void
1504 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1505 			 spdk_file_op_complete cb_fn, void *cb_arg)
1506 {
1507 	struct spdk_filesystem *fs;
1508 	size_t num_clusters;
1509 	struct spdk_fs_request *req;
1510 	struct spdk_fs_cb_args *args;
1511 
1512 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1513 	if (length == file->length) {
1514 		cb_fn(cb_arg, 0);
1515 		return;
1516 	}
1517 
1518 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1519 	if (req == NULL) {
1520 		cb_fn(cb_arg, -ENOMEM);
1521 		return;
1522 	}
1523 
1524 	args = &req->args;
1525 	args->fn.file_op = cb_fn;
1526 	args->arg = cb_arg;
1527 	args->file = file;
1528 	args->op.truncate.length = length;
1529 	fs = file->fs;
1530 
1531 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1532 
1533 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1534 }
1535 
1536 static void
1537 __truncate(void *arg)
1538 {
1539 	struct spdk_fs_request *req = arg;
1540 	struct spdk_fs_cb_args *args = &req->args;
1541 
1542 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1543 				 args->fn.file_op, args);
1544 }
1545 
1546 int
1547 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1548 		   uint64_t length)
1549 {
1550 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1551 	struct spdk_fs_request *req;
1552 	struct spdk_fs_cb_args *args;
1553 	int rc;
1554 
1555 	req = alloc_fs_request(channel);
1556 	if (req == NULL) {
1557 		return -ENOMEM;
1558 	}
1559 
1560 	args = &req->args;
1561 
1562 	args->file = file;
1563 	args->op.truncate.length = length;
1564 	args->fn.file_op = __wake_caller;
1565 	args->sem = &channel->sem;
1566 
1567 	channel->send_request(__truncate, req);
1568 	sem_wait(&channel->sem);
1569 	rc = args->rc;
1570 	free_fs_request(req);
1571 
1572 	return rc;
1573 }
1574 
1575 static void
1576 __rw_done(void *ctx, int bserrno)
1577 {
1578 	struct spdk_fs_request *req = ctx;
1579 	struct spdk_fs_cb_args *args = &req->args;
1580 
1581 	spdk_dma_free(args->op.rw.pin_buf);
1582 	args->fn.file_op(args->arg, bserrno);
1583 	free_fs_request(req);
1584 }
1585 
1586 static void
1587 __read_done(void *ctx, int bserrno)
1588 {
1589 	struct spdk_fs_request *req = ctx;
1590 	struct spdk_fs_cb_args *args = &req->args;
1591 
1592 	assert(req != NULL);
1593 	if (args->op.rw.is_read) {
1594 		memcpy(args->op.rw.user_buf,
1595 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1596 		       args->op.rw.length);
1597 		__rw_done(req, 0);
1598 	} else {
1599 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1600 		       args->op.rw.user_buf,
1601 		       args->op.rw.length);
1602 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1603 				   args->op.rw.pin_buf,
1604 				   args->op.rw.start_lba, args->op.rw.num_lba,
1605 				   __rw_done, req);
1606 	}
1607 }
1608 
1609 static void
1610 __do_blob_read(void *ctx, int fserrno)
1611 {
1612 	struct spdk_fs_request *req = ctx;
1613 	struct spdk_fs_cb_args *args = &req->args;
1614 
1615 	if (fserrno) {
1616 		__rw_done(req, fserrno);
1617 		return;
1618 	}
1619 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1620 			  args->op.rw.pin_buf,
1621 			  args->op.rw.start_lba, args->op.rw.num_lba,
1622 			  __read_done, req);
1623 }
1624 
1625 static void
1626 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1627 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1628 {
1629 	uint64_t end_lba;
1630 
1631 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1632 	*start_lba = offset / *lba_size;
1633 	end_lba = (offset + length - 1) / *lba_size;
1634 	*num_lba = (end_lba - *start_lba + 1);
1635 }
1636 
1637 static void
1638 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1639 	    void *payload, uint64_t offset, uint64_t length,
1640 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1641 {
1642 	struct spdk_fs_request *req;
1643 	struct spdk_fs_cb_args *args;
1644 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1645 	uint64_t start_lba, num_lba, pin_buf_length;
1646 	uint32_t lba_size;
1647 
1648 	if (is_read && offset + length > file->length) {
1649 		cb_fn(cb_arg, -EINVAL);
1650 		return;
1651 	}
1652 
1653 	req = alloc_fs_request(channel);
1654 	if (req == NULL) {
1655 		cb_fn(cb_arg, -ENOMEM);
1656 		return;
1657 	}
1658 
1659 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1660 
1661 	args = &req->args;
1662 	args->fn.file_op = cb_fn;
1663 	args->arg = cb_arg;
1664 	args->file = file;
1665 	args->op.rw.channel = channel->bs_channel;
1666 	args->op.rw.user_buf = payload;
1667 	args->op.rw.is_read = is_read;
1668 	args->op.rw.offset = offset;
1669 	args->op.rw.length = length;
1670 	args->op.rw.blocklen = lba_size;
1671 
1672 	pin_buf_length = num_lba * lba_size;
1673 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, lba_size, NULL);
1674 	if (args->op.rw.pin_buf == NULL) {
1675 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1676 			      file->name, offset, length);
1677 		free_fs_request(req);
1678 		cb_fn(cb_arg, -ENOMEM);
1679 		return;
1680 	}
1681 
1682 	args->op.rw.start_lba = start_lba;
1683 	args->op.rw.num_lba = num_lba;
1684 
1685 	if (!is_read && file->length < offset + length) {
1686 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1687 	} else {
1688 		__do_blob_read(req, 0);
1689 	}
1690 }
1691 
1692 void
1693 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1694 		      void *payload, uint64_t offset, uint64_t length,
1695 		      spdk_file_op_complete cb_fn, void *cb_arg)
1696 {
1697 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1698 }
1699 
1700 void
1701 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1702 		     void *payload, uint64_t offset, uint64_t length,
1703 		     spdk_file_op_complete cb_fn, void *cb_arg)
1704 {
1705 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1706 		      file->name, offset, length);
1707 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1708 }
1709 
1710 struct spdk_io_channel *
1711 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1712 {
1713 	struct spdk_io_channel *io_channel;
1714 	struct spdk_fs_channel *fs_channel;
1715 
1716 	io_channel = spdk_get_io_channel(&fs->io_target);
1717 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1718 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1719 	fs_channel->send_request = __send_request_direct;
1720 
1721 	return io_channel;
1722 }
1723 
1724 struct spdk_io_channel *
1725 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1726 {
1727 	struct spdk_io_channel *io_channel;
1728 	struct spdk_fs_channel *fs_channel;
1729 
1730 	io_channel = spdk_get_io_channel(&fs->io_target);
1731 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1732 	fs_channel->send_request = fs->send_request;
1733 	fs_channel->sync = 1;
1734 	pthread_spin_init(&fs_channel->lock, 0);
1735 
1736 	return io_channel;
1737 }
1738 
1739 void
1740 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1741 {
1742 	spdk_put_io_channel(channel);
1743 }
1744 
1745 void
1746 spdk_fs_set_cache_size(uint64_t size_in_mb)
1747 {
1748 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1749 }
1750 
1751 uint64_t
1752 spdk_fs_get_cache_size(void)
1753 {
1754 	return g_fs_cache_size / (1024 * 1024);
1755 }
1756 
1757 static void __file_flush(void *_args);
1758 
1759 static void *
1760 alloc_cache_memory_buffer(struct spdk_file *context)
1761 {
1762 	struct spdk_file *file;
1763 	void *buf;
1764 
1765 	buf = spdk_mempool_get(g_cache_pool);
1766 	if (buf != NULL) {
1767 		return buf;
1768 	}
1769 
1770 	pthread_spin_lock(&g_caches_lock);
1771 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1772 		if (!file->open_for_writing &&
1773 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1774 		    file != context) {
1775 			break;
1776 		}
1777 	}
1778 	pthread_spin_unlock(&g_caches_lock);
1779 	if (file != NULL) {
1780 		cache_free_buffers(file);
1781 		buf = spdk_mempool_get(g_cache_pool);
1782 		if (buf != NULL) {
1783 			return buf;
1784 		}
1785 	}
1786 
1787 	pthread_spin_lock(&g_caches_lock);
1788 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1789 		if (!file->open_for_writing && file != context) {
1790 			break;
1791 		}
1792 	}
1793 	pthread_spin_unlock(&g_caches_lock);
1794 	if (file != NULL) {
1795 		cache_free_buffers(file);
1796 		buf = spdk_mempool_get(g_cache_pool);
1797 		if (buf != NULL) {
1798 			return buf;
1799 		}
1800 	}
1801 
1802 	pthread_spin_lock(&g_caches_lock);
1803 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1804 		if (file != context) {
1805 			break;
1806 		}
1807 	}
1808 	pthread_spin_unlock(&g_caches_lock);
1809 	if (file != NULL) {
1810 		cache_free_buffers(file);
1811 		buf = spdk_mempool_get(g_cache_pool);
1812 		if (buf != NULL) {
1813 			return buf;
1814 		}
1815 	}
1816 
1817 	return NULL;
1818 }
1819 
1820 static struct cache_buffer *
1821 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1822 {
1823 	struct cache_buffer *buf;
1824 	int count = 0;
1825 
1826 	buf = calloc(1, sizeof(*buf));
1827 	if (buf == NULL) {
1828 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1829 		return NULL;
1830 	}
1831 
1832 	buf->buf = alloc_cache_memory_buffer(file);
1833 	while (buf->buf == NULL) {
1834 		/*
1835 		 * TODO: alloc_cache_memory_buffer() should eventually free
1836 		 *  some buffers.  Need a more sophisticated check here, instead
1837 		 *  of just bailing if 100 tries does not result in getting a
1838 		 *  free buffer.  This will involve using the sync channel's
1839 		 *  semaphore to block until a buffer becomes available.
1840 		 */
1841 		if (count++ == 100) {
1842 			SPDK_ERRLOG("could not allocate cache buffer\n");
1843 			assert(false);
1844 			free(buf);
1845 			return NULL;
1846 		}
1847 		buf->buf = alloc_cache_memory_buffer(file);
1848 	}
1849 
1850 	buf->buf_size = CACHE_BUFFER_SIZE;
1851 	buf->offset = offset;
1852 
1853 	pthread_spin_lock(&g_caches_lock);
1854 	if (file->tree->present_mask == 0) {
1855 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1856 	}
1857 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1858 	pthread_spin_unlock(&g_caches_lock);
1859 
1860 	return buf;
1861 }
1862 
1863 static struct cache_buffer *
1864 cache_append_buffer(struct spdk_file *file)
1865 {
1866 	struct cache_buffer *last;
1867 
1868 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1869 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1870 
1871 	last = cache_insert_buffer(file, file->append_pos);
1872 	if (last == NULL) {
1873 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1874 		return NULL;
1875 	}
1876 
1877 	file->last = last;
1878 
1879 	return last;
1880 }
1881 
1882 static void __check_sync_reqs(struct spdk_file *file);
1883 
1884 static void
1885 __file_cache_finish_sync(void *ctx, int bserrno)
1886 {
1887 	struct spdk_file *file = ctx;
1888 	struct spdk_fs_request *sync_req;
1889 	struct spdk_fs_cb_args *sync_args;
1890 
1891 	pthread_spin_lock(&file->lock);
1892 	sync_req = TAILQ_FIRST(&file->sync_requests);
1893 	sync_args = &sync_req->args;
1894 	assert(sync_args->op.sync.offset <= file->length_flushed);
1895 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1896 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1897 	pthread_spin_unlock(&file->lock);
1898 
1899 	sync_args->fn.file_op(sync_args->arg, bserrno);
1900 	__check_sync_reqs(file);
1901 
1902 	pthread_spin_lock(&file->lock);
1903 	free_fs_request(sync_req);
1904 	pthread_spin_unlock(&file->lock);
1905 }
1906 
1907 static void
1908 __free_args(struct spdk_fs_cb_args *args)
1909 {
1910 	struct spdk_fs_request *req;
1911 
1912 	if (!args->from_request) {
1913 		free(args);
1914 	} else {
1915 		/* Depends on args being at the start of the spdk_fs_request structure. */
1916 		req = (struct spdk_fs_request *)args;
1917 		free_fs_request(req);
1918 	}
1919 }
1920 
1921 static void
1922 __check_sync_reqs(struct spdk_file *file)
1923 {
1924 	struct spdk_fs_request *sync_req;
1925 
1926 	pthread_spin_lock(&file->lock);
1927 
1928 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1929 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1930 			break;
1931 		}
1932 	}
1933 
1934 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1935 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1936 		sync_req->args.op.sync.xattr_in_progress = true;
1937 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1938 				    sizeof(file->length_flushed));
1939 
1940 		pthread_spin_unlock(&file->lock);
1941 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1942 	} else {
1943 		pthread_spin_unlock(&file->lock);
1944 	}
1945 }
1946 
1947 static void
1948 __file_flush_done(void *arg, int bserrno)
1949 {
1950 	struct spdk_fs_cb_args *args = arg;
1951 	struct spdk_file *file = args->file;
1952 	struct cache_buffer *next = args->op.flush.cache_buffer;
1953 
1954 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1955 
1956 	pthread_spin_lock(&file->lock);
1957 	next->in_progress = false;
1958 	next->bytes_flushed += args->op.flush.length;
1959 	file->length_flushed += args->op.flush.length;
1960 	if (file->length_flushed > file->length) {
1961 		file->length = file->length_flushed;
1962 	}
1963 	if (next->bytes_flushed == next->buf_size) {
1964 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1965 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1966 	}
1967 
1968 	/*
1969 	 * Assert that there is no cached data that extends past the end of the underlying
1970 	 *  blob.
1971 	 */
1972 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1973 	       next->bytes_filled == 0);
1974 
1975 	pthread_spin_unlock(&file->lock);
1976 
1977 	__check_sync_reqs(file);
1978 
1979 	__file_flush(args);
1980 }
1981 
1982 static void
1983 __file_flush(void *_args)
1984 {
1985 	struct spdk_fs_cb_args *args = _args;
1986 	struct spdk_file *file = args->file;
1987 	struct cache_buffer *next;
1988 	uint64_t offset, length, start_lba, num_lba;
1989 	uint32_t lba_size;
1990 
1991 	pthread_spin_lock(&file->lock);
1992 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1993 	if (next == NULL || next->in_progress) {
1994 		/*
1995 		 * There is either no data to flush, or a flush I/O is already in
1996 		 *  progress.  So return immediately - if a flush I/O is in
1997 		 *  progress we will flush more data after that is completed.
1998 		 */
1999 		__free_args(args);
2000 		if (next == NULL) {
2001 			/*
2002 			 * For cases where a file's cache was evicted, and then the
2003 			 *  file was later appended, we will write the data directly
2004 			 *  to disk and bypass cache.  So just update length_flushed
2005 			 *  here to reflect that all data was already written to disk.
2006 			 */
2007 			file->length_flushed = file->append_pos;
2008 		}
2009 		pthread_spin_unlock(&file->lock);
2010 		if (next == NULL) {
2011 			/*
2012 			 * There is no data to flush, but we still need to check for any
2013 			 *  outstanding sync requests to make sure metadata gets updated.
2014 			 */
2015 			__check_sync_reqs(file);
2016 		}
2017 		return;
2018 	}
2019 
2020 	offset = next->offset + next->bytes_flushed;
2021 	length = next->bytes_filled - next->bytes_flushed;
2022 	if (length == 0) {
2023 		__free_args(args);
2024 		pthread_spin_unlock(&file->lock);
2025 		return;
2026 	}
2027 	args->op.flush.length = length;
2028 	args->op.flush.cache_buffer = next;
2029 
2030 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2031 
2032 	next->in_progress = true;
2033 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2034 		     offset, length, start_lba, num_lba);
2035 	pthread_spin_unlock(&file->lock);
2036 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2037 			   next->buf + (start_lba * lba_size) - next->offset,
2038 			   start_lba, num_lba, __file_flush_done, args);
2039 }
2040 
2041 static void
2042 __file_extend_done(void *arg, int bserrno)
2043 {
2044 	struct spdk_fs_cb_args *args = arg;
2045 
2046 	__wake_caller(args, bserrno);
2047 }
2048 
2049 static void
2050 __file_extend_resize_cb(void *_args, int bserrno)
2051 {
2052 	struct spdk_fs_cb_args *args = _args;
2053 	struct spdk_file *file = args->file;
2054 
2055 	if (bserrno) {
2056 		__wake_caller(args, bserrno);
2057 		return;
2058 	}
2059 
2060 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2061 }
2062 
2063 static void
2064 __file_extend_blob(void *_args)
2065 {
2066 	struct spdk_fs_cb_args *args = _args;
2067 	struct spdk_file *file = args->file;
2068 
2069 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2070 }
2071 
2072 static void
2073 __rw_from_file_done(void *arg, int bserrno)
2074 {
2075 	struct spdk_fs_cb_args *args = arg;
2076 
2077 	__wake_caller(args, bserrno);
2078 	__free_args(args);
2079 }
2080 
2081 static void
2082 __rw_from_file(void *_args)
2083 {
2084 	struct spdk_fs_cb_args *args = _args;
2085 	struct spdk_file *file = args->file;
2086 
2087 	if (args->op.rw.is_read) {
2088 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2089 				     args->op.rw.offset, args->op.rw.length,
2090 				     __rw_from_file_done, args);
2091 	} else {
2092 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2093 				      args->op.rw.offset, args->op.rw.length,
2094 				      __rw_from_file_done, args);
2095 	}
2096 }
2097 
2098 static int
2099 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
2100 		    uint64_t offset, uint64_t length, bool is_read)
2101 {
2102 	struct spdk_fs_cb_args *args;
2103 
2104 	args = calloc(1, sizeof(*args));
2105 	if (args == NULL) {
2106 		sem_post(sem);
2107 		return -ENOMEM;
2108 	}
2109 
2110 	args->file = file;
2111 	args->sem = sem;
2112 	args->op.rw.user_buf = payload;
2113 	args->op.rw.offset = offset;
2114 	args->op.rw.length = length;
2115 	args->op.rw.is_read = is_read;
2116 	file->fs->send_request(__rw_from_file, args);
2117 	return 0;
2118 }
2119 
2120 int
2121 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
2122 		void *payload, uint64_t offset, uint64_t length)
2123 {
2124 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2125 	struct spdk_fs_cb_args *args;
2126 	uint64_t rem_length, copy, blob_size, cluster_sz;
2127 	uint32_t cache_buffers_filled = 0;
2128 	uint8_t *cur_payload;
2129 	struct cache_buffer *last;
2130 
2131 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2132 
2133 	if (length == 0) {
2134 		return 0;
2135 	}
2136 
2137 	if (offset != file->append_pos) {
2138 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2139 		return -EINVAL;
2140 	}
2141 
2142 	pthread_spin_lock(&file->lock);
2143 	file->open_for_writing = true;
2144 
2145 	if (file->last == NULL) {
2146 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
2147 			cache_append_buffer(file);
2148 		} else {
2149 			int rc;
2150 
2151 			file->append_pos += length;
2152 			pthread_spin_unlock(&file->lock);
2153 			rc = __send_rw_from_file(file, &channel->sem, payload,
2154 						 offset, length, false);
2155 			sem_wait(&channel->sem);
2156 			return rc;
2157 		}
2158 	}
2159 
2160 	blob_size = __file_get_blob_size(file);
2161 
2162 	if ((offset + length) > blob_size) {
2163 		struct spdk_fs_cb_args extend_args = {};
2164 
2165 		cluster_sz = file->fs->bs_opts.cluster_sz;
2166 		extend_args.sem = &channel->sem;
2167 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2168 		extend_args.file = file;
2169 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2170 		pthread_spin_unlock(&file->lock);
2171 		file->fs->send_request(__file_extend_blob, &extend_args);
2172 		sem_wait(&channel->sem);
2173 		if (extend_args.rc) {
2174 			return extend_args.rc;
2175 		}
2176 	}
2177 
2178 	last = file->last;
2179 	rem_length = length;
2180 	cur_payload = payload;
2181 	while (rem_length > 0) {
2182 		copy = last->buf_size - last->bytes_filled;
2183 		if (copy > rem_length) {
2184 			copy = rem_length;
2185 		}
2186 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2187 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2188 		file->append_pos += copy;
2189 		if (file->length < file->append_pos) {
2190 			file->length = file->append_pos;
2191 		}
2192 		cur_payload += copy;
2193 		last->bytes_filled += copy;
2194 		rem_length -= copy;
2195 		if (last->bytes_filled == last->buf_size) {
2196 			cache_buffers_filled++;
2197 			last = cache_append_buffer(file);
2198 			if (last == NULL) {
2199 				BLOBFS_TRACE(file, "nomem\n");
2200 				pthread_spin_unlock(&file->lock);
2201 				return -ENOMEM;
2202 			}
2203 		}
2204 	}
2205 
2206 	pthread_spin_unlock(&file->lock);
2207 
2208 	if (cache_buffers_filled == 0) {
2209 		return 0;
2210 	}
2211 
2212 	args = calloc(1, sizeof(*args));
2213 	if (args == NULL) {
2214 		return -ENOMEM;
2215 	}
2216 
2217 	args->file = file;
2218 	file->fs->send_request(__file_flush, args);
2219 	return 0;
2220 }
2221 
2222 static void
2223 __readahead_done(void *arg, int bserrno)
2224 {
2225 	struct spdk_fs_cb_args *args = arg;
2226 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2227 	struct spdk_file *file = args->file;
2228 
2229 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2230 
2231 	pthread_spin_lock(&file->lock);
2232 	cache_buffer->bytes_filled = args->op.readahead.length;
2233 	cache_buffer->bytes_flushed = args->op.readahead.length;
2234 	cache_buffer->in_progress = false;
2235 	pthread_spin_unlock(&file->lock);
2236 
2237 	__free_args(args);
2238 }
2239 
2240 static void
2241 __readahead(void *_args)
2242 {
2243 	struct spdk_fs_cb_args *args = _args;
2244 	struct spdk_file *file = args->file;
2245 	uint64_t offset, length, start_lba, num_lba;
2246 	uint32_t lba_size;
2247 
2248 	offset = args->op.readahead.offset;
2249 	length = args->op.readahead.length;
2250 	assert(length > 0);
2251 
2252 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2253 
2254 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2255 		     offset, length, start_lba, num_lba);
2256 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2257 			  args->op.readahead.cache_buffer->buf,
2258 			  start_lba, num_lba, __readahead_done, args);
2259 }
2260 
2261 static uint64_t
2262 __next_cache_buffer_offset(uint64_t offset)
2263 {
2264 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2265 }
2266 
2267 static void
2268 check_readahead(struct spdk_file *file, uint64_t offset)
2269 {
2270 	struct spdk_fs_cb_args *args;
2271 
2272 	offset = __next_cache_buffer_offset(offset);
2273 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2274 		return;
2275 	}
2276 
2277 	args = calloc(1, sizeof(*args));
2278 	if (args == NULL) {
2279 		return;
2280 	}
2281 
2282 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2283 
2284 	args->file = file;
2285 	args->op.readahead.offset = offset;
2286 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2287 	if (!args->op.readahead.cache_buffer) {
2288 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2289 		free(args);
2290 		return;
2291 	}
2292 
2293 	args->op.readahead.cache_buffer->in_progress = true;
2294 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2295 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2296 	} else {
2297 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2298 	}
2299 	file->fs->send_request(__readahead, args);
2300 }
2301 
2302 static int
2303 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2304 {
2305 	struct cache_buffer *buf;
2306 	int rc;
2307 
2308 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2309 	if (buf == NULL) {
2310 		pthread_spin_unlock(&file->lock);
2311 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2312 		pthread_spin_lock(&file->lock);
2313 		return rc;
2314 	}
2315 
2316 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2317 		length = buf->offset + buf->bytes_filled - offset;
2318 	}
2319 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2320 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2321 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2322 		pthread_spin_lock(&g_caches_lock);
2323 		spdk_tree_remove_buffer(file->tree, buf);
2324 		if (file->tree->present_mask == 0) {
2325 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2326 		}
2327 		pthread_spin_unlock(&g_caches_lock);
2328 	}
2329 
2330 	sem_post(sem);
2331 	return 0;
2332 }
2333 
2334 int64_t
2335 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2336 	       void *payload, uint64_t offset, uint64_t length)
2337 {
2338 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2339 	uint64_t final_offset, final_length;
2340 	uint32_t sub_reads = 0;
2341 	int rc = 0;
2342 
2343 	pthread_spin_lock(&file->lock);
2344 
2345 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2346 
2347 	file->open_for_writing = false;
2348 
2349 	if (length == 0 || offset >= file->append_pos) {
2350 		pthread_spin_unlock(&file->lock);
2351 		return 0;
2352 	}
2353 
2354 	if (offset + length > file->append_pos) {
2355 		length = file->append_pos - offset;
2356 	}
2357 
2358 	if (offset != file->next_seq_offset) {
2359 		file->seq_byte_count = 0;
2360 	}
2361 	file->seq_byte_count += length;
2362 	file->next_seq_offset = offset + length;
2363 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2364 		check_readahead(file, offset);
2365 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2366 	}
2367 
2368 	final_length = 0;
2369 	final_offset = offset + length;
2370 	while (offset < final_offset) {
2371 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2372 		if (length > (final_offset - offset)) {
2373 			length = final_offset - offset;
2374 		}
2375 		rc = __file_read(file, payload, offset, length, &channel->sem);
2376 		if (rc == 0) {
2377 			final_length += length;
2378 		} else {
2379 			break;
2380 		}
2381 		payload += length;
2382 		offset += length;
2383 		sub_reads++;
2384 	}
2385 	pthread_spin_unlock(&file->lock);
2386 	while (sub_reads-- > 0) {
2387 		sem_wait(&channel->sem);
2388 	}
2389 	if (rc == 0) {
2390 		return final_length;
2391 	} else {
2392 		return rc;
2393 	}
2394 }
2395 
2396 static void
2397 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2398 	   spdk_file_op_complete cb_fn, void *cb_arg)
2399 {
2400 	struct spdk_fs_request *sync_req;
2401 	struct spdk_fs_request *flush_req;
2402 	struct spdk_fs_cb_args *sync_args;
2403 	struct spdk_fs_cb_args *flush_args;
2404 
2405 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2406 
2407 	pthread_spin_lock(&file->lock);
2408 	if (file->append_pos <= file->length_flushed) {
2409 		BLOBFS_TRACE(file, "done - no data to flush\n");
2410 		pthread_spin_unlock(&file->lock);
2411 		cb_fn(cb_arg, 0);
2412 		return;
2413 	}
2414 
2415 	sync_req = alloc_fs_request(channel);
2416 	if (!sync_req) {
2417 		pthread_spin_unlock(&file->lock);
2418 		cb_fn(cb_arg, -ENOMEM);
2419 		return;
2420 	}
2421 	sync_args = &sync_req->args;
2422 
2423 	flush_req = alloc_fs_request(channel);
2424 	if (!flush_req) {
2425 		pthread_spin_unlock(&file->lock);
2426 		cb_fn(cb_arg, -ENOMEM);
2427 		return;
2428 	}
2429 	flush_args = &flush_req->args;
2430 
2431 	sync_args->file = file;
2432 	sync_args->fn.file_op = cb_fn;
2433 	sync_args->arg = cb_arg;
2434 	sync_args->op.sync.offset = file->append_pos;
2435 	sync_args->op.sync.xattr_in_progress = false;
2436 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2437 	pthread_spin_unlock(&file->lock);
2438 
2439 	flush_args->file = file;
2440 	channel->send_request(__file_flush, flush_args);
2441 }
2442 
2443 int
2444 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2445 {
2446 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2447 	struct spdk_fs_cb_args args = {};
2448 
2449 	args.sem = &channel->sem;
2450 	_file_sync(file, channel, __wake_caller, &args);
2451 	sem_wait(&channel->sem);
2452 
2453 	return args.rc;
2454 }
2455 
2456 void
2457 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2458 		     spdk_file_op_complete cb_fn, void *cb_arg)
2459 {
2460 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2461 
2462 	_file_sync(file, channel, cb_fn, cb_arg);
2463 }
2464 
2465 void
2466 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2467 {
2468 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2469 	file->priority = priority;
2470 
2471 }
2472 
2473 /*
2474  * Close routines
2475  */
2476 
2477 static void
2478 __file_close_async_done(void *ctx, int bserrno)
2479 {
2480 	struct spdk_fs_request *req = ctx;
2481 	struct spdk_fs_cb_args *args = &req->args;
2482 	struct spdk_file *file = args->file;
2483 
2484 	if (file->is_deleted) {
2485 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2486 		return;
2487 	}
2488 
2489 	args->fn.file_op(args->arg, bserrno);
2490 	free_fs_request(req);
2491 }
2492 
2493 static void
2494 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2495 {
2496 	struct spdk_blob *blob;
2497 
2498 	pthread_spin_lock(&file->lock);
2499 	if (file->ref_count == 0) {
2500 		pthread_spin_unlock(&file->lock);
2501 		__file_close_async_done(req, -EBADF);
2502 		return;
2503 	}
2504 
2505 	file->ref_count--;
2506 	if (file->ref_count > 0) {
2507 		pthread_spin_unlock(&file->lock);
2508 		req->args.fn.file_op(req->args.arg, 0);
2509 		free_fs_request(req);
2510 		return;
2511 	}
2512 
2513 	pthread_spin_unlock(&file->lock);
2514 
2515 	blob = file->blob;
2516 	file->blob = NULL;
2517 	spdk_blob_close(blob, __file_close_async_done, req);
2518 }
2519 
2520 static void
2521 __file_close_async__sync_done(void *arg, int fserrno)
2522 {
2523 	struct spdk_fs_request *req = arg;
2524 	struct spdk_fs_cb_args *args = &req->args;
2525 
2526 	__file_close_async(args->file, req);
2527 }
2528 
2529 void
2530 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2531 {
2532 	struct spdk_fs_request *req;
2533 	struct spdk_fs_cb_args *args;
2534 
2535 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2536 	if (req == NULL) {
2537 		cb_fn(cb_arg, -ENOMEM);
2538 		return;
2539 	}
2540 
2541 	args = &req->args;
2542 	args->file = file;
2543 	args->fn.file_op = cb_fn;
2544 	args->arg = cb_arg;
2545 
2546 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2547 }
2548 
2549 static void
2550 __file_close(void *arg)
2551 {
2552 	struct spdk_fs_request *req = arg;
2553 	struct spdk_fs_cb_args *args = &req->args;
2554 	struct spdk_file *file = args->file;
2555 
2556 	__file_close_async(file, req);
2557 }
2558 
2559 int
2560 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2561 {
2562 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2563 	struct spdk_fs_request *req;
2564 	struct spdk_fs_cb_args *args;
2565 
2566 	req = alloc_fs_request(channel);
2567 	if (req == NULL) {
2568 		return -ENOMEM;
2569 	}
2570 
2571 	args = &req->args;
2572 
2573 	spdk_file_sync(file, _channel);
2574 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2575 	args->file = file;
2576 	args->sem = &channel->sem;
2577 	args->fn.file_op = __wake_caller;
2578 	args->arg = req;
2579 	channel->send_request(__file_close, req);
2580 	sem_wait(&channel->sem);
2581 
2582 	return args->rc;
2583 }
2584 
2585 int
2586 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2587 {
2588 	if (size < sizeof(spdk_blob_id)) {
2589 		return -EINVAL;
2590 	}
2591 
2592 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2593 
2594 	return sizeof(spdk_blob_id);
2595 }
2596 
2597 static void
2598 cache_free_buffers(struct spdk_file *file)
2599 {
2600 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2601 	pthread_spin_lock(&file->lock);
2602 	pthread_spin_lock(&g_caches_lock);
2603 	if (file->tree->present_mask == 0) {
2604 		pthread_spin_unlock(&g_caches_lock);
2605 		pthread_spin_unlock(&file->lock);
2606 		return;
2607 	}
2608 	spdk_tree_free_buffers(file->tree);
2609 
2610 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2611 	/* If not freed, put it in the end of the queue */
2612 	if (file->tree->present_mask != 0) {
2613 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2614 	}
2615 	file->last = NULL;
2616 	pthread_spin_unlock(&g_caches_lock);
2617 	pthread_spin_unlock(&file->lock);
2618 }
2619 
2620 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2621 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2622