xref: /spdk/lib/blobfs/blobfs.c (revision 87d5d832e7e05816baea7884cb0cf132ba9c24b9)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "tree.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
58 
59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
60 static struct spdk_mempool *g_cache_pool;
61 static TAILQ_HEAD(, spdk_file) g_caches;
62 static struct spdk_poller *g_cache_pool_mgmt_poller;
63 static struct spdk_thread *g_cache_pool_thread;
64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
65 static int g_fs_count = 0;
66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
67 static pthread_spinlock_t g_caches_lock;
68 
69 #define TRACE_GROUP_BLOBFS	0x7
70 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
71 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
72 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
73 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
74 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
75 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
76 
77 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
78 {
79 	spdk_trace_register_description("BLOBFS_XATTR_START",
80 					TRACE_BLOBFS_XATTR_START,
81 					OWNER_NONE, OBJECT_NONE, 0,
82 					SPDK_TRACE_ARG_TYPE_STR,
83 					"file:    ");
84 	spdk_trace_register_description("BLOBFS_XATTR_END",
85 					TRACE_BLOBFS_XATTR_END,
86 					OWNER_NONE, OBJECT_NONE, 0,
87 					SPDK_TRACE_ARG_TYPE_STR,
88 					"file:    ");
89 	spdk_trace_register_description("BLOBFS_OPEN",
90 					TRACE_BLOBFS_OPEN,
91 					OWNER_NONE, OBJECT_NONE, 0,
92 					SPDK_TRACE_ARG_TYPE_STR,
93 					"file:    ");
94 	spdk_trace_register_description("BLOBFS_CLOSE",
95 					TRACE_BLOBFS_CLOSE,
96 					OWNER_NONE, OBJECT_NONE, 0,
97 					SPDK_TRACE_ARG_TYPE_STR,
98 					"file:    ");
99 	spdk_trace_register_description("BLOBFS_DELETE_START",
100 					TRACE_BLOBFS_DELETE_START,
101 					OWNER_NONE, OBJECT_NONE, 0,
102 					SPDK_TRACE_ARG_TYPE_STR,
103 					"file:    ");
104 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
105 					TRACE_BLOBFS_DELETE_DONE,
106 					OWNER_NONE, OBJECT_NONE, 0,
107 					SPDK_TRACE_ARG_TYPE_STR,
108 					"file:    ");
109 }
110 
111 void
112 cache_buffer_free(struct cache_buffer *cache_buffer)
113 {
114 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
115 	free(cache_buffer);
116 }
117 
118 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
119 
120 struct spdk_file {
121 	struct spdk_filesystem	*fs;
122 	struct spdk_blob	*blob;
123 	char			*name;
124 	uint64_t		trace_arg_name;
125 	uint64_t		length;
126 	bool                    is_deleted;
127 	bool			open_for_writing;
128 	uint64_t		length_flushed;
129 	uint64_t		length_xattr;
130 	uint64_t		append_pos;
131 	uint64_t		seq_byte_count;
132 	uint64_t		next_seq_offset;
133 	uint32_t		priority;
134 	TAILQ_ENTRY(spdk_file)	tailq;
135 	spdk_blob_id		blobid;
136 	uint32_t		ref_count;
137 	pthread_spinlock_t	lock;
138 	struct cache_buffer	*last;
139 	struct cache_tree	*tree;
140 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
141 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
142 	TAILQ_ENTRY(spdk_file)	cache_tailq;
143 };
144 
145 struct spdk_deleted_file {
146 	spdk_blob_id	id;
147 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
148 };
149 
150 struct spdk_filesystem {
151 	struct spdk_blob_store	*bs;
152 	TAILQ_HEAD(, spdk_file)	files;
153 	struct spdk_bs_opts	bs_opts;
154 	struct spdk_bs_dev	*bdev;
155 	fs_send_request_fn	send_request;
156 
157 	struct {
158 		uint32_t		max_ops;
159 		struct spdk_io_channel	*sync_io_channel;
160 		struct spdk_fs_channel	*sync_fs_channel;
161 	} sync_target;
162 
163 	struct {
164 		uint32_t		max_ops;
165 		struct spdk_io_channel	*md_io_channel;
166 		struct spdk_fs_channel	*md_fs_channel;
167 	} md_target;
168 
169 	struct {
170 		uint32_t		max_ops;
171 	} io_target;
172 };
173 
174 struct spdk_fs_cb_args {
175 	union {
176 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
177 		spdk_fs_op_complete			fs_op;
178 		spdk_file_op_with_handle_complete	file_op_with_handle;
179 		spdk_file_op_complete			file_op;
180 		spdk_file_stat_op_complete		stat_op;
181 	} fn;
182 	void *arg;
183 	sem_t *sem;
184 	struct spdk_filesystem *fs;
185 	struct spdk_file *file;
186 	int rc;
187 	struct iovec *iovs;
188 	uint32_t iovcnt;
189 	struct iovec iov;
190 	union {
191 		struct {
192 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
193 		} fs_load;
194 		struct {
195 			uint64_t	length;
196 		} truncate;
197 		struct {
198 			struct spdk_io_channel	*channel;
199 			void		*pin_buf;
200 			int		is_read;
201 			off_t		offset;
202 			size_t		length;
203 			uint64_t	start_lba;
204 			uint64_t	num_lba;
205 			uint32_t	blocklen;
206 		} rw;
207 		struct {
208 			const char	*old_name;
209 			const char	*new_name;
210 		} rename;
211 		struct {
212 			struct cache_buffer	*cache_buffer;
213 			uint64_t		length;
214 		} flush;
215 		struct {
216 			struct cache_buffer	*cache_buffer;
217 			uint64_t		length;
218 			uint64_t		offset;
219 		} readahead;
220 		struct {
221 			/* offset of the file when the sync request was made */
222 			uint64_t			offset;
223 			TAILQ_ENTRY(spdk_fs_request)	tailq;
224 			bool				xattr_in_progress;
225 			/* length written to the xattr for this file - this should
226 			 * always be the same as the offset if only one thread is
227 			 * writing to the file, but could differ if multiple threads
228 			 * are appending
229 			 */
230 			uint64_t			length;
231 		} sync;
232 		struct {
233 			uint32_t			num_clusters;
234 		} resize;
235 		struct {
236 			const char	*name;
237 			uint32_t	flags;
238 			TAILQ_ENTRY(spdk_fs_request)	tailq;
239 		} open;
240 		struct {
241 			const char		*name;
242 			struct spdk_blob	*blob;
243 		} create;
244 		struct {
245 			const char	*name;
246 		} delete;
247 		struct {
248 			const char	*name;
249 		} stat;
250 	} op;
251 };
252 
253 static void file_free(struct spdk_file *file);
254 static void fs_io_device_unregister(struct spdk_filesystem *fs);
255 static void fs_free_io_channels(struct spdk_filesystem *fs);
256 
257 void
258 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
259 {
260 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
261 }
262 
263 static int _blobfs_cache_pool_reclaim(void *arg);
264 
265 static bool
266 blobfs_cache_pool_need_reclaim(void)
267 {
268 	size_t count;
269 
270 	count = spdk_mempool_count(g_cache_pool);
271 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
272 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
273 	 */
274 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
275 		return false;
276 	}
277 
278 	return true;
279 }
280 
281 static void
282 __start_cache_pool_mgmt(void *ctx)
283 {
284 	assert(g_cache_pool == NULL);
285 
286 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
287 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
288 					   CACHE_BUFFER_SIZE,
289 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
290 					   SPDK_ENV_SOCKET_ID_ANY);
291 	if (!g_cache_pool) {
292 		SPDK_ERRLOG("Create mempool failed, you may "
293 			    "increase the memory and try again\n");
294 		assert(false);
295 	}
296 	TAILQ_INIT(&g_caches);
297 	pthread_spin_init(&g_caches_lock, 0);
298 
299 	assert(g_cache_pool_mgmt_poller == NULL);
300 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
301 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
302 }
303 
304 static void
305 __stop_cache_pool_mgmt(void *ctx)
306 {
307 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
308 
309 	assert(g_cache_pool != NULL);
310 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
311 	spdk_mempool_free(g_cache_pool);
312 	g_cache_pool = NULL;
313 
314 	spdk_thread_exit(g_cache_pool_thread);
315 }
316 
317 static void
318 initialize_global_cache(void)
319 {
320 	pthread_mutex_lock(&g_cache_init_lock);
321 	if (g_fs_count == 0) {
322 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
323 		assert(g_cache_pool_thread != NULL);
324 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
325 	}
326 	g_fs_count++;
327 	pthread_mutex_unlock(&g_cache_init_lock);
328 }
329 
330 static void
331 free_global_cache(void)
332 {
333 	pthread_mutex_lock(&g_cache_init_lock);
334 	g_fs_count--;
335 	if (g_fs_count == 0) {
336 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
337 	}
338 	pthread_mutex_unlock(&g_cache_init_lock);
339 }
340 
341 static uint64_t
342 __file_get_blob_size(struct spdk_file *file)
343 {
344 	uint64_t cluster_sz;
345 
346 	cluster_sz = file->fs->bs_opts.cluster_sz;
347 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
348 }
349 
350 struct spdk_fs_request {
351 	struct spdk_fs_cb_args		args;
352 	TAILQ_ENTRY(spdk_fs_request)	link;
353 	struct spdk_fs_channel		*channel;
354 };
355 
356 struct spdk_fs_channel {
357 	struct spdk_fs_request		*req_mem;
358 	TAILQ_HEAD(, spdk_fs_request)	reqs;
359 	sem_t				sem;
360 	struct spdk_filesystem		*fs;
361 	struct spdk_io_channel		*bs_channel;
362 	fs_send_request_fn		send_request;
363 	bool				sync;
364 	uint32_t			outstanding_reqs;
365 	pthread_spinlock_t		lock;
366 };
367 
368 /* For now, this is effectively an alias. But eventually we'll shift
369  * some data members over. */
370 struct spdk_fs_thread_ctx {
371 	struct spdk_fs_channel	ch;
372 };
373 
374 static struct spdk_fs_request *
375 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
376 {
377 	struct spdk_fs_request *req;
378 	struct iovec *iovs = NULL;
379 
380 	if (iovcnt > 1) {
381 		iovs = calloc(iovcnt, sizeof(struct iovec));
382 		if (!iovs) {
383 			return NULL;
384 		}
385 	}
386 
387 	if (channel->sync) {
388 		pthread_spin_lock(&channel->lock);
389 	}
390 
391 	req = TAILQ_FIRST(&channel->reqs);
392 	if (req) {
393 		channel->outstanding_reqs++;
394 		TAILQ_REMOVE(&channel->reqs, req, link);
395 	}
396 
397 	if (channel->sync) {
398 		pthread_spin_unlock(&channel->lock);
399 	}
400 
401 	if (req == NULL) {
402 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
403 		free(iovs);
404 		return NULL;
405 	}
406 	memset(req, 0, sizeof(*req));
407 	req->channel = channel;
408 	if (iovcnt > 1) {
409 		req->args.iovs = iovs;
410 	} else {
411 		req->args.iovs = &req->args.iov;
412 	}
413 	req->args.iovcnt = iovcnt;
414 
415 	return req;
416 }
417 
418 static struct spdk_fs_request *
419 alloc_fs_request(struct spdk_fs_channel *channel)
420 {
421 	return alloc_fs_request_with_iov(channel, 0);
422 }
423 
424 static void
425 free_fs_request(struct spdk_fs_request *req)
426 {
427 	struct spdk_fs_channel *channel = req->channel;
428 
429 	if (req->args.iovcnt > 1) {
430 		free(req->args.iovs);
431 	}
432 
433 	if (channel->sync) {
434 		pthread_spin_lock(&channel->lock);
435 	}
436 
437 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
438 	channel->outstanding_reqs--;
439 
440 	if (channel->sync) {
441 		pthread_spin_unlock(&channel->lock);
442 	}
443 }
444 
445 static int
446 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
447 		  uint32_t max_ops)
448 {
449 	uint32_t i;
450 
451 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
452 	if (!channel->req_mem) {
453 		return -1;
454 	}
455 
456 	channel->outstanding_reqs = 0;
457 	TAILQ_INIT(&channel->reqs);
458 	sem_init(&channel->sem, 0, 0);
459 
460 	for (i = 0; i < max_ops; i++) {
461 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
462 	}
463 
464 	channel->fs = fs;
465 
466 	return 0;
467 }
468 
469 static int
470 fs_md_channel_create(void *io_device, void *ctx_buf)
471 {
472 	struct spdk_filesystem		*fs;
473 	struct spdk_fs_channel		*channel = ctx_buf;
474 
475 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
476 
477 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
478 }
479 
480 static int
481 fs_sync_channel_create(void *io_device, void *ctx_buf)
482 {
483 	struct spdk_filesystem		*fs;
484 	struct spdk_fs_channel		*channel = ctx_buf;
485 
486 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
487 
488 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
489 }
490 
491 static int
492 fs_io_channel_create(void *io_device, void *ctx_buf)
493 {
494 	struct spdk_filesystem		*fs;
495 	struct spdk_fs_channel		*channel = ctx_buf;
496 
497 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
498 
499 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
500 }
501 
502 static void
503 fs_channel_destroy(void *io_device, void *ctx_buf)
504 {
505 	struct spdk_fs_channel *channel = ctx_buf;
506 
507 	if (channel->outstanding_reqs > 0) {
508 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
509 			    channel->outstanding_reqs);
510 	}
511 
512 	free(channel->req_mem);
513 	if (channel->bs_channel != NULL) {
514 		spdk_bs_free_io_channel(channel->bs_channel);
515 	}
516 }
517 
518 static void
519 __send_request_direct(fs_request_fn fn, void *arg)
520 {
521 	fn(arg);
522 }
523 
524 static void
525 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
526 {
527 	fs->bs = bs;
528 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
529 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
530 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
531 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
532 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
533 
534 	initialize_global_cache();
535 }
536 
537 static void
538 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
539 {
540 	struct spdk_fs_request *req = ctx;
541 	struct spdk_fs_cb_args *args = &req->args;
542 	struct spdk_filesystem *fs = args->fs;
543 
544 	if (bserrno == 0) {
545 		common_fs_bs_init(fs, bs);
546 	} else {
547 		free(fs);
548 		fs = NULL;
549 	}
550 
551 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
552 	free_fs_request(req);
553 }
554 
555 static void
556 fs_conf_parse(void)
557 {
558 	struct spdk_conf_section *sp;
559 
560 	sp = spdk_conf_find_section(NULL, "Blobfs");
561 	if (sp == NULL) {
562 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
563 		return;
564 	}
565 
566 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
567 	if (g_fs_cache_buffer_shift <= 0) {
568 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
569 	}
570 }
571 
572 static struct spdk_filesystem *
573 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
574 {
575 	struct spdk_filesystem *fs;
576 
577 	fs = calloc(1, sizeof(*fs));
578 	if (fs == NULL) {
579 		return NULL;
580 	}
581 
582 	fs->bdev = dev;
583 	fs->send_request = send_request_fn;
584 	TAILQ_INIT(&fs->files);
585 
586 	fs->md_target.max_ops = 512;
587 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
588 				sizeof(struct spdk_fs_channel), "blobfs_md");
589 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
590 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
591 
592 	fs->sync_target.max_ops = 512;
593 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
594 				sizeof(struct spdk_fs_channel), "blobfs_sync");
595 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
596 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
597 
598 	fs->io_target.max_ops = 512;
599 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
600 				sizeof(struct spdk_fs_channel), "blobfs_io");
601 
602 	return fs;
603 }
604 
605 static void
606 __wake_caller(void *arg, int fserrno)
607 {
608 	struct spdk_fs_cb_args *args = arg;
609 
610 	args->rc = fserrno;
611 	sem_post(args->sem);
612 }
613 
614 void
615 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
616 	     fs_send_request_fn send_request_fn,
617 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
618 {
619 	struct spdk_filesystem *fs;
620 	struct spdk_fs_request *req;
621 	struct spdk_fs_cb_args *args;
622 	struct spdk_bs_opts opts = {};
623 
624 	fs = fs_alloc(dev, send_request_fn);
625 	if (fs == NULL) {
626 		cb_fn(cb_arg, NULL, -ENOMEM);
627 		return;
628 	}
629 
630 	fs_conf_parse();
631 
632 	req = alloc_fs_request(fs->md_target.md_fs_channel);
633 	if (req == NULL) {
634 		fs_free_io_channels(fs);
635 		fs_io_device_unregister(fs);
636 		cb_fn(cb_arg, NULL, -ENOMEM);
637 		return;
638 	}
639 
640 	args = &req->args;
641 	args->fn.fs_op_with_handle = cb_fn;
642 	args->arg = cb_arg;
643 	args->fs = fs;
644 
645 	spdk_bs_opts_init(&opts);
646 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
647 	if (opt) {
648 		opts.cluster_sz = opt->cluster_sz;
649 	}
650 	spdk_bs_init(dev, &opts, init_cb, req);
651 }
652 
653 static struct spdk_file *
654 file_alloc(struct spdk_filesystem *fs)
655 {
656 	struct spdk_file *file;
657 
658 	file = calloc(1, sizeof(*file));
659 	if (file == NULL) {
660 		return NULL;
661 	}
662 
663 	file->tree = calloc(1, sizeof(*file->tree));
664 	if (file->tree == NULL) {
665 		free(file);
666 		return NULL;
667 	}
668 
669 	file->fs = fs;
670 	TAILQ_INIT(&file->open_requests);
671 	TAILQ_INIT(&file->sync_requests);
672 	pthread_spin_init(&file->lock, 0);
673 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
674 	file->priority = SPDK_FILE_PRIORITY_LOW;
675 	return file;
676 }
677 
678 static void fs_load_done(void *ctx, int bserrno);
679 
680 static int
681 _handle_deleted_files(struct spdk_fs_request *req)
682 {
683 	struct spdk_fs_cb_args *args = &req->args;
684 	struct spdk_filesystem *fs = args->fs;
685 
686 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
687 		struct spdk_deleted_file *deleted_file;
688 
689 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
690 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
691 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
692 		free(deleted_file);
693 		return 0;
694 	}
695 
696 	return 1;
697 }
698 
699 static void
700 fs_load_done(void *ctx, int bserrno)
701 {
702 	struct spdk_fs_request *req = ctx;
703 	struct spdk_fs_cb_args *args = &req->args;
704 	struct spdk_filesystem *fs = args->fs;
705 
706 	/* The filesystem has been loaded.  Now check if there are any files that
707 	 *  were marked for deletion before last unload.  Do not complete the
708 	 *  fs_load callback until all of them have been deleted on disk.
709 	 */
710 	if (_handle_deleted_files(req) == 0) {
711 		/* We found a file that's been marked for deleting but not actually
712 		 *  deleted yet.  This function will get called again once the delete
713 		 *  operation is completed.
714 		 */
715 		return;
716 	}
717 
718 	args->fn.fs_op_with_handle(args->arg, fs, 0);
719 	free_fs_request(req);
720 
721 }
722 
723 static void
724 _file_build_trace_arg_name(struct spdk_file *f)
725 {
726 	f->trace_arg_name = 0;
727 	memcpy(&f->trace_arg_name, f->name,
728 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
729 }
730 
731 static void
732 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
733 {
734 	struct spdk_fs_request *req = ctx;
735 	struct spdk_fs_cb_args *args = &req->args;
736 	struct spdk_filesystem *fs = args->fs;
737 	uint64_t *length;
738 	const char *name;
739 	uint32_t *is_deleted;
740 	size_t value_len;
741 
742 	if (rc < 0) {
743 		args->fn.fs_op_with_handle(args->arg, fs, rc);
744 		free_fs_request(req);
745 		return;
746 	}
747 
748 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
749 	if (rc < 0) {
750 		args->fn.fs_op_with_handle(args->arg, fs, rc);
751 		free_fs_request(req);
752 		return;
753 	}
754 
755 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
756 	if (rc < 0) {
757 		args->fn.fs_op_with_handle(args->arg, fs, rc);
758 		free_fs_request(req);
759 		return;
760 	}
761 
762 	assert(value_len == 8);
763 
764 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
765 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
766 	if (rc < 0) {
767 		struct spdk_file *f;
768 
769 		f = file_alloc(fs);
770 		if (f == NULL) {
771 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
772 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
773 			free_fs_request(req);
774 			return;
775 		}
776 
777 		f->name = strdup(name);
778 		_file_build_trace_arg_name(f);
779 		f->blobid = spdk_blob_get_id(blob);
780 		f->length = *length;
781 		f->length_flushed = *length;
782 		f->length_xattr = *length;
783 		f->append_pos = *length;
784 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
785 	} else {
786 		struct spdk_deleted_file *deleted_file;
787 
788 		deleted_file = calloc(1, sizeof(*deleted_file));
789 		if (deleted_file == NULL) {
790 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
791 			free_fs_request(req);
792 			return;
793 		}
794 		deleted_file->id = spdk_blob_get_id(blob);
795 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
796 	}
797 }
798 
799 static void
800 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
801 {
802 	struct spdk_fs_request *req = ctx;
803 	struct spdk_fs_cb_args *args = &req->args;
804 	struct spdk_filesystem *fs = args->fs;
805 	struct spdk_bs_type bstype;
806 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
807 	static const struct spdk_bs_type zeros;
808 
809 	if (bserrno != 0) {
810 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
811 		free_fs_request(req);
812 		fs_free_io_channels(fs);
813 		fs_io_device_unregister(fs);
814 		return;
815 	}
816 
817 	bstype = spdk_bs_get_bstype(bs);
818 
819 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
820 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n");
821 		spdk_bs_set_bstype(bs, blobfs_type);
822 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
823 		SPDK_ERRLOG("not blobfs\n");
824 		SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype));
825 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
826 		free_fs_request(req);
827 		fs_free_io_channels(fs);
828 		fs_io_device_unregister(fs);
829 		return;
830 	}
831 
832 	common_fs_bs_init(fs, bs);
833 	fs_load_done(req, 0);
834 }
835 
836 static void
837 fs_io_device_unregister(struct spdk_filesystem *fs)
838 {
839 	assert(fs != NULL);
840 	spdk_io_device_unregister(&fs->md_target, NULL);
841 	spdk_io_device_unregister(&fs->sync_target, NULL);
842 	spdk_io_device_unregister(&fs->io_target, NULL);
843 	free(fs);
844 }
845 
846 static void
847 fs_free_io_channels(struct spdk_filesystem *fs)
848 {
849 	assert(fs != NULL);
850 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
851 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
852 }
853 
854 void
855 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
856 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
857 {
858 	struct spdk_filesystem *fs;
859 	struct spdk_fs_cb_args *args;
860 	struct spdk_fs_request *req;
861 	struct spdk_bs_opts	bs_opts;
862 
863 	fs = fs_alloc(dev, send_request_fn);
864 	if (fs == NULL) {
865 		cb_fn(cb_arg, NULL, -ENOMEM);
866 		return;
867 	}
868 
869 	fs_conf_parse();
870 
871 	req = alloc_fs_request(fs->md_target.md_fs_channel);
872 	if (req == NULL) {
873 		fs_free_io_channels(fs);
874 		fs_io_device_unregister(fs);
875 		cb_fn(cb_arg, NULL, -ENOMEM);
876 		return;
877 	}
878 
879 	args = &req->args;
880 	args->fn.fs_op_with_handle = cb_fn;
881 	args->arg = cb_arg;
882 	args->fs = fs;
883 	TAILQ_INIT(&args->op.fs_load.deleted_files);
884 	spdk_bs_opts_init(&bs_opts);
885 	bs_opts.iter_cb_fn = iter_cb;
886 	bs_opts.iter_cb_arg = req;
887 	spdk_bs_load(dev, &bs_opts, load_cb, req);
888 }
889 
890 static void
891 unload_cb(void *ctx, int bserrno)
892 {
893 	struct spdk_fs_request *req = ctx;
894 	struct spdk_fs_cb_args *args = &req->args;
895 	struct spdk_filesystem *fs = args->fs;
896 	struct spdk_file *file, *tmp;
897 
898 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
899 		TAILQ_REMOVE(&fs->files, file, tailq);
900 		file_free(file);
901 	}
902 
903 	free_global_cache();
904 
905 	args->fn.fs_op(args->arg, bserrno);
906 	free(req);
907 
908 	fs_io_device_unregister(fs);
909 }
910 
911 void
912 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
913 {
914 	struct spdk_fs_request *req;
915 	struct spdk_fs_cb_args *args;
916 
917 	/*
918 	 * We must free the md_channel before unloading the blobstore, so just
919 	 *  allocate this request from the general heap.
920 	 */
921 	req = calloc(1, sizeof(*req));
922 	if (req == NULL) {
923 		cb_fn(cb_arg, -ENOMEM);
924 		return;
925 	}
926 
927 	args = &req->args;
928 	args->fn.fs_op = cb_fn;
929 	args->arg = cb_arg;
930 	args->fs = fs;
931 
932 	fs_free_io_channels(fs);
933 	spdk_bs_unload(fs->bs, unload_cb, req);
934 }
935 
936 static struct spdk_file *
937 fs_find_file(struct spdk_filesystem *fs, const char *name)
938 {
939 	struct spdk_file *file;
940 
941 	TAILQ_FOREACH(file, &fs->files, tailq) {
942 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
943 			return file;
944 		}
945 	}
946 
947 	return NULL;
948 }
949 
950 void
951 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
952 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
953 {
954 	struct spdk_file_stat stat;
955 	struct spdk_file *f = NULL;
956 
957 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
958 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
959 		return;
960 	}
961 
962 	f = fs_find_file(fs, name);
963 	if (f != NULL) {
964 		stat.blobid = f->blobid;
965 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
966 		cb_fn(cb_arg, &stat, 0);
967 		return;
968 	}
969 
970 	cb_fn(cb_arg, NULL, -ENOENT);
971 }
972 
973 static void
974 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
975 {
976 	struct spdk_fs_request *req = arg;
977 	struct spdk_fs_cb_args *args = &req->args;
978 
979 	args->rc = fserrno;
980 	if (fserrno == 0) {
981 		memcpy(args->arg, stat, sizeof(*stat));
982 	}
983 	sem_post(args->sem);
984 }
985 
986 static void
987 __file_stat(void *arg)
988 {
989 	struct spdk_fs_request *req = arg;
990 	struct spdk_fs_cb_args *args = &req->args;
991 
992 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
993 				args->fn.stat_op, req);
994 }
995 
996 int
997 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
998 		  const char *name, struct spdk_file_stat *stat)
999 {
1000 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1001 	struct spdk_fs_request *req;
1002 	int rc;
1003 
1004 	req = alloc_fs_request(channel);
1005 	if (req == NULL) {
1006 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
1007 		return -ENOMEM;
1008 	}
1009 
1010 	req->args.fs = fs;
1011 	req->args.op.stat.name = name;
1012 	req->args.fn.stat_op = __copy_stat;
1013 	req->args.arg = stat;
1014 	req->args.sem = &channel->sem;
1015 	channel->send_request(__file_stat, req);
1016 	sem_wait(&channel->sem);
1017 
1018 	rc = req->args.rc;
1019 	free_fs_request(req);
1020 
1021 	return rc;
1022 }
1023 
1024 static void
1025 fs_create_blob_close_cb(void *ctx, int bserrno)
1026 {
1027 	int rc;
1028 	struct spdk_fs_request *req = ctx;
1029 	struct spdk_fs_cb_args *args = &req->args;
1030 
1031 	rc = args->rc ? args->rc : bserrno;
1032 	args->fn.file_op(args->arg, rc);
1033 	free_fs_request(req);
1034 }
1035 
1036 static void
1037 fs_create_blob_resize_cb(void *ctx, int bserrno)
1038 {
1039 	struct spdk_fs_request *req = ctx;
1040 	struct spdk_fs_cb_args *args = &req->args;
1041 	struct spdk_file *f = args->file;
1042 	struct spdk_blob *blob = args->op.create.blob;
1043 	uint64_t length = 0;
1044 
1045 	args->rc = bserrno;
1046 	if (bserrno) {
1047 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1048 		return;
1049 	}
1050 
1051 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1052 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1053 
1054 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1055 }
1056 
1057 static void
1058 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1059 {
1060 	struct spdk_fs_request *req = ctx;
1061 	struct spdk_fs_cb_args *args = &req->args;
1062 
1063 	if (bserrno) {
1064 		args->fn.file_op(args->arg, bserrno);
1065 		free_fs_request(req);
1066 		return;
1067 	}
1068 
1069 	args->op.create.blob = blob;
1070 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1071 }
1072 
1073 static void
1074 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1075 {
1076 	struct spdk_fs_request *req = ctx;
1077 	struct spdk_fs_cb_args *args = &req->args;
1078 	struct spdk_file *f = args->file;
1079 
1080 	if (bserrno) {
1081 		args->fn.file_op(args->arg, bserrno);
1082 		free_fs_request(req);
1083 		return;
1084 	}
1085 
1086 	f->blobid = blobid;
1087 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1088 }
1089 
1090 void
1091 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1092 			  spdk_file_op_complete cb_fn, void *cb_arg)
1093 {
1094 	struct spdk_file *file;
1095 	struct spdk_fs_request *req;
1096 	struct spdk_fs_cb_args *args;
1097 
1098 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1099 		cb_fn(cb_arg, -ENAMETOOLONG);
1100 		return;
1101 	}
1102 
1103 	file = fs_find_file(fs, name);
1104 	if (file != NULL) {
1105 		cb_fn(cb_arg, -EEXIST);
1106 		return;
1107 	}
1108 
1109 	file = file_alloc(fs);
1110 	if (file == NULL) {
1111 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1112 		cb_fn(cb_arg, -ENOMEM);
1113 		return;
1114 	}
1115 
1116 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1117 	if (req == NULL) {
1118 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1119 		cb_fn(cb_arg, -ENOMEM);
1120 		return;
1121 	}
1122 
1123 	args = &req->args;
1124 	args->file = file;
1125 	args->fn.file_op = cb_fn;
1126 	args->arg = cb_arg;
1127 
1128 	file->name = strdup(name);
1129 	_file_build_trace_arg_name(file);
1130 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1131 }
1132 
1133 static void
1134 __fs_create_file_done(void *arg, int fserrno)
1135 {
1136 	struct spdk_fs_request *req = arg;
1137 	struct spdk_fs_cb_args *args = &req->args;
1138 
1139 	__wake_caller(args, fserrno);
1140 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1141 }
1142 
1143 static void
1144 __fs_create_file(void *arg)
1145 {
1146 	struct spdk_fs_request *req = arg;
1147 	struct spdk_fs_cb_args *args = &req->args;
1148 
1149 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1150 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1151 }
1152 
1153 int
1154 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1155 {
1156 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1157 	struct spdk_fs_request *req;
1158 	struct spdk_fs_cb_args *args;
1159 	int rc;
1160 
1161 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1162 
1163 	req = alloc_fs_request(channel);
1164 	if (req == NULL) {
1165 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1166 		return -ENOMEM;
1167 	}
1168 
1169 	args = &req->args;
1170 	args->fs = fs;
1171 	args->op.create.name = name;
1172 	args->sem = &channel->sem;
1173 	fs->send_request(__fs_create_file, req);
1174 	sem_wait(&channel->sem);
1175 	rc = args->rc;
1176 	free_fs_request(req);
1177 
1178 	return rc;
1179 }
1180 
1181 static void
1182 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1183 {
1184 	struct spdk_fs_request *req = ctx;
1185 	struct spdk_fs_cb_args *args = &req->args;
1186 	struct spdk_file *f = args->file;
1187 
1188 	f->blob = blob;
1189 	while (!TAILQ_EMPTY(&f->open_requests)) {
1190 		req = TAILQ_FIRST(&f->open_requests);
1191 		args = &req->args;
1192 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1193 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1194 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1195 		free_fs_request(req);
1196 	}
1197 }
1198 
1199 static void
1200 fs_open_blob_create_cb(void *ctx, int bserrno)
1201 {
1202 	struct spdk_fs_request *req = ctx;
1203 	struct spdk_fs_cb_args *args = &req->args;
1204 	struct spdk_file *file = args->file;
1205 	struct spdk_filesystem *fs = args->fs;
1206 
1207 	if (file == NULL) {
1208 		/*
1209 		 * This is from an open with CREATE flag - the file
1210 		 *  is now created so look it up in the file list for this
1211 		 *  filesystem.
1212 		 */
1213 		file = fs_find_file(fs, args->op.open.name);
1214 		assert(file != NULL);
1215 		args->file = file;
1216 	}
1217 
1218 	file->ref_count++;
1219 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1220 	if (file->ref_count == 1) {
1221 		assert(file->blob == NULL);
1222 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1223 	} else if (file->blob != NULL) {
1224 		fs_open_blob_done(req, file->blob, 0);
1225 	} else {
1226 		/*
1227 		 * The blob open for this file is in progress due to a previous
1228 		 *  open request.  When that open completes, it will invoke the
1229 		 *  open callback for this request.
1230 		 */
1231 	}
1232 }
1233 
1234 void
1235 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1236 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1237 {
1238 	struct spdk_file *f = NULL;
1239 	struct spdk_fs_request *req;
1240 	struct spdk_fs_cb_args *args;
1241 
1242 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1243 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1244 		return;
1245 	}
1246 
1247 	f = fs_find_file(fs, name);
1248 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1249 		cb_fn(cb_arg, NULL, -ENOENT);
1250 		return;
1251 	}
1252 
1253 	if (f != NULL && f->is_deleted == true) {
1254 		cb_fn(cb_arg, NULL, -ENOENT);
1255 		return;
1256 	}
1257 
1258 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1259 	if (req == NULL) {
1260 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1261 		cb_fn(cb_arg, NULL, -ENOMEM);
1262 		return;
1263 	}
1264 
1265 	args = &req->args;
1266 	args->fn.file_op_with_handle = cb_fn;
1267 	args->arg = cb_arg;
1268 	args->file = f;
1269 	args->fs = fs;
1270 	args->op.open.name = name;
1271 
1272 	if (f == NULL) {
1273 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1274 	} else {
1275 		fs_open_blob_create_cb(req, 0);
1276 	}
1277 }
1278 
1279 static void
1280 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1281 {
1282 	struct spdk_fs_request *req = arg;
1283 	struct spdk_fs_cb_args *args = &req->args;
1284 
1285 	args->file = file;
1286 	__wake_caller(args, bserrno);
1287 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1288 }
1289 
1290 static void
1291 __fs_open_file(void *arg)
1292 {
1293 	struct spdk_fs_request *req = arg;
1294 	struct spdk_fs_cb_args *args = &req->args;
1295 
1296 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1297 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1298 				__fs_open_file_done, req);
1299 }
1300 
1301 int
1302 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1303 		  const char *name, uint32_t flags, struct spdk_file **file)
1304 {
1305 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1306 	struct spdk_fs_request *req;
1307 	struct spdk_fs_cb_args *args;
1308 	int rc;
1309 
1310 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1311 
1312 	req = alloc_fs_request(channel);
1313 	if (req == NULL) {
1314 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1315 		return -ENOMEM;
1316 	}
1317 
1318 	args = &req->args;
1319 	args->fs = fs;
1320 	args->op.open.name = name;
1321 	args->op.open.flags = flags;
1322 	args->sem = &channel->sem;
1323 	fs->send_request(__fs_open_file, req);
1324 	sem_wait(&channel->sem);
1325 	rc = args->rc;
1326 	if (rc == 0) {
1327 		*file = args->file;
1328 	} else {
1329 		*file = NULL;
1330 	}
1331 	free_fs_request(req);
1332 
1333 	return rc;
1334 }
1335 
1336 static void
1337 fs_rename_blob_close_cb(void *ctx, int bserrno)
1338 {
1339 	struct spdk_fs_request *req = ctx;
1340 	struct spdk_fs_cb_args *args = &req->args;
1341 
1342 	args->fn.fs_op(args->arg, bserrno);
1343 	free_fs_request(req);
1344 }
1345 
1346 static void
1347 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1348 {
1349 	struct spdk_fs_request *req = ctx;
1350 	struct spdk_fs_cb_args *args = &req->args;
1351 	const char *new_name = args->op.rename.new_name;
1352 
1353 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1354 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1355 }
1356 
1357 static void
1358 _fs_md_rename_file(struct spdk_fs_request *req)
1359 {
1360 	struct spdk_fs_cb_args *args = &req->args;
1361 	struct spdk_file *f;
1362 
1363 	f = fs_find_file(args->fs, args->op.rename.old_name);
1364 	if (f == NULL) {
1365 		args->fn.fs_op(args->arg, -ENOENT);
1366 		free_fs_request(req);
1367 		return;
1368 	}
1369 
1370 	free(f->name);
1371 	f->name = strdup(args->op.rename.new_name);
1372 	_file_build_trace_arg_name(f);
1373 	args->file = f;
1374 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1375 }
1376 
1377 static void
1378 fs_rename_delete_done(void *arg, int fserrno)
1379 {
1380 	_fs_md_rename_file(arg);
1381 }
1382 
1383 void
1384 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1385 			  const char *old_name, const char *new_name,
1386 			  spdk_file_op_complete cb_fn, void *cb_arg)
1387 {
1388 	struct spdk_file *f;
1389 	struct spdk_fs_request *req;
1390 	struct spdk_fs_cb_args *args;
1391 
1392 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1393 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1394 		cb_fn(cb_arg, -ENAMETOOLONG);
1395 		return;
1396 	}
1397 
1398 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1399 	if (req == NULL) {
1400 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1401 			    new_name);
1402 		cb_fn(cb_arg, -ENOMEM);
1403 		return;
1404 	}
1405 
1406 	args = &req->args;
1407 	args->fn.fs_op = cb_fn;
1408 	args->fs = fs;
1409 	args->arg = cb_arg;
1410 	args->op.rename.old_name = old_name;
1411 	args->op.rename.new_name = new_name;
1412 
1413 	f = fs_find_file(fs, new_name);
1414 	if (f == NULL) {
1415 		_fs_md_rename_file(req);
1416 		return;
1417 	}
1418 
1419 	/*
1420 	 * The rename overwrites an existing file.  So delete the existing file, then
1421 	 *  do the actual rename.
1422 	 */
1423 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1424 }
1425 
1426 static void
1427 __fs_rename_file_done(void *arg, int fserrno)
1428 {
1429 	struct spdk_fs_request *req = arg;
1430 	struct spdk_fs_cb_args *args = &req->args;
1431 
1432 	__wake_caller(args, fserrno);
1433 }
1434 
1435 static void
1436 __fs_rename_file(void *arg)
1437 {
1438 	struct spdk_fs_request *req = arg;
1439 	struct spdk_fs_cb_args *args = &req->args;
1440 
1441 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1442 				  __fs_rename_file_done, req);
1443 }
1444 
1445 int
1446 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1447 		    const char *old_name, const char *new_name)
1448 {
1449 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1450 	struct spdk_fs_request *req;
1451 	struct spdk_fs_cb_args *args;
1452 	int rc;
1453 
1454 	req = alloc_fs_request(channel);
1455 	if (req == NULL) {
1456 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1457 		return -ENOMEM;
1458 	}
1459 
1460 	args = &req->args;
1461 
1462 	args->fs = fs;
1463 	args->op.rename.old_name = old_name;
1464 	args->op.rename.new_name = new_name;
1465 	args->sem = &channel->sem;
1466 	fs->send_request(__fs_rename_file, req);
1467 	sem_wait(&channel->sem);
1468 	rc = args->rc;
1469 	free_fs_request(req);
1470 	return rc;
1471 }
1472 
1473 static void
1474 blob_delete_cb(void *ctx, int bserrno)
1475 {
1476 	struct spdk_fs_request *req = ctx;
1477 	struct spdk_fs_cb_args *args = &req->args;
1478 
1479 	args->fn.file_op(args->arg, bserrno);
1480 	free_fs_request(req);
1481 }
1482 
1483 void
1484 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1485 			  spdk_file_op_complete cb_fn, void *cb_arg)
1486 {
1487 	struct spdk_file *f;
1488 	spdk_blob_id blobid;
1489 	struct spdk_fs_request *req;
1490 	struct spdk_fs_cb_args *args;
1491 
1492 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1493 
1494 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1495 		cb_fn(cb_arg, -ENAMETOOLONG);
1496 		return;
1497 	}
1498 
1499 	f = fs_find_file(fs, name);
1500 	if (f == NULL) {
1501 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1502 		cb_fn(cb_arg, -ENOENT);
1503 		return;
1504 	}
1505 
1506 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1507 	if (req == NULL) {
1508 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1509 		cb_fn(cb_arg, -ENOMEM);
1510 		return;
1511 	}
1512 
1513 	args = &req->args;
1514 	args->fn.file_op = cb_fn;
1515 	args->arg = cb_arg;
1516 
1517 	if (f->ref_count > 0) {
1518 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1519 		f->is_deleted = true;
1520 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1521 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1522 		return;
1523 	}
1524 
1525 	blobid = f->blobid;
1526 	TAILQ_REMOVE(&fs->files, f, tailq);
1527 
1528 	file_free(f);
1529 
1530 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1531 }
1532 
1533 static uint64_t
1534 fs_name_to_uint64(const char *name)
1535 {
1536 	uint64_t result = 0;
1537 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1538 	return result;
1539 }
1540 
1541 static void
1542 __fs_delete_file_done(void *arg, int fserrno)
1543 {
1544 	struct spdk_fs_request *req = arg;
1545 	struct spdk_fs_cb_args *args = &req->args;
1546 
1547 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1548 	__wake_caller(args, fserrno);
1549 }
1550 
1551 static void
1552 __fs_delete_file(void *arg)
1553 {
1554 	struct spdk_fs_request *req = arg;
1555 	struct spdk_fs_cb_args *args = &req->args;
1556 
1557 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1558 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1559 }
1560 
1561 int
1562 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1563 		    const char *name)
1564 {
1565 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1566 	struct spdk_fs_request *req;
1567 	struct spdk_fs_cb_args *args;
1568 	int rc;
1569 
1570 	req = alloc_fs_request(channel);
1571 	if (req == NULL) {
1572 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1573 		return -ENOMEM;
1574 	}
1575 
1576 	args = &req->args;
1577 	args->fs = fs;
1578 	args->op.delete.name = name;
1579 	args->sem = &channel->sem;
1580 	fs->send_request(__fs_delete_file, req);
1581 	sem_wait(&channel->sem);
1582 	rc = args->rc;
1583 	free_fs_request(req);
1584 
1585 	return rc;
1586 }
1587 
1588 spdk_fs_iter
1589 spdk_fs_iter_first(struct spdk_filesystem *fs)
1590 {
1591 	struct spdk_file *f;
1592 
1593 	f = TAILQ_FIRST(&fs->files);
1594 	return f;
1595 }
1596 
1597 spdk_fs_iter
1598 spdk_fs_iter_next(spdk_fs_iter iter)
1599 {
1600 	struct spdk_file *f = iter;
1601 
1602 	if (f == NULL) {
1603 		return NULL;
1604 	}
1605 
1606 	f = TAILQ_NEXT(f, tailq);
1607 	return f;
1608 }
1609 
1610 const char *
1611 spdk_file_get_name(struct spdk_file *file)
1612 {
1613 	return file->name;
1614 }
1615 
1616 uint64_t
1617 spdk_file_get_length(struct spdk_file *file)
1618 {
1619 	uint64_t length;
1620 
1621 	assert(file != NULL);
1622 
1623 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1624 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1625 	return length;
1626 }
1627 
1628 static void
1629 fs_truncate_complete_cb(void *ctx, int bserrno)
1630 {
1631 	struct spdk_fs_request *req = ctx;
1632 	struct spdk_fs_cb_args *args = &req->args;
1633 
1634 	args->fn.file_op(args->arg, bserrno);
1635 	free_fs_request(req);
1636 }
1637 
1638 static void
1639 fs_truncate_resize_cb(void *ctx, int bserrno)
1640 {
1641 	struct spdk_fs_request *req = ctx;
1642 	struct spdk_fs_cb_args *args = &req->args;
1643 	struct spdk_file *file = args->file;
1644 	uint64_t *length = &args->op.truncate.length;
1645 
1646 	if (bserrno) {
1647 		args->fn.file_op(args->arg, bserrno);
1648 		free_fs_request(req);
1649 		return;
1650 	}
1651 
1652 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1653 
1654 	file->length = *length;
1655 	if (file->append_pos > file->length) {
1656 		file->append_pos = file->length;
1657 	}
1658 
1659 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1660 }
1661 
1662 static uint64_t
1663 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1664 {
1665 	return (length + cluster_sz - 1) / cluster_sz;
1666 }
1667 
1668 void
1669 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1670 			 spdk_file_op_complete cb_fn, void *cb_arg)
1671 {
1672 	struct spdk_filesystem *fs;
1673 	size_t num_clusters;
1674 	struct spdk_fs_request *req;
1675 	struct spdk_fs_cb_args *args;
1676 
1677 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1678 	if (length == file->length) {
1679 		cb_fn(cb_arg, 0);
1680 		return;
1681 	}
1682 
1683 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1684 	if (req == NULL) {
1685 		cb_fn(cb_arg, -ENOMEM);
1686 		return;
1687 	}
1688 
1689 	args = &req->args;
1690 	args->fn.file_op = cb_fn;
1691 	args->arg = cb_arg;
1692 	args->file = file;
1693 	args->op.truncate.length = length;
1694 	fs = file->fs;
1695 
1696 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1697 
1698 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1699 }
1700 
1701 static void
1702 __truncate(void *arg)
1703 {
1704 	struct spdk_fs_request *req = arg;
1705 	struct spdk_fs_cb_args *args = &req->args;
1706 
1707 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1708 				 args->fn.file_op, args);
1709 }
1710 
1711 int
1712 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1713 		   uint64_t length)
1714 {
1715 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1716 	struct spdk_fs_request *req;
1717 	struct spdk_fs_cb_args *args;
1718 	int rc;
1719 
1720 	req = alloc_fs_request(channel);
1721 	if (req == NULL) {
1722 		return -ENOMEM;
1723 	}
1724 
1725 	args = &req->args;
1726 
1727 	args->file = file;
1728 	args->op.truncate.length = length;
1729 	args->fn.file_op = __wake_caller;
1730 	args->sem = &channel->sem;
1731 
1732 	channel->send_request(__truncate, req);
1733 	sem_wait(&channel->sem);
1734 	rc = args->rc;
1735 	free_fs_request(req);
1736 
1737 	return rc;
1738 }
1739 
1740 static void
1741 __rw_done(void *ctx, int bserrno)
1742 {
1743 	struct spdk_fs_request *req = ctx;
1744 	struct spdk_fs_cb_args *args = &req->args;
1745 
1746 	spdk_free(args->op.rw.pin_buf);
1747 	args->fn.file_op(args->arg, bserrno);
1748 	free_fs_request(req);
1749 }
1750 
1751 static void
1752 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1753 {
1754 	int i;
1755 	size_t len;
1756 
1757 	for (i = 0; i < iovcnt; i++) {
1758 		len = spdk_min(iovs[i].iov_len, buf_len);
1759 		memcpy(buf, iovs[i].iov_base, len);
1760 		buf += len;
1761 		assert(buf_len >= len);
1762 		buf_len -= len;
1763 	}
1764 }
1765 
1766 static void
1767 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1768 {
1769 	int i;
1770 	size_t len;
1771 
1772 	for (i = 0; i < iovcnt; i++) {
1773 		len = spdk_min(iovs[i].iov_len, buf_len);
1774 		memcpy(iovs[i].iov_base, buf, len);
1775 		buf += len;
1776 		assert(buf_len >= len);
1777 		buf_len -= len;
1778 	}
1779 }
1780 
1781 static void
1782 __read_done(void *ctx, int bserrno)
1783 {
1784 	struct spdk_fs_request *req = ctx;
1785 	struct spdk_fs_cb_args *args = &req->args;
1786 	void *buf;
1787 
1788 	assert(req != NULL);
1789 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1790 	if (args->op.rw.is_read) {
1791 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1792 		__rw_done(req, 0);
1793 	} else {
1794 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1795 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1796 				   args->op.rw.pin_buf,
1797 				   args->op.rw.start_lba, args->op.rw.num_lba,
1798 				   __rw_done, req);
1799 	}
1800 }
1801 
1802 static void
1803 __do_blob_read(void *ctx, int fserrno)
1804 {
1805 	struct spdk_fs_request *req = ctx;
1806 	struct spdk_fs_cb_args *args = &req->args;
1807 
1808 	if (fserrno) {
1809 		__rw_done(req, fserrno);
1810 		return;
1811 	}
1812 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1813 			  args->op.rw.pin_buf,
1814 			  args->op.rw.start_lba, args->op.rw.num_lba,
1815 			  __read_done, req);
1816 }
1817 
1818 static void
1819 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1820 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1821 {
1822 	uint64_t end_lba;
1823 
1824 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1825 	*start_lba = offset / *lba_size;
1826 	end_lba = (offset + length - 1) / *lba_size;
1827 	*num_lba = (end_lba - *start_lba + 1);
1828 }
1829 
1830 static bool
1831 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1832 {
1833 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1834 
1835 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1836 		return true;
1837 	}
1838 
1839 	return false;
1840 }
1841 
1842 static void
1843 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1844 {
1845 	uint32_t i;
1846 
1847 	for (i = 0; i < iovcnt; i++) {
1848 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1849 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1850 	}
1851 }
1852 
1853 static void
1854 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1855 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1856 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1857 {
1858 	struct spdk_fs_request *req;
1859 	struct spdk_fs_cb_args *args;
1860 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1861 	uint64_t start_lba, num_lba, pin_buf_length;
1862 	uint32_t lba_size;
1863 
1864 	if (is_read && offset + length > file->length) {
1865 		cb_fn(cb_arg, -EINVAL);
1866 		return;
1867 	}
1868 
1869 	req = alloc_fs_request_with_iov(channel, iovcnt);
1870 	if (req == NULL) {
1871 		cb_fn(cb_arg, -ENOMEM);
1872 		return;
1873 	}
1874 
1875 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1876 
1877 	args = &req->args;
1878 	args->fn.file_op = cb_fn;
1879 	args->arg = cb_arg;
1880 	args->file = file;
1881 	args->op.rw.channel = channel->bs_channel;
1882 	_fs_request_setup_iovs(req, iovs, iovcnt);
1883 	args->op.rw.is_read = is_read;
1884 	args->op.rw.offset = offset;
1885 	args->op.rw.blocklen = lba_size;
1886 
1887 	pin_buf_length = num_lba * lba_size;
1888 	args->op.rw.length = pin_buf_length;
1889 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1890 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1891 	if (args->op.rw.pin_buf == NULL) {
1892 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1893 			      file->name, offset, length);
1894 		free_fs_request(req);
1895 		cb_fn(cb_arg, -ENOMEM);
1896 		return;
1897 	}
1898 
1899 	args->op.rw.start_lba = start_lba;
1900 	args->op.rw.num_lba = num_lba;
1901 
1902 	if (!is_read && file->length < offset + length) {
1903 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1904 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1905 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1906 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1907 				   args->op.rw.pin_buf,
1908 				   args->op.rw.start_lba, args->op.rw.num_lba,
1909 				   __rw_done, req);
1910 	} else {
1911 		__do_blob_read(req, 0);
1912 	}
1913 }
1914 
1915 static void
1916 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1917 	    void *payload, uint64_t offset, uint64_t length,
1918 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1919 {
1920 	struct iovec iov;
1921 
1922 	iov.iov_base = payload;
1923 	iov.iov_len = (size_t)length;
1924 
1925 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1926 }
1927 
1928 void
1929 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1930 		      void *payload, uint64_t offset, uint64_t length,
1931 		      spdk_file_op_complete cb_fn, void *cb_arg)
1932 {
1933 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1934 }
1935 
1936 void
1937 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1938 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1939 		       spdk_file_op_complete cb_fn, void *cb_arg)
1940 {
1941 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1942 		      file->name, offset, length);
1943 
1944 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1945 }
1946 
1947 void
1948 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1949 		     void *payload, uint64_t offset, uint64_t length,
1950 		     spdk_file_op_complete cb_fn, void *cb_arg)
1951 {
1952 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1953 		      file->name, offset, length);
1954 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1955 }
1956 
1957 void
1958 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1959 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1960 		      spdk_file_op_complete cb_fn, void *cb_arg)
1961 {
1962 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1963 		      file->name, offset, length);
1964 
1965 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1966 }
1967 
1968 struct spdk_io_channel *
1969 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1970 {
1971 	struct spdk_io_channel *io_channel;
1972 	struct spdk_fs_channel *fs_channel;
1973 
1974 	io_channel = spdk_get_io_channel(&fs->io_target);
1975 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1976 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1977 	fs_channel->send_request = __send_request_direct;
1978 
1979 	return io_channel;
1980 }
1981 
1982 void
1983 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1984 {
1985 	spdk_put_io_channel(channel);
1986 }
1987 
1988 struct spdk_fs_thread_ctx *
1989 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1990 {
1991 	struct spdk_fs_thread_ctx *ctx;
1992 
1993 	ctx = calloc(1, sizeof(*ctx));
1994 	if (!ctx) {
1995 		return NULL;
1996 	}
1997 
1998 	fs_channel_create(fs, &ctx->ch, 512);
1999 
2000 	ctx->ch.send_request = fs->send_request;
2001 	ctx->ch.sync = 1;
2002 	pthread_spin_init(&ctx->ch.lock, 0);
2003 
2004 	return ctx;
2005 }
2006 
2007 
2008 void
2009 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
2010 {
2011 	assert(ctx->ch.sync == 1);
2012 
2013 	while (true) {
2014 		pthread_spin_lock(&ctx->ch.lock);
2015 		if (ctx->ch.outstanding_reqs == 0) {
2016 			pthread_spin_unlock(&ctx->ch.lock);
2017 			break;
2018 		}
2019 		pthread_spin_unlock(&ctx->ch.lock);
2020 		usleep(1000);
2021 	}
2022 
2023 	fs_channel_destroy(NULL, &ctx->ch);
2024 	free(ctx);
2025 }
2026 
2027 int
2028 spdk_fs_set_cache_size(uint64_t size_in_mb)
2029 {
2030 	/* setting g_fs_cache_size is only permitted if cache pool
2031 	 * is already freed or hasn't been initialized
2032 	 */
2033 	if (g_cache_pool != NULL) {
2034 		return -EPERM;
2035 	}
2036 
2037 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2038 
2039 	return 0;
2040 }
2041 
2042 uint64_t
2043 spdk_fs_get_cache_size(void)
2044 {
2045 	return g_fs_cache_size / (1024 * 1024);
2046 }
2047 
2048 static void __file_flush(void *ctx);
2049 
2050 /* Try to free some cache buffers of this file, this function must
2051  * be called while holding g_caches_lock.
2052  */
2053 static int
2054 reclaim_cache_buffers(struct spdk_file *file)
2055 {
2056 	int rc;
2057 
2058 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2059 
2060 	/* The function is safe to be called with any threads, while the file
2061 	 * lock maybe locked by other thread for now, so try to get the file
2062 	 * lock here.
2063 	 */
2064 	rc = pthread_spin_trylock(&file->lock);
2065 	if (rc != 0) {
2066 		return -1;
2067 	}
2068 
2069 	if (file->tree->present_mask == 0) {
2070 		pthread_spin_unlock(&file->lock);
2071 		return -1;
2072 	}
2073 	tree_free_buffers(file->tree);
2074 
2075 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2076 	/* If not freed, put it in the end of the queue */
2077 	if (file->tree->present_mask != 0) {
2078 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2079 	} else {
2080 		file->last = NULL;
2081 	}
2082 	pthread_spin_unlock(&file->lock);
2083 
2084 	return 0;
2085 }
2086 
2087 static int
2088 _blobfs_cache_pool_reclaim(void *arg)
2089 {
2090 	struct spdk_file *file, *tmp;
2091 	int rc;
2092 
2093 	if (!blobfs_cache_pool_need_reclaim()) {
2094 		return 0;
2095 	}
2096 
2097 	pthread_spin_lock(&g_caches_lock);
2098 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2099 		if (!file->open_for_writing &&
2100 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2101 			rc = reclaim_cache_buffers(file);
2102 			if (rc < 0) {
2103 				continue;
2104 			}
2105 			if (!blobfs_cache_pool_need_reclaim()) {
2106 				pthread_spin_unlock(&g_caches_lock);
2107 				return 1;
2108 			}
2109 			break;
2110 		}
2111 	}
2112 
2113 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2114 		if (!file->open_for_writing) {
2115 			rc = reclaim_cache_buffers(file);
2116 			if (rc < 0) {
2117 				continue;
2118 			}
2119 			if (!blobfs_cache_pool_need_reclaim()) {
2120 				pthread_spin_unlock(&g_caches_lock);
2121 				return 1;
2122 			}
2123 			break;
2124 		}
2125 	}
2126 
2127 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2128 		rc = reclaim_cache_buffers(file);
2129 		if (rc < 0) {
2130 			continue;
2131 		}
2132 		break;
2133 	}
2134 	pthread_spin_unlock(&g_caches_lock);
2135 
2136 	return 1;
2137 }
2138 
2139 static void
2140 _add_file_to_cache_pool(void *ctx)
2141 {
2142 	struct spdk_file *file = ctx;
2143 
2144 	pthread_spin_lock(&g_caches_lock);
2145 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2146 	pthread_spin_unlock(&g_caches_lock);
2147 }
2148 
2149 static void
2150 _remove_file_from_cache_pool(void *ctx)
2151 {
2152 	struct spdk_file *file = ctx;
2153 
2154 	pthread_spin_lock(&g_caches_lock);
2155 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2156 	pthread_spin_unlock(&g_caches_lock);
2157 }
2158 
2159 static struct cache_buffer *
2160 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2161 {
2162 	struct cache_buffer *buf;
2163 	int count = 0;
2164 	bool need_update = false;
2165 
2166 	buf = calloc(1, sizeof(*buf));
2167 	if (buf == NULL) {
2168 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
2169 		return NULL;
2170 	}
2171 
2172 	do {
2173 		buf->buf = spdk_mempool_get(g_cache_pool);
2174 		if (buf->buf) {
2175 			break;
2176 		}
2177 		if (count++ == 100) {
2178 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2179 				    file, offset);
2180 			free(buf);
2181 			return NULL;
2182 		}
2183 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2184 	} while (true);
2185 
2186 	buf->buf_size = CACHE_BUFFER_SIZE;
2187 	buf->offset = offset;
2188 
2189 	if (file->tree->present_mask == 0) {
2190 		need_update = true;
2191 	}
2192 	file->tree = tree_insert_buffer(file->tree, buf);
2193 
2194 	if (need_update) {
2195 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2196 	}
2197 
2198 	return buf;
2199 }
2200 
2201 static struct cache_buffer *
2202 cache_append_buffer(struct spdk_file *file)
2203 {
2204 	struct cache_buffer *last;
2205 
2206 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2207 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2208 
2209 	last = cache_insert_buffer(file, file->append_pos);
2210 	if (last == NULL) {
2211 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
2212 		return NULL;
2213 	}
2214 
2215 	file->last = last;
2216 
2217 	return last;
2218 }
2219 
2220 static void __check_sync_reqs(struct spdk_file *file);
2221 
2222 static void
2223 __file_cache_finish_sync(void *ctx, int bserrno)
2224 {
2225 	struct spdk_file *file;
2226 	struct spdk_fs_request *sync_req = ctx;
2227 	struct spdk_fs_cb_args *sync_args;
2228 
2229 	sync_args = &sync_req->args;
2230 	file = sync_args->file;
2231 	pthread_spin_lock(&file->lock);
2232 	file->length_xattr = sync_args->op.sync.length;
2233 	assert(sync_args->op.sync.offset <= file->length_flushed);
2234 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2235 			  0, file->trace_arg_name);
2236 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2237 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2238 	pthread_spin_unlock(&file->lock);
2239 
2240 	sync_args->fn.file_op(sync_args->arg, bserrno);
2241 
2242 	free_fs_request(sync_req);
2243 	__check_sync_reqs(file);
2244 }
2245 
2246 static void
2247 __check_sync_reqs(struct spdk_file *file)
2248 {
2249 	struct spdk_fs_request *sync_req;
2250 
2251 	pthread_spin_lock(&file->lock);
2252 
2253 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2254 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2255 			break;
2256 		}
2257 	}
2258 
2259 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2260 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2261 		sync_req->args.op.sync.xattr_in_progress = true;
2262 		sync_req->args.op.sync.length = file->length_flushed;
2263 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2264 				    sizeof(file->length_flushed));
2265 
2266 		pthread_spin_unlock(&file->lock);
2267 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2268 				  0, file->trace_arg_name);
2269 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2270 	} else {
2271 		pthread_spin_unlock(&file->lock);
2272 	}
2273 }
2274 
2275 static void
2276 __file_flush_done(void *ctx, int bserrno)
2277 {
2278 	struct spdk_fs_request *req = ctx;
2279 	struct spdk_fs_cb_args *args = &req->args;
2280 	struct spdk_file *file = args->file;
2281 	struct cache_buffer *next = args->op.flush.cache_buffer;
2282 
2283 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2284 
2285 	pthread_spin_lock(&file->lock);
2286 	next->in_progress = false;
2287 	next->bytes_flushed += args->op.flush.length;
2288 	file->length_flushed += args->op.flush.length;
2289 	if (file->length_flushed > file->length) {
2290 		file->length = file->length_flushed;
2291 	}
2292 	if (next->bytes_flushed == next->buf_size) {
2293 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2294 		next = tree_find_buffer(file->tree, file->length_flushed);
2295 	}
2296 
2297 	/*
2298 	 * Assert that there is no cached data that extends past the end of the underlying
2299 	 *  blob.
2300 	 */
2301 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2302 	       next->bytes_filled == 0);
2303 
2304 	pthread_spin_unlock(&file->lock);
2305 
2306 	__check_sync_reqs(file);
2307 
2308 	__file_flush(req);
2309 }
2310 
2311 static void
2312 __file_flush(void *ctx)
2313 {
2314 	struct spdk_fs_request *req = ctx;
2315 	struct spdk_fs_cb_args *args = &req->args;
2316 	struct spdk_file *file = args->file;
2317 	struct cache_buffer *next;
2318 	uint64_t offset, length, start_lba, num_lba;
2319 	uint32_t lba_size;
2320 
2321 	pthread_spin_lock(&file->lock);
2322 	next = tree_find_buffer(file->tree, file->length_flushed);
2323 	if (next == NULL || next->in_progress ||
2324 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2325 		/*
2326 		 * There is either no data to flush, a flush I/O is already in
2327 		 *  progress, or the next buffer is partially filled but there's no
2328 		 *  outstanding request to sync it.
2329 		 * So return immediately - if a flush I/O is in progress we will flush
2330 		 *  more data after that is completed, or a partial buffer will get flushed
2331 		 *  when it is either filled or the file is synced.
2332 		 */
2333 		free_fs_request(req);
2334 		if (next == NULL) {
2335 			/*
2336 			 * For cases where a file's cache was evicted, and then the
2337 			 *  file was later appended, we will write the data directly
2338 			 *  to disk and bypass cache.  So just update length_flushed
2339 			 *  here to reflect that all data was already written to disk.
2340 			 */
2341 			file->length_flushed = file->append_pos;
2342 		}
2343 		pthread_spin_unlock(&file->lock);
2344 		if (next == NULL) {
2345 			/*
2346 			 * There is no data to flush, but we still need to check for any
2347 			 *  outstanding sync requests to make sure metadata gets updated.
2348 			 */
2349 			__check_sync_reqs(file);
2350 		}
2351 		return;
2352 	}
2353 
2354 	offset = next->offset + next->bytes_flushed;
2355 	length = next->bytes_filled - next->bytes_flushed;
2356 	if (length == 0) {
2357 		free_fs_request(req);
2358 		pthread_spin_unlock(&file->lock);
2359 		/*
2360 		 * There is no data to flush, but we still need to check for any
2361 		 *  outstanding sync requests to make sure metadata gets updated.
2362 		 */
2363 		__check_sync_reqs(file);
2364 		return;
2365 	}
2366 	args->op.flush.length = length;
2367 	args->op.flush.cache_buffer = next;
2368 
2369 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2370 
2371 	next->in_progress = true;
2372 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2373 		     offset, length, start_lba, num_lba);
2374 	pthread_spin_unlock(&file->lock);
2375 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2376 			   next->buf + (start_lba * lba_size) - next->offset,
2377 			   start_lba, num_lba, __file_flush_done, req);
2378 }
2379 
2380 static void
2381 __file_extend_done(void *arg, int bserrno)
2382 {
2383 	struct spdk_fs_cb_args *args = arg;
2384 
2385 	__wake_caller(args, bserrno);
2386 }
2387 
2388 static void
2389 __file_extend_resize_cb(void *_args, int bserrno)
2390 {
2391 	struct spdk_fs_cb_args *args = _args;
2392 	struct spdk_file *file = args->file;
2393 
2394 	if (bserrno) {
2395 		__wake_caller(args, bserrno);
2396 		return;
2397 	}
2398 
2399 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2400 }
2401 
2402 static void
2403 __file_extend_blob(void *_args)
2404 {
2405 	struct spdk_fs_cb_args *args = _args;
2406 	struct spdk_file *file = args->file;
2407 
2408 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2409 }
2410 
2411 static void
2412 __rw_from_file_done(void *ctx, int bserrno)
2413 {
2414 	struct spdk_fs_request *req = ctx;
2415 
2416 	__wake_caller(&req->args, bserrno);
2417 	free_fs_request(req);
2418 }
2419 
2420 static void
2421 __rw_from_file(void *ctx)
2422 {
2423 	struct spdk_fs_request *req = ctx;
2424 	struct spdk_fs_cb_args *args = &req->args;
2425 	struct spdk_file *file = args->file;
2426 
2427 	if (args->op.rw.is_read) {
2428 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2429 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2430 				     __rw_from_file_done, req);
2431 	} else {
2432 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2433 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2434 				      __rw_from_file_done, req);
2435 	}
2436 }
2437 
2438 static int
2439 __send_rw_from_file(struct spdk_file *file, void *payload,
2440 		    uint64_t offset, uint64_t length, bool is_read,
2441 		    struct spdk_fs_channel *channel)
2442 {
2443 	struct spdk_fs_request *req;
2444 	struct spdk_fs_cb_args *args;
2445 
2446 	req = alloc_fs_request_with_iov(channel, 1);
2447 	if (req == NULL) {
2448 		sem_post(&channel->sem);
2449 		return -ENOMEM;
2450 	}
2451 
2452 	args = &req->args;
2453 	args->file = file;
2454 	args->sem = &channel->sem;
2455 	args->iovs[0].iov_base = payload;
2456 	args->iovs[0].iov_len = (size_t)length;
2457 	args->op.rw.offset = offset;
2458 	args->op.rw.is_read = is_read;
2459 	file->fs->send_request(__rw_from_file, req);
2460 	return 0;
2461 }
2462 
2463 int
2464 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2465 		void *payload, uint64_t offset, uint64_t length)
2466 {
2467 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2468 	struct spdk_fs_request *flush_req;
2469 	uint64_t rem_length, copy, blob_size, cluster_sz;
2470 	uint32_t cache_buffers_filled = 0;
2471 	uint8_t *cur_payload;
2472 	struct cache_buffer *last;
2473 
2474 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2475 
2476 	if (length == 0) {
2477 		return 0;
2478 	}
2479 
2480 	if (offset != file->append_pos) {
2481 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2482 		return -EINVAL;
2483 	}
2484 
2485 	pthread_spin_lock(&file->lock);
2486 	file->open_for_writing = true;
2487 
2488 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2489 		cache_append_buffer(file);
2490 	}
2491 
2492 	if (file->last == NULL) {
2493 		int rc;
2494 
2495 		file->append_pos += length;
2496 		pthread_spin_unlock(&file->lock);
2497 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2498 		sem_wait(&channel->sem);
2499 		return rc;
2500 	}
2501 
2502 	blob_size = __file_get_blob_size(file);
2503 
2504 	if ((offset + length) > blob_size) {
2505 		struct spdk_fs_cb_args extend_args = {};
2506 
2507 		cluster_sz = file->fs->bs_opts.cluster_sz;
2508 		extend_args.sem = &channel->sem;
2509 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2510 		extend_args.file = file;
2511 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2512 		pthread_spin_unlock(&file->lock);
2513 		file->fs->send_request(__file_extend_blob, &extend_args);
2514 		sem_wait(&channel->sem);
2515 		if (extend_args.rc) {
2516 			return extend_args.rc;
2517 		}
2518 	}
2519 
2520 	flush_req = alloc_fs_request(channel);
2521 	if (flush_req == NULL) {
2522 		pthread_spin_unlock(&file->lock);
2523 		return -ENOMEM;
2524 	}
2525 
2526 	last = file->last;
2527 	rem_length = length;
2528 	cur_payload = payload;
2529 	while (rem_length > 0) {
2530 		copy = last->buf_size - last->bytes_filled;
2531 		if (copy > rem_length) {
2532 			copy = rem_length;
2533 		}
2534 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2535 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2536 		file->append_pos += copy;
2537 		if (file->length < file->append_pos) {
2538 			file->length = file->append_pos;
2539 		}
2540 		cur_payload += copy;
2541 		last->bytes_filled += copy;
2542 		rem_length -= copy;
2543 		if (last->bytes_filled == last->buf_size) {
2544 			cache_buffers_filled++;
2545 			last = cache_append_buffer(file);
2546 			if (last == NULL) {
2547 				BLOBFS_TRACE(file, "nomem\n");
2548 				free_fs_request(flush_req);
2549 				pthread_spin_unlock(&file->lock);
2550 				return -ENOMEM;
2551 			}
2552 		}
2553 	}
2554 
2555 	pthread_spin_unlock(&file->lock);
2556 
2557 	if (cache_buffers_filled == 0) {
2558 		free_fs_request(flush_req);
2559 		return 0;
2560 	}
2561 
2562 	flush_req->args.file = file;
2563 	file->fs->send_request(__file_flush, flush_req);
2564 	return 0;
2565 }
2566 
2567 static void
2568 __readahead_done(void *ctx, int bserrno)
2569 {
2570 	struct spdk_fs_request *req = ctx;
2571 	struct spdk_fs_cb_args *args = &req->args;
2572 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2573 	struct spdk_file *file = args->file;
2574 
2575 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2576 
2577 	pthread_spin_lock(&file->lock);
2578 	cache_buffer->bytes_filled = args->op.readahead.length;
2579 	cache_buffer->bytes_flushed = args->op.readahead.length;
2580 	cache_buffer->in_progress = false;
2581 	pthread_spin_unlock(&file->lock);
2582 
2583 	free_fs_request(req);
2584 }
2585 
2586 static void
2587 __readahead(void *ctx)
2588 {
2589 	struct spdk_fs_request *req = ctx;
2590 	struct spdk_fs_cb_args *args = &req->args;
2591 	struct spdk_file *file = args->file;
2592 	uint64_t offset, length, start_lba, num_lba;
2593 	uint32_t lba_size;
2594 
2595 	offset = args->op.readahead.offset;
2596 	length = args->op.readahead.length;
2597 	assert(length > 0);
2598 
2599 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2600 
2601 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2602 		     offset, length, start_lba, num_lba);
2603 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2604 			  args->op.readahead.cache_buffer->buf,
2605 			  start_lba, num_lba, __readahead_done, req);
2606 }
2607 
2608 static uint64_t
2609 __next_cache_buffer_offset(uint64_t offset)
2610 {
2611 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2612 }
2613 
2614 static void
2615 check_readahead(struct spdk_file *file, uint64_t offset,
2616 		struct spdk_fs_channel *channel)
2617 {
2618 	struct spdk_fs_request *req;
2619 	struct spdk_fs_cb_args *args;
2620 
2621 	offset = __next_cache_buffer_offset(offset);
2622 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2623 		return;
2624 	}
2625 
2626 	req = alloc_fs_request(channel);
2627 	if (req == NULL) {
2628 		return;
2629 	}
2630 	args = &req->args;
2631 
2632 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2633 
2634 	args->file = file;
2635 	args->op.readahead.offset = offset;
2636 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2637 	if (!args->op.readahead.cache_buffer) {
2638 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2639 		free_fs_request(req);
2640 		return;
2641 	}
2642 
2643 	args->op.readahead.cache_buffer->in_progress = true;
2644 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2645 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2646 	} else {
2647 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2648 	}
2649 	file->fs->send_request(__readahead, req);
2650 }
2651 
2652 int64_t
2653 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2654 	       void *payload, uint64_t offset, uint64_t length)
2655 {
2656 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2657 	uint64_t final_offset, final_length;
2658 	uint32_t sub_reads = 0;
2659 	struct cache_buffer *buf;
2660 	uint64_t read_len;
2661 	int rc = 0;
2662 
2663 	pthread_spin_lock(&file->lock);
2664 
2665 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2666 
2667 	file->open_for_writing = false;
2668 
2669 	if (length == 0 || offset >= file->append_pos) {
2670 		pthread_spin_unlock(&file->lock);
2671 		return 0;
2672 	}
2673 
2674 	if (offset + length > file->append_pos) {
2675 		length = file->append_pos - offset;
2676 	}
2677 
2678 	if (offset != file->next_seq_offset) {
2679 		file->seq_byte_count = 0;
2680 	}
2681 	file->seq_byte_count += length;
2682 	file->next_seq_offset = offset + length;
2683 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2684 		check_readahead(file, offset, channel);
2685 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2686 	}
2687 
2688 	final_length = 0;
2689 	final_offset = offset + length;
2690 	while (offset < final_offset) {
2691 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2692 		if (length > (final_offset - offset)) {
2693 			length = final_offset - offset;
2694 		}
2695 
2696 		buf = tree_find_filled_buffer(file->tree, offset);
2697 		if (buf == NULL) {
2698 			pthread_spin_unlock(&file->lock);
2699 			rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2700 			pthread_spin_lock(&file->lock);
2701 			if (rc == 0) {
2702 				sub_reads++;
2703 			}
2704 		} else {
2705 			read_len = length;
2706 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2707 				read_len = buf->offset + buf->bytes_filled - offset;
2708 			}
2709 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2710 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2711 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2712 				tree_remove_buffer(file->tree, buf);
2713 				if (file->tree->present_mask == 0) {
2714 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2715 				}
2716 			}
2717 		}
2718 
2719 		if (rc == 0) {
2720 			final_length += length;
2721 		} else {
2722 			break;
2723 		}
2724 		payload += length;
2725 		offset += length;
2726 	}
2727 	pthread_spin_unlock(&file->lock);
2728 	while (sub_reads > 0) {
2729 		sem_wait(&channel->sem);
2730 		sub_reads--;
2731 	}
2732 	if (rc == 0) {
2733 		return final_length;
2734 	} else {
2735 		return rc;
2736 	}
2737 }
2738 
2739 static void
2740 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2741 	   spdk_file_op_complete cb_fn, void *cb_arg)
2742 {
2743 	struct spdk_fs_request *sync_req;
2744 	struct spdk_fs_request *flush_req;
2745 	struct spdk_fs_cb_args *sync_args;
2746 	struct spdk_fs_cb_args *flush_args;
2747 
2748 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2749 
2750 	pthread_spin_lock(&file->lock);
2751 	if (file->append_pos <= file->length_xattr) {
2752 		BLOBFS_TRACE(file, "done - file already synced\n");
2753 		pthread_spin_unlock(&file->lock);
2754 		cb_fn(cb_arg, 0);
2755 		return;
2756 	}
2757 
2758 	sync_req = alloc_fs_request(channel);
2759 	if (!sync_req) {
2760 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2761 		pthread_spin_unlock(&file->lock);
2762 		cb_fn(cb_arg, -ENOMEM);
2763 		return;
2764 	}
2765 	sync_args = &sync_req->args;
2766 
2767 	flush_req = alloc_fs_request(channel);
2768 	if (!flush_req) {
2769 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2770 		free_fs_request(sync_req);
2771 		pthread_spin_unlock(&file->lock);
2772 		cb_fn(cb_arg, -ENOMEM);
2773 		return;
2774 	}
2775 	flush_args = &flush_req->args;
2776 
2777 	sync_args->file = file;
2778 	sync_args->fn.file_op = cb_fn;
2779 	sync_args->arg = cb_arg;
2780 	sync_args->op.sync.offset = file->append_pos;
2781 	sync_args->op.sync.xattr_in_progress = false;
2782 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2783 	pthread_spin_unlock(&file->lock);
2784 
2785 	flush_args->file = file;
2786 	channel->send_request(__file_flush, flush_req);
2787 }
2788 
2789 int
2790 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2791 {
2792 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2793 	struct spdk_fs_cb_args args = {};
2794 
2795 	args.sem = &channel->sem;
2796 	_file_sync(file, channel, __wake_caller, &args);
2797 	sem_wait(&channel->sem);
2798 
2799 	return args.rc;
2800 }
2801 
2802 void
2803 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2804 		     spdk_file_op_complete cb_fn, void *cb_arg)
2805 {
2806 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2807 
2808 	_file_sync(file, channel, cb_fn, cb_arg);
2809 }
2810 
2811 void
2812 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2813 {
2814 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2815 	file->priority = priority;
2816 
2817 }
2818 
2819 /*
2820  * Close routines
2821  */
2822 
2823 static void
2824 __file_close_async_done(void *ctx, int bserrno)
2825 {
2826 	struct spdk_fs_request *req = ctx;
2827 	struct spdk_fs_cb_args *args = &req->args;
2828 	struct spdk_file *file = args->file;
2829 
2830 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2831 
2832 	if (file->is_deleted) {
2833 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2834 		return;
2835 	}
2836 
2837 	args->fn.file_op(args->arg, bserrno);
2838 	free_fs_request(req);
2839 }
2840 
2841 static void
2842 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2843 {
2844 	struct spdk_blob *blob;
2845 
2846 	pthread_spin_lock(&file->lock);
2847 	if (file->ref_count == 0) {
2848 		pthread_spin_unlock(&file->lock);
2849 		__file_close_async_done(req, -EBADF);
2850 		return;
2851 	}
2852 
2853 	file->ref_count--;
2854 	if (file->ref_count > 0) {
2855 		pthread_spin_unlock(&file->lock);
2856 		req->args.fn.file_op(req->args.arg, 0);
2857 		free_fs_request(req);
2858 		return;
2859 	}
2860 
2861 	pthread_spin_unlock(&file->lock);
2862 
2863 	blob = file->blob;
2864 	file->blob = NULL;
2865 	spdk_blob_close(blob, __file_close_async_done, req);
2866 }
2867 
2868 static void
2869 __file_close_async__sync_done(void *arg, int fserrno)
2870 {
2871 	struct spdk_fs_request *req = arg;
2872 	struct spdk_fs_cb_args *args = &req->args;
2873 
2874 	__file_close_async(args->file, req);
2875 }
2876 
2877 void
2878 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2879 {
2880 	struct spdk_fs_request *req;
2881 	struct spdk_fs_cb_args *args;
2882 
2883 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2884 	if (req == NULL) {
2885 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2886 		cb_fn(cb_arg, -ENOMEM);
2887 		return;
2888 	}
2889 
2890 	args = &req->args;
2891 	args->file = file;
2892 	args->fn.file_op = cb_fn;
2893 	args->arg = cb_arg;
2894 
2895 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2896 }
2897 
2898 static void
2899 __file_close(void *arg)
2900 {
2901 	struct spdk_fs_request *req = arg;
2902 	struct spdk_fs_cb_args *args = &req->args;
2903 	struct spdk_file *file = args->file;
2904 
2905 	__file_close_async(file, req);
2906 }
2907 
2908 int
2909 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2910 {
2911 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2912 	struct spdk_fs_request *req;
2913 	struct spdk_fs_cb_args *args;
2914 
2915 	req = alloc_fs_request(channel);
2916 	if (req == NULL) {
2917 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2918 		return -ENOMEM;
2919 	}
2920 
2921 	args = &req->args;
2922 
2923 	spdk_file_sync(file, ctx);
2924 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2925 	args->file = file;
2926 	args->sem = &channel->sem;
2927 	args->fn.file_op = __wake_caller;
2928 	args->arg = args;
2929 	channel->send_request(__file_close, req);
2930 	sem_wait(&channel->sem);
2931 
2932 	return args->rc;
2933 }
2934 
2935 int
2936 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2937 {
2938 	if (size < sizeof(spdk_blob_id)) {
2939 		return -EINVAL;
2940 	}
2941 
2942 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2943 
2944 	return sizeof(spdk_blob_id);
2945 }
2946 
2947 static void
2948 _file_free(void *ctx)
2949 {
2950 	struct spdk_file *file = ctx;
2951 
2952 	pthread_spin_lock(&g_caches_lock);
2953 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2954 	pthread_spin_unlock(&g_caches_lock);
2955 
2956 	free(file->name);
2957 	free(file->tree);
2958 	free(file);
2959 }
2960 
2961 static void
2962 file_free(struct spdk_file *file)
2963 {
2964 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2965 	pthread_spin_lock(&file->lock);
2966 	if (file->tree->present_mask == 0) {
2967 		pthread_spin_unlock(&file->lock);
2968 		free(file->name);
2969 		free(file->tree);
2970 		free(file);
2971 		return;
2972 	}
2973 
2974 	tree_free_buffers(file->tree);
2975 	assert(file->tree->present_mask == 0);
2976 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2977 	pthread_spin_unlock(&file->lock);
2978 }
2979 
2980 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2981 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2982