xref: /spdk/lib/blobfs/blobfs.c (revision bdb1d5713fb8b1ded692ac9b537d7346ffee36be)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
57 static struct spdk_mempool *g_cache_pool;
58 static TAILQ_HEAD(, spdk_file) g_caches;
59 static int g_fs_count = 0;
60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_spinlock_t g_caches_lock;
62 
63 void
64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
65 {
66 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
67 	free(cache_buffer);
68 }
69 
70 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
71 
72 struct spdk_file {
73 	struct spdk_filesystem	*fs;
74 	struct spdk_blob	*blob;
75 	char			*name;
76 	uint64_t		length;
77 	bool                    is_deleted;
78 	bool			open_for_writing;
79 	uint64_t		length_flushed;
80 	uint64_t		append_pos;
81 	uint64_t		seq_byte_count;
82 	uint64_t		next_seq_offset;
83 	uint32_t		priority;
84 	TAILQ_ENTRY(spdk_file)	tailq;
85 	spdk_blob_id		blobid;
86 	uint32_t		ref_count;
87 	pthread_spinlock_t	lock;
88 	struct cache_buffer	*last;
89 	struct cache_tree	*tree;
90 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
91 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
92 	TAILQ_ENTRY(spdk_file)	cache_tailq;
93 };
94 
95 struct spdk_deleted_file {
96 	spdk_blob_id	id;
97 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	bool from_request;
138 	union {
139 		struct {
140 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
141 		} fs_load;
142 		struct {
143 			uint64_t	length;
144 		} truncate;
145 		struct {
146 			struct spdk_io_channel	*channel;
147 			void		*user_buf;
148 			void		*pin_buf;
149 			int		is_read;
150 			off_t		offset;
151 			size_t		length;
152 			uint64_t	start_lba;
153 			uint64_t	num_lba;
154 			uint32_t	blocklen;
155 		} rw;
156 		struct {
157 			const char	*old_name;
158 			const char	*new_name;
159 		} rename;
160 		struct {
161 			struct cache_buffer	*cache_buffer;
162 			uint64_t		length;
163 		} flush;
164 		struct {
165 			struct cache_buffer	*cache_buffer;
166 			uint64_t		length;
167 			uint64_t		offset;
168 		} readahead;
169 		struct {
170 			uint64_t			offset;
171 			TAILQ_ENTRY(spdk_fs_request)	tailq;
172 			bool				xattr_in_progress;
173 		} sync;
174 		struct {
175 			uint32_t			num_clusters;
176 		} resize;
177 		struct {
178 			const char	*name;
179 			uint32_t	flags;
180 			TAILQ_ENTRY(spdk_fs_request)	tailq;
181 		} open;
182 		struct {
183 			const char		*name;
184 			struct spdk_blob	*blob;
185 		} create;
186 		struct {
187 			const char	*name;
188 		} delete;
189 		struct {
190 			const char	*name;
191 		} stat;
192 	} op;
193 };
194 
195 static void cache_free_buffers(struct spdk_file *file);
196 
197 void
198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
199 {
200 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
201 }
202 
203 static void
204 __initialize_cache(void)
205 {
206 	assert(g_cache_pool == NULL);
207 
208 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
209 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
210 					   CACHE_BUFFER_SIZE,
211 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 					   SPDK_ENV_SOCKET_ID_ANY);
213 	if (!g_cache_pool) {
214 		SPDK_ERRLOG("Create mempool failed, you may "
215 			    "increase the memory and try again\n");
216 		assert(false);
217 	}
218 	TAILQ_INIT(&g_caches);
219 	pthread_spin_init(&g_caches_lock, 0);
220 }
221 
222 static void
223 __free_cache(void)
224 {
225 	assert(g_cache_pool != NULL);
226 
227 	spdk_mempool_free(g_cache_pool);
228 	g_cache_pool = NULL;
229 }
230 
231 static uint64_t
232 __file_get_blob_size(struct spdk_file *file)
233 {
234 	uint64_t cluster_sz;
235 
236 	cluster_sz = file->fs->bs_opts.cluster_sz;
237 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
238 }
239 
240 struct spdk_fs_request {
241 	struct spdk_fs_cb_args		args;
242 	TAILQ_ENTRY(spdk_fs_request)	link;
243 	struct spdk_fs_channel		*channel;
244 };
245 
246 struct spdk_fs_channel {
247 	struct spdk_fs_request		*req_mem;
248 	TAILQ_HEAD(, spdk_fs_request)	reqs;
249 	sem_t				sem;
250 	struct spdk_filesystem		*fs;
251 	struct spdk_io_channel		*bs_channel;
252 	fs_send_request_fn		send_request;
253 	bool				sync;
254 	pthread_spinlock_t		lock;
255 };
256 
257 static struct spdk_fs_request *
258 alloc_fs_request(struct spdk_fs_channel *channel)
259 {
260 	struct spdk_fs_request *req;
261 
262 	if (channel->sync) {
263 		pthread_spin_lock(&channel->lock);
264 	}
265 
266 	req = TAILQ_FIRST(&channel->reqs);
267 	if (req) {
268 		TAILQ_REMOVE(&channel->reqs, req, link);
269 	}
270 
271 	if (channel->sync) {
272 		pthread_spin_unlock(&channel->lock);
273 	}
274 
275 	if (req == NULL) {
276 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
277 		return NULL;
278 	}
279 	memset(req, 0, sizeof(*req));
280 	req->channel = channel;
281 	req->args.from_request = true;
282 
283 	return req;
284 }
285 
286 static void
287 free_fs_request(struct spdk_fs_request *req)
288 {
289 	struct spdk_fs_channel *channel = req->channel;
290 
291 	if (channel->sync) {
292 		pthread_spin_lock(&channel->lock);
293 	}
294 
295 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
296 
297 	if (channel->sync) {
298 		pthread_spin_unlock(&channel->lock);
299 	}
300 }
301 
302 static int
303 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
304 			uint32_t max_ops)
305 {
306 	uint32_t i;
307 
308 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
309 	if (!channel->req_mem) {
310 		return -1;
311 	}
312 
313 	TAILQ_INIT(&channel->reqs);
314 	sem_init(&channel->sem, 0, 0);
315 
316 	for (i = 0; i < max_ops; i++) {
317 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
318 	}
319 
320 	channel->fs = fs;
321 
322 	return 0;
323 }
324 
325 static int
326 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
327 {
328 	struct spdk_filesystem		*fs;
329 	struct spdk_fs_channel		*channel = ctx_buf;
330 
331 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
332 
333 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
334 }
335 
336 static int
337 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
338 {
339 	struct spdk_filesystem		*fs;
340 	struct spdk_fs_channel		*channel = ctx_buf;
341 
342 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
343 
344 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
345 }
346 
347 static int
348 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
349 {
350 	struct spdk_filesystem		*fs;
351 	struct spdk_fs_channel		*channel = ctx_buf;
352 
353 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
354 
355 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
356 }
357 
358 static void
359 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
360 {
361 	struct spdk_fs_channel *channel = ctx_buf;
362 
363 	free(channel->req_mem);
364 	if (channel->bs_channel != NULL) {
365 		spdk_bs_free_io_channel(channel->bs_channel);
366 	}
367 }
368 
369 static void
370 __send_request_direct(fs_request_fn fn, void *arg)
371 {
372 	fn(arg);
373 }
374 
375 static void
376 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
377 {
378 	fs->bs = bs;
379 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
380 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
381 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
382 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
383 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
384 
385 	pthread_mutex_lock(&g_cache_init_lock);
386 	if (g_fs_count == 0) {
387 		__initialize_cache();
388 	}
389 	g_fs_count++;
390 	pthread_mutex_unlock(&g_cache_init_lock);
391 }
392 
393 static void
394 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
395 {
396 	struct spdk_fs_request *req = ctx;
397 	struct spdk_fs_cb_args *args = &req->args;
398 	struct spdk_filesystem *fs = args->fs;
399 
400 	if (bserrno == 0) {
401 		common_fs_bs_init(fs, bs);
402 	} else {
403 		free(fs);
404 		fs = NULL;
405 	}
406 
407 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
408 	free_fs_request(req);
409 }
410 
411 static void
412 fs_conf_parse(void)
413 {
414 	struct spdk_conf_section *sp;
415 
416 	sp = spdk_conf_find_section(NULL, "Blobfs");
417 	if (sp == NULL) {
418 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
419 		return;
420 	}
421 
422 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
423 	if (g_fs_cache_buffer_shift <= 0) {
424 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
425 	}
426 }
427 
428 static struct spdk_filesystem *
429 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
430 {
431 	struct spdk_filesystem *fs;
432 
433 	fs = calloc(1, sizeof(*fs));
434 	if (fs == NULL) {
435 		return NULL;
436 	}
437 
438 	fs->bdev = dev;
439 	fs->send_request = send_request_fn;
440 	TAILQ_INIT(&fs->files);
441 
442 	fs->md_target.max_ops = 512;
443 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
444 				sizeof(struct spdk_fs_channel), "blobfs_md");
445 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
446 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
447 
448 	fs->sync_target.max_ops = 512;
449 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
450 				sizeof(struct spdk_fs_channel), "blobfs_sync");
451 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
452 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
453 
454 	fs->io_target.max_ops = 512;
455 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
456 				sizeof(struct spdk_fs_channel), "blobfs_io");
457 
458 	return fs;
459 }
460 
461 static void
462 __wake_caller(void *arg, int fserrno)
463 {
464 	struct spdk_fs_cb_args *args = arg;
465 
466 	args->rc = fserrno;
467 	sem_post(args->sem);
468 }
469 
470 void
471 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
472 	     fs_send_request_fn send_request_fn,
473 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
474 {
475 	struct spdk_filesystem *fs;
476 	struct spdk_fs_request *req;
477 	struct spdk_fs_cb_args *args;
478 	struct spdk_bs_opts opts = {};
479 
480 	fs = fs_alloc(dev, send_request_fn);
481 	if (fs == NULL) {
482 		cb_fn(cb_arg, NULL, -ENOMEM);
483 		return;
484 	}
485 
486 	fs_conf_parse();
487 
488 	req = alloc_fs_request(fs->md_target.md_fs_channel);
489 	if (req == NULL) {
490 		spdk_put_io_channel(fs->md_target.md_io_channel);
491 		spdk_io_device_unregister(&fs->md_target, NULL);
492 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
493 		spdk_io_device_unregister(&fs->sync_target, NULL);
494 		spdk_io_device_unregister(&fs->io_target, NULL);
495 		free(fs);
496 		cb_fn(cb_arg, NULL, -ENOMEM);
497 		return;
498 	}
499 
500 	args = &req->args;
501 	args->fn.fs_op_with_handle = cb_fn;
502 	args->arg = cb_arg;
503 	args->fs = fs;
504 
505 	spdk_bs_opts_init(&opts);
506 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
507 	if (opt) {
508 		opts.cluster_sz = opt->cluster_sz;
509 	}
510 	spdk_bs_init(dev, &opts, init_cb, req);
511 }
512 
513 static struct spdk_file *
514 file_alloc(struct spdk_filesystem *fs)
515 {
516 	struct spdk_file *file;
517 
518 	file = calloc(1, sizeof(*file));
519 	if (file == NULL) {
520 		return NULL;
521 	}
522 
523 	file->tree = calloc(1, sizeof(*file->tree));
524 	if (file->tree == NULL) {
525 		free(file);
526 		return NULL;
527 	}
528 
529 	file->fs = fs;
530 	TAILQ_INIT(&file->open_requests);
531 	TAILQ_INIT(&file->sync_requests);
532 	pthread_spin_init(&file->lock, 0);
533 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
534 	file->priority = SPDK_FILE_PRIORITY_LOW;
535 	return file;
536 }
537 
538 static void fs_load_done(void *ctx, int bserrno);
539 
540 static int
541 _handle_deleted_files(struct spdk_fs_request *req)
542 {
543 	struct spdk_fs_cb_args *args = &req->args;
544 	struct spdk_filesystem *fs = args->fs;
545 
546 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
547 		struct spdk_deleted_file *deleted_file;
548 
549 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
550 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
551 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
552 		free(deleted_file);
553 		return 0;
554 	}
555 
556 	return 1;
557 }
558 
559 static void
560 fs_load_done(void *ctx, int bserrno)
561 {
562 	struct spdk_fs_request *req = ctx;
563 	struct spdk_fs_cb_args *args = &req->args;
564 	struct spdk_filesystem *fs = args->fs;
565 
566 	/* The filesystem has been loaded.  Now check if there are any files that
567 	 *  were marked for deletion before last unload.  Do not complete the
568 	 *  fs_load callback until all of them have been deleted on disk.
569 	 */
570 	if (_handle_deleted_files(req) == 0) {
571 		/* We found a file that's been marked for deleting but not actually
572 		 *  deleted yet.  This function will get called again once the delete
573 		 *  operation is completed.
574 		 */
575 		return;
576 	}
577 
578 	args->fn.fs_op_with_handle(args->arg, fs, 0);
579 	free_fs_request(req);
580 
581 }
582 
583 static void
584 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
585 {
586 	struct spdk_fs_request *req = ctx;
587 	struct spdk_fs_cb_args *args = &req->args;
588 	struct spdk_filesystem *fs = args->fs;
589 	uint64_t *length;
590 	const char *name;
591 	uint32_t *is_deleted;
592 	size_t value_len;
593 
594 	if (rc < 0) {
595 		args->fn.fs_op_with_handle(args->arg, fs, rc);
596 		free_fs_request(req);
597 		return;
598 	}
599 
600 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
601 	if (rc < 0) {
602 		args->fn.fs_op_with_handle(args->arg, fs, rc);
603 		free_fs_request(req);
604 		return;
605 	}
606 
607 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
608 	if (rc < 0) {
609 		args->fn.fs_op_with_handle(args->arg, fs, rc);
610 		free_fs_request(req);
611 		return;
612 	}
613 
614 	assert(value_len == 8);
615 
616 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
617 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
618 	if (rc < 0) {
619 		struct spdk_file *f;
620 
621 		f = file_alloc(fs);
622 		if (f == NULL) {
623 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
624 			free_fs_request(req);
625 			return;
626 		}
627 
628 		f->name = strdup(name);
629 		f->blobid = spdk_blob_get_id(blob);
630 		f->length = *length;
631 		f->length_flushed = *length;
632 		f->append_pos = *length;
633 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
634 	} else {
635 		struct spdk_deleted_file *deleted_file;
636 
637 		deleted_file = calloc(1, sizeof(*deleted_file));
638 		if (deleted_file == NULL) {
639 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
640 			free_fs_request(req);
641 			return;
642 		}
643 		deleted_file->id = spdk_blob_get_id(blob);
644 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
645 	}
646 }
647 
648 static void
649 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
650 {
651 	struct spdk_fs_request *req = ctx;
652 	struct spdk_fs_cb_args *args = &req->args;
653 	struct spdk_filesystem *fs = args->fs;
654 	struct spdk_bs_type bstype;
655 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
656 	static const struct spdk_bs_type zeros;
657 
658 	if (bserrno != 0) {
659 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
660 		free_fs_request(req);
661 		free(fs);
662 		return;
663 	}
664 
665 	bstype = spdk_bs_get_bstype(bs);
666 
667 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
668 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
669 		spdk_bs_set_bstype(bs, blobfs_type);
670 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
671 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
672 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
673 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
674 		free_fs_request(req);
675 		free(fs);
676 		return;
677 	}
678 
679 	common_fs_bs_init(fs, bs);
680 	fs_load_done(req, 0);
681 }
682 
683 static void
684 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
685 {
686 	assert(fs != NULL);
687 	spdk_io_device_unregister(&fs->md_target, NULL);
688 	spdk_io_device_unregister(&fs->sync_target, NULL);
689 	spdk_io_device_unregister(&fs->io_target, NULL);
690 	free(fs);
691 }
692 
693 static void
694 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
695 {
696 	assert(fs != NULL);
697 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
698 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
699 }
700 
701 void
702 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
703 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
704 {
705 	struct spdk_filesystem *fs;
706 	struct spdk_fs_cb_args *args;
707 	struct spdk_fs_request *req;
708 	struct spdk_bs_opts	bs_opts;
709 
710 	fs = fs_alloc(dev, send_request_fn);
711 	if (fs == NULL) {
712 		cb_fn(cb_arg, NULL, -ENOMEM);
713 		return;
714 	}
715 
716 	fs_conf_parse();
717 
718 	req = alloc_fs_request(fs->md_target.md_fs_channel);
719 	if (req == NULL) {
720 		spdk_fs_free_io_channels(fs);
721 		spdk_fs_io_device_unregister(fs);
722 		cb_fn(cb_arg, NULL, -ENOMEM);
723 		return;
724 	}
725 
726 	args = &req->args;
727 	args->fn.fs_op_with_handle = cb_fn;
728 	args->arg = cb_arg;
729 	args->fs = fs;
730 	TAILQ_INIT(&args->op.fs_load.deleted_files);
731 	spdk_bs_opts_init(&bs_opts);
732 	bs_opts.iter_cb_fn = iter_cb;
733 	bs_opts.iter_cb_arg = req;
734 	spdk_bs_load(dev, &bs_opts, load_cb, req);
735 }
736 
737 static void
738 unload_cb(void *ctx, int bserrno)
739 {
740 	struct spdk_fs_request *req = ctx;
741 	struct spdk_fs_cb_args *args = &req->args;
742 	struct spdk_filesystem *fs = args->fs;
743 	struct spdk_file *file, *tmp;
744 
745 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
746 		TAILQ_REMOVE(&fs->files, file, tailq);
747 		cache_free_buffers(file);
748 		free(file->name);
749 		free(file->tree);
750 		free(file);
751 	}
752 
753 	pthread_mutex_lock(&g_cache_init_lock);
754 	g_fs_count--;
755 	if (g_fs_count == 0) {
756 		__free_cache();
757 	}
758 	pthread_mutex_unlock(&g_cache_init_lock);
759 
760 	args->fn.fs_op(args->arg, bserrno);
761 	free(req);
762 
763 	spdk_fs_io_device_unregister(fs);
764 }
765 
766 void
767 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
768 {
769 	struct spdk_fs_request *req;
770 	struct spdk_fs_cb_args *args;
771 
772 	/*
773 	 * We must free the md_channel before unloading the blobstore, so just
774 	 *  allocate this request from the general heap.
775 	 */
776 	req = calloc(1, sizeof(*req));
777 	if (req == NULL) {
778 		cb_fn(cb_arg, -ENOMEM);
779 		return;
780 	}
781 
782 	args = &req->args;
783 	args->fn.fs_op = cb_fn;
784 	args->arg = cb_arg;
785 	args->fs = fs;
786 
787 	spdk_fs_free_io_channels(fs);
788 	spdk_bs_unload(fs->bs, unload_cb, req);
789 }
790 
791 static struct spdk_file *
792 fs_find_file(struct spdk_filesystem *fs, const char *name)
793 {
794 	struct spdk_file *file;
795 
796 	TAILQ_FOREACH(file, &fs->files, tailq) {
797 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
798 			return file;
799 		}
800 	}
801 
802 	return NULL;
803 }
804 
805 void
806 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
807 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
808 {
809 	struct spdk_file_stat stat;
810 	struct spdk_file *f = NULL;
811 
812 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
813 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
814 		return;
815 	}
816 
817 	f = fs_find_file(fs, name);
818 	if (f != NULL) {
819 		stat.blobid = f->blobid;
820 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
821 		cb_fn(cb_arg, &stat, 0);
822 		return;
823 	}
824 
825 	cb_fn(cb_arg, NULL, -ENOENT);
826 }
827 
828 static void
829 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
830 {
831 	struct spdk_fs_request *req = arg;
832 	struct spdk_fs_cb_args *args = &req->args;
833 
834 	args->rc = fserrno;
835 	if (fserrno == 0) {
836 		memcpy(args->arg, stat, sizeof(*stat));
837 	}
838 	sem_post(args->sem);
839 }
840 
841 static void
842 __file_stat(void *arg)
843 {
844 	struct spdk_fs_request *req = arg;
845 	struct spdk_fs_cb_args *args = &req->args;
846 
847 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
848 				args->fn.stat_op, req);
849 }
850 
851 int
852 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
853 		  const char *name, struct spdk_file_stat *stat)
854 {
855 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
856 	struct spdk_fs_request *req;
857 	int rc;
858 
859 	req = alloc_fs_request(channel);
860 	if (req == NULL) {
861 		return -ENOMEM;
862 	}
863 
864 	req->args.fs = fs;
865 	req->args.op.stat.name = name;
866 	req->args.fn.stat_op = __copy_stat;
867 	req->args.arg = stat;
868 	req->args.sem = &channel->sem;
869 	channel->send_request(__file_stat, req);
870 	sem_wait(&channel->sem);
871 
872 	rc = req->args.rc;
873 	free_fs_request(req);
874 
875 	return rc;
876 }
877 
878 static void
879 fs_create_blob_close_cb(void *ctx, int bserrno)
880 {
881 	int rc;
882 	struct spdk_fs_request *req = ctx;
883 	struct spdk_fs_cb_args *args = &req->args;
884 
885 	rc = args->rc ? args->rc : bserrno;
886 	args->fn.file_op(args->arg, rc);
887 	free_fs_request(req);
888 }
889 
890 static void
891 fs_create_blob_resize_cb(void *ctx, int bserrno)
892 {
893 	struct spdk_fs_request *req = ctx;
894 	struct spdk_fs_cb_args *args = &req->args;
895 	struct spdk_file *f = args->file;
896 	struct spdk_blob *blob = args->op.create.blob;
897 	uint64_t length = 0;
898 
899 	args->rc = bserrno;
900 	if (bserrno) {
901 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
902 		return;
903 	}
904 
905 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
906 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
907 
908 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
909 }
910 
911 static void
912 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
913 {
914 	struct spdk_fs_request *req = ctx;
915 	struct spdk_fs_cb_args *args = &req->args;
916 
917 	if (bserrno) {
918 		args->fn.file_op(args->arg, bserrno);
919 		free_fs_request(req);
920 		return;
921 	}
922 
923 	args->op.create.blob = blob;
924 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
925 }
926 
927 static void
928 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
929 {
930 	struct spdk_fs_request *req = ctx;
931 	struct spdk_fs_cb_args *args = &req->args;
932 	struct spdk_file *f = args->file;
933 
934 	if (bserrno) {
935 		args->fn.file_op(args->arg, bserrno);
936 		free_fs_request(req);
937 		return;
938 	}
939 
940 	f->blobid = blobid;
941 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
942 }
943 
944 void
945 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
946 			  spdk_file_op_complete cb_fn, void *cb_arg)
947 {
948 	struct spdk_file *file;
949 	struct spdk_fs_request *req;
950 	struct spdk_fs_cb_args *args;
951 
952 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
953 		cb_fn(cb_arg, -ENAMETOOLONG);
954 		return;
955 	}
956 
957 	file = fs_find_file(fs, name);
958 	if (file != NULL) {
959 		cb_fn(cb_arg, -EEXIST);
960 		return;
961 	}
962 
963 	file = file_alloc(fs);
964 	if (file == NULL) {
965 		cb_fn(cb_arg, -ENOMEM);
966 		return;
967 	}
968 
969 	req = alloc_fs_request(fs->md_target.md_fs_channel);
970 	if (req == NULL) {
971 		cb_fn(cb_arg, -ENOMEM);
972 		return;
973 	}
974 
975 	args = &req->args;
976 	args->file = file;
977 	args->fn.file_op = cb_fn;
978 	args->arg = cb_arg;
979 
980 	file->name = strdup(name);
981 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
982 }
983 
984 static void
985 __fs_create_file_done(void *arg, int fserrno)
986 {
987 	struct spdk_fs_request *req = arg;
988 	struct spdk_fs_cb_args *args = &req->args;
989 
990 	args->rc = fserrno;
991 	sem_post(args->sem);
992 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
993 }
994 
995 static void
996 __fs_create_file(void *arg)
997 {
998 	struct spdk_fs_request *req = arg;
999 	struct spdk_fs_cb_args *args = &req->args;
1000 
1001 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1002 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1003 }
1004 
1005 int
1006 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
1007 {
1008 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1009 	struct spdk_fs_request *req;
1010 	struct spdk_fs_cb_args *args;
1011 	int rc;
1012 
1013 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1014 
1015 	req = alloc_fs_request(channel);
1016 	if (req == NULL) {
1017 		return -ENOMEM;
1018 	}
1019 
1020 	args = &req->args;
1021 	args->fs = fs;
1022 	args->op.create.name = name;
1023 	args->sem = &channel->sem;
1024 	fs->send_request(__fs_create_file, req);
1025 	sem_wait(&channel->sem);
1026 	rc = args->rc;
1027 	free_fs_request(req);
1028 
1029 	return rc;
1030 }
1031 
1032 static void
1033 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1034 {
1035 	struct spdk_fs_request *req = ctx;
1036 	struct spdk_fs_cb_args *args = &req->args;
1037 	struct spdk_file *f = args->file;
1038 
1039 	f->blob = blob;
1040 	while (!TAILQ_EMPTY(&f->open_requests)) {
1041 		req = TAILQ_FIRST(&f->open_requests);
1042 		args = &req->args;
1043 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1044 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1045 		free_fs_request(req);
1046 	}
1047 }
1048 
1049 static void
1050 fs_open_blob_create_cb(void *ctx, int bserrno)
1051 {
1052 	struct spdk_fs_request *req = ctx;
1053 	struct spdk_fs_cb_args *args = &req->args;
1054 	struct spdk_file *file = args->file;
1055 	struct spdk_filesystem *fs = args->fs;
1056 
1057 	if (file == NULL) {
1058 		/*
1059 		 * This is from an open with CREATE flag - the file
1060 		 *  is now created so look it up in the file list for this
1061 		 *  filesystem.
1062 		 */
1063 		file = fs_find_file(fs, args->op.open.name);
1064 		assert(file != NULL);
1065 		args->file = file;
1066 	}
1067 
1068 	file->ref_count++;
1069 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1070 	if (file->ref_count == 1) {
1071 		assert(file->blob == NULL);
1072 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1073 	} else if (file->blob != NULL) {
1074 		fs_open_blob_done(req, file->blob, 0);
1075 	} else {
1076 		/*
1077 		 * The blob open for this file is in progress due to a previous
1078 		 *  open request.  When that open completes, it will invoke the
1079 		 *  open callback for this request.
1080 		 */
1081 	}
1082 }
1083 
1084 void
1085 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1086 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1087 {
1088 	struct spdk_file *f = NULL;
1089 	struct spdk_fs_request *req;
1090 	struct spdk_fs_cb_args *args;
1091 
1092 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1093 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1094 		return;
1095 	}
1096 
1097 	f = fs_find_file(fs, name);
1098 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1099 		cb_fn(cb_arg, NULL, -ENOENT);
1100 		return;
1101 	}
1102 
1103 	if (f != NULL && f->is_deleted == true) {
1104 		cb_fn(cb_arg, NULL, -ENOENT);
1105 		return;
1106 	}
1107 
1108 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1109 	if (req == NULL) {
1110 		cb_fn(cb_arg, NULL, -ENOMEM);
1111 		return;
1112 	}
1113 
1114 	args = &req->args;
1115 	args->fn.file_op_with_handle = cb_fn;
1116 	args->arg = cb_arg;
1117 	args->file = f;
1118 	args->fs = fs;
1119 	args->op.open.name = name;
1120 
1121 	if (f == NULL) {
1122 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1123 	} else {
1124 		fs_open_blob_create_cb(req, 0);
1125 	}
1126 }
1127 
1128 static void
1129 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1130 {
1131 	struct spdk_fs_request *req = arg;
1132 	struct spdk_fs_cb_args *args = &req->args;
1133 
1134 	args->file = file;
1135 	__wake_caller(args, bserrno);
1136 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1137 }
1138 
1139 static void
1140 __fs_open_file(void *arg)
1141 {
1142 	struct spdk_fs_request *req = arg;
1143 	struct spdk_fs_cb_args *args = &req->args;
1144 
1145 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1146 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1147 				__fs_open_file_done, req);
1148 }
1149 
1150 int
1151 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1152 		  const char *name, uint32_t flags, struct spdk_file **file)
1153 {
1154 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1155 	struct spdk_fs_request *req;
1156 	struct spdk_fs_cb_args *args;
1157 	int rc;
1158 
1159 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1160 
1161 	req = alloc_fs_request(channel);
1162 	if (req == NULL) {
1163 		return -ENOMEM;
1164 	}
1165 
1166 	args = &req->args;
1167 	args->fs = fs;
1168 	args->op.open.name = name;
1169 	args->op.open.flags = flags;
1170 	args->sem = &channel->sem;
1171 	fs->send_request(__fs_open_file, req);
1172 	sem_wait(&channel->sem);
1173 	rc = args->rc;
1174 	if (rc == 0) {
1175 		*file = args->file;
1176 	} else {
1177 		*file = NULL;
1178 	}
1179 	free_fs_request(req);
1180 
1181 	return rc;
1182 }
1183 
1184 static void
1185 fs_rename_blob_close_cb(void *ctx, int bserrno)
1186 {
1187 	struct spdk_fs_request *req = ctx;
1188 	struct spdk_fs_cb_args *args = &req->args;
1189 
1190 	args->fn.fs_op(args->arg, bserrno);
1191 	free_fs_request(req);
1192 }
1193 
1194 static void
1195 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1196 {
1197 	struct spdk_fs_request *req = ctx;
1198 	struct spdk_fs_cb_args *args = &req->args;
1199 	const char *new_name = args->op.rename.new_name;
1200 
1201 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1202 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1203 }
1204 
1205 static void
1206 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1207 {
1208 	struct spdk_fs_cb_args *args = &req->args;
1209 	struct spdk_file *f;
1210 
1211 	f = fs_find_file(args->fs, args->op.rename.old_name);
1212 	if (f == NULL) {
1213 		args->fn.fs_op(args->arg, -ENOENT);
1214 		free_fs_request(req);
1215 		return;
1216 	}
1217 
1218 	free(f->name);
1219 	f->name = strdup(args->op.rename.new_name);
1220 	args->file = f;
1221 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1222 }
1223 
1224 static void
1225 fs_rename_delete_done(void *arg, int fserrno)
1226 {
1227 	__spdk_fs_md_rename_file(arg);
1228 }
1229 
1230 void
1231 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1232 			  const char *old_name, const char *new_name,
1233 			  spdk_file_op_complete cb_fn, void *cb_arg)
1234 {
1235 	struct spdk_file *f;
1236 	struct spdk_fs_request *req;
1237 	struct spdk_fs_cb_args *args;
1238 
1239 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1240 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1241 		cb_fn(cb_arg, -ENAMETOOLONG);
1242 		return;
1243 	}
1244 
1245 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1246 	if (req == NULL) {
1247 		cb_fn(cb_arg, -ENOMEM);
1248 		return;
1249 	}
1250 
1251 	args = &req->args;
1252 	args->fn.fs_op = cb_fn;
1253 	args->fs = fs;
1254 	args->arg = cb_arg;
1255 	args->op.rename.old_name = old_name;
1256 	args->op.rename.new_name = new_name;
1257 
1258 	f = fs_find_file(fs, new_name);
1259 	if (f == NULL) {
1260 		__spdk_fs_md_rename_file(req);
1261 		return;
1262 	}
1263 
1264 	/*
1265 	 * The rename overwrites an existing file.  So delete the existing file, then
1266 	 *  do the actual rename.
1267 	 */
1268 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1269 }
1270 
1271 static void
1272 __fs_rename_file_done(void *arg, int fserrno)
1273 {
1274 	struct spdk_fs_request *req = arg;
1275 	struct spdk_fs_cb_args *args = &req->args;
1276 
1277 	__wake_caller(args, fserrno);
1278 }
1279 
1280 static void
1281 __fs_rename_file(void *arg)
1282 {
1283 	struct spdk_fs_request *req = arg;
1284 	struct spdk_fs_cb_args *args = &req->args;
1285 
1286 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1287 				  __fs_rename_file_done, req);
1288 }
1289 
1290 int
1291 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1292 		    const char *old_name, const char *new_name)
1293 {
1294 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1295 	struct spdk_fs_request *req;
1296 	struct spdk_fs_cb_args *args;
1297 	int rc;
1298 
1299 	req = alloc_fs_request(channel);
1300 	if (req == NULL) {
1301 		return -ENOMEM;
1302 	}
1303 
1304 	args = &req->args;
1305 
1306 	args->fs = fs;
1307 	args->op.rename.old_name = old_name;
1308 	args->op.rename.new_name = new_name;
1309 	args->sem = &channel->sem;
1310 	fs->send_request(__fs_rename_file, req);
1311 	sem_wait(&channel->sem);
1312 	rc = args->rc;
1313 	free_fs_request(req);
1314 	return rc;
1315 }
1316 
1317 static void
1318 blob_delete_cb(void *ctx, int bserrno)
1319 {
1320 	struct spdk_fs_request *req = ctx;
1321 	struct spdk_fs_cb_args *args = &req->args;
1322 
1323 	args->fn.file_op(args->arg, bserrno);
1324 	free_fs_request(req);
1325 }
1326 
1327 void
1328 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1329 			  spdk_file_op_complete cb_fn, void *cb_arg)
1330 {
1331 	struct spdk_file *f;
1332 	spdk_blob_id blobid;
1333 	struct spdk_fs_request *req;
1334 	struct spdk_fs_cb_args *args;
1335 
1336 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1337 
1338 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1339 		cb_fn(cb_arg, -ENAMETOOLONG);
1340 		return;
1341 	}
1342 
1343 	f = fs_find_file(fs, name);
1344 	if (f == NULL) {
1345 		cb_fn(cb_arg, -ENOENT);
1346 		return;
1347 	}
1348 
1349 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1350 	if (req == NULL) {
1351 		cb_fn(cb_arg, -ENOMEM);
1352 		return;
1353 	}
1354 
1355 	args = &req->args;
1356 	args->fn.file_op = cb_fn;
1357 	args->arg = cb_arg;
1358 
1359 	if (f->ref_count > 0) {
1360 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1361 		f->is_deleted = true;
1362 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1363 		spdk_blob_sync_md(f->blob, blob_delete_cb, args);
1364 		return;
1365 	}
1366 
1367 	TAILQ_REMOVE(&fs->files, f, tailq);
1368 
1369 	cache_free_buffers(f);
1370 
1371 	blobid = f->blobid;
1372 
1373 	free(f->name);
1374 	free(f->tree);
1375 	free(f);
1376 
1377 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1378 }
1379 
1380 static void
1381 __fs_delete_file_done(void *arg, int fserrno)
1382 {
1383 	struct spdk_fs_request *req = arg;
1384 	struct spdk_fs_cb_args *args = &req->args;
1385 
1386 	__wake_caller(args, fserrno);
1387 }
1388 
1389 static void
1390 __fs_delete_file(void *arg)
1391 {
1392 	struct spdk_fs_request *req = arg;
1393 	struct spdk_fs_cb_args *args = &req->args;
1394 
1395 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1396 }
1397 
1398 int
1399 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1400 		    const char *name)
1401 {
1402 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1403 	struct spdk_fs_request *req;
1404 	struct spdk_fs_cb_args *args;
1405 	int rc;
1406 
1407 	req = alloc_fs_request(channel);
1408 	if (req == NULL) {
1409 		return -ENOMEM;
1410 	}
1411 
1412 	args = &req->args;
1413 	args->fs = fs;
1414 	args->op.delete.name = name;
1415 	args->sem = &channel->sem;
1416 	fs->send_request(__fs_delete_file, req);
1417 	sem_wait(&channel->sem);
1418 	rc = args->rc;
1419 	free_fs_request(req);
1420 
1421 	return rc;
1422 }
1423 
1424 spdk_fs_iter
1425 spdk_fs_iter_first(struct spdk_filesystem *fs)
1426 {
1427 	struct spdk_file *f;
1428 
1429 	f = TAILQ_FIRST(&fs->files);
1430 	return f;
1431 }
1432 
1433 spdk_fs_iter
1434 spdk_fs_iter_next(spdk_fs_iter iter)
1435 {
1436 	struct spdk_file *f = iter;
1437 
1438 	if (f == NULL) {
1439 		return NULL;
1440 	}
1441 
1442 	f = TAILQ_NEXT(f, tailq);
1443 	return f;
1444 }
1445 
1446 const char *
1447 spdk_file_get_name(struct spdk_file *file)
1448 {
1449 	return file->name;
1450 }
1451 
1452 uint64_t
1453 spdk_file_get_length(struct spdk_file *file)
1454 {
1455 	uint64_t length;
1456 
1457 	assert(file != NULL);
1458 
1459 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1460 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1461 	return length;
1462 }
1463 
1464 static void
1465 fs_truncate_complete_cb(void *ctx, int bserrno)
1466 {
1467 	struct spdk_fs_request *req = ctx;
1468 	struct spdk_fs_cb_args *args = &req->args;
1469 
1470 	args->fn.file_op(args->arg, bserrno);
1471 	free_fs_request(req);
1472 }
1473 
1474 static void
1475 fs_truncate_resize_cb(void *ctx, int bserrno)
1476 {
1477 	struct spdk_fs_request *req = ctx;
1478 	struct spdk_fs_cb_args *args = &req->args;
1479 	struct spdk_file *file = args->file;
1480 	uint64_t *length = &args->op.truncate.length;
1481 
1482 	if (bserrno) {
1483 		args->fn.file_op(args->arg, bserrno);
1484 		free_fs_request(req);
1485 		return;
1486 	}
1487 
1488 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1489 
1490 	file->length = *length;
1491 	if (file->append_pos > file->length) {
1492 		file->append_pos = file->length;
1493 	}
1494 
1495 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args);
1496 }
1497 
1498 static uint64_t
1499 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1500 {
1501 	return (length + cluster_sz - 1) / cluster_sz;
1502 }
1503 
1504 void
1505 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1506 			 spdk_file_op_complete cb_fn, void *cb_arg)
1507 {
1508 	struct spdk_filesystem *fs;
1509 	size_t num_clusters;
1510 	struct spdk_fs_request *req;
1511 	struct spdk_fs_cb_args *args;
1512 
1513 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1514 	if (length == file->length) {
1515 		cb_fn(cb_arg, 0);
1516 		return;
1517 	}
1518 
1519 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1520 	if (req == NULL) {
1521 		cb_fn(cb_arg, -ENOMEM);
1522 		return;
1523 	}
1524 
1525 	args = &req->args;
1526 	args->fn.file_op = cb_fn;
1527 	args->arg = cb_arg;
1528 	args->file = file;
1529 	args->op.truncate.length = length;
1530 	fs = file->fs;
1531 
1532 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1533 
1534 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1535 }
1536 
1537 static void
1538 __truncate(void *arg)
1539 {
1540 	struct spdk_fs_request *req = arg;
1541 	struct spdk_fs_cb_args *args = &req->args;
1542 
1543 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1544 				 args->fn.file_op, args);
1545 }
1546 
1547 int
1548 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1549 		   uint64_t length)
1550 {
1551 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1552 	struct spdk_fs_request *req;
1553 	struct spdk_fs_cb_args *args;
1554 	int rc;
1555 
1556 	req = alloc_fs_request(channel);
1557 	if (req == NULL) {
1558 		return -ENOMEM;
1559 	}
1560 
1561 	args = &req->args;
1562 
1563 	args->file = file;
1564 	args->op.truncate.length = length;
1565 	args->fn.file_op = __wake_caller;
1566 	args->sem = &channel->sem;
1567 
1568 	channel->send_request(__truncate, req);
1569 	sem_wait(&channel->sem);
1570 	rc = args->rc;
1571 	free_fs_request(req);
1572 
1573 	return rc;
1574 }
1575 
1576 static void
1577 __rw_done(void *ctx, int bserrno)
1578 {
1579 	struct spdk_fs_request *req = ctx;
1580 	struct spdk_fs_cb_args *args = &req->args;
1581 
1582 	spdk_dma_free(args->op.rw.pin_buf);
1583 	args->fn.file_op(args->arg, bserrno);
1584 	free_fs_request(req);
1585 }
1586 
1587 static void
1588 __read_done(void *ctx, int bserrno)
1589 {
1590 	struct spdk_fs_request *req = ctx;
1591 	struct spdk_fs_cb_args *args = &req->args;
1592 
1593 	assert(req != NULL);
1594 	if (args->op.rw.is_read) {
1595 		memcpy(args->op.rw.user_buf,
1596 		       args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1597 		       args->op.rw.length);
1598 		__rw_done(req, 0);
1599 	} else {
1600 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)),
1601 		       args->op.rw.user_buf,
1602 		       args->op.rw.length);
1603 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1604 				   args->op.rw.pin_buf,
1605 				   args->op.rw.start_lba, args->op.rw.num_lba,
1606 				   __rw_done, req);
1607 	}
1608 }
1609 
1610 static void
1611 __do_blob_read(void *ctx, int fserrno)
1612 {
1613 	struct spdk_fs_request *req = ctx;
1614 	struct spdk_fs_cb_args *args = &req->args;
1615 
1616 	if (fserrno) {
1617 		__rw_done(req, fserrno);
1618 		return;
1619 	}
1620 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1621 			  args->op.rw.pin_buf,
1622 			  args->op.rw.start_lba, args->op.rw.num_lba,
1623 			  __read_done, req);
1624 }
1625 
1626 static void
1627 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1628 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1629 {
1630 	uint64_t end_lba;
1631 
1632 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1633 	*start_lba = offset / *lba_size;
1634 	end_lba = (offset + length - 1) / *lba_size;
1635 	*num_lba = (end_lba - *start_lba + 1);
1636 }
1637 
1638 static void
1639 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1640 	    void *payload, uint64_t offset, uint64_t length,
1641 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1642 {
1643 	struct spdk_fs_request *req;
1644 	struct spdk_fs_cb_args *args;
1645 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1646 	uint64_t start_lba, num_lba, pin_buf_length;
1647 	uint32_t lba_size;
1648 
1649 	if (is_read && offset + length > file->length) {
1650 		cb_fn(cb_arg, -EINVAL);
1651 		return;
1652 	}
1653 
1654 	req = alloc_fs_request(channel);
1655 	if (req == NULL) {
1656 		cb_fn(cb_arg, -ENOMEM);
1657 		return;
1658 	}
1659 
1660 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1661 
1662 	args = &req->args;
1663 	args->fn.file_op = cb_fn;
1664 	args->arg = cb_arg;
1665 	args->file = file;
1666 	args->op.rw.channel = channel->bs_channel;
1667 	args->op.rw.user_buf = payload;
1668 	args->op.rw.is_read = is_read;
1669 	args->op.rw.offset = offset;
1670 	args->op.rw.length = length;
1671 	args->op.rw.blocklen = lba_size;
1672 
1673 	pin_buf_length = num_lba * lba_size;
1674 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, lba_size, NULL);
1675 	if (args->op.rw.pin_buf == NULL) {
1676 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1677 			      file->name, offset, length);
1678 		free_fs_request(req);
1679 		cb_fn(cb_arg, -ENOMEM);
1680 		return;
1681 	}
1682 
1683 	args->op.rw.start_lba = start_lba;
1684 	args->op.rw.num_lba = num_lba;
1685 
1686 	if (!is_read && file->length < offset + length) {
1687 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1688 	} else {
1689 		__do_blob_read(req, 0);
1690 	}
1691 }
1692 
1693 void
1694 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1695 		      void *payload, uint64_t offset, uint64_t length,
1696 		      spdk_file_op_complete cb_fn, void *cb_arg)
1697 {
1698 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1699 }
1700 
1701 void
1702 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1703 		     void *payload, uint64_t offset, uint64_t length,
1704 		     spdk_file_op_complete cb_fn, void *cb_arg)
1705 {
1706 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1707 		      file->name, offset, length);
1708 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1709 }
1710 
1711 struct spdk_io_channel *
1712 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1713 {
1714 	struct spdk_io_channel *io_channel;
1715 	struct spdk_fs_channel *fs_channel;
1716 
1717 	io_channel = spdk_get_io_channel(&fs->io_target);
1718 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1719 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1720 	fs_channel->send_request = __send_request_direct;
1721 
1722 	return io_channel;
1723 }
1724 
1725 struct spdk_io_channel *
1726 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1727 {
1728 	struct spdk_io_channel *io_channel;
1729 	struct spdk_fs_channel *fs_channel;
1730 
1731 	io_channel = spdk_get_io_channel(&fs->io_target);
1732 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1733 	fs_channel->send_request = fs->send_request;
1734 	fs_channel->sync = 1;
1735 	pthread_spin_init(&fs_channel->lock, 0);
1736 
1737 	return io_channel;
1738 }
1739 
1740 void
1741 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1742 {
1743 	spdk_put_io_channel(channel);
1744 }
1745 
1746 void
1747 spdk_fs_set_cache_size(uint64_t size_in_mb)
1748 {
1749 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1750 }
1751 
1752 uint64_t
1753 spdk_fs_get_cache_size(void)
1754 {
1755 	return g_fs_cache_size / (1024 * 1024);
1756 }
1757 
1758 static void __file_flush(void *_args);
1759 
1760 static void *
1761 alloc_cache_memory_buffer(struct spdk_file *context)
1762 {
1763 	struct spdk_file *file;
1764 	void *buf;
1765 
1766 	buf = spdk_mempool_get(g_cache_pool);
1767 	if (buf != NULL) {
1768 		return buf;
1769 	}
1770 
1771 	pthread_spin_lock(&g_caches_lock);
1772 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1773 		if (!file->open_for_writing &&
1774 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1775 		    file != context) {
1776 			break;
1777 		}
1778 	}
1779 	pthread_spin_unlock(&g_caches_lock);
1780 	if (file != NULL) {
1781 		cache_free_buffers(file);
1782 		buf = spdk_mempool_get(g_cache_pool);
1783 		if (buf != NULL) {
1784 			return buf;
1785 		}
1786 	}
1787 
1788 	pthread_spin_lock(&g_caches_lock);
1789 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1790 		if (!file->open_for_writing && file != context) {
1791 			break;
1792 		}
1793 	}
1794 	pthread_spin_unlock(&g_caches_lock);
1795 	if (file != NULL) {
1796 		cache_free_buffers(file);
1797 		buf = spdk_mempool_get(g_cache_pool);
1798 		if (buf != NULL) {
1799 			return buf;
1800 		}
1801 	}
1802 
1803 	pthread_spin_lock(&g_caches_lock);
1804 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1805 		if (file != context) {
1806 			break;
1807 		}
1808 	}
1809 	pthread_spin_unlock(&g_caches_lock);
1810 	if (file != NULL) {
1811 		cache_free_buffers(file);
1812 		buf = spdk_mempool_get(g_cache_pool);
1813 		if (buf != NULL) {
1814 			return buf;
1815 		}
1816 	}
1817 
1818 	return NULL;
1819 }
1820 
1821 static struct cache_buffer *
1822 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1823 {
1824 	struct cache_buffer *buf;
1825 	int count = 0;
1826 
1827 	buf = calloc(1, sizeof(*buf));
1828 	if (buf == NULL) {
1829 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
1830 		return NULL;
1831 	}
1832 
1833 	buf->buf = alloc_cache_memory_buffer(file);
1834 	while (buf->buf == NULL) {
1835 		/*
1836 		 * TODO: alloc_cache_memory_buffer() should eventually free
1837 		 *  some buffers.  Need a more sophisticated check here, instead
1838 		 *  of just bailing if 100 tries does not result in getting a
1839 		 *  free buffer.  This will involve using the sync channel's
1840 		 *  semaphore to block until a buffer becomes available.
1841 		 */
1842 		if (count++ == 100) {
1843 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
1844 				    file, offset);
1845 			free(buf);
1846 			return NULL;
1847 		}
1848 		buf->buf = alloc_cache_memory_buffer(file);
1849 	}
1850 
1851 	buf->buf_size = CACHE_BUFFER_SIZE;
1852 	buf->offset = offset;
1853 
1854 	pthread_spin_lock(&g_caches_lock);
1855 	if (file->tree->present_mask == 0) {
1856 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1857 	}
1858 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1859 	pthread_spin_unlock(&g_caches_lock);
1860 
1861 	return buf;
1862 }
1863 
1864 static struct cache_buffer *
1865 cache_append_buffer(struct spdk_file *file)
1866 {
1867 	struct cache_buffer *last;
1868 
1869 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1870 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1871 
1872 	last = cache_insert_buffer(file, file->append_pos);
1873 	if (last == NULL) {
1874 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
1875 		return NULL;
1876 	}
1877 
1878 	file->last = last;
1879 
1880 	return last;
1881 }
1882 
1883 static void __check_sync_reqs(struct spdk_file *file);
1884 
1885 static void
1886 __file_cache_finish_sync(void *ctx, int bserrno)
1887 {
1888 	struct spdk_file *file = ctx;
1889 	struct spdk_fs_request *sync_req;
1890 	struct spdk_fs_cb_args *sync_args;
1891 
1892 	pthread_spin_lock(&file->lock);
1893 	sync_req = TAILQ_FIRST(&file->sync_requests);
1894 	sync_args = &sync_req->args;
1895 	assert(sync_args->op.sync.offset <= file->length_flushed);
1896 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1897 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1898 	pthread_spin_unlock(&file->lock);
1899 
1900 	sync_args->fn.file_op(sync_args->arg, bserrno);
1901 	__check_sync_reqs(file);
1902 
1903 	pthread_spin_lock(&file->lock);
1904 	free_fs_request(sync_req);
1905 	pthread_spin_unlock(&file->lock);
1906 }
1907 
1908 static void
1909 __free_args(struct spdk_fs_cb_args *args)
1910 {
1911 	struct spdk_fs_request *req;
1912 
1913 	if (!args->from_request) {
1914 		free(args);
1915 	} else {
1916 		/* Depends on args being at the start of the spdk_fs_request structure. */
1917 		req = (struct spdk_fs_request *)args;
1918 		free_fs_request(req);
1919 	}
1920 }
1921 
1922 static void
1923 __check_sync_reqs(struct spdk_file *file)
1924 {
1925 	struct spdk_fs_request *sync_req;
1926 
1927 	pthread_spin_lock(&file->lock);
1928 
1929 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1930 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1931 			break;
1932 		}
1933 	}
1934 
1935 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1936 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1937 		sync_req->args.op.sync.xattr_in_progress = true;
1938 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
1939 				    sizeof(file->length_flushed));
1940 
1941 		pthread_spin_unlock(&file->lock);
1942 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file);
1943 	} else {
1944 		pthread_spin_unlock(&file->lock);
1945 	}
1946 }
1947 
1948 static void
1949 __file_flush_done(void *arg, int bserrno)
1950 {
1951 	struct spdk_fs_cb_args *args = arg;
1952 	struct spdk_file *file = args->file;
1953 	struct cache_buffer *next = args->op.flush.cache_buffer;
1954 
1955 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1956 
1957 	pthread_spin_lock(&file->lock);
1958 	next->in_progress = false;
1959 	next->bytes_flushed += args->op.flush.length;
1960 	file->length_flushed += args->op.flush.length;
1961 	if (file->length_flushed > file->length) {
1962 		file->length = file->length_flushed;
1963 	}
1964 	if (next->bytes_flushed == next->buf_size) {
1965 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1966 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1967 	}
1968 
1969 	/*
1970 	 * Assert that there is no cached data that extends past the end of the underlying
1971 	 *  blob.
1972 	 */
1973 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1974 	       next->bytes_filled == 0);
1975 
1976 	pthread_spin_unlock(&file->lock);
1977 
1978 	__check_sync_reqs(file);
1979 
1980 	__file_flush(args);
1981 }
1982 
1983 static void
1984 __file_flush(void *_args)
1985 {
1986 	struct spdk_fs_cb_args *args = _args;
1987 	struct spdk_file *file = args->file;
1988 	struct cache_buffer *next;
1989 	uint64_t offset, length, start_lba, num_lba;
1990 	uint32_t lba_size;
1991 
1992 	pthread_spin_lock(&file->lock);
1993 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1994 	if (next == NULL || next->in_progress) {
1995 		/*
1996 		 * There is either no data to flush, or a flush I/O is already in
1997 		 *  progress.  So return immediately - if a flush I/O is in
1998 		 *  progress we will flush more data after that is completed.
1999 		 */
2000 		__free_args(args);
2001 		if (next == NULL) {
2002 			/*
2003 			 * For cases where a file's cache was evicted, and then the
2004 			 *  file was later appended, we will write the data directly
2005 			 *  to disk and bypass cache.  So just update length_flushed
2006 			 *  here to reflect that all data was already written to disk.
2007 			 */
2008 			file->length_flushed = file->append_pos;
2009 		}
2010 		pthread_spin_unlock(&file->lock);
2011 		if (next == NULL) {
2012 			/*
2013 			 * There is no data to flush, but we still need to check for any
2014 			 *  outstanding sync requests to make sure metadata gets updated.
2015 			 */
2016 			__check_sync_reqs(file);
2017 		}
2018 		return;
2019 	}
2020 
2021 	offset = next->offset + next->bytes_flushed;
2022 	length = next->bytes_filled - next->bytes_flushed;
2023 	if (length == 0) {
2024 		__free_args(args);
2025 		pthread_spin_unlock(&file->lock);
2026 		return;
2027 	}
2028 	args->op.flush.length = length;
2029 	args->op.flush.cache_buffer = next;
2030 
2031 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2032 
2033 	next->in_progress = true;
2034 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2035 		     offset, length, start_lba, num_lba);
2036 	pthread_spin_unlock(&file->lock);
2037 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2038 			   next->buf + (start_lba * lba_size) - next->offset,
2039 			   start_lba, num_lba, __file_flush_done, args);
2040 }
2041 
2042 static void
2043 __file_extend_done(void *arg, int bserrno)
2044 {
2045 	struct spdk_fs_cb_args *args = arg;
2046 
2047 	__wake_caller(args, bserrno);
2048 }
2049 
2050 static void
2051 __file_extend_resize_cb(void *_args, int bserrno)
2052 {
2053 	struct spdk_fs_cb_args *args = _args;
2054 	struct spdk_file *file = args->file;
2055 
2056 	if (bserrno) {
2057 		__wake_caller(args, bserrno);
2058 		return;
2059 	}
2060 
2061 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2062 }
2063 
2064 static void
2065 __file_extend_blob(void *_args)
2066 {
2067 	struct spdk_fs_cb_args *args = _args;
2068 	struct spdk_file *file = args->file;
2069 
2070 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2071 }
2072 
2073 static void
2074 __rw_from_file_done(void *arg, int bserrno)
2075 {
2076 	struct spdk_fs_cb_args *args = arg;
2077 
2078 	__wake_caller(args, bserrno);
2079 	__free_args(args);
2080 }
2081 
2082 static void
2083 __rw_from_file(void *_args)
2084 {
2085 	struct spdk_fs_cb_args *args = _args;
2086 	struct spdk_file *file = args->file;
2087 
2088 	if (args->op.rw.is_read) {
2089 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2090 				     args->op.rw.offset, args->op.rw.length,
2091 				     __rw_from_file_done, args);
2092 	} else {
2093 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
2094 				      args->op.rw.offset, args->op.rw.length,
2095 				      __rw_from_file_done, args);
2096 	}
2097 }
2098 
2099 static int
2100 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
2101 		    uint64_t offset, uint64_t length, bool is_read)
2102 {
2103 	struct spdk_fs_cb_args *args;
2104 
2105 	args = calloc(1, sizeof(*args));
2106 	if (args == NULL) {
2107 		sem_post(sem);
2108 		return -ENOMEM;
2109 	}
2110 
2111 	args->file = file;
2112 	args->sem = sem;
2113 	args->op.rw.user_buf = payload;
2114 	args->op.rw.offset = offset;
2115 	args->op.rw.length = length;
2116 	args->op.rw.is_read = is_read;
2117 	file->fs->send_request(__rw_from_file, args);
2118 	return 0;
2119 }
2120 
2121 int
2122 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
2123 		void *payload, uint64_t offset, uint64_t length)
2124 {
2125 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2126 	struct spdk_fs_cb_args *args;
2127 	uint64_t rem_length, copy, blob_size, cluster_sz;
2128 	uint32_t cache_buffers_filled = 0;
2129 	uint8_t *cur_payload;
2130 	struct cache_buffer *last;
2131 
2132 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2133 
2134 	if (length == 0) {
2135 		return 0;
2136 	}
2137 
2138 	if (offset != file->append_pos) {
2139 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2140 		return -EINVAL;
2141 	}
2142 
2143 	pthread_spin_lock(&file->lock);
2144 	file->open_for_writing = true;
2145 
2146 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2147 		cache_append_buffer(file);
2148 	}
2149 
2150 	if (file->last == NULL) {
2151 		int rc;
2152 
2153 		file->append_pos += length;
2154 		pthread_spin_unlock(&file->lock);
2155 		rc = __send_rw_from_file(file, &channel->sem, payload,
2156 					 offset, length, false);
2157 		sem_wait(&channel->sem);
2158 		return rc;
2159 	}
2160 
2161 	blob_size = __file_get_blob_size(file);
2162 
2163 	if ((offset + length) > blob_size) {
2164 		struct spdk_fs_cb_args extend_args = {};
2165 
2166 		cluster_sz = file->fs->bs_opts.cluster_sz;
2167 		extend_args.sem = &channel->sem;
2168 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2169 		extend_args.file = file;
2170 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2171 		pthread_spin_unlock(&file->lock);
2172 		file->fs->send_request(__file_extend_blob, &extend_args);
2173 		sem_wait(&channel->sem);
2174 		if (extend_args.rc) {
2175 			return extend_args.rc;
2176 		}
2177 	}
2178 
2179 	last = file->last;
2180 	rem_length = length;
2181 	cur_payload = payload;
2182 	while (rem_length > 0) {
2183 		copy = last->buf_size - last->bytes_filled;
2184 		if (copy > rem_length) {
2185 			copy = rem_length;
2186 		}
2187 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2188 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2189 		file->append_pos += copy;
2190 		if (file->length < file->append_pos) {
2191 			file->length = file->append_pos;
2192 		}
2193 		cur_payload += copy;
2194 		last->bytes_filled += copy;
2195 		rem_length -= copy;
2196 		if (last->bytes_filled == last->buf_size) {
2197 			cache_buffers_filled++;
2198 			last = cache_append_buffer(file);
2199 			if (last == NULL) {
2200 				BLOBFS_TRACE(file, "nomem\n");
2201 				pthread_spin_unlock(&file->lock);
2202 				return -ENOMEM;
2203 			}
2204 		}
2205 	}
2206 
2207 	pthread_spin_unlock(&file->lock);
2208 
2209 	if (cache_buffers_filled == 0) {
2210 		return 0;
2211 	}
2212 
2213 	args = calloc(1, sizeof(*args));
2214 	if (args == NULL) {
2215 		return -ENOMEM;
2216 	}
2217 
2218 	args->file = file;
2219 	file->fs->send_request(__file_flush, args);
2220 	return 0;
2221 }
2222 
2223 static void
2224 __readahead_done(void *arg, int bserrno)
2225 {
2226 	struct spdk_fs_cb_args *args = arg;
2227 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2228 	struct spdk_file *file = args->file;
2229 
2230 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2231 
2232 	pthread_spin_lock(&file->lock);
2233 	cache_buffer->bytes_filled = args->op.readahead.length;
2234 	cache_buffer->bytes_flushed = args->op.readahead.length;
2235 	cache_buffer->in_progress = false;
2236 	pthread_spin_unlock(&file->lock);
2237 
2238 	__free_args(args);
2239 }
2240 
2241 static void
2242 __readahead(void *_args)
2243 {
2244 	struct spdk_fs_cb_args *args = _args;
2245 	struct spdk_file *file = args->file;
2246 	uint64_t offset, length, start_lba, num_lba;
2247 	uint32_t lba_size;
2248 
2249 	offset = args->op.readahead.offset;
2250 	length = args->op.readahead.length;
2251 	assert(length > 0);
2252 
2253 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2254 
2255 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2256 		     offset, length, start_lba, num_lba);
2257 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2258 			  args->op.readahead.cache_buffer->buf,
2259 			  start_lba, num_lba, __readahead_done, args);
2260 }
2261 
2262 static uint64_t
2263 __next_cache_buffer_offset(uint64_t offset)
2264 {
2265 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2266 }
2267 
2268 static void
2269 check_readahead(struct spdk_file *file, uint64_t offset)
2270 {
2271 	struct spdk_fs_cb_args *args;
2272 
2273 	offset = __next_cache_buffer_offset(offset);
2274 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2275 		return;
2276 	}
2277 
2278 	args = calloc(1, sizeof(*args));
2279 	if (args == NULL) {
2280 		return;
2281 	}
2282 
2283 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2284 
2285 	args->file = file;
2286 	args->op.readahead.offset = offset;
2287 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2288 	if (!args->op.readahead.cache_buffer) {
2289 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2290 		free(args);
2291 		return;
2292 	}
2293 
2294 	args->op.readahead.cache_buffer->in_progress = true;
2295 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2296 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2297 	} else {
2298 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2299 	}
2300 	file->fs->send_request(__readahead, args);
2301 }
2302 
2303 static int
2304 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2305 {
2306 	struct cache_buffer *buf;
2307 	int rc;
2308 
2309 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2310 	if (buf == NULL) {
2311 		pthread_spin_unlock(&file->lock);
2312 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2313 		pthread_spin_lock(&file->lock);
2314 		return rc;
2315 	}
2316 
2317 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2318 		length = buf->offset + buf->bytes_filled - offset;
2319 	}
2320 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2321 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2322 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2323 		pthread_spin_lock(&g_caches_lock);
2324 		spdk_tree_remove_buffer(file->tree, buf);
2325 		if (file->tree->present_mask == 0) {
2326 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2327 		}
2328 		pthread_spin_unlock(&g_caches_lock);
2329 	}
2330 
2331 	sem_post(sem);
2332 	return 0;
2333 }
2334 
2335 int64_t
2336 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2337 	       void *payload, uint64_t offset, uint64_t length)
2338 {
2339 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2340 	uint64_t final_offset, final_length;
2341 	uint32_t sub_reads = 0;
2342 	int rc = 0;
2343 
2344 	pthread_spin_lock(&file->lock);
2345 
2346 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2347 
2348 	file->open_for_writing = false;
2349 
2350 	if (length == 0 || offset >= file->append_pos) {
2351 		pthread_spin_unlock(&file->lock);
2352 		return 0;
2353 	}
2354 
2355 	if (offset + length > file->append_pos) {
2356 		length = file->append_pos - offset;
2357 	}
2358 
2359 	if (offset != file->next_seq_offset) {
2360 		file->seq_byte_count = 0;
2361 	}
2362 	file->seq_byte_count += length;
2363 	file->next_seq_offset = offset + length;
2364 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2365 		check_readahead(file, offset);
2366 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2367 	}
2368 
2369 	final_length = 0;
2370 	final_offset = offset + length;
2371 	while (offset < final_offset) {
2372 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2373 		if (length > (final_offset - offset)) {
2374 			length = final_offset - offset;
2375 		}
2376 		rc = __file_read(file, payload, offset, length, &channel->sem);
2377 		if (rc == 0) {
2378 			final_length += length;
2379 		} else {
2380 			break;
2381 		}
2382 		payload += length;
2383 		offset += length;
2384 		sub_reads++;
2385 	}
2386 	pthread_spin_unlock(&file->lock);
2387 	while (sub_reads-- > 0) {
2388 		sem_wait(&channel->sem);
2389 	}
2390 	if (rc == 0) {
2391 		return final_length;
2392 	} else {
2393 		return rc;
2394 	}
2395 }
2396 
2397 static void
2398 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2399 	   spdk_file_op_complete cb_fn, void *cb_arg)
2400 {
2401 	struct spdk_fs_request *sync_req;
2402 	struct spdk_fs_request *flush_req;
2403 	struct spdk_fs_cb_args *sync_args;
2404 	struct spdk_fs_cb_args *flush_args;
2405 
2406 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2407 
2408 	pthread_spin_lock(&file->lock);
2409 	if (file->append_pos <= file->length_flushed) {
2410 		BLOBFS_TRACE(file, "done - no data to flush\n");
2411 		pthread_spin_unlock(&file->lock);
2412 		cb_fn(cb_arg, 0);
2413 		return;
2414 	}
2415 
2416 	sync_req = alloc_fs_request(channel);
2417 	if (!sync_req) {
2418 		pthread_spin_unlock(&file->lock);
2419 		cb_fn(cb_arg, -ENOMEM);
2420 		return;
2421 	}
2422 	sync_args = &sync_req->args;
2423 
2424 	flush_req = alloc_fs_request(channel);
2425 	if (!flush_req) {
2426 		pthread_spin_unlock(&file->lock);
2427 		cb_fn(cb_arg, -ENOMEM);
2428 		return;
2429 	}
2430 	flush_args = &flush_req->args;
2431 
2432 	sync_args->file = file;
2433 	sync_args->fn.file_op = cb_fn;
2434 	sync_args->arg = cb_arg;
2435 	sync_args->op.sync.offset = file->append_pos;
2436 	sync_args->op.sync.xattr_in_progress = false;
2437 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2438 	pthread_spin_unlock(&file->lock);
2439 
2440 	flush_args->file = file;
2441 	channel->send_request(__file_flush, flush_args);
2442 }
2443 
2444 int
2445 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2446 {
2447 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2448 	struct spdk_fs_cb_args args = {};
2449 
2450 	args.sem = &channel->sem;
2451 	_file_sync(file, channel, __wake_caller, &args);
2452 	sem_wait(&channel->sem);
2453 
2454 	return args.rc;
2455 }
2456 
2457 void
2458 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2459 		     spdk_file_op_complete cb_fn, void *cb_arg)
2460 {
2461 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2462 
2463 	_file_sync(file, channel, cb_fn, cb_arg);
2464 }
2465 
2466 void
2467 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2468 {
2469 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2470 	file->priority = priority;
2471 
2472 }
2473 
2474 /*
2475  * Close routines
2476  */
2477 
2478 static void
2479 __file_close_async_done(void *ctx, int bserrno)
2480 {
2481 	struct spdk_fs_request *req = ctx;
2482 	struct spdk_fs_cb_args *args = &req->args;
2483 	struct spdk_file *file = args->file;
2484 
2485 	if (file->is_deleted) {
2486 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2487 		return;
2488 	}
2489 
2490 	args->fn.file_op(args->arg, bserrno);
2491 	free_fs_request(req);
2492 }
2493 
2494 static void
2495 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2496 {
2497 	struct spdk_blob *blob;
2498 
2499 	pthread_spin_lock(&file->lock);
2500 	if (file->ref_count == 0) {
2501 		pthread_spin_unlock(&file->lock);
2502 		__file_close_async_done(req, -EBADF);
2503 		return;
2504 	}
2505 
2506 	file->ref_count--;
2507 	if (file->ref_count > 0) {
2508 		pthread_spin_unlock(&file->lock);
2509 		req->args.fn.file_op(req->args.arg, 0);
2510 		free_fs_request(req);
2511 		return;
2512 	}
2513 
2514 	pthread_spin_unlock(&file->lock);
2515 
2516 	blob = file->blob;
2517 	file->blob = NULL;
2518 	spdk_blob_close(blob, __file_close_async_done, req);
2519 }
2520 
2521 static void
2522 __file_close_async__sync_done(void *arg, int fserrno)
2523 {
2524 	struct spdk_fs_request *req = arg;
2525 	struct spdk_fs_cb_args *args = &req->args;
2526 
2527 	__file_close_async(args->file, req);
2528 }
2529 
2530 void
2531 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2532 {
2533 	struct spdk_fs_request *req;
2534 	struct spdk_fs_cb_args *args;
2535 
2536 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2537 	if (req == NULL) {
2538 		cb_fn(cb_arg, -ENOMEM);
2539 		return;
2540 	}
2541 
2542 	args = &req->args;
2543 	args->file = file;
2544 	args->fn.file_op = cb_fn;
2545 	args->arg = cb_arg;
2546 
2547 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2548 }
2549 
2550 static void
2551 __file_close(void *arg)
2552 {
2553 	struct spdk_fs_request *req = arg;
2554 	struct spdk_fs_cb_args *args = &req->args;
2555 	struct spdk_file *file = args->file;
2556 
2557 	__file_close_async(file, req);
2558 }
2559 
2560 int
2561 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2562 {
2563 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2564 	struct spdk_fs_request *req;
2565 	struct spdk_fs_cb_args *args;
2566 
2567 	req = alloc_fs_request(channel);
2568 	if (req == NULL) {
2569 		return -ENOMEM;
2570 	}
2571 
2572 	args = &req->args;
2573 
2574 	spdk_file_sync(file, _channel);
2575 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2576 	args->file = file;
2577 	args->sem = &channel->sem;
2578 	args->fn.file_op = __wake_caller;
2579 	args->arg = req;
2580 	channel->send_request(__file_close, req);
2581 	sem_wait(&channel->sem);
2582 
2583 	return args->rc;
2584 }
2585 
2586 int
2587 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2588 {
2589 	if (size < sizeof(spdk_blob_id)) {
2590 		return -EINVAL;
2591 	}
2592 
2593 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2594 
2595 	return sizeof(spdk_blob_id);
2596 }
2597 
2598 static void
2599 cache_free_buffers(struct spdk_file *file)
2600 {
2601 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2602 	pthread_spin_lock(&file->lock);
2603 	pthread_spin_lock(&g_caches_lock);
2604 	if (file->tree->present_mask == 0) {
2605 		pthread_spin_unlock(&g_caches_lock);
2606 		pthread_spin_unlock(&file->lock);
2607 		return;
2608 	}
2609 	spdk_tree_free_buffers(file->tree);
2610 
2611 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2612 	/* If not freed, put it in the end of the queue */
2613 	if (file->tree->present_mask != 0) {
2614 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2615 	}
2616 	file->last = NULL;
2617 	pthread_spin_unlock(&g_caches_lock);
2618 	pthread_spin_unlock(&file->lock);
2619 }
2620 
2621 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2622 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2623