xref: /spdk/lib/blobfs/blobfs.c (revision 0eae01067000f31cb6c9dbdf792411c1957754f3)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/blobfs.h"
9 #include "cache_tree.h"
10 
11 #include "spdk/queue.h"
12 #include "spdk/thread.h"
13 #include "spdk/assert.h"
14 #include "spdk/env.h"
15 #include "spdk/util.h"
16 #include "spdk/log.h"
17 #include "spdk/trace.h"
18 
19 #include "spdk_internal/trace_defs.h"
20 
21 #define BLOBFS_TRACE(file, str, args...) \
22 	SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args)
23 
24 #define BLOBFS_TRACE_RW(file, str, args...) \
25 	SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args)
26 
27 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
28 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
29 
30 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
31 
32 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
33 static struct spdk_mempool *g_cache_pool;
34 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches);
35 static struct spdk_poller *g_cache_pool_mgmt_poller;
36 static struct spdk_thread *g_cache_pool_thread;
37 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
38 static int g_fs_count = 0;
39 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
40 
41 static void
42 blobfs_trace(void)
43 {
44 	struct spdk_trace_tpoint_opts opts[] = {
45 		{
46 			"BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START,
47 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
48 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
49 		},
50 		{
51 			"BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END,
52 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
53 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
54 		},
55 		{
56 			"BLOBFS_OPEN", TRACE_BLOBFS_OPEN,
57 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
58 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
59 		},
60 		{
61 			"BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE,
62 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
63 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
64 		},
65 		{
66 			"BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START,
67 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
68 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
69 		},
70 		{
71 			"BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE,
72 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
73 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
74 		}
75 	};
76 
77 	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
78 }
79 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
80 
81 void
82 cache_buffer_free(struct cache_buffer *cache_buffer)
83 {
84 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
85 	free(cache_buffer);
86 }
87 
88 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
89 
90 struct spdk_file {
91 	struct spdk_filesystem	*fs;
92 	struct spdk_blob	*blob;
93 	char			*name;
94 	uint64_t		length;
95 	bool                    is_deleted;
96 	bool			open_for_writing;
97 	uint64_t		length_flushed;
98 	uint64_t		length_xattr;
99 	uint64_t		append_pos;
100 	uint64_t		seq_byte_count;
101 	uint64_t		next_seq_offset;
102 	uint32_t		priority;
103 	TAILQ_ENTRY(spdk_file)	tailq;
104 	spdk_blob_id		blobid;
105 	uint32_t		ref_count;
106 	pthread_spinlock_t	lock;
107 	struct cache_buffer	*last;
108 	struct cache_tree	*tree;
109 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
110 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
111 	TAILQ_ENTRY(spdk_file)	cache_tailq;
112 };
113 
114 struct spdk_deleted_file {
115 	spdk_blob_id	id;
116 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
117 };
118 
119 struct spdk_filesystem {
120 	struct spdk_blob_store	*bs;
121 	TAILQ_HEAD(, spdk_file)	files;
122 	struct spdk_bs_opts	bs_opts;
123 	struct spdk_bs_dev	*bdev;
124 	fs_send_request_fn	send_request;
125 
126 	struct {
127 		uint32_t		max_ops;
128 		struct spdk_io_channel	*sync_io_channel;
129 		struct spdk_fs_channel	*sync_fs_channel;
130 	} sync_target;
131 
132 	struct {
133 		uint32_t		max_ops;
134 		struct spdk_io_channel	*md_io_channel;
135 		struct spdk_fs_channel	*md_fs_channel;
136 	} md_target;
137 
138 	struct {
139 		uint32_t		max_ops;
140 	} io_target;
141 };
142 
143 struct spdk_fs_cb_args {
144 	union {
145 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
146 		spdk_fs_op_complete			fs_op;
147 		spdk_file_op_with_handle_complete	file_op_with_handle;
148 		spdk_file_op_complete			file_op;
149 		spdk_file_stat_op_complete		stat_op;
150 	} fn;
151 	void *arg;
152 	sem_t *sem;
153 	struct spdk_filesystem *fs;
154 	struct spdk_file *file;
155 	int rc;
156 	int *rwerrno;
157 	struct iovec *iovs;
158 	uint32_t iovcnt;
159 	struct iovec iov;
160 	union {
161 		struct {
162 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
163 		} fs_load;
164 		struct {
165 			uint64_t	length;
166 		} truncate;
167 		struct {
168 			struct spdk_io_channel	*channel;
169 			void		*pin_buf;
170 			int		is_read;
171 			off_t		offset;
172 			size_t		length;
173 			uint64_t	start_lba;
174 			uint64_t	num_lba;
175 			uint32_t	blocklen;
176 		} rw;
177 		struct {
178 			const char	*old_name;
179 			const char	*new_name;
180 		} rename;
181 		struct {
182 			struct cache_buffer	*cache_buffer;
183 			uint64_t		length;
184 		} flush;
185 		struct {
186 			struct cache_buffer	*cache_buffer;
187 			uint64_t		length;
188 			uint64_t		offset;
189 		} readahead;
190 		struct {
191 			/* offset of the file when the sync request was made */
192 			uint64_t			offset;
193 			TAILQ_ENTRY(spdk_fs_request)	tailq;
194 			bool				xattr_in_progress;
195 			/* length written to the xattr for this file - this should
196 			 * always be the same as the offset if only one thread is
197 			 * writing to the file, but could differ if multiple threads
198 			 * are appending
199 			 */
200 			uint64_t			length;
201 		} sync;
202 		struct {
203 			uint32_t			num_clusters;
204 		} resize;
205 		struct {
206 			const char	*name;
207 			uint32_t	flags;
208 			TAILQ_ENTRY(spdk_fs_request)	tailq;
209 		} open;
210 		struct {
211 			const char		*name;
212 			struct spdk_blob	*blob;
213 		} create;
214 		struct {
215 			const char	*name;
216 		} delete;
217 		struct {
218 			const char	*name;
219 		} stat;
220 	} op;
221 };
222 
223 static void file_free(struct spdk_file *file);
224 static void fs_io_device_unregister(struct spdk_filesystem *fs);
225 static void fs_free_io_channels(struct spdk_filesystem *fs);
226 
227 void
228 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
229 {
230 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
231 }
232 
233 static int _blobfs_cache_pool_reclaim(void *arg);
234 
235 static bool
236 blobfs_cache_pool_need_reclaim(void)
237 {
238 	size_t count;
239 
240 	count = spdk_mempool_count(g_cache_pool);
241 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
242 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
243 	 */
244 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
245 		return false;
246 	}
247 
248 	return true;
249 }
250 
251 static void
252 __start_cache_pool_mgmt(void *ctx)
253 {
254 	assert(g_cache_pool_mgmt_poller == NULL);
255 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
256 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
257 }
258 
259 static void
260 __stop_cache_pool_mgmt(void *ctx)
261 {
262 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
263 
264 	assert(g_cache_pool != NULL);
265 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
266 	spdk_mempool_free(g_cache_pool);
267 	g_cache_pool = NULL;
268 
269 	spdk_thread_exit(g_cache_pool_thread);
270 }
271 
272 static void
273 allocate_cache_pool(void)
274 {
275 	assert(g_cache_pool == NULL);
276 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
277 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
278 					   CACHE_BUFFER_SIZE,
279 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
280 					   SPDK_ENV_NUMA_ID_ANY);
281 	if (!g_cache_pool) {
282 		if (spdk_mempool_lookup("spdk_fs_cache") != NULL) {
283 			SPDK_ERRLOG("Unable to allocate mempool: already exists\n");
284 			SPDK_ERRLOG("Probably running in multiprocess environment, which is "
285 				    "unsupported by the blobfs library\n");
286 		} else {
287 			SPDK_ERRLOG("Create mempool failed, you may "
288 				    "increase the memory and try again\n");
289 		}
290 		assert(false);
291 	}
292 }
293 
294 static void
295 initialize_global_cache(void)
296 {
297 	pthread_mutex_lock(&g_cache_init_lock);
298 	if (g_fs_count == 0) {
299 		allocate_cache_pool();
300 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
301 		assert(g_cache_pool_thread != NULL);
302 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
303 	}
304 	g_fs_count++;
305 	pthread_mutex_unlock(&g_cache_init_lock);
306 }
307 
308 static void
309 free_global_cache(void)
310 {
311 	pthread_mutex_lock(&g_cache_init_lock);
312 	g_fs_count--;
313 	if (g_fs_count == 0) {
314 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
315 	}
316 	pthread_mutex_unlock(&g_cache_init_lock);
317 }
318 
319 static uint64_t
320 __file_get_blob_size(struct spdk_file *file)
321 {
322 	uint64_t cluster_sz;
323 
324 	cluster_sz = file->fs->bs_opts.cluster_sz;
325 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
326 }
327 
328 struct spdk_fs_request {
329 	struct spdk_fs_cb_args		args;
330 	TAILQ_ENTRY(spdk_fs_request)	link;
331 	struct spdk_fs_channel		*channel;
332 };
333 
334 struct spdk_fs_channel {
335 	struct spdk_fs_request		*req_mem;
336 	TAILQ_HEAD(, spdk_fs_request)	reqs;
337 	sem_t				sem;
338 	struct spdk_filesystem		*fs;
339 	struct spdk_io_channel		*bs_channel;
340 	fs_send_request_fn		send_request;
341 	bool				sync;
342 	uint32_t			outstanding_reqs;
343 	pthread_spinlock_t		lock;
344 };
345 
346 /* For now, this is effectively an alias. But eventually we'll shift
347  * some data members over. */
348 struct spdk_fs_thread_ctx {
349 	struct spdk_fs_channel	ch;
350 };
351 
352 static struct spdk_fs_request *
353 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
354 {
355 	struct spdk_fs_request *req;
356 	struct iovec *iovs = NULL;
357 
358 	if (iovcnt > 1) {
359 		iovs = calloc(iovcnt, sizeof(struct iovec));
360 		if (!iovs) {
361 			return NULL;
362 		}
363 	}
364 
365 	if (channel->sync) {
366 		pthread_spin_lock(&channel->lock);
367 	}
368 
369 	req = TAILQ_FIRST(&channel->reqs);
370 	if (req) {
371 		channel->outstanding_reqs++;
372 		TAILQ_REMOVE(&channel->reqs, req, link);
373 	}
374 
375 	if (channel->sync) {
376 		pthread_spin_unlock(&channel->lock);
377 	}
378 
379 	if (req == NULL) {
380 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
381 		free(iovs);
382 		return NULL;
383 	}
384 	memset(req, 0, sizeof(*req));
385 	req->channel = channel;
386 	if (iovcnt > 1) {
387 		req->args.iovs = iovs;
388 	} else {
389 		req->args.iovs = &req->args.iov;
390 	}
391 	req->args.iovcnt = iovcnt;
392 
393 	return req;
394 }
395 
396 static struct spdk_fs_request *
397 alloc_fs_request(struct spdk_fs_channel *channel)
398 {
399 	return alloc_fs_request_with_iov(channel, 0);
400 }
401 
402 static void
403 free_fs_request(struct spdk_fs_request *req)
404 {
405 	struct spdk_fs_channel *channel = req->channel;
406 
407 	if (req->args.iovcnt > 1) {
408 		free(req->args.iovs);
409 	}
410 
411 	if (channel->sync) {
412 		pthread_spin_lock(&channel->lock);
413 	}
414 
415 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
416 	channel->outstanding_reqs--;
417 
418 	if (channel->sync) {
419 		pthread_spin_unlock(&channel->lock);
420 	}
421 }
422 
423 static int
424 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
425 		  uint32_t max_ops)
426 {
427 	uint32_t i;
428 
429 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
430 	if (!channel->req_mem) {
431 		return -1;
432 	}
433 
434 	channel->outstanding_reqs = 0;
435 	TAILQ_INIT(&channel->reqs);
436 	sem_init(&channel->sem, 0, 0);
437 
438 	for (i = 0; i < max_ops; i++) {
439 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
440 	}
441 
442 	channel->fs = fs;
443 
444 	return 0;
445 }
446 
447 static int
448 fs_md_channel_create(void *io_device, void *ctx_buf)
449 {
450 	struct spdk_filesystem		*fs;
451 	struct spdk_fs_channel		*channel = ctx_buf;
452 
453 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
454 
455 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
456 }
457 
458 static int
459 fs_sync_channel_create(void *io_device, void *ctx_buf)
460 {
461 	struct spdk_filesystem		*fs;
462 	struct spdk_fs_channel		*channel = ctx_buf;
463 
464 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
465 
466 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
467 }
468 
469 static int
470 fs_io_channel_create(void *io_device, void *ctx_buf)
471 {
472 	struct spdk_filesystem		*fs;
473 	struct spdk_fs_channel		*channel = ctx_buf;
474 
475 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
476 
477 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
478 }
479 
480 static void
481 fs_channel_destroy(void *io_device, void *ctx_buf)
482 {
483 	struct spdk_fs_channel *channel = ctx_buf;
484 
485 	if (channel->outstanding_reqs > 0) {
486 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
487 			    channel->outstanding_reqs);
488 	}
489 
490 	free(channel->req_mem);
491 	if (channel->bs_channel != NULL) {
492 		spdk_bs_free_io_channel(channel->bs_channel);
493 	}
494 }
495 
496 static void
497 __send_request_direct(fs_request_fn fn, void *arg)
498 {
499 	fn(arg);
500 }
501 
502 static void
503 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
504 {
505 	fs->bs = bs;
506 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
507 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
508 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
509 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
510 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
511 
512 	initialize_global_cache();
513 }
514 
515 static void
516 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
517 {
518 	struct spdk_fs_request *req = ctx;
519 	struct spdk_fs_cb_args *args = &req->args;
520 	struct spdk_filesystem *fs = args->fs;
521 
522 	if (bserrno == 0) {
523 		common_fs_bs_init(fs, bs);
524 	} else {
525 		free(fs);
526 		fs = NULL;
527 	}
528 
529 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
530 	free_fs_request(req);
531 }
532 
533 static struct spdk_filesystem *
534 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
535 {
536 	struct spdk_filesystem *fs;
537 
538 	fs = calloc(1, sizeof(*fs));
539 	if (fs == NULL) {
540 		return NULL;
541 	}
542 
543 	fs->bdev = dev;
544 	fs->send_request = send_request_fn;
545 	TAILQ_INIT(&fs->files);
546 
547 	fs->md_target.max_ops = 512;
548 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
549 				sizeof(struct spdk_fs_channel), "blobfs_md");
550 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
551 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
552 
553 	fs->sync_target.max_ops = 512;
554 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
555 				sizeof(struct spdk_fs_channel), "blobfs_sync");
556 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
557 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
558 
559 	fs->io_target.max_ops = 512;
560 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
561 				sizeof(struct spdk_fs_channel), "blobfs_io");
562 
563 	return fs;
564 }
565 
566 static void
567 __wake_caller(void *arg, int fserrno)
568 {
569 	struct spdk_fs_cb_args *args = arg;
570 
571 	if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) {
572 		*(args->rwerrno) = fserrno;
573 	}
574 	args->rc = fserrno;
575 	sem_post(args->sem);
576 }
577 
578 void
579 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
580 	     fs_send_request_fn send_request_fn,
581 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
582 {
583 	struct spdk_filesystem *fs;
584 	struct spdk_fs_request *req;
585 	struct spdk_fs_cb_args *args;
586 	struct spdk_bs_opts opts = {};
587 
588 	fs = fs_alloc(dev, send_request_fn);
589 	if (fs == NULL) {
590 		cb_fn(cb_arg, NULL, -ENOMEM);
591 		return;
592 	}
593 
594 	req = alloc_fs_request(fs->md_target.md_fs_channel);
595 	if (req == NULL) {
596 		fs_free_io_channels(fs);
597 		fs_io_device_unregister(fs);
598 		cb_fn(cb_arg, NULL, -ENOMEM);
599 		return;
600 	}
601 
602 	args = &req->args;
603 	args->fn.fs_op_with_handle = cb_fn;
604 	args->arg = cb_arg;
605 	args->fs = fs;
606 
607 	spdk_bs_opts_init(&opts, sizeof(opts));
608 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
609 	if (opt) {
610 		opts.cluster_sz = opt->cluster_sz;
611 	}
612 	spdk_bs_init(dev, &opts, init_cb, req);
613 }
614 
615 static struct spdk_file *
616 file_alloc(struct spdk_filesystem *fs)
617 {
618 	struct spdk_file *file;
619 
620 	file = calloc(1, sizeof(*file));
621 	if (file == NULL) {
622 		return NULL;
623 	}
624 
625 	file->tree = calloc(1, sizeof(*file->tree));
626 	if (file->tree == NULL) {
627 		free(file);
628 		return NULL;
629 	}
630 
631 	if (pthread_spin_init(&file->lock, 0)) {
632 		free(file->tree);
633 		free(file);
634 		return NULL;
635 	}
636 
637 	file->fs = fs;
638 	TAILQ_INIT(&file->open_requests);
639 	TAILQ_INIT(&file->sync_requests);
640 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
641 	file->priority = SPDK_FILE_PRIORITY_LOW;
642 	return file;
643 }
644 
645 static void fs_load_done(void *ctx, int bserrno);
646 
647 static int
648 _handle_deleted_files(struct spdk_fs_request *req)
649 {
650 	struct spdk_fs_cb_args *args = &req->args;
651 	struct spdk_filesystem *fs = args->fs;
652 
653 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
654 		struct spdk_deleted_file *deleted_file;
655 
656 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
657 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
658 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
659 		free(deleted_file);
660 		return 0;
661 	}
662 
663 	return 1;
664 }
665 
666 static void
667 fs_load_done(void *ctx, int bserrno)
668 {
669 	struct spdk_fs_request *req = ctx;
670 	struct spdk_fs_cb_args *args = &req->args;
671 	struct spdk_filesystem *fs = args->fs;
672 
673 	/* The filesystem has been loaded.  Now check if there are any files that
674 	 *  were marked for deletion before last unload.  Do not complete the
675 	 *  fs_load callback until all of them have been deleted on disk.
676 	 */
677 	if (_handle_deleted_files(req) == 0) {
678 		/* We found a file that's been marked for deleting but not actually
679 		 *  deleted yet.  This function will get called again once the delete
680 		 *  operation is completed.
681 		 */
682 		return;
683 	}
684 
685 	args->fn.fs_op_with_handle(args->arg, fs, 0);
686 	free_fs_request(req);
687 
688 }
689 
690 static void
691 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
692 {
693 	struct spdk_fs_request *req = ctx;
694 	struct spdk_fs_cb_args *args = &req->args;
695 	struct spdk_filesystem *fs = args->fs;
696 	uint64_t *length;
697 	const char *name;
698 	uint32_t *is_deleted;
699 	size_t value_len;
700 
701 	if (rc < 0) {
702 		args->fn.fs_op_with_handle(args->arg, fs, rc);
703 		free_fs_request(req);
704 		return;
705 	}
706 
707 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
708 	if (rc < 0) {
709 		args->fn.fs_op_with_handle(args->arg, fs, rc);
710 		free_fs_request(req);
711 		return;
712 	}
713 
714 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
715 	if (rc < 0) {
716 		args->fn.fs_op_with_handle(args->arg, fs, rc);
717 		free_fs_request(req);
718 		return;
719 	}
720 
721 	assert(value_len == 8);
722 
723 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
724 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
725 	if (rc < 0) {
726 		struct spdk_file *f;
727 
728 		f = file_alloc(fs);
729 		if (f == NULL) {
730 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
731 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
732 			free_fs_request(req);
733 			return;
734 		}
735 
736 		f->name = strdup(name);
737 		if (!f->name) {
738 			SPDK_ERRLOG("Cannot allocate memory for file name\n");
739 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
740 			free_fs_request(req);
741 			file_free(f);
742 			return;
743 		}
744 
745 		f->blobid = spdk_blob_get_id(blob);
746 		f->length = *length;
747 		f->length_flushed = *length;
748 		f->length_xattr = *length;
749 		f->append_pos = *length;
750 		SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length);
751 	} else {
752 		struct spdk_deleted_file *deleted_file;
753 
754 		deleted_file = calloc(1, sizeof(*deleted_file));
755 		if (deleted_file == NULL) {
756 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
757 			free_fs_request(req);
758 			return;
759 		}
760 		deleted_file->id = spdk_blob_get_id(blob);
761 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
762 	}
763 }
764 
765 static void
766 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
767 {
768 	struct spdk_fs_request *req = ctx;
769 	struct spdk_fs_cb_args *args = &req->args;
770 	struct spdk_filesystem *fs = args->fs;
771 	struct spdk_bs_type bstype;
772 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
773 	static const struct spdk_bs_type zeros;
774 
775 	if (bserrno != 0) {
776 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
777 		free_fs_request(req);
778 		fs_free_io_channels(fs);
779 		fs_io_device_unregister(fs);
780 		return;
781 	}
782 
783 	bstype = spdk_bs_get_bstype(bs);
784 
785 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
786 		SPDK_DEBUGLOG(blobfs, "assigning bstype\n");
787 		spdk_bs_set_bstype(bs, blobfs_type);
788 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
789 		SPDK_ERRLOG("not blobfs\n");
790 		SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype));
791 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
792 		free_fs_request(req);
793 		fs_free_io_channels(fs);
794 		fs_io_device_unregister(fs);
795 		return;
796 	}
797 
798 	common_fs_bs_init(fs, bs);
799 	fs_load_done(req, 0);
800 }
801 
802 static void
803 fs_io_device_unregister(struct spdk_filesystem *fs)
804 {
805 	assert(fs != NULL);
806 	spdk_io_device_unregister(&fs->md_target, NULL);
807 	spdk_io_device_unregister(&fs->sync_target, NULL);
808 	spdk_io_device_unregister(&fs->io_target, NULL);
809 	free(fs);
810 }
811 
812 static void
813 fs_free_io_channels(struct spdk_filesystem *fs)
814 {
815 	assert(fs != NULL);
816 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
817 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
818 }
819 
820 void
821 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
822 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
823 {
824 	struct spdk_filesystem *fs;
825 	struct spdk_fs_cb_args *args;
826 	struct spdk_fs_request *req;
827 	struct spdk_bs_opts	bs_opts;
828 
829 	fs = fs_alloc(dev, send_request_fn);
830 	if (fs == NULL) {
831 		cb_fn(cb_arg, NULL, -ENOMEM);
832 		return;
833 	}
834 
835 	req = alloc_fs_request(fs->md_target.md_fs_channel);
836 	if (req == NULL) {
837 		fs_free_io_channels(fs);
838 		fs_io_device_unregister(fs);
839 		cb_fn(cb_arg, NULL, -ENOMEM);
840 		return;
841 	}
842 
843 	args = &req->args;
844 	args->fn.fs_op_with_handle = cb_fn;
845 	args->arg = cb_arg;
846 	args->fs = fs;
847 	TAILQ_INIT(&args->op.fs_load.deleted_files);
848 	spdk_bs_opts_init(&bs_opts, sizeof(bs_opts));
849 	bs_opts.iter_cb_fn = iter_cb;
850 	bs_opts.iter_cb_arg = req;
851 	spdk_bs_load(dev, &bs_opts, load_cb, req);
852 }
853 
854 static void
855 unload_cb(void *ctx, int bserrno)
856 {
857 	struct spdk_fs_request *req = ctx;
858 	struct spdk_fs_cb_args *args = &req->args;
859 	struct spdk_filesystem *fs = args->fs;
860 	struct spdk_file *file, *tmp;
861 
862 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
863 		TAILQ_REMOVE(&fs->files, file, tailq);
864 		file_free(file);
865 	}
866 
867 	free_global_cache();
868 
869 	args->fn.fs_op(args->arg, bserrno);
870 	free(req);
871 
872 	fs_io_device_unregister(fs);
873 }
874 
875 void
876 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
877 {
878 	struct spdk_fs_request *req;
879 	struct spdk_fs_cb_args *args;
880 
881 	/*
882 	 * We must free the md_channel before unloading the blobstore, so just
883 	 *  allocate this request from the general heap.
884 	 */
885 	req = calloc(1, sizeof(*req));
886 	if (req == NULL) {
887 		cb_fn(cb_arg, -ENOMEM);
888 		return;
889 	}
890 
891 	args = &req->args;
892 	args->fn.fs_op = cb_fn;
893 	args->arg = cb_arg;
894 	args->fs = fs;
895 
896 	fs_free_io_channels(fs);
897 	spdk_bs_unload(fs->bs, unload_cb, req);
898 }
899 
900 static struct spdk_file *
901 fs_find_file(struct spdk_filesystem *fs, const char *name)
902 {
903 	struct spdk_file *file;
904 
905 	TAILQ_FOREACH(file, &fs->files, tailq) {
906 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
907 			return file;
908 		}
909 	}
910 
911 	return NULL;
912 }
913 
914 void
915 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
916 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
917 {
918 	struct spdk_file_stat stat;
919 	struct spdk_file *f = NULL;
920 
921 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
922 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
923 		return;
924 	}
925 
926 	f = fs_find_file(fs, name);
927 	if (f != NULL) {
928 		stat.blobid = f->blobid;
929 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
930 		cb_fn(cb_arg, &stat, 0);
931 		return;
932 	}
933 
934 	cb_fn(cb_arg, NULL, -ENOENT);
935 }
936 
937 static void
938 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
939 {
940 	struct spdk_fs_request *req = arg;
941 	struct spdk_fs_cb_args *args = &req->args;
942 
943 	args->rc = fserrno;
944 	if (fserrno == 0) {
945 		memcpy(args->arg, stat, sizeof(*stat));
946 	}
947 	sem_post(args->sem);
948 }
949 
950 static void
951 __file_stat(void *arg)
952 {
953 	struct spdk_fs_request *req = arg;
954 	struct spdk_fs_cb_args *args = &req->args;
955 
956 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
957 				args->fn.stat_op, req);
958 }
959 
960 int
961 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
962 		  const char *name, struct spdk_file_stat *stat)
963 {
964 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
965 	struct spdk_fs_request *req;
966 	int rc;
967 
968 	req = alloc_fs_request(channel);
969 	if (req == NULL) {
970 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
971 		return -ENOMEM;
972 	}
973 
974 	req->args.fs = fs;
975 	req->args.op.stat.name = name;
976 	req->args.fn.stat_op = __copy_stat;
977 	req->args.arg = stat;
978 	req->args.sem = &channel->sem;
979 	channel->send_request(__file_stat, req);
980 	sem_wait(&channel->sem);
981 
982 	rc = req->args.rc;
983 	free_fs_request(req);
984 
985 	return rc;
986 }
987 
988 static void
989 fs_create_blob_close_cb(void *ctx, int bserrno)
990 {
991 	int rc;
992 	struct spdk_fs_request *req = ctx;
993 	struct spdk_fs_cb_args *args = &req->args;
994 
995 	rc = args->rc ? args->rc : bserrno;
996 	args->fn.file_op(args->arg, rc);
997 	free_fs_request(req);
998 }
999 
1000 static void
1001 fs_create_blob_resize_cb(void *ctx, int bserrno)
1002 {
1003 	struct spdk_fs_request *req = ctx;
1004 	struct spdk_fs_cb_args *args = &req->args;
1005 	struct spdk_file *f = args->file;
1006 	struct spdk_blob *blob = args->op.create.blob;
1007 	uint64_t length = 0;
1008 
1009 	args->rc = bserrno;
1010 	if (bserrno) {
1011 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1012 		return;
1013 	}
1014 
1015 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1016 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1017 
1018 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1019 }
1020 
1021 static void
1022 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1023 {
1024 	struct spdk_fs_request *req = ctx;
1025 	struct spdk_fs_cb_args *args = &req->args;
1026 
1027 	if (bserrno) {
1028 		args->fn.file_op(args->arg, bserrno);
1029 		free_fs_request(req);
1030 		return;
1031 	}
1032 
1033 	args->op.create.blob = blob;
1034 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1035 }
1036 
1037 static void
1038 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1039 {
1040 	struct spdk_fs_request *req = ctx;
1041 	struct spdk_fs_cb_args *args = &req->args;
1042 	struct spdk_file *f = args->file;
1043 
1044 	if (bserrno) {
1045 		args->fn.file_op(args->arg, bserrno);
1046 		free_fs_request(req);
1047 		return;
1048 	}
1049 
1050 	f->blobid = blobid;
1051 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1052 }
1053 
1054 void
1055 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1056 			  spdk_file_op_complete cb_fn, void *cb_arg)
1057 {
1058 	struct spdk_file *file;
1059 	struct spdk_fs_request *req;
1060 	struct spdk_fs_cb_args *args;
1061 
1062 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1063 		cb_fn(cb_arg, -ENAMETOOLONG);
1064 		return;
1065 	}
1066 
1067 	file = fs_find_file(fs, name);
1068 	if (file != NULL) {
1069 		cb_fn(cb_arg, -EEXIST);
1070 		return;
1071 	}
1072 
1073 	file = file_alloc(fs);
1074 	if (file == NULL) {
1075 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1076 		cb_fn(cb_arg, -ENOMEM);
1077 		return;
1078 	}
1079 
1080 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1081 	if (req == NULL) {
1082 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1083 		TAILQ_REMOVE(&fs->files, file, tailq);
1084 		file_free(file);
1085 		cb_fn(cb_arg, -ENOMEM);
1086 		return;
1087 	}
1088 
1089 	args = &req->args;
1090 	args->file = file;
1091 	args->fn.file_op = cb_fn;
1092 	args->arg = cb_arg;
1093 
1094 	file->name = strdup(name);
1095 	if (!file->name) {
1096 		SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name);
1097 		free_fs_request(req);
1098 		TAILQ_REMOVE(&fs->files, file, tailq);
1099 		file_free(file);
1100 		cb_fn(cb_arg, -ENOMEM);
1101 		return;
1102 	}
1103 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1104 }
1105 
1106 static void
1107 __fs_create_file_done(void *arg, int fserrno)
1108 {
1109 	struct spdk_fs_request *req = arg;
1110 	struct spdk_fs_cb_args *args = &req->args;
1111 
1112 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1113 	__wake_caller(args, fserrno);
1114 }
1115 
1116 static void
1117 __fs_create_file(void *arg)
1118 {
1119 	struct spdk_fs_request *req = arg;
1120 	struct spdk_fs_cb_args *args = &req->args;
1121 
1122 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1123 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1124 }
1125 
1126 int
1127 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1128 {
1129 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1130 	struct spdk_fs_request *req;
1131 	struct spdk_fs_cb_args *args;
1132 	int rc;
1133 
1134 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1135 
1136 	req = alloc_fs_request(channel);
1137 	if (req == NULL) {
1138 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1139 		return -ENOMEM;
1140 	}
1141 
1142 	args = &req->args;
1143 	args->fs = fs;
1144 	args->op.create.name = name;
1145 	args->sem = &channel->sem;
1146 	fs->send_request(__fs_create_file, req);
1147 	sem_wait(&channel->sem);
1148 	rc = args->rc;
1149 	free_fs_request(req);
1150 
1151 	return rc;
1152 }
1153 
1154 static void
1155 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1156 {
1157 	struct spdk_fs_request *req = ctx;
1158 	struct spdk_fs_cb_args *args = &req->args;
1159 	struct spdk_file *f = args->file;
1160 
1161 	f->blob = blob;
1162 	while (!TAILQ_EMPTY(&f->open_requests)) {
1163 		req = TAILQ_FIRST(&f->open_requests);
1164 		args = &req->args;
1165 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1166 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name);
1167 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1168 		free_fs_request(req);
1169 	}
1170 }
1171 
1172 static void
1173 fs_open_blob_create_cb(void *ctx, int bserrno)
1174 {
1175 	struct spdk_fs_request *req = ctx;
1176 	struct spdk_fs_cb_args *args = &req->args;
1177 	struct spdk_file *file = args->file;
1178 	struct spdk_filesystem *fs = args->fs;
1179 
1180 	if (file == NULL) {
1181 		/*
1182 		 * This is from an open with CREATE flag - the file
1183 		 *  is now created so look it up in the file list for this
1184 		 *  filesystem.
1185 		 */
1186 		file = fs_find_file(fs, args->op.open.name);
1187 		assert(file != NULL);
1188 		args->file = file;
1189 	}
1190 
1191 	file->ref_count++;
1192 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1193 	if (file->ref_count == 1) {
1194 		assert(file->blob == NULL);
1195 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1196 	} else if (file->blob != NULL) {
1197 		fs_open_blob_done(req, file->blob, 0);
1198 	} else {
1199 		/*
1200 		 * The blob open for this file is in progress due to a previous
1201 		 *  open request.  When that open completes, it will invoke the
1202 		 *  open callback for this request.
1203 		 */
1204 	}
1205 }
1206 
1207 void
1208 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1209 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1210 {
1211 	struct spdk_file *f = NULL;
1212 	struct spdk_fs_request *req;
1213 	struct spdk_fs_cb_args *args;
1214 
1215 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1216 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1217 		return;
1218 	}
1219 
1220 	f = fs_find_file(fs, name);
1221 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1222 		cb_fn(cb_arg, NULL, -ENOENT);
1223 		return;
1224 	}
1225 
1226 	if (f != NULL && f->is_deleted == true) {
1227 		cb_fn(cb_arg, NULL, -ENOENT);
1228 		return;
1229 	}
1230 
1231 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1232 	if (req == NULL) {
1233 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1234 		cb_fn(cb_arg, NULL, -ENOMEM);
1235 		return;
1236 	}
1237 
1238 	args = &req->args;
1239 	args->fn.file_op_with_handle = cb_fn;
1240 	args->arg = cb_arg;
1241 	args->file = f;
1242 	args->fs = fs;
1243 	args->op.open.name = name;
1244 
1245 	if (f == NULL) {
1246 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1247 	} else {
1248 		fs_open_blob_create_cb(req, 0);
1249 	}
1250 }
1251 
1252 static void
1253 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1254 {
1255 	struct spdk_fs_request *req = arg;
1256 	struct spdk_fs_cb_args *args = &req->args;
1257 
1258 	args->file = file;
1259 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1260 	__wake_caller(args, bserrno);
1261 }
1262 
1263 static void
1264 __fs_open_file(void *arg)
1265 {
1266 	struct spdk_fs_request *req = arg;
1267 	struct spdk_fs_cb_args *args = &req->args;
1268 
1269 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1270 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1271 				__fs_open_file_done, req);
1272 }
1273 
1274 int
1275 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1276 		  const char *name, uint32_t flags, struct spdk_file **file)
1277 {
1278 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1279 	struct spdk_fs_request *req;
1280 	struct spdk_fs_cb_args *args;
1281 	int rc;
1282 
1283 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1284 
1285 	req = alloc_fs_request(channel);
1286 	if (req == NULL) {
1287 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1288 		return -ENOMEM;
1289 	}
1290 
1291 	args = &req->args;
1292 	args->fs = fs;
1293 	args->op.open.name = name;
1294 	args->op.open.flags = flags;
1295 	args->sem = &channel->sem;
1296 	fs->send_request(__fs_open_file, req);
1297 	sem_wait(&channel->sem);
1298 	rc = args->rc;
1299 	if (rc == 0) {
1300 		*file = args->file;
1301 	} else {
1302 		*file = NULL;
1303 	}
1304 	free_fs_request(req);
1305 
1306 	return rc;
1307 }
1308 
1309 static void
1310 fs_rename_blob_close_cb(void *ctx, int bserrno)
1311 {
1312 	struct spdk_fs_request *req = ctx;
1313 	struct spdk_fs_cb_args *args = &req->args;
1314 
1315 	args->fn.fs_op(args->arg, bserrno);
1316 	free_fs_request(req);
1317 }
1318 
1319 static void
1320 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1321 {
1322 	struct spdk_fs_request *req = ctx;
1323 	struct spdk_fs_cb_args *args = &req->args;
1324 	const char *new_name = args->op.rename.new_name;
1325 
1326 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1327 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1328 }
1329 
1330 static void
1331 _fs_md_rename_file(struct spdk_fs_request *req)
1332 {
1333 	struct spdk_fs_cb_args *args = &req->args;
1334 	struct spdk_file *f;
1335 
1336 	f = fs_find_file(args->fs, args->op.rename.old_name);
1337 	if (f == NULL) {
1338 		args->fn.fs_op(args->arg, -ENOENT);
1339 		free_fs_request(req);
1340 		return;
1341 	}
1342 
1343 	free(f->name);
1344 	f->name = strdup(args->op.rename.new_name);
1345 	if (!f->name) {
1346 		SPDK_ERRLOG("Cannot allocate memory for file name\n");
1347 		args->fn.fs_op(args->arg, -ENOMEM);
1348 		free_fs_request(req);
1349 		return;
1350 	}
1351 
1352 	args->file = f;
1353 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1354 }
1355 
1356 static void
1357 fs_rename_delete_done(void *arg, int fserrno)
1358 {
1359 	_fs_md_rename_file(arg);
1360 }
1361 
1362 void
1363 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1364 			  const char *old_name, const char *new_name,
1365 			  spdk_file_op_complete cb_fn, void *cb_arg)
1366 {
1367 	struct spdk_file *f;
1368 	struct spdk_fs_request *req;
1369 	struct spdk_fs_cb_args *args;
1370 
1371 	SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name);
1372 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1373 		cb_fn(cb_arg, -ENAMETOOLONG);
1374 		return;
1375 	}
1376 
1377 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1378 	if (req == NULL) {
1379 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1380 			    new_name);
1381 		cb_fn(cb_arg, -ENOMEM);
1382 		return;
1383 	}
1384 
1385 	args = &req->args;
1386 	args->fn.fs_op = cb_fn;
1387 	args->fs = fs;
1388 	args->arg = cb_arg;
1389 	args->op.rename.old_name = old_name;
1390 	args->op.rename.new_name = new_name;
1391 
1392 	f = fs_find_file(fs, new_name);
1393 	if (f == NULL) {
1394 		_fs_md_rename_file(req);
1395 		return;
1396 	}
1397 
1398 	/*
1399 	 * The rename overwrites an existing file.  So delete the existing file, then
1400 	 *  do the actual rename.
1401 	 */
1402 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1403 }
1404 
1405 static void
1406 __fs_rename_file_done(void *arg, int fserrno)
1407 {
1408 	struct spdk_fs_request *req = arg;
1409 	struct spdk_fs_cb_args *args = &req->args;
1410 
1411 	__wake_caller(args, fserrno);
1412 }
1413 
1414 static void
1415 __fs_rename_file(void *arg)
1416 {
1417 	struct spdk_fs_request *req = arg;
1418 	struct spdk_fs_cb_args *args = &req->args;
1419 
1420 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1421 				  __fs_rename_file_done, req);
1422 }
1423 
1424 int
1425 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1426 		    const char *old_name, const char *new_name)
1427 {
1428 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1429 	struct spdk_fs_request *req;
1430 	struct spdk_fs_cb_args *args;
1431 	int rc;
1432 
1433 	req = alloc_fs_request(channel);
1434 	if (req == NULL) {
1435 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1436 		return -ENOMEM;
1437 	}
1438 
1439 	args = &req->args;
1440 
1441 	args->fs = fs;
1442 	args->op.rename.old_name = old_name;
1443 	args->op.rename.new_name = new_name;
1444 	args->sem = &channel->sem;
1445 	fs->send_request(__fs_rename_file, req);
1446 	sem_wait(&channel->sem);
1447 	rc = args->rc;
1448 	free_fs_request(req);
1449 	return rc;
1450 }
1451 
1452 static void
1453 blob_delete_cb(void *ctx, int bserrno)
1454 {
1455 	struct spdk_fs_request *req = ctx;
1456 	struct spdk_fs_cb_args *args = &req->args;
1457 
1458 	args->fn.file_op(args->arg, bserrno);
1459 	free_fs_request(req);
1460 }
1461 
1462 void
1463 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1464 			  spdk_file_op_complete cb_fn, void *cb_arg)
1465 {
1466 	struct spdk_file *f;
1467 	spdk_blob_id blobid;
1468 	struct spdk_fs_request *req;
1469 	struct spdk_fs_cb_args *args;
1470 
1471 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1472 
1473 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1474 		cb_fn(cb_arg, -ENAMETOOLONG);
1475 		return;
1476 	}
1477 
1478 	f = fs_find_file(fs, name);
1479 	if (f == NULL) {
1480 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1481 		cb_fn(cb_arg, -ENOENT);
1482 		return;
1483 	}
1484 
1485 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1486 	if (req == NULL) {
1487 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1488 		cb_fn(cb_arg, -ENOMEM);
1489 		return;
1490 	}
1491 
1492 	args = &req->args;
1493 	args->fn.file_op = cb_fn;
1494 	args->arg = cb_arg;
1495 
1496 	if (f->ref_count > 0) {
1497 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1498 		f->is_deleted = true;
1499 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1500 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1501 		return;
1502 	}
1503 
1504 	blobid = f->blobid;
1505 	TAILQ_REMOVE(&fs->files, f, tailq);
1506 
1507 	file_free(f);
1508 
1509 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1510 }
1511 
1512 static void
1513 __fs_delete_file_done(void *arg, int fserrno)
1514 {
1515 	struct spdk_fs_request *req = arg;
1516 	struct spdk_fs_cb_args *args = &req->args;
1517 
1518 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name);
1519 	__wake_caller(args, fserrno);
1520 }
1521 
1522 static void
1523 __fs_delete_file(void *arg)
1524 {
1525 	struct spdk_fs_request *req = arg;
1526 	struct spdk_fs_cb_args *args = &req->args;
1527 
1528 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name);
1529 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1530 }
1531 
1532 int
1533 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1534 		    const char *name)
1535 {
1536 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1537 	struct spdk_fs_request *req;
1538 	struct spdk_fs_cb_args *args;
1539 	int rc;
1540 
1541 	req = alloc_fs_request(channel);
1542 	if (req == NULL) {
1543 		SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name);
1544 		return -ENOMEM;
1545 	}
1546 
1547 	args = &req->args;
1548 	args->fs = fs;
1549 	args->op.delete.name = name;
1550 	args->sem = &channel->sem;
1551 	fs->send_request(__fs_delete_file, req);
1552 	sem_wait(&channel->sem);
1553 	rc = args->rc;
1554 	free_fs_request(req);
1555 
1556 	return rc;
1557 }
1558 
1559 spdk_fs_iter
1560 spdk_fs_iter_first(struct spdk_filesystem *fs)
1561 {
1562 	struct spdk_file *f;
1563 
1564 	f = TAILQ_FIRST(&fs->files);
1565 	return f;
1566 }
1567 
1568 spdk_fs_iter
1569 spdk_fs_iter_next(spdk_fs_iter iter)
1570 {
1571 	struct spdk_file *f = iter;
1572 
1573 	if (f == NULL) {
1574 		return NULL;
1575 	}
1576 
1577 	f = TAILQ_NEXT(f, tailq);
1578 	return f;
1579 }
1580 
1581 const char *
1582 spdk_file_get_name(struct spdk_file *file)
1583 {
1584 	return file->name;
1585 }
1586 
1587 uint64_t
1588 spdk_file_get_length(struct spdk_file *file)
1589 {
1590 	uint64_t length;
1591 
1592 	assert(file != NULL);
1593 
1594 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1595 	SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length);
1596 	return length;
1597 }
1598 
1599 static void
1600 fs_truncate_complete_cb(void *ctx, int bserrno)
1601 {
1602 	struct spdk_fs_request *req = ctx;
1603 	struct spdk_fs_cb_args *args = &req->args;
1604 
1605 	args->fn.file_op(args->arg, bserrno);
1606 	free_fs_request(req);
1607 }
1608 
1609 static void
1610 fs_truncate_resize_cb(void *ctx, int bserrno)
1611 {
1612 	struct spdk_fs_request *req = ctx;
1613 	struct spdk_fs_cb_args *args = &req->args;
1614 	struct spdk_file *file = args->file;
1615 	uint64_t *length = &args->op.truncate.length;
1616 
1617 	if (bserrno) {
1618 		args->fn.file_op(args->arg, bserrno);
1619 		free_fs_request(req);
1620 		return;
1621 	}
1622 
1623 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1624 
1625 	file->length = *length;
1626 	if (file->append_pos > file->length) {
1627 		file->append_pos = file->length;
1628 	}
1629 
1630 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1631 }
1632 
1633 static uint64_t
1634 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1635 {
1636 	return (length + cluster_sz - 1) / cluster_sz;
1637 }
1638 
1639 void
1640 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1641 			 spdk_file_op_complete cb_fn, void *cb_arg)
1642 {
1643 	struct spdk_filesystem *fs;
1644 	size_t num_clusters;
1645 	struct spdk_fs_request *req;
1646 	struct spdk_fs_cb_args *args;
1647 
1648 	SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1649 	if (length == file->length) {
1650 		cb_fn(cb_arg, 0);
1651 		return;
1652 	}
1653 
1654 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1655 	if (req == NULL) {
1656 		cb_fn(cb_arg, -ENOMEM);
1657 		return;
1658 	}
1659 
1660 	args = &req->args;
1661 	args->fn.file_op = cb_fn;
1662 	args->arg = cb_arg;
1663 	args->file = file;
1664 	args->op.truncate.length = length;
1665 	fs = file->fs;
1666 
1667 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1668 
1669 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1670 }
1671 
1672 static void
1673 __truncate(void *arg)
1674 {
1675 	struct spdk_fs_request *req = arg;
1676 	struct spdk_fs_cb_args *args = &req->args;
1677 
1678 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1679 				 args->fn.file_op, args);
1680 }
1681 
1682 int
1683 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1684 		   uint64_t length)
1685 {
1686 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1687 	struct spdk_fs_request *req;
1688 	struct spdk_fs_cb_args *args;
1689 	int rc;
1690 
1691 	req = alloc_fs_request(channel);
1692 	if (req == NULL) {
1693 		return -ENOMEM;
1694 	}
1695 
1696 	args = &req->args;
1697 
1698 	args->file = file;
1699 	args->op.truncate.length = length;
1700 	args->fn.file_op = __wake_caller;
1701 	args->sem = &channel->sem;
1702 
1703 	channel->send_request(__truncate, req);
1704 	sem_wait(&channel->sem);
1705 	rc = args->rc;
1706 	free_fs_request(req);
1707 
1708 	return rc;
1709 }
1710 
1711 static void
1712 __rw_done(void *ctx, int bserrno)
1713 {
1714 	struct spdk_fs_request *req = ctx;
1715 	struct spdk_fs_cb_args *args = &req->args;
1716 
1717 	spdk_free(args->op.rw.pin_buf);
1718 	args->fn.file_op(args->arg, bserrno);
1719 	free_fs_request(req);
1720 }
1721 
1722 static void
1723 __read_done(void *ctx, int bserrno)
1724 {
1725 	struct spdk_fs_request *req = ctx;
1726 	struct spdk_fs_cb_args *args = &req->args;
1727 	void *buf;
1728 
1729 	if (bserrno) {
1730 		__rw_done(req, bserrno);
1731 		return;
1732 	}
1733 
1734 	assert(req != NULL);
1735 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1736 	if (args->op.rw.is_read) {
1737 		spdk_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1738 		__rw_done(req, 0);
1739 	} else {
1740 		spdk_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1741 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1742 				   args->op.rw.pin_buf,
1743 				   args->op.rw.start_lba, args->op.rw.num_lba,
1744 				   __rw_done, req);
1745 	}
1746 }
1747 
1748 static void
1749 __do_blob_read(void *ctx, int fserrno)
1750 {
1751 	struct spdk_fs_request *req = ctx;
1752 	struct spdk_fs_cb_args *args = &req->args;
1753 
1754 	if (fserrno) {
1755 		__rw_done(req, fserrno);
1756 		return;
1757 	}
1758 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1759 			  args->op.rw.pin_buf,
1760 			  args->op.rw.start_lba, args->op.rw.num_lba,
1761 			  __read_done, req);
1762 }
1763 
1764 static void
1765 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1766 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1767 {
1768 	uint64_t end_lba;
1769 
1770 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1771 	*start_lba = offset / *lba_size;
1772 	end_lba = (offset + length - 1) / *lba_size;
1773 	*num_lba = (end_lba - *start_lba + 1);
1774 }
1775 
1776 static bool
1777 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1778 {
1779 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1780 
1781 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1782 		return true;
1783 	}
1784 
1785 	return false;
1786 }
1787 
1788 static void
1789 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1790 {
1791 	uint32_t i;
1792 
1793 	for (i = 0; i < iovcnt; i++) {
1794 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1795 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1796 	}
1797 }
1798 
1799 static void
1800 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1801 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1802 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1803 {
1804 	struct spdk_fs_request *req;
1805 	struct spdk_fs_cb_args *args;
1806 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1807 	uint64_t start_lba, num_lba, pin_buf_length;
1808 	uint32_t lba_size;
1809 
1810 	if (is_read && offset + length > file->length) {
1811 		cb_fn(cb_arg, -EINVAL);
1812 		return;
1813 	}
1814 
1815 	req = alloc_fs_request_with_iov(channel, iovcnt);
1816 	if (req == NULL) {
1817 		cb_fn(cb_arg, -ENOMEM);
1818 		return;
1819 	}
1820 
1821 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1822 
1823 	args = &req->args;
1824 	args->fn.file_op = cb_fn;
1825 	args->arg = cb_arg;
1826 	args->file = file;
1827 	args->op.rw.channel = channel->bs_channel;
1828 	_fs_request_setup_iovs(req, iovs, iovcnt);
1829 	args->op.rw.is_read = is_read;
1830 	args->op.rw.offset = offset;
1831 	args->op.rw.blocklen = lba_size;
1832 
1833 	pin_buf_length = num_lba * lba_size;
1834 	args->op.rw.length = pin_buf_length;
1835 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1836 					  SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
1837 	if (args->op.rw.pin_buf == NULL) {
1838 		SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1839 			      file->name, offset, length);
1840 		free_fs_request(req);
1841 		cb_fn(cb_arg, -ENOMEM);
1842 		return;
1843 	}
1844 
1845 	args->op.rw.start_lba = start_lba;
1846 	args->op.rw.num_lba = num_lba;
1847 
1848 	if (!is_read && file->length < offset + length) {
1849 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1850 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1851 		spdk_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1852 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1853 				   args->op.rw.pin_buf,
1854 				   args->op.rw.start_lba, args->op.rw.num_lba,
1855 				   __rw_done, req);
1856 	} else {
1857 		__do_blob_read(req, 0);
1858 	}
1859 }
1860 
1861 static void
1862 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1863 	    void *payload, uint64_t offset, uint64_t length,
1864 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1865 {
1866 	struct iovec iov;
1867 
1868 	iov.iov_base = payload;
1869 	iov.iov_len = (size_t)length;
1870 
1871 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1872 }
1873 
1874 void
1875 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1876 		      void *payload, uint64_t offset, uint64_t length,
1877 		      spdk_file_op_complete cb_fn, void *cb_arg)
1878 {
1879 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1880 }
1881 
1882 void
1883 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1884 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1885 		       spdk_file_op_complete cb_fn, void *cb_arg)
1886 {
1887 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1888 		      file->name, offset, length);
1889 
1890 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1891 }
1892 
1893 void
1894 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1895 		     void *payload, uint64_t offset, uint64_t length,
1896 		     spdk_file_op_complete cb_fn, void *cb_arg)
1897 {
1898 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1899 		      file->name, offset, length);
1900 
1901 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1902 }
1903 
1904 void
1905 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1906 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1907 		      spdk_file_op_complete cb_fn, void *cb_arg)
1908 {
1909 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1910 		      file->name, offset, length);
1911 
1912 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1913 }
1914 
1915 struct spdk_io_channel *
1916 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1917 {
1918 	struct spdk_io_channel *io_channel;
1919 	struct spdk_fs_channel *fs_channel;
1920 
1921 	io_channel = spdk_get_io_channel(&fs->io_target);
1922 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1923 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1924 	fs_channel->send_request = __send_request_direct;
1925 
1926 	return io_channel;
1927 }
1928 
1929 void
1930 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1931 {
1932 	spdk_put_io_channel(channel);
1933 }
1934 
1935 struct spdk_fs_thread_ctx *
1936 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1937 {
1938 	struct spdk_fs_thread_ctx *ctx;
1939 
1940 	ctx = calloc(1, sizeof(*ctx));
1941 	if (!ctx) {
1942 		return NULL;
1943 	}
1944 
1945 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
1946 		free(ctx);
1947 		return NULL;
1948 	}
1949 
1950 	fs_channel_create(fs, &ctx->ch, 512);
1951 
1952 	ctx->ch.send_request = fs->send_request;
1953 	ctx->ch.sync = 1;
1954 
1955 	return ctx;
1956 }
1957 
1958 
1959 void
1960 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1961 {
1962 	assert(ctx->ch.sync == 1);
1963 
1964 	while (true) {
1965 		pthread_spin_lock(&ctx->ch.lock);
1966 		if (ctx->ch.outstanding_reqs == 0) {
1967 			pthread_spin_unlock(&ctx->ch.lock);
1968 			break;
1969 		}
1970 		pthread_spin_unlock(&ctx->ch.lock);
1971 		usleep(1000);
1972 	}
1973 
1974 	fs_channel_destroy(NULL, &ctx->ch);
1975 	free(ctx);
1976 }
1977 
1978 int
1979 spdk_fs_set_cache_size(uint64_t size_in_mb)
1980 {
1981 	/* setting g_fs_cache_size is only permitted if cache pool
1982 	 * is already freed or hasn't been initialized
1983 	 */
1984 	if (g_cache_pool != NULL) {
1985 		return -EPERM;
1986 	}
1987 
1988 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1989 
1990 	return 0;
1991 }
1992 
1993 uint64_t
1994 spdk_fs_get_cache_size(void)
1995 {
1996 	return g_fs_cache_size / (1024 * 1024);
1997 }
1998 
1999 static void __file_flush(void *ctx);
2000 
2001 /* Try to free some cache buffers from this file.
2002  */
2003 static int
2004 reclaim_cache_buffers(struct spdk_file *file)
2005 {
2006 	int rc;
2007 
2008 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2009 
2010 	/* The function is safe to be called with any threads, while the file
2011 	 * lock maybe locked by other thread for now, so try to get the file
2012 	 * lock here.
2013 	 */
2014 	rc = pthread_spin_trylock(&file->lock);
2015 	if (rc != 0) {
2016 		return -1;
2017 	}
2018 
2019 	if (file->tree->present_mask == 0) {
2020 		pthread_spin_unlock(&file->lock);
2021 		return -1;
2022 	}
2023 	tree_free_buffers(file->tree);
2024 
2025 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2026 	/* If not freed, put it in the end of the queue */
2027 	if (file->tree->present_mask != 0) {
2028 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2029 	}
2030 
2031 	/* tree_free_buffers() may have freed the buffer pointed to by file->last.
2032 	 * So check if current append_pos is still in the cache, and if not, clear
2033 	 * file->last.
2034 	 */
2035 	if (tree_find_buffer(file->tree, file->append_pos) == NULL) {
2036 		file->last = NULL;
2037 	}
2038 
2039 	pthread_spin_unlock(&file->lock);
2040 
2041 	return 0;
2042 }
2043 
2044 static int
2045 _blobfs_cache_pool_reclaim(void *arg)
2046 {
2047 	struct spdk_file *file, *tmp;
2048 	int rc;
2049 
2050 	if (!blobfs_cache_pool_need_reclaim()) {
2051 		return SPDK_POLLER_IDLE;
2052 	}
2053 
2054 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2055 		if (!file->open_for_writing &&
2056 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2057 			rc = reclaim_cache_buffers(file);
2058 			if (rc < 0) {
2059 				continue;
2060 			}
2061 			if (!blobfs_cache_pool_need_reclaim()) {
2062 				return SPDK_POLLER_BUSY;
2063 			}
2064 			break;
2065 		}
2066 	}
2067 
2068 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2069 		if (!file->open_for_writing) {
2070 			rc = reclaim_cache_buffers(file);
2071 			if (rc < 0) {
2072 				continue;
2073 			}
2074 			if (!blobfs_cache_pool_need_reclaim()) {
2075 				return SPDK_POLLER_BUSY;
2076 			}
2077 			break;
2078 		}
2079 	}
2080 
2081 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2082 		rc = reclaim_cache_buffers(file);
2083 		if (rc < 0) {
2084 			continue;
2085 		}
2086 		break;
2087 	}
2088 
2089 	return SPDK_POLLER_BUSY;
2090 }
2091 
2092 static void
2093 _add_file_to_cache_pool(void *ctx)
2094 {
2095 	struct spdk_file *file = ctx;
2096 
2097 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2098 }
2099 
2100 static void
2101 _remove_file_from_cache_pool(void *ctx)
2102 {
2103 	struct spdk_file *file = ctx;
2104 
2105 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2106 }
2107 
2108 static struct cache_buffer *
2109 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2110 {
2111 	struct cache_buffer *buf;
2112 	int count = 0;
2113 	bool need_update = false;
2114 
2115 	buf = calloc(1, sizeof(*buf));
2116 	if (buf == NULL) {
2117 		SPDK_DEBUGLOG(blobfs, "calloc failed\n");
2118 		return NULL;
2119 	}
2120 
2121 	do {
2122 		buf->buf = spdk_mempool_get(g_cache_pool);
2123 		if (buf->buf) {
2124 			break;
2125 		}
2126 		if (count++ == 100) {
2127 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2128 				    file, offset);
2129 			free(buf);
2130 			return NULL;
2131 		}
2132 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2133 	} while (true);
2134 
2135 	buf->buf_size = CACHE_BUFFER_SIZE;
2136 	buf->offset = offset;
2137 
2138 	if (file->tree->present_mask == 0) {
2139 		need_update = true;
2140 	}
2141 	file->tree = tree_insert_buffer(file->tree, buf);
2142 
2143 	if (need_update) {
2144 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2145 	}
2146 
2147 	return buf;
2148 }
2149 
2150 static struct cache_buffer *
2151 cache_append_buffer(struct spdk_file *file)
2152 {
2153 	struct cache_buffer *last;
2154 
2155 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2156 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2157 
2158 	last = cache_insert_buffer(file, file->append_pos);
2159 	if (last == NULL) {
2160 		SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n");
2161 		return NULL;
2162 	}
2163 
2164 	file->last = last;
2165 
2166 	return last;
2167 }
2168 
2169 static void __check_sync_reqs(struct spdk_file *file);
2170 
2171 static void
2172 __file_cache_finish_sync(void *ctx, int bserrno)
2173 {
2174 	struct spdk_file *file;
2175 	struct spdk_fs_request *sync_req = ctx;
2176 	struct spdk_fs_cb_args *sync_args;
2177 
2178 	sync_args = &sync_req->args;
2179 	file = sync_args->file;
2180 	pthread_spin_lock(&file->lock);
2181 	file->length_xattr = sync_args->op.sync.length;
2182 	assert(sync_args->op.sync.offset <= file->length_flushed);
2183 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2184 			  0, file->name);
2185 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2186 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2187 	pthread_spin_unlock(&file->lock);
2188 
2189 	sync_args->fn.file_op(sync_args->arg, bserrno);
2190 
2191 	free_fs_request(sync_req);
2192 	__check_sync_reqs(file);
2193 }
2194 
2195 static void
2196 __check_sync_reqs(struct spdk_file *file)
2197 {
2198 	struct spdk_fs_request *sync_req;
2199 
2200 	pthread_spin_lock(&file->lock);
2201 
2202 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2203 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2204 			break;
2205 		}
2206 	}
2207 
2208 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2209 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2210 		sync_req->args.op.sync.xattr_in_progress = true;
2211 		sync_req->args.op.sync.length = file->length_flushed;
2212 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2213 				    sizeof(file->length_flushed));
2214 
2215 		pthread_spin_unlock(&file->lock);
2216 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2217 				  0, file->name);
2218 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2219 	} else {
2220 		pthread_spin_unlock(&file->lock);
2221 	}
2222 }
2223 
2224 static void
2225 __file_flush_done(void *ctx, int bserrno)
2226 {
2227 	struct spdk_fs_request *req = ctx;
2228 	struct spdk_fs_cb_args *args = &req->args;
2229 	struct spdk_file *file = args->file;
2230 	struct cache_buffer *next = args->op.flush.cache_buffer;
2231 
2232 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2233 
2234 	pthread_spin_lock(&file->lock);
2235 	next->in_progress = false;
2236 	next->bytes_flushed += args->op.flush.length;
2237 	file->length_flushed += args->op.flush.length;
2238 	if (file->length_flushed > file->length) {
2239 		file->length = file->length_flushed;
2240 	}
2241 	if (next->bytes_flushed == next->buf_size) {
2242 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2243 		next = tree_find_buffer(file->tree, file->length_flushed);
2244 	}
2245 
2246 	/*
2247 	 * Assert that there is no cached data that extends past the end of the underlying
2248 	 *  blob.
2249 	 */
2250 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2251 	       next->bytes_filled == 0);
2252 
2253 	pthread_spin_unlock(&file->lock);
2254 
2255 	__check_sync_reqs(file);
2256 
2257 	__file_flush(req);
2258 }
2259 
2260 static void
2261 __file_flush(void *ctx)
2262 {
2263 	struct spdk_fs_request *req = ctx;
2264 	struct spdk_fs_cb_args *args = &req->args;
2265 	struct spdk_file *file = args->file;
2266 	struct cache_buffer *next;
2267 	uint64_t offset, length, start_lba, num_lba;
2268 	uint32_t lba_size;
2269 
2270 	pthread_spin_lock(&file->lock);
2271 	next = tree_find_buffer(file->tree, file->length_flushed);
2272 	if (next == NULL || next->in_progress ||
2273 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2274 		/*
2275 		 * There is either no data to flush, a flush I/O is already in
2276 		 *  progress, or the next buffer is partially filled but there's no
2277 		 *  outstanding request to sync it.
2278 		 * So return immediately - if a flush I/O is in progress we will flush
2279 		 *  more data after that is completed, or a partial buffer will get flushed
2280 		 *  when it is either filled or the file is synced.
2281 		 */
2282 		free_fs_request(req);
2283 		if (next == NULL) {
2284 			/*
2285 			 * For cases where a file's cache was evicted, and then the
2286 			 *  file was later appended, we will write the data directly
2287 			 *  to disk and bypass cache.  So just update length_flushed
2288 			 *  here to reflect that all data was already written to disk.
2289 			 */
2290 			file->length_flushed = file->append_pos;
2291 		}
2292 		pthread_spin_unlock(&file->lock);
2293 		if (next == NULL) {
2294 			/*
2295 			 * There is no data to flush, but we still need to check for any
2296 			 *  outstanding sync requests to make sure metadata gets updated.
2297 			 */
2298 			__check_sync_reqs(file);
2299 		}
2300 		return;
2301 	}
2302 
2303 	offset = next->offset + next->bytes_flushed;
2304 	length = next->bytes_filled - next->bytes_flushed;
2305 	if (length == 0) {
2306 		free_fs_request(req);
2307 		pthread_spin_unlock(&file->lock);
2308 		/*
2309 		 * There is no data to flush, but we still need to check for any
2310 		 *  outstanding sync requests to make sure metadata gets updated.
2311 		 */
2312 		__check_sync_reqs(file);
2313 		return;
2314 	}
2315 	args->op.flush.length = length;
2316 	args->op.flush.cache_buffer = next;
2317 
2318 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2319 
2320 	next->in_progress = true;
2321 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2322 		     offset, length, start_lba, num_lba);
2323 	pthread_spin_unlock(&file->lock);
2324 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2325 			   next->buf + (start_lba * lba_size) - next->offset,
2326 			   start_lba, num_lba, __file_flush_done, req);
2327 }
2328 
2329 static void
2330 __file_extend_done(void *arg, int bserrno)
2331 {
2332 	struct spdk_fs_cb_args *args = arg;
2333 
2334 	__wake_caller(args, bserrno);
2335 }
2336 
2337 static void
2338 __file_extend_resize_cb(void *_args, int bserrno)
2339 {
2340 	struct spdk_fs_cb_args *args = _args;
2341 	struct spdk_file *file = args->file;
2342 
2343 	if (bserrno) {
2344 		__wake_caller(args, bserrno);
2345 		return;
2346 	}
2347 
2348 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2349 }
2350 
2351 static void
2352 __file_extend_blob(void *_args)
2353 {
2354 	struct spdk_fs_cb_args *args = _args;
2355 	struct spdk_file *file = args->file;
2356 
2357 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2358 }
2359 
2360 static void
2361 __rw_from_file_done(void *ctx, int bserrno)
2362 {
2363 	struct spdk_fs_request *req = ctx;
2364 
2365 	__wake_caller(&req->args, bserrno);
2366 	free_fs_request(req);
2367 }
2368 
2369 static void
2370 __rw_from_file(void *ctx)
2371 {
2372 	struct spdk_fs_request *req = ctx;
2373 	struct spdk_fs_cb_args *args = &req->args;
2374 	struct spdk_file *file = args->file;
2375 
2376 	if (args->op.rw.is_read) {
2377 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2378 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2379 				     __rw_from_file_done, req);
2380 	} else {
2381 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2382 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2383 				      __rw_from_file_done, req);
2384 	}
2385 }
2386 
2387 struct rw_from_file_arg {
2388 	struct spdk_fs_channel *channel;
2389 	int rwerrno;
2390 };
2391 
2392 static int
2393 __send_rw_from_file(struct spdk_file *file, void *payload,
2394 		    uint64_t offset, uint64_t length, bool is_read,
2395 		    struct rw_from_file_arg *arg)
2396 {
2397 	struct spdk_fs_request *req;
2398 	struct spdk_fs_cb_args *args;
2399 
2400 	req = alloc_fs_request_with_iov(arg->channel, 1);
2401 	if (req == NULL) {
2402 		sem_post(&arg->channel->sem);
2403 		return -ENOMEM;
2404 	}
2405 
2406 	args = &req->args;
2407 	args->file = file;
2408 	args->sem = &arg->channel->sem;
2409 	args->iovs[0].iov_base = payload;
2410 	args->iovs[0].iov_len = (size_t)length;
2411 	args->op.rw.offset = offset;
2412 	args->op.rw.is_read = is_read;
2413 	args->rwerrno = &arg->rwerrno;
2414 	file->fs->send_request(__rw_from_file, req);
2415 	return 0;
2416 }
2417 
2418 int
2419 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2420 		void *payload, uint64_t offset, uint64_t length)
2421 {
2422 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2423 	struct spdk_fs_request *flush_req;
2424 	uint64_t rem_length, copy, blob_size, cluster_sz;
2425 	uint32_t cache_buffers_filled = 0;
2426 	uint8_t *cur_payload;
2427 	struct cache_buffer *last;
2428 
2429 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2430 
2431 	if (length == 0) {
2432 		return 0;
2433 	}
2434 
2435 	if (offset != file->append_pos) {
2436 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2437 		return -EINVAL;
2438 	}
2439 
2440 	pthread_spin_lock(&file->lock);
2441 	file->open_for_writing = true;
2442 
2443 	do {
2444 		if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2445 			cache_append_buffer(file);
2446 		}
2447 
2448 		if (file->last == NULL) {
2449 			struct rw_from_file_arg arg = {};
2450 			int rc;
2451 
2452 			arg.channel = channel;
2453 			arg.rwerrno = 0;
2454 			file->append_pos += length;
2455 			pthread_spin_unlock(&file->lock);
2456 			rc = __send_rw_from_file(file, payload, offset, length, false, &arg);
2457 			if (rc != 0) {
2458 				return rc;
2459 			}
2460 			sem_wait(&channel->sem);
2461 			return arg.rwerrno;
2462 		}
2463 
2464 		blob_size = __file_get_blob_size(file);
2465 
2466 		if ((offset + length) > blob_size) {
2467 			struct spdk_fs_cb_args extend_args = {};
2468 
2469 			cluster_sz = file->fs->bs_opts.cluster_sz;
2470 			extend_args.sem = &channel->sem;
2471 			extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2472 			extend_args.file = file;
2473 			BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2474 			pthread_spin_unlock(&file->lock);
2475 			file->fs->send_request(__file_extend_blob, &extend_args);
2476 			sem_wait(&channel->sem);
2477 			if (extend_args.rc) {
2478 				return extend_args.rc;
2479 			}
2480 			pthread_spin_lock(&file->lock);
2481 		}
2482 	} while (file->last == NULL);
2483 
2484 	flush_req = alloc_fs_request(channel);
2485 	if (flush_req == NULL) {
2486 		pthread_spin_unlock(&file->lock);
2487 		return -ENOMEM;
2488 	}
2489 
2490 	last = file->last;
2491 	rem_length = length;
2492 	cur_payload = payload;
2493 	while (rem_length > 0) {
2494 		copy = last->buf_size - last->bytes_filled;
2495 		if (copy > rem_length) {
2496 			copy = rem_length;
2497 		}
2498 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2499 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2500 		file->append_pos += copy;
2501 		if (file->length < file->append_pos) {
2502 			file->length = file->append_pos;
2503 		}
2504 		cur_payload += copy;
2505 		last->bytes_filled += copy;
2506 		rem_length -= copy;
2507 		if (last->bytes_filled == last->buf_size) {
2508 			cache_buffers_filled++;
2509 			last = cache_append_buffer(file);
2510 			if (last == NULL) {
2511 				BLOBFS_TRACE(file, "nomem\n");
2512 				free_fs_request(flush_req);
2513 				pthread_spin_unlock(&file->lock);
2514 				return -ENOMEM;
2515 			}
2516 		}
2517 	}
2518 
2519 	pthread_spin_unlock(&file->lock);
2520 
2521 	if (cache_buffers_filled == 0) {
2522 		free_fs_request(flush_req);
2523 		return 0;
2524 	}
2525 
2526 	flush_req->args.file = file;
2527 	file->fs->send_request(__file_flush, flush_req);
2528 	return 0;
2529 }
2530 
2531 static void
2532 __readahead_done(void *ctx, int bserrno)
2533 {
2534 	struct spdk_fs_request *req = ctx;
2535 	struct spdk_fs_cb_args *args = &req->args;
2536 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2537 	struct spdk_file *file = args->file;
2538 
2539 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2540 
2541 	pthread_spin_lock(&file->lock);
2542 	cache_buffer->bytes_filled = args->op.readahead.length;
2543 	cache_buffer->bytes_flushed = args->op.readahead.length;
2544 	cache_buffer->in_progress = false;
2545 	pthread_spin_unlock(&file->lock);
2546 
2547 	free_fs_request(req);
2548 }
2549 
2550 static void
2551 __readahead(void *ctx)
2552 {
2553 	struct spdk_fs_request *req = ctx;
2554 	struct spdk_fs_cb_args *args = &req->args;
2555 	struct spdk_file *file = args->file;
2556 	uint64_t offset, length, start_lba, num_lba;
2557 	uint32_t lba_size;
2558 
2559 	offset = args->op.readahead.offset;
2560 	length = args->op.readahead.length;
2561 	assert(length > 0);
2562 
2563 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2564 
2565 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2566 		     offset, length, start_lba, num_lba);
2567 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2568 			  args->op.readahead.cache_buffer->buf,
2569 			  start_lba, num_lba, __readahead_done, req);
2570 }
2571 
2572 static uint64_t
2573 __next_cache_buffer_offset(uint64_t offset)
2574 {
2575 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2576 }
2577 
2578 static void
2579 check_readahead(struct spdk_file *file, uint64_t offset,
2580 		struct spdk_fs_channel *channel)
2581 {
2582 	struct spdk_fs_request *req;
2583 	struct spdk_fs_cb_args *args;
2584 
2585 	offset = __next_cache_buffer_offset(offset);
2586 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2587 		return;
2588 	}
2589 
2590 	req = alloc_fs_request(channel);
2591 	if (req == NULL) {
2592 		return;
2593 	}
2594 	args = &req->args;
2595 
2596 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2597 
2598 	args->file = file;
2599 	args->op.readahead.offset = offset;
2600 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2601 	if (!args->op.readahead.cache_buffer) {
2602 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2603 		free_fs_request(req);
2604 		return;
2605 	}
2606 
2607 	args->op.readahead.cache_buffer->in_progress = true;
2608 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2609 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2610 	} else {
2611 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2612 	}
2613 	file->fs->send_request(__readahead, req);
2614 }
2615 
2616 int64_t
2617 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2618 	       void *payload, uint64_t offset, uint64_t length)
2619 {
2620 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2621 	uint64_t final_offset, final_length;
2622 	uint32_t sub_reads = 0;
2623 	struct cache_buffer *buf;
2624 	uint64_t read_len;
2625 	struct rw_from_file_arg arg = {};
2626 
2627 	pthread_spin_lock(&file->lock);
2628 
2629 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2630 
2631 	file->open_for_writing = false;
2632 
2633 	if (length == 0 || offset >= file->append_pos) {
2634 		pthread_spin_unlock(&file->lock);
2635 		return 0;
2636 	}
2637 
2638 	if (offset + length > file->append_pos) {
2639 		length = file->append_pos - offset;
2640 	}
2641 
2642 	if (offset != file->next_seq_offset) {
2643 		file->seq_byte_count = 0;
2644 	}
2645 	file->seq_byte_count += length;
2646 	file->next_seq_offset = offset + length;
2647 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2648 		check_readahead(file, offset, channel);
2649 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2650 	}
2651 
2652 	arg.channel = channel;
2653 	arg.rwerrno = 0;
2654 	final_length = 0;
2655 	final_offset = offset + length;
2656 	while (offset < final_offset) {
2657 		int ret = 0;
2658 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2659 		if (length > (final_offset - offset)) {
2660 			length = final_offset - offset;
2661 		}
2662 
2663 		buf = tree_find_filled_buffer(file->tree, offset);
2664 		if (buf == NULL) {
2665 			pthread_spin_unlock(&file->lock);
2666 			ret = __send_rw_from_file(file, payload, offset, length, true, &arg);
2667 			pthread_spin_lock(&file->lock);
2668 			if (ret == 0) {
2669 				sub_reads++;
2670 			}
2671 		} else {
2672 			read_len = length;
2673 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2674 				read_len = buf->offset + buf->bytes_filled - offset;
2675 			}
2676 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2677 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2678 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2679 				tree_remove_buffer(file->tree, buf);
2680 				if (file->tree->present_mask == 0) {
2681 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2682 				}
2683 			}
2684 		}
2685 
2686 		if (ret == 0) {
2687 			final_length += length;
2688 		} else {
2689 			arg.rwerrno = ret;
2690 			break;
2691 		}
2692 		payload += length;
2693 		offset += length;
2694 	}
2695 	pthread_spin_unlock(&file->lock);
2696 	while (sub_reads > 0) {
2697 		sem_wait(&channel->sem);
2698 		sub_reads--;
2699 	}
2700 	if (arg.rwerrno == 0) {
2701 		return final_length;
2702 	} else {
2703 		return arg.rwerrno;
2704 	}
2705 }
2706 
2707 static void
2708 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2709 	   spdk_file_op_complete cb_fn, void *cb_arg)
2710 {
2711 	struct spdk_fs_request *sync_req;
2712 	struct spdk_fs_request *flush_req;
2713 	struct spdk_fs_cb_args *sync_args;
2714 	struct spdk_fs_cb_args *flush_args;
2715 
2716 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2717 
2718 	pthread_spin_lock(&file->lock);
2719 	if (file->append_pos <= file->length_xattr) {
2720 		BLOBFS_TRACE(file, "done - file already synced\n");
2721 		pthread_spin_unlock(&file->lock);
2722 		cb_fn(cb_arg, 0);
2723 		return;
2724 	}
2725 
2726 	sync_req = alloc_fs_request(channel);
2727 	if (!sync_req) {
2728 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2729 		pthread_spin_unlock(&file->lock);
2730 		cb_fn(cb_arg, -ENOMEM);
2731 		return;
2732 	}
2733 	sync_args = &sync_req->args;
2734 
2735 	flush_req = alloc_fs_request(channel);
2736 	if (!flush_req) {
2737 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2738 		free_fs_request(sync_req);
2739 		pthread_spin_unlock(&file->lock);
2740 		cb_fn(cb_arg, -ENOMEM);
2741 		return;
2742 	}
2743 	flush_args = &flush_req->args;
2744 
2745 	sync_args->file = file;
2746 	sync_args->fn.file_op = cb_fn;
2747 	sync_args->arg = cb_arg;
2748 	sync_args->op.sync.offset = file->append_pos;
2749 	sync_args->op.sync.xattr_in_progress = false;
2750 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2751 	pthread_spin_unlock(&file->lock);
2752 
2753 	flush_args->file = file;
2754 	channel->send_request(__file_flush, flush_req);
2755 }
2756 
2757 int
2758 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2759 {
2760 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2761 	struct spdk_fs_cb_args args = {};
2762 
2763 	args.sem = &channel->sem;
2764 	_file_sync(file, channel, __wake_caller, &args);
2765 	sem_wait(&channel->sem);
2766 
2767 	return args.rc;
2768 }
2769 
2770 void
2771 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2772 		     spdk_file_op_complete cb_fn, void *cb_arg)
2773 {
2774 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2775 
2776 	_file_sync(file, channel, cb_fn, cb_arg);
2777 }
2778 
2779 void
2780 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2781 {
2782 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2783 	file->priority = priority;
2784 
2785 }
2786 
2787 /*
2788  * Close routines
2789  */
2790 
2791 static void
2792 __file_close_async_done(void *ctx, int bserrno)
2793 {
2794 	struct spdk_fs_request *req = ctx;
2795 	struct spdk_fs_cb_args *args = &req->args;
2796 	struct spdk_file *file = args->file;
2797 
2798 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name);
2799 
2800 	if (file->is_deleted) {
2801 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2802 		return;
2803 	}
2804 
2805 	args->fn.file_op(args->arg, bserrno);
2806 	free_fs_request(req);
2807 }
2808 
2809 static void
2810 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2811 {
2812 	struct spdk_blob *blob;
2813 
2814 	pthread_spin_lock(&file->lock);
2815 	if (file->ref_count == 0) {
2816 		pthread_spin_unlock(&file->lock);
2817 		__file_close_async_done(req, -EBADF);
2818 		return;
2819 	}
2820 
2821 	file->ref_count--;
2822 	if (file->ref_count > 0) {
2823 		pthread_spin_unlock(&file->lock);
2824 		req->args.fn.file_op(req->args.arg, 0);
2825 		free_fs_request(req);
2826 		return;
2827 	}
2828 
2829 	pthread_spin_unlock(&file->lock);
2830 
2831 	blob = file->blob;
2832 	file->blob = NULL;
2833 	spdk_blob_close(blob, __file_close_async_done, req);
2834 }
2835 
2836 static void
2837 __file_close_async__sync_done(void *arg, int fserrno)
2838 {
2839 	struct spdk_fs_request *req = arg;
2840 	struct spdk_fs_cb_args *args = &req->args;
2841 
2842 	__file_close_async(args->file, req);
2843 }
2844 
2845 void
2846 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2847 {
2848 	struct spdk_fs_request *req;
2849 	struct spdk_fs_cb_args *args;
2850 
2851 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2852 	if (req == NULL) {
2853 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2854 		cb_fn(cb_arg, -ENOMEM);
2855 		return;
2856 	}
2857 
2858 	args = &req->args;
2859 	args->file = file;
2860 	args->fn.file_op = cb_fn;
2861 	args->arg = cb_arg;
2862 
2863 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2864 }
2865 
2866 static void
2867 __file_close(void *arg)
2868 {
2869 	struct spdk_fs_request *req = arg;
2870 	struct spdk_fs_cb_args *args = &req->args;
2871 	struct spdk_file *file = args->file;
2872 
2873 	__file_close_async(file, req);
2874 }
2875 
2876 int
2877 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2878 {
2879 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2880 	struct spdk_fs_request *req;
2881 	struct spdk_fs_cb_args *args;
2882 
2883 	req = alloc_fs_request(channel);
2884 	if (req == NULL) {
2885 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2886 		return -ENOMEM;
2887 	}
2888 
2889 	args = &req->args;
2890 
2891 	spdk_file_sync(file, ctx);
2892 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2893 	args->file = file;
2894 	args->sem = &channel->sem;
2895 	args->fn.file_op = __wake_caller;
2896 	args->arg = args;
2897 	channel->send_request(__file_close, req);
2898 	sem_wait(&channel->sem);
2899 
2900 	return args->rc;
2901 }
2902 
2903 int
2904 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2905 {
2906 	if (size < sizeof(spdk_blob_id)) {
2907 		return -EINVAL;
2908 	}
2909 
2910 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2911 
2912 	return sizeof(spdk_blob_id);
2913 }
2914 
2915 static void
2916 _file_free(void *ctx)
2917 {
2918 	struct spdk_file *file = ctx;
2919 
2920 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2921 
2922 	free(file->name);
2923 	free(file->tree);
2924 	free(file);
2925 }
2926 
2927 static void
2928 file_free(struct spdk_file *file)
2929 {
2930 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2931 	pthread_spin_lock(&file->lock);
2932 	if (file->tree->present_mask == 0) {
2933 		pthread_spin_unlock(&file->lock);
2934 		free(file->name);
2935 		free(file->tree);
2936 		free(file);
2937 		return;
2938 	}
2939 
2940 	tree_free_buffers(file->tree);
2941 	assert(file->tree->present_mask == 0);
2942 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2943 	pthread_spin_unlock(&file->lock);
2944 }
2945 
2946 SPDK_LOG_REGISTER_COMPONENT(blobfs)
2947 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw)
2948