xref: /spdk/lib/blobfs/blobfs.c (revision 27a23a33f93c98f134efcf782e25bb754a8e71a5)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	bool from_request;
138 	union {
139 		struct {
140 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
141 		} fs_load;
142 		struct {
143 			uint64_t	length;
144 		} truncate;
145 		struct {
146 			struct spdk_io_channel	*channel;
147 			void		*user_buf;
148 			void		*pin_buf;
149 			int		is_read;
150 			off_t		offset;
151 			size_t		length;
152 			uint64_t	start_lba;
153 			uint64_t	num_lba;
154 			uint32_t	blocklen;
155 		} rw;
156 		struct {
157 			const char	*old_name;
158 			const char	*new_name;
159 		} rename;
160 		struct {
161 			struct cache_buffer	*cache_buffer;
162 			uint64_t		length;
163 		} flush;
164 		struct {
165 			struct cache_buffer	*cache_buffer;
166 			uint64_t		length;
167 			uint64_t		offset;
168 		} readahead;
169 		struct {
170 			uint64_t			offset;
171 			TAILQ_ENTRY(spdk_fs_request)	tailq;
172 			bool				xattr_in_progress;
173 		} sync;
174 		struct {
175 			uint32_t			num_clusters;
176 		} resize;
177 		struct {
178 			const char	*name;
179 			uint32_t	flags;
180 			TAILQ_ENTRY(spdk_fs_request)	tailq;
181 		} open;
182 		struct {
183 			const char		*name;
184 			struct spdk_blob	*blob;
185 		} create;
186 		struct {
187 			const char	*name;
188 		} delete;
189 		struct {
190 			const char	*name;
191 		} stat;
192 	} op;
193 };
194 
195 static void cache_free_buffers(struct spdk_file *file);
196 
197 void
198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
199 {
200 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
201 }
202 
203 static void
204 __initialize_cache(void)
205 {
206 	assert(g_cache_pool == NULL);
207 
208 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
209 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
210 					   CACHE_BUFFER_SIZE,
211 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 					   SPDK_ENV_SOCKET_ID_ANY);
213 	if (!g_cache_pool) {
214 		SPDK_ERRLOG("Create mempool failed, you may "
215 			    "increase the memory and try again\n");
216 		assert(false);
217 	}
218 	TAILQ_INIT(&g_caches);
219 	pthread_spin_init(&g_caches_lock, 0);
220 }
221 
222 static void
223 __free_cache(void)
224 {
225 	assert(g_cache_pool != NULL);
226 
227 	spdk_mempool_free(g_cache_pool);
228 	g_cache_pool = NULL;
229 }
230 
231 static uint64_t
232 __file_get_blob_size(struct spdk_file *file)
233 {
234 	uint64_t cluster_sz;
235 
236 	cluster_sz = file->fs->bs_opts.cluster_sz;
237 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
238 }
239 
240 struct spdk_fs_request {
241 	struct spdk_fs_cb_args		args;
242 	TAILQ_ENTRY(spdk_fs_request)	link;
243 	struct spdk_fs_channel		*channel;
244 };
245 
246 struct spdk_fs_channel {
247 	struct spdk_fs_request		*req_mem;
248 	TAILQ_HEAD(, spdk_fs_request)	reqs;
249 	sem_t				sem;
250 	struct spdk_filesystem		*fs;
251 	struct spdk_io_channel		*bs_channel;
252 	fs_send_request_fn		send_request;
253 	bool				sync;
254 	pthread_spinlock_t		lock;
255 };
256 
257 static struct spdk_fs_request *
258 alloc_fs_request(struct spdk_fs_channel *channel)
259 {
260 	struct spdk_fs_request *req;
261 
262 	if (channel->sync) {
263 		pthread_spin_lock(&channel->lock);
264 	}
265 
266 	req = TAILQ_FIRST(&channel->reqs);
267 	if (req) {
268 		TAILQ_REMOVE(&channel->reqs, req, link);
269 	}
270 
271 	if (channel->sync) {
272 		pthread_spin_unlock(&channel->lock);
273 	}
274 
275 	if (req == NULL) {
276 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
277 		return NULL;
278 	}
279 	memset(req, 0, sizeof(*req));
280 	req->channel = channel;
281 	req->args.from_request = true;
282 
283 	return req;
284 }
285 
286 static void
287 free_fs_request(struct spdk_fs_request *req)
288 {
289 	struct spdk_fs_channel *channel = req->channel;
290 
291 	if (channel->sync) {
292 		pthread_spin_lock(&channel->lock);
293 	}
294 
295 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
296 
297 	if (channel->sync) {
298 		pthread_spin_unlock(&channel->lock);
299 	}
300 }
301 
302 static int
303 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
304 			uint32_t max_ops)
305 {
306 	uint32_t i;
307 
308 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
309 	if (!channel->req_mem) {
310 		return -1;
311 	}
312 
313 	TAILQ_INIT(&channel->reqs);
314 	sem_init(&channel->sem, 0, 0);
315 
316 	for (i = 0; i < max_ops; i++) {
317 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
318 	}
319 
320 	channel->fs = fs;
321 
322 	return 0;
323 }
324 
325 static int
326 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
327 {
328 	struct spdk_filesystem		*fs;
329 	struct spdk_fs_channel		*channel = ctx_buf;
330 
331 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
332 
333 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
334 }
335 
336 static int
337 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
338 {
339 	struct spdk_filesystem		*fs;
340 	struct spdk_fs_channel		*channel = ctx_buf;
341 
342 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
343 
344 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
345 }
346 
347 static int
348 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
349 {
350 	struct spdk_filesystem		*fs;
351 	struct spdk_fs_channel		*channel = ctx_buf;
352 
353 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
354 
355 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
356 }
357 
358 static void
359 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
360 {
361 	struct spdk_fs_channel *channel = ctx_buf;
362 
363 	free(channel->req_mem);
364 	if (channel->bs_channel != NULL) {
365 		spdk_bs_free_io_channel(channel->bs_channel);
366 	}
367 }
368 
369 static void
370 __send_request_direct(fs_request_fn fn, void *arg)
371 {
372 	fn(arg);
373 }
374 
375 static void
376 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
377 {
378 	fs->bs = bs;
379 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
380 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
381 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
382 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
383 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
384 
385 	pthread_mutex_lock(&g_cache_init_lock);
386 	if (g_fs_count == 0) {
387 		__initialize_cache();
388 	}
389 	g_fs_count++;
390 	pthread_mutex_unlock(&g_cache_init_lock);
391 }
392 
393 static void
394 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
395 {
396 	struct spdk_fs_request *req = ctx;
397 	struct spdk_fs_cb_args *args = &req->args;
398 	struct spdk_filesystem *fs = args->fs;
399 
400 	if (bserrno == 0) {
401 		common_fs_bs_init(fs, bs);
402 	} else {
403 		free(fs);
404 		fs = NULL;
405 	}
406 
407 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
408 	free_fs_request(req);
409 }
410 
411 static void
412 fs_conf_parse(void)
413 {
414 	struct spdk_conf_section *sp;
415 
416 	sp = spdk_conf_find_section(NULL, "Blobfs");
417 	if (sp == NULL) {
418 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
419 		return;
420 	}
421 
422 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
423 	if (g_fs_cache_buffer_shift <= 0) {
424 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
425 	}
426 }
427 
428 static struct spdk_filesystem *
429 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
430 {
431 	struct spdk_filesystem *fs;
432 
433 	fs = calloc(1, sizeof(*fs));
434 	if (fs == NULL) {
435 		return NULL;
436 	}
437 
438 	fs->bdev = dev;
439 	fs->send_request = send_request_fn;
440 	TAILQ_INIT(&fs->files);
441 
442 	fs->md_target.max_ops = 512;
443 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
444 				sizeof(struct spdk_fs_channel), "blobfs_md");
445 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
446 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
447 
448 	fs->sync_target.max_ops = 512;
449 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
450 				sizeof(struct spdk_fs_channel), "blobfs_sync");
451 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
452 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
453 
454 	fs->io_target.max_ops = 512;
455 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
456 				sizeof(struct spdk_fs_channel), "blobfs_io");
457 
458 	return fs;
459 }
460 
461 static void
462 __wake_caller(void *arg, int fserrno)
463 {
464 	struct spdk_fs_cb_args *args = arg;
465 
466 	args->rc = fserrno;
467 	sem_post(args->sem);
468 }
469 
470 void
471 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
472 	     fs_send_request_fn send_request_fn,
473 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
474 {
475 	struct spdk_filesystem *fs;
476 	struct spdk_fs_request *req;
477 	struct spdk_fs_cb_args *args;
478 	struct spdk_bs_opts opts = {};
479 
480 	fs = fs_alloc(dev, send_request_fn);
481 	if (fs == NULL) {
482 		cb_fn(cb_arg, NULL, -ENOMEM);
483 		return;
484 	}
485 
486 	fs_conf_parse();
487 
488 	req = alloc_fs_request(fs->md_target.md_fs_channel);
489 	if (req == NULL) {
490 		spdk_put_io_channel(fs->md_target.md_io_channel);
491 		spdk_io_device_unregister(&fs->md_target, NULL);
492 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
493 		spdk_io_device_unregister(&fs->sync_target, NULL);
494 		spdk_io_device_unregister(&fs->io_target, NULL);
495 		free(fs);
496 		cb_fn(cb_arg, NULL, -ENOMEM);
497 		return;
498 	}
499 
500 	args = &req->args;
501 	args->fn.fs_op_with_handle = cb_fn;
502 	args->arg = cb_arg;
503 	args->fs = fs;
504 
505 	spdk_bs_opts_init(&opts);
506 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
507 	if (opt) {
508 		opts.cluster_sz = opt->cluster_sz;
509 	}
510 	spdk_bs_init(dev, &opts, init_cb, req);
511 }
512 
513 static struct spdk_file *
514 file_alloc(struct spdk_filesystem *fs)
515 {
516 	struct spdk_file *file;
517 
518 	file = calloc(1, sizeof(*file));
519 	if (file == NULL) {
520 		return NULL;
521 	}
522 
523 	file->tree = calloc(1, sizeof(*file->tree));
524 	if (file->tree == NULL) {
525 		free(file);
526 		return NULL;
527 	}
528 
529 	file->fs = fs;
530 	TAILQ_INIT(&file->open_requests);
531 	TAILQ_INIT(&file->sync_requests);
532 	pthread_spin_init(&file->lock, 0);
533 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
534 	file->priority = SPDK_FILE_PRIORITY_LOW;
535 	return file;
536 }
537 
538 static void fs_load_done(void *ctx, int bserrno);
539 
540 static int
541 _handle_deleted_files(struct spdk_fs_request *req)
542 {
543 	struct spdk_fs_cb_args *args = &req->args;
544 	struct spdk_filesystem *fs = args->fs;
545 
546 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
547 		struct spdk_deleted_file *deleted_file;
548 
549 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
550 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
551 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
552 		free(deleted_file);
553 		return 0;
554 	}
555 
556 	return 1;
557 }
558 
559 static void
560 fs_load_done(void *ctx, int bserrno)
561 {
562 	struct spdk_fs_request *req = ctx;
563 	struct spdk_fs_cb_args *args = &req->args;
564 	struct spdk_filesystem *fs = args->fs;
565 
566 	/* The filesystem has been loaded.  Now check if there are any files that
567 	 *  were marked for deletion before last unload.  Do not complete the
568 	 *  fs_load callback until all of them have been deleted on disk.
569 	 */
570 	if (_handle_deleted_files(req) == 0) {
571 		/* We found a file that's been marked for deleting but not actually
572 		 *  deleted yet.  This function will get called again once the delete
573 		 *  operation is completed.
574 		 */
575 		return;
576 	}
577 
578 	args->fn.fs_op_with_handle(args->arg, fs, 0);
579 	free_fs_request(req);
580 
581 }
582 
583 static void
584 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
585 {
586 	struct spdk_fs_request *req = ctx;
587 	struct spdk_fs_cb_args *args = &req->args;
588 	struct spdk_filesystem *fs = args->fs;
589 	uint64_t *length;
590 	const char *name;
591 	uint32_t *is_deleted;
592 	size_t value_len;
593 
594 	if (rc < 0) {
595 		args->fn.fs_op_with_handle(args->arg, fs, rc);
596 		free_fs_request(req);
597 		return;
598 	}
599 
600 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
601 	if (rc < 0) {
602 		args->fn.fs_op_with_handle(args->arg, fs, rc);
603 		free_fs_request(req);
604 		return;
605 	}
606 
607 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
608 	if (rc < 0) {
609 		args->fn.fs_op_with_handle(args->arg, fs, rc);
610 		free_fs_request(req);
611 		return;
612 	}
613 
614 	assert(value_len == 8);
615 
616 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
617 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
618 	if (rc < 0) {
619 		struct spdk_file *f;
620 
621 		f = file_alloc(fs);
622 		if (f == NULL) {
623 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
624 			free_fs_request(req);
625 			return;
626 		}
627 
628 		f->name = strdup(name);
629 		f->blobid = spdk_blob_get_id(blob);
630 		f->length = *length;
631 		f->length_flushed = *length;
632 		f->append_pos = *length;
633 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
634 	} else {
635 		struct spdk_deleted_file *deleted_file;
636 
637 		deleted_file = calloc(1, sizeof(*deleted_file));
638 		if (deleted_file == NULL) {
639 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
640 			free_fs_request(req);
641 			return;
642 		}
643 		deleted_file->id = spdk_blob_get_id(blob);
644 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
645 	}
646 }
647 
648 static void
649 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
650 {
651 	struct spdk_fs_request *req = ctx;
652 	struct spdk_fs_cb_args *args = &req->args;
653 	struct spdk_filesystem *fs = args->fs;
654 	struct spdk_bs_type bstype;
655 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
656 	static const struct spdk_bs_type zeros;
657 
658 	if (bserrno != 0) {
659 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
660 		free_fs_request(req);
661 		free(fs);
662 		return;
663 	}
664 
665 	bstype = spdk_bs_get_bstype(bs);
666 
667 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
668 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
669 		spdk_bs_set_bstype(bs, blobfs_type);
670 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
671 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
672 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
673 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
674 		free_fs_request(req);
675 		free(fs);
676 		return;
677 	}
678 
679 	common_fs_bs_init(fs, bs);
680 	fs_load_done(req, 0);
681 }
682 
683 static void
684 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
685 {
686 	assert(fs != NULL);
687 	spdk_io_device_unregister(&fs->md_target, NULL);
688 	spdk_io_device_unregister(&fs->sync_target, NULL);
689 	spdk_io_device_unregister(&fs->io_target, NULL);
690 	free(fs);
691 }
692 
693 static void
694 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
695 {
696 	assert(fs != NULL);
697 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
698 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
699 }
700 
701 void
702 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
703 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
704 {
705 	struct spdk_filesystem *fs;
706 	struct spdk_fs_cb_args *args;
707 	struct spdk_fs_request *req;
708 	struct spdk_bs_opts	bs_opts;
709 
710 	fs = fs_alloc(dev, send_request_fn);
711 	if (fs == NULL) {
712 		cb_fn(cb_arg, NULL, -ENOMEM);
713 		return;
714 	}
715 
716 	fs_conf_parse();
717 
718 	req = alloc_fs_request(fs->md_target.md_fs_channel);
719 	if (req == NULL) {
720 		spdk_fs_free_io_channels(fs);
721 		spdk_fs_io_device_unregister(fs);
722 		cb_fn(cb_arg, NULL, -ENOMEM);
723 		return;
724 	}
725 
726 	args = &req->args;
727 	args->fn.fs_op_with_handle = cb_fn;
728 	args->arg = cb_arg;
729 	args->fs = fs;
730 	TAILQ_INIT(&args->op.fs_load.deleted_files);
731 	spdk_bs_opts_init(&bs_opts);
732 	bs_opts.iter_cb_fn = iter_cb;
733 	bs_opts.iter_cb_arg = req;
734 	spdk_bs_load(dev, &bs_opts, load_cb, req);
735 }
736 
737 static void
738 unload_cb(void *ctx, int bserrno)
739 {
740 	struct spdk_fs_request *req = ctx;
741 	struct spdk_fs_cb_args *args = &req->args;
742 	struct spdk_filesystem *fs = args->fs;
743 	struct spdk_file *file, *tmp;
744 
745 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
746 		TAILQ_REMOVE(&fs->files, file, tailq);
747 		cache_free_buffers(file);
748 		free(file->name);
749 		free(file->tree);
750 		free(file);
751 	}
752 
753 	pthread_mutex_lock(&g_cache_init_lock);
754 	g_fs_count--;
755 	if (g_fs_count == 0) {
756 		__free_cache();
757 	}
758 	pthread_mutex_unlock(&g_cache_init_lock);
759 
760 	args->fn.fs_op(args->arg, bserrno);
761 	free(req);
762 
763 	spdk_fs_io_device_unregister(fs);
764 }
765 
766 void
767 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
768 {
769 	struct spdk_fs_request *req;
770 	struct spdk_fs_cb_args *args;
771 
772 	/*
773 	 * We must free the md_channel before unloading the blobstore, so just
774 	 *  allocate this request from the general heap.
775 	 */
776 	req = calloc(1, sizeof(*req));
777 	if (req == NULL) {
778 		cb_fn(cb_arg, -ENOMEM);
779 		return;
780 	}
781 
782 	args = &req->args;
783 	args->fn.fs_op = cb_fn;
784 	args->arg = cb_arg;
785 	args->fs = fs;
786 
787 	spdk_fs_free_io_channels(fs);
788 	spdk_bs_unload(fs->bs, unload_cb, req);
789 }
790 
791 static struct spdk_file *
792 fs_find_file(struct spdk_filesystem *fs, const char *name)
793 {
794 	struct spdk_file *file;
795 
796 	TAILQ_FOREACH(file, &fs->files, tailq) {
797 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
798 			return file;
799 		}
800 	}
801 
802 	return NULL;
803 }
804 
805 void
806 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
807 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
808 {
809 	struct spdk_file_stat stat;
810 	struct spdk_file *f = NULL;
811 
812 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
813 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
814 		return;
815 	}
816 
817 	f = fs_find_file(fs, name);
818 	if (f != NULL) {
819 		stat.blobid = f->blobid;
820 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
821 		cb_fn(cb_arg, &stat, 0);
822 		return;
823 	}
824 
825 	cb_fn(cb_arg, NULL, -ENOENT);
826 }
827 
828 static void
829 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
830 {
831 	struct spdk_fs_request *req = arg;
832 	struct spdk_fs_cb_args *args = &req->args;
833 
834 	args->rc = fserrno;
835 	if (fserrno == 0) {
836 		memcpy(args->arg, stat, sizeof(*stat));
837 	}
838 	sem_post(args->sem);
839 }
840 
841 static void
842 __file_stat(void *arg)
843 {
844 	struct spdk_fs_request *req = arg;
845 	struct spdk_fs_cb_args *args = &req->args;
846 
847 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
848 				args->fn.stat_op, req);
849 }
850 
851 int
852 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
853 		  const char *name, struct spdk_file_stat *stat)
854 {
855 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
856 	struct spdk_fs_request *req;
857 	int rc;
858 
859 	req = alloc_fs_request(channel);
860 	if (req == NULL) {
861 		return -ENOMEM;
862 	}
863 
864 	req->args.fs = fs;
865 	req->args.op.stat.name = name;
866 	req->args.fn.stat_op = __copy_stat;
867 	req->args.arg = stat;
868 	req->args.sem = &channel->sem;
869 	channel->send_request(__file_stat, req);
870 	sem_wait(&channel->sem);
871 
872 	rc = req->args.rc;
873 	free_fs_request(req);
874 
875 	return rc;
876 }
877 
878 static void
879 fs_create_blob_close_cb(void *ctx, int bserrno)
880 {
881 	int rc;
882 	struct spdk_fs_request *req = ctx;
883 	struct spdk_fs_cb_args *args = &req->args;
884 
885 	rc = args->rc ? args->rc : bserrno;
886 	args->fn.file_op(args->arg, rc);
887 	free_fs_request(req);
888 }
889 
890 static void
891 fs_create_blob_resize_cb(void *ctx, int bserrno)
892 {
893 	struct spdk_fs_request *req = ctx;
894 	struct spdk_fs_cb_args *args = &req->args;
895 	struct spdk_file *f = args->file;
896 	struct spdk_blob *blob = args->op.create.blob;
897 	uint64_t length = 0;
898 
899 	args->rc = bserrno;
900 	if (bserrno) {
901 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
902 		return;
903 	}
904 
905 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
906 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
907 
908 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
909 }
910 
911 static void
912 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
913 {
914 	struct spdk_fs_request *req = ctx;
915 	struct spdk_fs_cb_args *args = &req->args;
916 
917 	if (bserrno) {
918 		args->fn.file_op(args->arg, bserrno);
919 		free_fs_request(req);
920 		return;
921 	}
922 
923 	args->op.create.blob = blob;
924 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
925 }
926 
927 static void
928 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
929 {
930 	struct spdk_fs_request *req = ctx;
931 	struct spdk_fs_cb_args *args = &req->args;
932 	struct spdk_file *f = args->file;
933 
934 	if (bserrno) {
935 		args->fn.file_op(args->arg, bserrno);
936 		free_fs_request(req);
937 		return;
938 	}
939 
940 	f->blobid = blobid;
941 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
942 }
943 
944 void
945 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
946 			  spdk_file_op_complete cb_fn, void *cb_arg)
947 {
948 	struct spdk_file *file;
949 	struct spdk_fs_request *req;
950 	struct spdk_fs_cb_args *args;
951 
952 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
953 		cb_fn(cb_arg, -ENAMETOOLONG);
954 		return;
955 	}
956 
957 	file = fs_find_file(fs, name);
958 	if (file != NULL) {
959 		cb_fn(cb_arg, -EEXIST);
960 		return;
961 	}
962 
963 	file = file_alloc(fs);
964 	if (file == NULL) {
965 		cb_fn(cb_arg, -ENOMEM);
966 		return;
967 	}
968 
969 	req = alloc_fs_request(fs->md_target.md_fs_channel);
970 	if (req == NULL) {
971 		cb_fn(cb_arg, -ENOMEM);
972 		return;
973 	}
974 
975 	args = &req->args;
976 	args->file = file;
977 	args->fn.file_op = cb_fn;
978 	args->arg = cb_arg;
979 
980 	file->name = strdup(name);
981 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
982 }
983 
984 static void
985 __fs_create_file_done(void *arg, int fserrno)
986 {
987 	struct spdk_fs_request *req = arg;
988 	struct spdk_fs_cb_args *args = &req->args;
989 
990 	args->rc = fserrno;
991 	sem_post(args->sem);
992 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
993 }
994 
995 static void
996 __fs_create_file(void *arg)
997 {
998 	struct spdk_fs_request *req = arg;
999 	struct spdk_fs_cb_args *args = &req->args;
1000 
1001 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1002 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1003 }
1004 
1005 int
1006 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
1007 {
1008 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1009 	struct spdk_fs_request *req;
1010 	struct spdk_fs_cb_args *args;
1011 	int rc;
1012 
1013 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1014 
1015 	req = alloc_fs_request(channel);
1016 	if (req == NULL) {
1017 		return -ENOMEM;
1018 	}
1019 
1020 	args = &req->args;
1021 	args->fs = fs;
1022 	args->op.create.name = name;
1023 	args->sem = &channel->sem;
1024 	fs->send_request(__fs_create_file, req);
1025 	sem_wait(&channel->sem);
1026 	rc = args->rc;
1027 	free_fs_request(req);
1028 
1029 	return rc;
1030 }
1031 
1032 static void
1033 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1034 {
1035 	struct spdk_fs_request *req = ctx;
1036 	struct spdk_fs_cb_args *args = &req->args;
1037 	struct spdk_file *f = args->file;
1038 
1039 	f->blob = blob;
1040 	while (!TAILQ_EMPTY(&f->open_requests)) {
1041 		req = TAILQ_FIRST(&f->open_requests);
1042 		args = &req->args;
1043 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1044 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1045 		free_fs_request(req);
1046 	}
1047 }
1048 
1049 static void
1050 fs_open_blob_create_cb(void *ctx, int bserrno)
1051 {
1052 	struct spdk_fs_request *req = ctx;
1053 	struct spdk_fs_cb_args *args = &req->args;
1054 	struct spdk_file *file = args->file;
1055 	struct spdk_filesystem *fs = args->fs;
1056 
1057 	if (file == NULL) {
1058 		/*
1059 		 * This is from an open with CREATE flag - the file
1060 		 *  is now created so look it up in the file list for this
1061 		 *  filesystem.
1062 		 */
1063 		file = fs_find_file(fs, args->op.open.name);
1064 		assert(file != NULL);
1065 		args->file = file;
1066 	}
1067 
1068 	file->ref_count++;
1069 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1070 	if (file->ref_count == 1) {
1071 		assert(file->blob == NULL);
1072 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1073 	} else if (file->blob != NULL) {
1074 		fs_open_blob_done(req, file->blob, 0);
1075 	} else {
1076 		/*
1077 		 * The blob open for this file is in progress due to a previous
1078 		 *  open request.  When that open completes, it will invoke the
1079 		 *  open callback for this request.
1080 		 */
1081 	}
1082 }
1083 
1084 void
1085 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1086 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1087 {
1088 	struct spdk_file *f = NULL;
1089 	struct spdk_fs_request *req;
1090 	struct spdk_fs_cb_args *args;
1091 
1092 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1093 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1094 		return;
1095 	}
1096 
1097 	f = fs_find_file(fs, name);
1098 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1099 		cb_fn(cb_arg, NULL, -ENOENT);
1100 		return;
1101 	}
1102 
1103 	if (f != NULL && f->is_deleted == true) {
1104 		cb_fn(cb_arg, NULL, -ENOENT);
1105 		return;
1106 	}
1107 
1108 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1109 	if (req == NULL) {
1110 		cb_fn(cb_arg, NULL, -ENOMEM);
1111 		return;
1112 	}
1113 
1114 	args = &req->args;
1115 	args->fn.file_op_with_handle = cb_fn;
1116 	args->arg = cb_arg;
1117 	args->file = f;
1118 	args->fs = fs;
1119 	args->op.open.name = name;
1120 
1121 	if (f == NULL) {
1122 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1123 	} else {
1124 		fs_open_blob_create_cb(req, 0);
1125 	}
1126 }
1127 
1128 static void
1129 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1130 {
1131 	struct spdk_fs_request *req = arg;
1132 	struct spdk_fs_cb_args *args = &req->args;
1133 
1134 	args->file = file;
1135 	__wake_caller(args, bserrno);
1136 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1137 }
1138 
1139 static void
1140 __fs_open_file(void *arg)
1141 {
1142 	struct spdk_fs_request *req = arg;
1143 	struct spdk_fs_cb_args *args = &req->args;
1144 
1145 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1146 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1147 				__fs_open_file_done, req);
1148 }
1149 
1150 int
1151 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1152 		  const char *name, uint32_t flags, struct spdk_file **file)
1153 {
1154 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1155 	struct spdk_fs_request *req;
1156 	struct spdk_fs_cb_args *args;
1157 	int rc;
1158 
1159 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1160 
1161 	req = alloc_fs_request(channel);
1162 	if (req == NULL) {
1163 		return -ENOMEM;
1164 	}
1165 
1166 	args = &req->args;
1167 	args->fs = fs;
1168 	args->op.open.name = name;
1169 	args->op.open.flags = flags;
1170 	args->sem = &channel->sem;
1171 	fs->send_request(__fs_open_file, req);
1172 	sem_wait(&channel->sem);
1173 	rc = args->rc;
1174 	if (rc == 0) {
1175 		*file = args->file;
1176 	} else {
1177 		*file = NULL;
1178 	}
1179 	free_fs_request(req);
1180 
1181 	return rc;
1182 }
1183 
1184 static void
1185 fs_rename_blob_close_cb(void *ctx, int bserrno)
1186 {
1187 	struct spdk_fs_request *req = ctx;
1188 	struct spdk_fs_cb_args *args = &req->args;
1189 
1190 	args->fn.fs_op(args->arg, bserrno);
1191 	free_fs_request(req);
1192 }
1193 
1194 static void
1195 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1196 {
1197 	struct spdk_fs_request *req = ctx;
1198 	struct spdk_fs_cb_args *args = &req->args;
1199 	const char *new_name = args->op.rename.new_name;
1200 
1201 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1202 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1203 }
1204 
1205 static void
1206 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1207 {
1208 	struct spdk_fs_cb_args *args = &req->args;
1209 	struct spdk_file *f;
1210 
1211 	f = fs_find_file(args->fs, args->op.rename.old_name);
1212 	if (f == NULL) {
1213 		args->fn.fs_op(args->arg, -ENOENT);
1214 		free_fs_request(req);
1215 		return;
1216 	}
1217 
1218 	free(f->name);
1219 	f->name = strdup(args->op.rename.new_name);
1220 	args->file = f;
1221 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1222 }
1223 
1224 static void
1225 fs_rename_delete_done(void *arg, int fserrno)
1226 {
1227 	__spdk_fs_md_rename_file(arg);
1228 }
1229 
1230 void
1231 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1232 			  const char *old_name, const char *new_name,
1233 			  spdk_file_op_complete cb_fn, void *cb_arg)
1234 {
1235 	struct spdk_file *f;
1236 	struct spdk_fs_request *req;
1237 	struct spdk_fs_cb_args *args;
1238 
1239 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1240 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1241 		cb_fn(cb_arg, -ENAMETOOLONG);
1242 		return;
1243 	}
1244 
1245 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1246 	if (req == NULL) {
1247 		cb_fn(cb_arg, -ENOMEM);
1248 		return;
1249 	}
1250 
1251 	args = &req->args;
1252 	args->fn.fs_op = cb_fn;
1253 	args->fs = fs;
1254 	args->arg = cb_arg;
1255 	args->op.rename.old_name = old_name;
1256 	args->op.rename.new_name = new_name;
1257 
1258 	f = fs_find_file(fs, new_name);
1259 	if (f == NULL) {
1260 		__spdk_fs_md_rename_file(req);
1261 		return;
1262 	}
1263 
1264 	/*
1265 	 * The rename overwrites an existing file.  So delete the existing file, then
1266 	 *  do the actual rename.
1267 	 */
1268 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1269 }
1270 
1271 static void
1272 __fs_rename_file_done(void *arg, int fserrno)
1273 {
1274 	struct spdk_fs_request *req = arg;
1275 	struct spdk_fs_cb_args *args = &req->args;
1276 
1277 	__wake_caller(args, fserrno);
1278 }
1279 
1280 static void
1281 __fs_rename_file(void *arg)
1282 {
1283 	struct spdk_fs_request *req = arg;
1284 	struct spdk_fs_cb_args *args = &req->args;
1285 
1286 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1287 				  __fs_rename_file_done, req);
1288 }
1289 
1290 int
1291 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1292 		    const char *old_name, const char *new_name)
1293 {
1294 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1295 	struct spdk_fs_request *req;
1296 	struct spdk_fs_cb_args *args;
1297 	int rc;
1298 
1299 	req = alloc_fs_request(channel);
1300 	if (req == NULL) {
1301 		return -ENOMEM;
1302 	}
1303 
1304 	args = &req->args;
1305 
1306 	args->fs = fs;
1307 	args->op.rename.old_name = old_name;
1308 	args->op.rename.new_name = new_name;
1309 	args->sem = &channel->sem;
1310 	fs->send_request(__fs_rename_file, req);
1311 	sem_wait(&channel->sem);
1312 	rc = args->rc;
1313 	free_fs_request(req);
1314 	return rc;
1315 }
1316 
1317 static void
1318 blob_delete_cb(void *ctx, int bserrno)
1319 {
1320 	struct spdk_fs_request *req = ctx;
1321 	struct spdk_fs_cb_args *args = &req->args;
1322 
1323 	args->fn.file_op(args->arg, bserrno);
1324 	free_fs_request(req);
1325 }
1326 
1327 void
1328 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1329 			  spdk_file_op_complete cb_fn, void *cb_arg)
1330 {
1331 	struct spdk_file *f;
1332 	spdk_blob_id blobid;
1333 	struct spdk_fs_request *req;
1334 	struct spdk_fs_cb_args *args;
1335 
1336 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1337 
1338 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1339 		cb_fn(cb_arg, -ENAMETOOLONG);
1340 		return;
1341 	}
1342 
1343 	f = fs_find_file(fs, name);
1344 	if (f == NULL) {
1345 		cb_fn(cb_arg, -ENOENT);
1346 		return;
1347 	}
1348 
1349 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1350 	if (req == NULL) {
1351 		cb_fn(cb_arg, -ENOMEM);
1352 		return;
1353 	}
1354 
1355 	args = &req->args;
1356 	args->fn.file_op = cb_fn;
1357 	args->arg = cb_arg;
1358 
1359 	if (f->ref_count > 0) {
1360 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1361 		f->is_deleted = true;
1362 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1363 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1364 		return;
1365 	}
1366 
1367 	TAILQ_REMOVE(&fs->files, f, tailq);
1368 
1369 	cache_free_buffers(f);
1370 
1371 	blobid = f->blobid;
1372 
1373 	free(f->name);
1374 	free(f->tree);
1375 	free(f);
1376 
1377 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1378 }
1379 
1380 static void
1381 __fs_delete_file_done(void *arg, int fserrno)
1382 {
1383 	struct spdk_fs_request *req = arg;
1384 	struct spdk_fs_cb_args *args = &req->args;
1385 
1386 	__wake_caller(args, fserrno);
1387 }
1388 
1389 static void
1390 __fs_delete_file(void *arg)
1391 {
1392 	struct spdk_fs_request *req = arg;
1393 	struct spdk_fs_cb_args *args = &req->args;
1394 
1395 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1396 }
1397 
1398 int
1399 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1400 		    const char *name)
1401 {
1402 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1403 	struct spdk_fs_request *req;
1404 	struct spdk_fs_cb_args *args;
1405 	int rc;
1406 
1407 	req = alloc_fs_request(channel);
1408 	if (req == NULL) {
1409 		return -ENOMEM;
1410 	}
1411 
1412 	args = &req->args;
1413 	args->fs = fs;
1414 	args->op.delete.name = name;
1415 	args->sem = &channel->sem;
1416 	fs->send_request(__fs_delete_file, req);
1417 	sem_wait(&channel->sem);
1418 	rc = args->rc;
1419 	free_fs_request(req);
1420 
1421 	return rc;
1422 }
1423 
1424 spdk_fs_iter
1425 spdk_fs_iter_first(struct spdk_filesystem *fs)
1426 {
1427 	struct spdk_file *f;
1428 
1429 	f = TAILQ_FIRST(&fs->files);
1430 	return f;
1431 }
1432 
1433 spdk_fs_iter
1434 spdk_fs_iter_next(spdk_fs_iter iter)
1435 {
1436 	struct spdk_file *f = iter;
1437 
1438 	if (f == NULL) {
1439 		return NULL;
1440 	}
1441 
1442 	f = TAILQ_NEXT(f, tailq);
1443 	return f;
1444 }
1445 
1446 const char *
1447 spdk_file_get_name(struct spdk_file *file)
1448 {
1449 	return file->name;
1450 }
1451 
1452 uint64_t
1453 spdk_file_get_length(struct spdk_file *file)
1454 {
1455 	uint64_t length;
1456 
1457 	assert(file != NULL);
1458 
1459 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1460 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1461 	return length;
1462 }
1463 
1464 static void
1465 fs_truncate_complete_cb(void *ctx, int bserrno)
1466 {
1467 	struct spdk_fs_request *req = ctx;
1468 	struct spdk_fs_cb_args *args = &req->args;
1469 
1470 	args->fn.file_op(args->arg, bserrno);
1471 	free_fs_request(req);
1472 }
1473 
1474 static void
1475 fs_truncate_resize_cb(void *ctx, int bserrno)
1476 {
1477 	struct spdk_fs_request *req = ctx;
1478 	struct spdk_fs_cb_args *args = &req->args;
1479 	struct spdk_file *file = args->file;
1480 	uint64_t *length = &args->op.truncate.length;
1481 
1482 	if (bserrno) {
1483 		args->fn.file_op(args->arg, bserrno);
1484 		free_fs_request(req);
1485 		return;
1486 	}
1487 
1488 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1489 
1490 	file->length = *length;
1491 	if (file->append_pos > file->length) {
1492 		file->append_pos = file->length;
1493 	}
1494 
1495 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1496 }
1497 
1498 static uint64_t
1499 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1500 {
1501 	return (length + cluster_sz - 1) / cluster_sz;
1502 }
1503 
1504 void
1505 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1506 			 spdk_file_op_complete cb_fn, void *cb_arg)
1507 {
1508 	struct spdk_filesystem *fs;
1509 	size_t num_clusters;
1510 	struct spdk_fs_request *req;
1511 	struct spdk_fs_cb_args *args;
1512 
1513 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1514 	if (length == file->length) {
1515 		cb_fn(cb_arg, 0);
1516 		return;
1517 	}
1518 
1519 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1520 	if (req == NULL) {
1521 		cb_fn(cb_arg, -ENOMEM);
1522 		return;
1523 	}
1524 
1525 	args = &req->args;
1526 	args->fn.file_op = cb_fn;
1527 	args->arg = cb_arg;
1528 	args->file = file;
1529 	args->op.truncate.length = length;
1530 	fs = file->fs;
1531 
1532 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1533 
1534 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1535 }
1536 
1537 static void
1538 __truncate(void *arg)
1539 {
1540 	struct spdk_fs_request *req = arg;
1541 	struct spdk_fs_cb_args *args = &req->args;
1542 
1543 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1544 				 args->fn.file_op, args);
1545 }
1546 
1547 int
1548 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1549 		   uint64_t length)
1550 {
1551 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1552 	struct spdk_fs_request *req;
1553 	struct spdk_fs_cb_args *args;
1554 	int rc;
1555 
1556 	req = alloc_fs_request(channel);
1557 	if (req == NULL) {
1558 		return -ENOMEM;
1559 	}
1560 
1561 	args = &req->args;
1562 
1563 	args->file = file;
1564 	args->op.truncate.length = length;
1565 	args->fn.file_op = __wake_caller;
1566 	args->sem = &channel->sem;
1567 
1568 	channel->send_request(__truncate, req);
1569 	sem_wait(&channel->sem);
1570 	rc = args->rc;
1571 	free_fs_request(req);
1572 
1573 	return rc;
1574 }
1575 
1576 static void
1577 __rw_done(void *ctx, int bserrno)
1578 {
1579 	struct spdk_fs_request *req = ctx;
1580 	struct spdk_fs_cb_args *args = &req->args;
1581 
1582 	spdk_free(args->op.rw.pin_buf);
1583 	args->fn.file_op(args->arg, bserrno);
1584 	free_fs_request(req);
1585 }
1586 
1587 static void
1588 __read_done(void *ctx, int bserrno)
1589 {
1590 	struct spdk_fs_request *req = ctx;
1591 	struct spdk_fs_cb_args *args = &req->args;
1592 
1593 	assert(req != NULL);
1594 	if (args->op.rw.is_read) {
1595 		memcpy(args->op.rw.user_buf,
1596 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1597 		       args->op.rw.length);
1598 		__rw_done(req, 0);
1599 	} else {
1600 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1601 		       args->op.rw.user_buf,
1602 		       args->op.rw.length);
1603 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1604 				   args->op.rw.pin_buf,
1605 				   args->op.rw.start_lba, args->op.rw.num_lba,
1606 				   __rw_done, req);
1607 	}
1608 }
1609 
1610 static void
1611 __do_blob_read(void *ctx, int fserrno)
1612 {
1613 	struct spdk_fs_request *req = ctx;
1614 	struct spdk_fs_cb_args *args = &req->args;
1615 
1616 	if (fserrno) {
1617 		__rw_done(req, fserrno);
1618 		return;
1619 	}
1620 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1621 			  args->op.rw.pin_buf,
1622 			  args->op.rw.start_lba, args->op.rw.num_lba,
1623 			  __read_done, req);
1624 }
1625 
1626 static void
1627 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1628 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1629 {
1630 	uint64_t end_lba;
1631 
1632 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1633 	*start_lba = offset / *lba_size;
1634 	end_lba = (offset + length - 1) / *lba_size;
1635 	*num_lba = (end_lba - *start_lba + 1);
1636 }
1637 
1638 static void
1639 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1640 	    void *payload, uint64_t offset, uint64_t length,
1641 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1642 {
1643 	struct spdk_fs_request *req;
1644 	struct spdk_fs_cb_args *args;
1645 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1646 	uint64_t start_lba, num_lba, pin_buf_length;
1647 	uint32_t lba_size;
1648 
1649 	if (is_read && offset + length > file->length) {
1650 		cb_fn(cb_arg, -EINVAL);
1651 		return;
1652 	}
1653 
1654 	req = alloc_fs_request(channel);
1655 	if (req == NULL) {
1656 		cb_fn(cb_arg, -ENOMEM);
1657 		return;
1658 	}
1659 
1660 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1661 
1662 	args = &req->args;
1663 	args->fn.file_op = cb_fn;
1664 	args->arg = cb_arg;
1665 	args->file = file;
1666 	args->op.rw.channel = channel->bs_channel;
1667 	args->op.rw.user_buf = payload;
1668 	args->op.rw.is_read = is_read;
1669 	args->op.rw.offset = offset;
1670 	args->op.rw.length = length;
1671 	args->op.rw.blocklen = lba_size;
1672 
1673 	pin_buf_length = num_lba * lba_size;
1674 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1675 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1676 	if (args->op.rw.pin_buf == NULL) {
1677 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1678 			      file->name, offset, length);
1679 		free_fs_request(req);
1680 		cb_fn(cb_arg, -ENOMEM);
1681 		return;
1682 	}
1683 
1684 	args->op.rw.start_lba = start_lba;
1685 	args->op.rw.num_lba = num_lba;
1686 
1687 	if (!is_read && file->length < offset + length) {
1688 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1689 	} else {
1690 		__do_blob_read(req, 0);
1691 	}
1692 }
1693 
1694 void
1695 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1696 		      void *payload, uint64_t offset, uint64_t length,
1697 		      spdk_file_op_complete cb_fn, void *cb_arg)
1698 {
1699 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1700 }
1701 
1702 void
1703 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1704 		     void *payload, uint64_t offset, uint64_t length,
1705 		     spdk_file_op_complete cb_fn, void *cb_arg)
1706 {
1707 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1708 		      file->name, offset, length);
1709 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1710 }
1711 
1712 struct spdk_io_channel *
1713 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1714 {
1715 	struct spdk_io_channel *io_channel;
1716 	struct spdk_fs_channel *fs_channel;
1717 
1718 	io_channel = spdk_get_io_channel(&fs->io_target);
1719 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1720 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1721 	fs_channel->send_request = __send_request_direct;
1722 
1723 	return io_channel;
1724 }
1725 
1726 struct spdk_io_channel *
1727 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1728 {
1729 	struct spdk_io_channel *io_channel;
1730 	struct spdk_fs_channel *fs_channel;
1731 
1732 	io_channel = spdk_get_io_channel(&fs->io_target);
1733 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1734 	fs_channel->send_request = fs->send_request;
1735 	fs_channel->sync = 1;
1736 	pthread_spin_init(&fs_channel->lock, 0);
1737 
1738 	return io_channel;
1739 }
1740 
1741 void
1742 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1743 {
1744 	spdk_put_io_channel(channel);
1745 }
1746 
1747 void
1748 spdk_fs_set_cache_size(uint64_t size_in_mb)
1749 {
1750 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1751 }
1752 
1753 uint64_t
1754 spdk_fs_get_cache_size(void)
1755 {
1756 	return g_fs_cache_size / (1024 * 1024);
1757 }
1758 
1759 static void __file_flush(void *_args);
1760 
1761 static void *
1762 alloc_cache_memory_buffer(struct spdk_file *context)
1763 {
1764 	struct spdk_file *file;
1765 	void *buf;
1766 
1767 	buf = spdk_mempool_get(g_cache_pool);
1768 	if (buf != NULL) {
1769 		return buf;
1770 	}
1771 
1772 	pthread_spin_lock(&g_caches_lock);
1773 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1774 		if (!file->open_for_writing &&
1775 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1776 		    file != context) {
1777 			break;
1778 		}
1779 	}
1780 	pthread_spin_unlock(&g_caches_lock);
1781 	if (file != NULL) {
1782 		cache_free_buffers(file);
1783 		buf = spdk_mempool_get(g_cache_pool);
1784 		if (buf != NULL) {
1785 			return buf;
1786 		}
1787 	}
1788 
1789 	pthread_spin_lock(&g_caches_lock);
1790 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1791 		if (!file->open_for_writing && file != context) {
1792 			break;
1793 		}
1794 	}
1795 	pthread_spin_unlock(&g_caches_lock);
1796 	if (file != NULL) {
1797 		cache_free_buffers(file);
1798 		buf = spdk_mempool_get(g_cache_pool);
1799 		if (buf != NULL) {
1800 			return buf;
1801 		}
1802 	}
1803 
1804 	pthread_spin_lock(&g_caches_lock);
1805 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1806 		if (file != context) {
1807 			break;
1808 		}
1809 	}
1810 	pthread_spin_unlock(&g_caches_lock);
1811 	if (file != NULL) {
1812 		cache_free_buffers(file);
1813 		buf = spdk_mempool_get(g_cache_pool);
1814 		if (buf != NULL) {
1815 			return buf;
1816 		}
1817 	}
1818 
1819 	return NULL;
1820 }
1821 
1822 static struct cache_buffer *
1823 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1824 {
1825 	struct cache_buffer *buf;
1826 	int count = 0;
1827 
1828 	buf = calloc(1, sizeof(*buf));
1829 	if (buf == NULL) {
1830 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1831 		return NULL;
1832 	}
1833 
1834 	buf->buf = alloc_cache_memory_buffer(file);
1835 	while (buf->buf == NULL) {
1836 		/*
1837 		 * TODO: alloc_cache_memory_buffer() should eventually free
1838 		 *  some buffers.  Need a more sophisticated check here, instead
1839 		 *  of just bailing if 100 tries does not result in getting a
1840 		 *  free buffer.  This will involve using the sync channel's
1841 		 *  semaphore to block until a buffer becomes available.
1842 		 */
1843 		if (count++ == 100) {
1844 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1845 				    file, offset);
1846 			free(buf);
1847 			return NULL;
1848 		}
1849 		buf->buf = alloc_cache_memory_buffer(file);
1850 	}
1851 
1852 	buf->buf_size = CACHE_BUFFER_SIZE;
1853 	buf->offset = offset;
1854 
1855 	pthread_spin_lock(&g_caches_lock);
1856 	if (file->tree->present_mask == 0) {
1857 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1858 	}
1859 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1860 	pthread_spin_unlock(&g_caches_lock);
1861 
1862 	return buf;
1863 }
1864 
1865 static struct cache_buffer *
1866 cache_append_buffer(struct spdk_file *file)
1867 {
1868 	struct cache_buffer *last;
1869 
1870 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1871 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1872 
1873 	last = cache_insert_buffer(file, file->append_pos);
1874 	if (last == NULL) {
1875 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1876 		return NULL;
1877 	}
1878 
1879 	file->last = last;
1880 
1881 	return last;
1882 }
1883 
1884 static void __check_sync_reqs(struct spdk_file *file);
1885 
1886 static void
1887 __file_cache_finish_sync(void *ctx, int bserrno)
1888 {
1889 	struct spdk_file *file = ctx;
1890 	struct spdk_fs_request *sync_req;
1891 	struct spdk_fs_cb_args *sync_args;
1892 
1893 	pthread_spin_lock(&file->lock);
1894 	sync_req = TAILQ_FIRST(&file->sync_requests);
1895 	sync_args = &sync_req->args;
1896 	assert(sync_args->op.sync.offset <= file->length_flushed);
1897 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1898 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1899 	pthread_spin_unlock(&file->lock);
1900 
1901 	sync_args->fn.file_op(sync_args->arg, bserrno);
1902 	__check_sync_reqs(file);
1903 
1904 	pthread_spin_lock(&file->lock);
1905 	free_fs_request(sync_req);
1906 	pthread_spin_unlock(&file->lock);
1907 }
1908 
1909 static void
1910 __free_args(struct spdk_fs_cb_args *args)
1911 {
1912 	struct spdk_fs_request *req;
1913 
1914 	if (!args->from_request) {
1915 		free(args);
1916 	} else {
1917 		/* Depends on args being at the start of the spdk_fs_request structure. */
1918 		req = (struct spdk_fs_request *)args;
1919 		free_fs_request(req);
1920 	}
1921 }
1922 
1923 static void
1924 __check_sync_reqs(struct spdk_file *file)
1925 {
1926 	struct spdk_fs_request *sync_req;
1927 
1928 	pthread_spin_lock(&file->lock);
1929 
1930 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1931 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1932 			break;
1933 		}
1934 	}
1935 
1936 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1937 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1938 		sync_req->args.op.sync.xattr_in_progress = true;
1939 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1940 				    sizeof(file->length_flushed));
1941 
1942 		pthread_spin_unlock(&file->lock);
1943 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1944 	} else {
1945 		pthread_spin_unlock(&file->lock);
1946 	}
1947 }
1948 
1949 static void
1950 __file_flush_done(void *arg, int bserrno)
1951 {
1952 	struct spdk_fs_cb_args *args = arg;
1953 	struct spdk_file *file = args->file;
1954 	struct cache_buffer *next = args->op.flush.cache_buffer;
1955 
1956 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1957 
1958 	pthread_spin_lock(&file->lock);
1959 	next->in_progress = false;
1960 	next->bytes_flushed += args->op.flush.length;
1961 	file->length_flushed += args->op.flush.length;
1962 	if (file->length_flushed > file->length) {
1963 		file->length = file->length_flushed;
1964 	}
1965 	if (next->bytes_flushed == next->buf_size) {
1966 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1967 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1968 	}
1969 
1970 	/*
1971 	 * Assert that there is no cached data that extends past the end of the underlying
1972 	 *  blob.
1973 	 */
1974 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1975 	       next->bytes_filled == 0);
1976 
1977 	pthread_spin_unlock(&file->lock);
1978 
1979 	__check_sync_reqs(file);
1980 
1981 	__file_flush(args);
1982 }
1983 
1984 static void
1985 __file_flush(void *_args)
1986 {
1987 	struct spdk_fs_cb_args *args = _args;
1988 	struct spdk_file *file = args->file;
1989 	struct cache_buffer *next;
1990 	uint64_t offset, length, start_lba, num_lba;
1991 	uint32_t lba_size;
1992 
1993 	pthread_spin_lock(&file->lock);
1994 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1995 	if (next == NULL || next->in_progress) {
1996 		/*
1997 		 * There is either no data to flush, or a flush I/O is already in
1998 		 *  progress.  So return immediately - if a flush I/O is in
1999 		 *  progress we will flush more data after that is completed.
2000 		 */
2001 		__free_args(args);
2002 		if (next == NULL) {
2003 			/*
2004 			 * For cases where a file's cache was evicted, and then the
2005 			 *  file was later appended, we will write the data directly
2006 			 *  to disk and bypass cache.  So just update length_flushed
2007 			 *  here to reflect that all data was already written to disk.
2008 			 */
2009 			file->length_flushed = file->append_pos;
2010 		}
2011 		pthread_spin_unlock(&file->lock);
2012 		if (next == NULL) {
2013 			/*
2014 			 * There is no data to flush, but we still need to check for any
2015 			 *  outstanding sync requests to make sure metadata gets updated.
2016 			 */
2017 			__check_sync_reqs(file);
2018 		}
2019 		return;
2020 	}
2021 
2022 	offset = next->offset + next->bytes_flushed;
2023 	length = next->bytes_filled - next->bytes_flushed;
2024 	if (length == 0) {
2025 		__free_args(args);
2026 		pthread_spin_unlock(&file->lock);
2027 		return;
2028 	}
2029 	args->op.flush.length = length;
2030 	args->op.flush.cache_buffer = next;
2031 
2032 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2033 
2034 	next->in_progress = true;
2035 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2036 		     offset, length, start_lba, num_lba);
2037 	pthread_spin_unlock(&file->lock);
2038 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2039 			   next->buf + (start_lba * lba_size) - next->offset,
2040 			   start_lba, num_lba, __file_flush_done, args);
2041 }
2042 
2043 static void
2044 __file_extend_done(void *arg, int bserrno)
2045 {
2046 	struct spdk_fs_cb_args *args = arg;
2047 
2048 	__wake_caller(args, bserrno);
2049 }
2050 
2051 static void
2052 __file_extend_resize_cb(void *_args, int bserrno)
2053 {
2054 	struct spdk_fs_cb_args *args = _args;
2055 	struct spdk_file *file = args->file;
2056 
2057 	if (bserrno) {
2058 		__wake_caller(args, bserrno);
2059 		return;
2060 	}
2061 
2062 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2063 }
2064 
2065 static void
2066 __file_extend_blob(void *_args)
2067 {
2068 	struct spdk_fs_cb_args *args = _args;
2069 	struct spdk_file *file = args->file;
2070 
2071 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2072 }
2073 
2074 static void
2075 __rw_from_file_done(void *arg, int bserrno)
2076 {
2077 	struct spdk_fs_cb_args *args = arg;
2078 
2079 	__wake_caller(args, bserrno);
2080 	__free_args(args);
2081 }
2082 
2083 static void
2084 __rw_from_file(void *_args)
2085 {
2086 	struct spdk_fs_cb_args *args = _args;
2087 	struct spdk_file *file = args->file;
2088 
2089 	if (args->op.rw.is_read) {
2090 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2091 				     args->op.rw.offset, args->op.rw.length,
2092 				     __rw_from_file_done, args);
2093 	} else {
2094 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2095 				      args->op.rw.offset, args->op.rw.length,
2096 				      __rw_from_file_done, args);
2097 	}
2098 }
2099 
2100 static int
2101 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
2102 		    uint64_t offset, uint64_t length, bool is_read)
2103 {
2104 	struct spdk_fs_cb_args *args;
2105 
2106 	args = calloc(1, sizeof(*args));
2107 	if (args == NULL) {
2108 		sem_post(sem);
2109 		return -ENOMEM;
2110 	}
2111 
2112 	args->file = file;
2113 	args->sem = sem;
2114 	args->op.rw.user_buf = payload;
2115 	args->op.rw.offset = offset;
2116 	args->op.rw.length = length;
2117 	args->op.rw.is_read = is_read;
2118 	file->fs->send_request(__rw_from_file, args);
2119 	return 0;
2120 }
2121 
2122 int
2123 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
2124 		void *payload, uint64_t offset, uint64_t length)
2125 {
2126 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2127 	struct spdk_fs_cb_args *args;
2128 	uint64_t rem_length, copy, blob_size, cluster_sz;
2129 	uint32_t cache_buffers_filled = 0;
2130 	uint8_t *cur_payload;
2131 	struct cache_buffer *last;
2132 
2133 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2134 
2135 	if (length == 0) {
2136 		return 0;
2137 	}
2138 
2139 	if (offset != file->append_pos) {
2140 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2141 		return -EINVAL;
2142 	}
2143 
2144 	pthread_spin_lock(&file->lock);
2145 	file->open_for_writing = true;
2146 
2147 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2148 		cache_append_buffer(file);
2149 	}
2150 
2151 	if (file->last == NULL) {
2152 		int rc;
2153 
2154 		file->append_pos += length;
2155 		pthread_spin_unlock(&file->lock);
2156 		rc = __send_rw_from_file(file, &channel->sem, payload,
2157 					 offset, length, false);
2158 		sem_wait(&channel->sem);
2159 		return rc;
2160 	}
2161 
2162 	blob_size = __file_get_blob_size(file);
2163 
2164 	if ((offset + length) > blob_size) {
2165 		struct spdk_fs_cb_args extend_args = {};
2166 
2167 		cluster_sz = file->fs->bs_opts.cluster_sz;
2168 		extend_args.sem = &channel->sem;
2169 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2170 		extend_args.file = file;
2171 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2172 		pthread_spin_unlock(&file->lock);
2173 		file->fs->send_request(__file_extend_blob, &extend_args);
2174 		sem_wait(&channel->sem);
2175 		if (extend_args.rc) {
2176 			return extend_args.rc;
2177 		}
2178 	}
2179 
2180 	last = file->last;
2181 	rem_length = length;
2182 	cur_payload = payload;
2183 	while (rem_length > 0) {
2184 		copy = last->buf_size - last->bytes_filled;
2185 		if (copy > rem_length) {
2186 			copy = rem_length;
2187 		}
2188 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2189 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2190 		file->append_pos += copy;
2191 		if (file->length < file->append_pos) {
2192 			file->length = file->append_pos;
2193 		}
2194 		cur_payload += copy;
2195 		last->bytes_filled += copy;
2196 		rem_length -= copy;
2197 		if (last->bytes_filled == last->buf_size) {
2198 			cache_buffers_filled++;
2199 			last = cache_append_buffer(file);
2200 			if (last == NULL) {
2201 				BLOBFS_TRACE(file, "nomem\n");
2202 				pthread_spin_unlock(&file->lock);
2203 				return -ENOMEM;
2204 			}
2205 		}
2206 	}
2207 
2208 	pthread_spin_unlock(&file->lock);
2209 
2210 	if (cache_buffers_filled == 0) {
2211 		return 0;
2212 	}
2213 
2214 	args = calloc(1, sizeof(*args));
2215 	if (args == NULL) {
2216 		return -ENOMEM;
2217 	}
2218 
2219 	args->file = file;
2220 	file->fs->send_request(__file_flush, args);
2221 	return 0;
2222 }
2223 
2224 static void
2225 __readahead_done(void *arg, int bserrno)
2226 {
2227 	struct spdk_fs_cb_args *args = arg;
2228 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2229 	struct spdk_file *file = args->file;
2230 
2231 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2232 
2233 	pthread_spin_lock(&file->lock);
2234 	cache_buffer->bytes_filled = args->op.readahead.length;
2235 	cache_buffer->bytes_flushed = args->op.readahead.length;
2236 	cache_buffer->in_progress = false;
2237 	pthread_spin_unlock(&file->lock);
2238 
2239 	__free_args(args);
2240 }
2241 
2242 static void
2243 __readahead(void *_args)
2244 {
2245 	struct spdk_fs_cb_args *args = _args;
2246 	struct spdk_file *file = args->file;
2247 	uint64_t offset, length, start_lba, num_lba;
2248 	uint32_t lba_size;
2249 
2250 	offset = args->op.readahead.offset;
2251 	length = args->op.readahead.length;
2252 	assert(length > 0);
2253 
2254 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2255 
2256 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2257 		     offset, length, start_lba, num_lba);
2258 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2259 			  args->op.readahead.cache_buffer->buf,
2260 			  start_lba, num_lba, __readahead_done, args);
2261 }
2262 
2263 static uint64_t
2264 __next_cache_buffer_offset(uint64_t offset)
2265 {
2266 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2267 }
2268 
2269 static void
2270 check_readahead(struct spdk_file *file, uint64_t offset)
2271 {
2272 	struct spdk_fs_cb_args *args;
2273 
2274 	offset = __next_cache_buffer_offset(offset);
2275 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2276 		return;
2277 	}
2278 
2279 	args = calloc(1, sizeof(*args));
2280 	if (args == NULL) {
2281 		return;
2282 	}
2283 
2284 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2285 
2286 	args->file = file;
2287 	args->op.readahead.offset = offset;
2288 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2289 	if (!args->op.readahead.cache_buffer) {
2290 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2291 		free(args);
2292 		return;
2293 	}
2294 
2295 	args->op.readahead.cache_buffer->in_progress = true;
2296 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2297 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2298 	} else {
2299 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2300 	}
2301 	file->fs->send_request(__readahead, args);
2302 }
2303 
2304 static int
2305 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2306 {
2307 	struct cache_buffer *buf;
2308 	int rc;
2309 
2310 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2311 	if (buf == NULL) {
2312 		pthread_spin_unlock(&file->lock);
2313 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2314 		pthread_spin_lock(&file->lock);
2315 		return rc;
2316 	}
2317 
2318 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2319 		length = buf->offset + buf->bytes_filled - offset;
2320 	}
2321 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2322 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2323 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2324 		pthread_spin_lock(&g_caches_lock);
2325 		spdk_tree_remove_buffer(file->tree, buf);
2326 		if (file->tree->present_mask == 0) {
2327 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2328 		}
2329 		pthread_spin_unlock(&g_caches_lock);
2330 	}
2331 
2332 	sem_post(sem);
2333 	return 0;
2334 }
2335 
2336 int64_t
2337 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2338 	       void *payload, uint64_t offset, uint64_t length)
2339 {
2340 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2341 	uint64_t final_offset, final_length;
2342 	uint32_t sub_reads = 0;
2343 	int rc = 0;
2344 
2345 	pthread_spin_lock(&file->lock);
2346 
2347 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2348 
2349 	file->open_for_writing = false;
2350 
2351 	if (length == 0 || offset >= file->append_pos) {
2352 		pthread_spin_unlock(&file->lock);
2353 		return 0;
2354 	}
2355 
2356 	if (offset + length > file->append_pos) {
2357 		length = file->append_pos - offset;
2358 	}
2359 
2360 	if (offset != file->next_seq_offset) {
2361 		file->seq_byte_count = 0;
2362 	}
2363 	file->seq_byte_count += length;
2364 	file->next_seq_offset = offset + length;
2365 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2366 		check_readahead(file, offset);
2367 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2368 	}
2369 
2370 	final_length = 0;
2371 	final_offset = offset + length;
2372 	while (offset < final_offset) {
2373 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2374 		if (length > (final_offset - offset)) {
2375 			length = final_offset - offset;
2376 		}
2377 		rc = __file_read(file, payload, offset, length, &channel->sem);
2378 		if (rc == 0) {
2379 			final_length += length;
2380 		} else {
2381 			break;
2382 		}
2383 		payload += length;
2384 		offset += length;
2385 		sub_reads++;
2386 	}
2387 	pthread_spin_unlock(&file->lock);
2388 	while (sub_reads-- > 0) {
2389 		sem_wait(&channel->sem);
2390 	}
2391 	if (rc == 0) {
2392 		return final_length;
2393 	} else {
2394 		return rc;
2395 	}
2396 }
2397 
2398 static void
2399 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2400 	   spdk_file_op_complete cb_fn, void *cb_arg)
2401 {
2402 	struct spdk_fs_request *sync_req;
2403 	struct spdk_fs_request *flush_req;
2404 	struct spdk_fs_cb_args *sync_args;
2405 	struct spdk_fs_cb_args *flush_args;
2406 
2407 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2408 
2409 	pthread_spin_lock(&file->lock);
2410 	if (file->append_pos <= file->length_flushed) {
2411 		BLOBFS_TRACE(file, "done - no data to flush\n");
2412 		pthread_spin_unlock(&file->lock);
2413 		cb_fn(cb_arg, 0);
2414 		return;
2415 	}
2416 
2417 	sync_req = alloc_fs_request(channel);
2418 	if (!sync_req) {
2419 		pthread_spin_unlock(&file->lock);
2420 		cb_fn(cb_arg, -ENOMEM);
2421 		return;
2422 	}
2423 	sync_args = &sync_req->args;
2424 
2425 	flush_req = alloc_fs_request(channel);
2426 	if (!flush_req) {
2427 		pthread_spin_unlock(&file->lock);
2428 		cb_fn(cb_arg, -ENOMEM);
2429 		return;
2430 	}
2431 	flush_args = &flush_req->args;
2432 
2433 	sync_args->file = file;
2434 	sync_args->fn.file_op = cb_fn;
2435 	sync_args->arg = cb_arg;
2436 	sync_args->op.sync.offset = file->append_pos;
2437 	sync_args->op.sync.xattr_in_progress = false;
2438 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2439 	pthread_spin_unlock(&file->lock);
2440 
2441 	flush_args->file = file;
2442 	channel->send_request(__file_flush, flush_args);
2443 }
2444 
2445 int
2446 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2447 {
2448 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2449 	struct spdk_fs_cb_args args = {};
2450 
2451 	args.sem = &channel->sem;
2452 	_file_sync(file, channel, __wake_caller, &args);
2453 	sem_wait(&channel->sem);
2454 
2455 	return args.rc;
2456 }
2457 
2458 void
2459 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2460 		     spdk_file_op_complete cb_fn, void *cb_arg)
2461 {
2462 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2463 
2464 	_file_sync(file, channel, cb_fn, cb_arg);
2465 }
2466 
2467 void
2468 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2469 {
2470 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2471 	file->priority = priority;
2472 
2473 }
2474 
2475 /*
2476  * Close routines
2477  */
2478 
2479 static void
2480 __file_close_async_done(void *ctx, int bserrno)
2481 {
2482 	struct spdk_fs_request *req = ctx;
2483 	struct spdk_fs_cb_args *args = &req->args;
2484 	struct spdk_file *file = args->file;
2485 
2486 	if (file->is_deleted) {
2487 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2488 		return;
2489 	}
2490 
2491 	args->fn.file_op(args->arg, bserrno);
2492 	free_fs_request(req);
2493 }
2494 
2495 static void
2496 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2497 {
2498 	struct spdk_blob *blob;
2499 
2500 	pthread_spin_lock(&file->lock);
2501 	if (file->ref_count == 0) {
2502 		pthread_spin_unlock(&file->lock);
2503 		__file_close_async_done(req, -EBADF);
2504 		return;
2505 	}
2506 
2507 	file->ref_count--;
2508 	if (file->ref_count > 0) {
2509 		pthread_spin_unlock(&file->lock);
2510 		req->args.fn.file_op(req->args.arg, 0);
2511 		free_fs_request(req);
2512 		return;
2513 	}
2514 
2515 	pthread_spin_unlock(&file->lock);
2516 
2517 	blob = file->blob;
2518 	file->blob = NULL;
2519 	spdk_blob_close(blob, __file_close_async_done, req);
2520 }
2521 
2522 static void
2523 __file_close_async__sync_done(void *arg, int fserrno)
2524 {
2525 	struct spdk_fs_request *req = arg;
2526 	struct spdk_fs_cb_args *args = &req->args;
2527 
2528 	__file_close_async(args->file, req);
2529 }
2530 
2531 void
2532 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2533 {
2534 	struct spdk_fs_request *req;
2535 	struct spdk_fs_cb_args *args;
2536 
2537 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2538 	if (req == NULL) {
2539 		cb_fn(cb_arg, -ENOMEM);
2540 		return;
2541 	}
2542 
2543 	args = &req->args;
2544 	args->file = file;
2545 	args->fn.file_op = cb_fn;
2546 	args->arg = cb_arg;
2547 
2548 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2549 }
2550 
2551 static void
2552 __file_close(void *arg)
2553 {
2554 	struct spdk_fs_request *req = arg;
2555 	struct spdk_fs_cb_args *args = &req->args;
2556 	struct spdk_file *file = args->file;
2557 
2558 	__file_close_async(file, req);
2559 }
2560 
2561 int
2562 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2563 {
2564 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2565 	struct spdk_fs_request *req;
2566 	struct spdk_fs_cb_args *args;
2567 
2568 	req = alloc_fs_request(channel);
2569 	if (req == NULL) {
2570 		return -ENOMEM;
2571 	}
2572 
2573 	args = &req->args;
2574 
2575 	spdk_file_sync(file, _channel);
2576 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2577 	args->file = file;
2578 	args->sem = &channel->sem;
2579 	args->fn.file_op = __wake_caller;
2580 	args->arg = req;
2581 	channel->send_request(__file_close, req);
2582 	sem_wait(&channel->sem);
2583 
2584 	return args->rc;
2585 }
2586 
2587 int
2588 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2589 {
2590 	if (size < sizeof(spdk_blob_id)) {
2591 		return -EINVAL;
2592 	}
2593 
2594 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2595 
2596 	return sizeof(spdk_blob_id);
2597 }
2598 
2599 static void
2600 cache_free_buffers(struct spdk_file *file)
2601 {
2602 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2603 	pthread_spin_lock(&file->lock);
2604 	pthread_spin_lock(&g_caches_lock);
2605 	if (file->tree->present_mask == 0) {
2606 		pthread_spin_unlock(&g_caches_lock);
2607 		pthread_spin_unlock(&file->lock);
2608 		return;
2609 	}
2610 	spdk_tree_free_buffers(file->tree);
2611 
2612 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2613 	/* If not freed, put it in the end of the queue */
2614 	if (file->tree->present_mask != 0) {
2615 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2616 	}
2617 	file->last = NULL;
2618 	pthread_spin_unlock(&g_caches_lock);
2619 	pthread_spin_unlock(&file->lock);
2620 }
2621 
2622 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2623 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2624