xref: /spdk/lib/blobfs/blobfs.c (revision 0ed85362c8132a2d1927757fbcade66b6660d26a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "tree.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
58 
59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
60 static struct spdk_mempool *g_cache_pool;
61 static TAILQ_HEAD(, spdk_file) g_caches;
62 static struct spdk_poller *g_cache_pool_mgmt_poller;
63 static struct spdk_thread *g_cache_pool_thread;
64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
65 static int g_fs_count = 0;
66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
67 
68 #define TRACE_GROUP_BLOBFS	0x7
69 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
70 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
71 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
72 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
73 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
74 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
75 
76 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
77 {
78 	spdk_trace_register_description("BLOBFS_XATTR_START",
79 					TRACE_BLOBFS_XATTR_START,
80 					OWNER_NONE, OBJECT_NONE, 0,
81 					SPDK_TRACE_ARG_TYPE_STR,
82 					"file:    ");
83 	spdk_trace_register_description("BLOBFS_XATTR_END",
84 					TRACE_BLOBFS_XATTR_END,
85 					OWNER_NONE, OBJECT_NONE, 0,
86 					SPDK_TRACE_ARG_TYPE_STR,
87 					"file:    ");
88 	spdk_trace_register_description("BLOBFS_OPEN",
89 					TRACE_BLOBFS_OPEN,
90 					OWNER_NONE, OBJECT_NONE, 0,
91 					SPDK_TRACE_ARG_TYPE_STR,
92 					"file:    ");
93 	spdk_trace_register_description("BLOBFS_CLOSE",
94 					TRACE_BLOBFS_CLOSE,
95 					OWNER_NONE, OBJECT_NONE, 0,
96 					SPDK_TRACE_ARG_TYPE_STR,
97 					"file:    ");
98 	spdk_trace_register_description("BLOBFS_DELETE_START",
99 					TRACE_BLOBFS_DELETE_START,
100 					OWNER_NONE, OBJECT_NONE, 0,
101 					SPDK_TRACE_ARG_TYPE_STR,
102 					"file:    ");
103 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
104 					TRACE_BLOBFS_DELETE_DONE,
105 					OWNER_NONE, OBJECT_NONE, 0,
106 					SPDK_TRACE_ARG_TYPE_STR,
107 					"file:    ");
108 }
109 
110 void
111 cache_buffer_free(struct cache_buffer *cache_buffer)
112 {
113 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
114 	free(cache_buffer);
115 }
116 
117 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
118 
119 struct spdk_file {
120 	struct spdk_filesystem	*fs;
121 	struct spdk_blob	*blob;
122 	char			*name;
123 	uint64_t		trace_arg_name;
124 	uint64_t		length;
125 	bool                    is_deleted;
126 	bool			open_for_writing;
127 	uint64_t		length_flushed;
128 	uint64_t		length_xattr;
129 	uint64_t		append_pos;
130 	uint64_t		seq_byte_count;
131 	uint64_t		next_seq_offset;
132 	uint32_t		priority;
133 	TAILQ_ENTRY(spdk_file)	tailq;
134 	spdk_blob_id		blobid;
135 	uint32_t		ref_count;
136 	pthread_spinlock_t	lock;
137 	struct cache_buffer	*last;
138 	struct cache_tree	*tree;
139 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
140 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
141 	TAILQ_ENTRY(spdk_file)	cache_tailq;
142 };
143 
144 struct spdk_deleted_file {
145 	spdk_blob_id	id;
146 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
147 };
148 
149 struct spdk_filesystem {
150 	struct spdk_blob_store	*bs;
151 	TAILQ_HEAD(, spdk_file)	files;
152 	struct spdk_bs_opts	bs_opts;
153 	struct spdk_bs_dev	*bdev;
154 	fs_send_request_fn	send_request;
155 
156 	struct {
157 		uint32_t		max_ops;
158 		struct spdk_io_channel	*sync_io_channel;
159 		struct spdk_fs_channel	*sync_fs_channel;
160 	} sync_target;
161 
162 	struct {
163 		uint32_t		max_ops;
164 		struct spdk_io_channel	*md_io_channel;
165 		struct spdk_fs_channel	*md_fs_channel;
166 	} md_target;
167 
168 	struct {
169 		uint32_t		max_ops;
170 	} io_target;
171 };
172 
173 struct spdk_fs_cb_args {
174 	union {
175 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
176 		spdk_fs_op_complete			fs_op;
177 		spdk_file_op_with_handle_complete	file_op_with_handle;
178 		spdk_file_op_complete			file_op;
179 		spdk_file_stat_op_complete		stat_op;
180 	} fn;
181 	void *arg;
182 	sem_t *sem;
183 	struct spdk_filesystem *fs;
184 	struct spdk_file *file;
185 	int rc;
186 	struct iovec *iovs;
187 	uint32_t iovcnt;
188 	struct iovec iov;
189 	union {
190 		struct {
191 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
192 		} fs_load;
193 		struct {
194 			uint64_t	length;
195 		} truncate;
196 		struct {
197 			struct spdk_io_channel	*channel;
198 			void		*pin_buf;
199 			int		is_read;
200 			off_t		offset;
201 			size_t		length;
202 			uint64_t	start_lba;
203 			uint64_t	num_lba;
204 			uint32_t	blocklen;
205 		} rw;
206 		struct {
207 			const char	*old_name;
208 			const char	*new_name;
209 		} rename;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 		} flush;
214 		struct {
215 			struct cache_buffer	*cache_buffer;
216 			uint64_t		length;
217 			uint64_t		offset;
218 		} readahead;
219 		struct {
220 			/* offset of the file when the sync request was made */
221 			uint64_t			offset;
222 			TAILQ_ENTRY(spdk_fs_request)	tailq;
223 			bool				xattr_in_progress;
224 			/* length written to the xattr for this file - this should
225 			 * always be the same as the offset if only one thread is
226 			 * writing to the file, but could differ if multiple threads
227 			 * are appending
228 			 */
229 			uint64_t			length;
230 		} sync;
231 		struct {
232 			uint32_t			num_clusters;
233 		} resize;
234 		struct {
235 			const char	*name;
236 			uint32_t	flags;
237 			TAILQ_ENTRY(spdk_fs_request)	tailq;
238 		} open;
239 		struct {
240 			const char		*name;
241 			struct spdk_blob	*blob;
242 		} create;
243 		struct {
244 			const char	*name;
245 		} delete;
246 		struct {
247 			const char	*name;
248 		} stat;
249 	} op;
250 };
251 
252 static void file_free(struct spdk_file *file);
253 static void fs_io_device_unregister(struct spdk_filesystem *fs);
254 static void fs_free_io_channels(struct spdk_filesystem *fs);
255 
256 void
257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
258 {
259 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
260 }
261 
262 static int _blobfs_cache_pool_reclaim(void *arg);
263 
264 static bool
265 blobfs_cache_pool_need_reclaim(void)
266 {
267 	size_t count;
268 
269 	count = spdk_mempool_count(g_cache_pool);
270 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
271 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
272 	 */
273 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
274 		return false;
275 	}
276 
277 	return true;
278 }
279 
280 static void
281 __start_cache_pool_mgmt(void *ctx)
282 {
283 	assert(g_cache_pool == NULL);
284 
285 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
286 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
287 					   CACHE_BUFFER_SIZE,
288 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
289 					   SPDK_ENV_SOCKET_ID_ANY);
290 	if (!g_cache_pool) {
291 		SPDK_ERRLOG("Create mempool failed, you may "
292 			    "increase the memory and try again\n");
293 		assert(false);
294 	}
295 	TAILQ_INIT(&g_caches);
296 
297 	assert(g_cache_pool_mgmt_poller == NULL);
298 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
299 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
300 }
301 
302 static void
303 __stop_cache_pool_mgmt(void *ctx)
304 {
305 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
306 
307 	assert(g_cache_pool != NULL);
308 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
309 	spdk_mempool_free(g_cache_pool);
310 	g_cache_pool = NULL;
311 
312 	spdk_thread_exit(g_cache_pool_thread);
313 }
314 
315 static void
316 initialize_global_cache(void)
317 {
318 	pthread_mutex_lock(&g_cache_init_lock);
319 	if (g_fs_count == 0) {
320 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
321 		assert(g_cache_pool_thread != NULL);
322 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
323 	}
324 	g_fs_count++;
325 	pthread_mutex_unlock(&g_cache_init_lock);
326 }
327 
328 static void
329 free_global_cache(void)
330 {
331 	pthread_mutex_lock(&g_cache_init_lock);
332 	g_fs_count--;
333 	if (g_fs_count == 0) {
334 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
335 	}
336 	pthread_mutex_unlock(&g_cache_init_lock);
337 }
338 
339 static uint64_t
340 __file_get_blob_size(struct spdk_file *file)
341 {
342 	uint64_t cluster_sz;
343 
344 	cluster_sz = file->fs->bs_opts.cluster_sz;
345 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
346 }
347 
348 struct spdk_fs_request {
349 	struct spdk_fs_cb_args		args;
350 	TAILQ_ENTRY(spdk_fs_request)	link;
351 	struct spdk_fs_channel		*channel;
352 };
353 
354 struct spdk_fs_channel {
355 	struct spdk_fs_request		*req_mem;
356 	TAILQ_HEAD(, spdk_fs_request)	reqs;
357 	sem_t				sem;
358 	struct spdk_filesystem		*fs;
359 	struct spdk_io_channel		*bs_channel;
360 	fs_send_request_fn		send_request;
361 	bool				sync;
362 	uint32_t			outstanding_reqs;
363 	pthread_spinlock_t		lock;
364 };
365 
366 /* For now, this is effectively an alias. But eventually we'll shift
367  * some data members over. */
368 struct spdk_fs_thread_ctx {
369 	struct spdk_fs_channel	ch;
370 };
371 
372 static struct spdk_fs_request *
373 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
374 {
375 	struct spdk_fs_request *req;
376 	struct iovec *iovs = NULL;
377 
378 	if (iovcnt > 1) {
379 		iovs = calloc(iovcnt, sizeof(struct iovec));
380 		if (!iovs) {
381 			return NULL;
382 		}
383 	}
384 
385 	if (channel->sync) {
386 		pthread_spin_lock(&channel->lock);
387 	}
388 
389 	req = TAILQ_FIRST(&channel->reqs);
390 	if (req) {
391 		channel->outstanding_reqs++;
392 		TAILQ_REMOVE(&channel->reqs, req, link);
393 	}
394 
395 	if (channel->sync) {
396 		pthread_spin_unlock(&channel->lock);
397 	}
398 
399 	if (req == NULL) {
400 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
401 		free(iovs);
402 		return NULL;
403 	}
404 	memset(req, 0, sizeof(*req));
405 	req->channel = channel;
406 	if (iovcnt > 1) {
407 		req->args.iovs = iovs;
408 	} else {
409 		req->args.iovs = &req->args.iov;
410 	}
411 	req->args.iovcnt = iovcnt;
412 
413 	return req;
414 }
415 
416 static struct spdk_fs_request *
417 alloc_fs_request(struct spdk_fs_channel *channel)
418 {
419 	return alloc_fs_request_with_iov(channel, 0);
420 }
421 
422 static void
423 free_fs_request(struct spdk_fs_request *req)
424 {
425 	struct spdk_fs_channel *channel = req->channel;
426 
427 	if (req->args.iovcnt > 1) {
428 		free(req->args.iovs);
429 	}
430 
431 	if (channel->sync) {
432 		pthread_spin_lock(&channel->lock);
433 	}
434 
435 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
436 	channel->outstanding_reqs--;
437 
438 	if (channel->sync) {
439 		pthread_spin_unlock(&channel->lock);
440 	}
441 }
442 
443 static int
444 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
445 		  uint32_t max_ops)
446 {
447 	uint32_t i;
448 
449 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
450 	if (!channel->req_mem) {
451 		return -1;
452 	}
453 
454 	channel->outstanding_reqs = 0;
455 	TAILQ_INIT(&channel->reqs);
456 	sem_init(&channel->sem, 0, 0);
457 
458 	for (i = 0; i < max_ops; i++) {
459 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
460 	}
461 
462 	channel->fs = fs;
463 
464 	return 0;
465 }
466 
467 static int
468 fs_md_channel_create(void *io_device, void *ctx_buf)
469 {
470 	struct spdk_filesystem		*fs;
471 	struct spdk_fs_channel		*channel = ctx_buf;
472 
473 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
474 
475 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
476 }
477 
478 static int
479 fs_sync_channel_create(void *io_device, void *ctx_buf)
480 {
481 	struct spdk_filesystem		*fs;
482 	struct spdk_fs_channel		*channel = ctx_buf;
483 
484 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
485 
486 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
487 }
488 
489 static int
490 fs_io_channel_create(void *io_device, void *ctx_buf)
491 {
492 	struct spdk_filesystem		*fs;
493 	struct spdk_fs_channel		*channel = ctx_buf;
494 
495 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
496 
497 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
498 }
499 
500 static void
501 fs_channel_destroy(void *io_device, void *ctx_buf)
502 {
503 	struct spdk_fs_channel *channel = ctx_buf;
504 
505 	if (channel->outstanding_reqs > 0) {
506 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
507 			    channel->outstanding_reqs);
508 	}
509 
510 	free(channel->req_mem);
511 	if (channel->bs_channel != NULL) {
512 		spdk_bs_free_io_channel(channel->bs_channel);
513 	}
514 }
515 
516 static void
517 __send_request_direct(fs_request_fn fn, void *arg)
518 {
519 	fn(arg);
520 }
521 
522 static void
523 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
524 {
525 	fs->bs = bs;
526 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
527 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
528 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
529 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
530 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
531 
532 	initialize_global_cache();
533 }
534 
535 static void
536 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
537 {
538 	struct spdk_fs_request *req = ctx;
539 	struct spdk_fs_cb_args *args = &req->args;
540 	struct spdk_filesystem *fs = args->fs;
541 
542 	if (bserrno == 0) {
543 		common_fs_bs_init(fs, bs);
544 	} else {
545 		free(fs);
546 		fs = NULL;
547 	}
548 
549 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
550 	free_fs_request(req);
551 }
552 
553 static void
554 fs_conf_parse(void)
555 {
556 	struct spdk_conf_section *sp;
557 	int cache_buffer_shift;
558 
559 	sp = spdk_conf_find_section(NULL, "Blobfs");
560 	if (sp == NULL) {
561 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
562 		return;
563 	}
564 
565 	cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
566 	if (cache_buffer_shift <= 0) {
567 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
568 	} else {
569 		g_fs_cache_buffer_shift = cache_buffer_shift;
570 	}
571 }
572 
573 static struct spdk_filesystem *
574 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
575 {
576 	struct spdk_filesystem *fs;
577 
578 	fs = calloc(1, sizeof(*fs));
579 	if (fs == NULL) {
580 		return NULL;
581 	}
582 
583 	fs->bdev = dev;
584 	fs->send_request = send_request_fn;
585 	TAILQ_INIT(&fs->files);
586 
587 	fs->md_target.max_ops = 512;
588 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
589 				sizeof(struct spdk_fs_channel), "blobfs_md");
590 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
591 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
592 
593 	fs->sync_target.max_ops = 512;
594 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
595 				sizeof(struct spdk_fs_channel), "blobfs_sync");
596 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
597 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
598 
599 	fs->io_target.max_ops = 512;
600 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
601 				sizeof(struct spdk_fs_channel), "blobfs_io");
602 
603 	return fs;
604 }
605 
606 static void
607 __wake_caller(void *arg, int fserrno)
608 {
609 	struct spdk_fs_cb_args *args = arg;
610 
611 	args->rc = fserrno;
612 	sem_post(args->sem);
613 }
614 
615 void
616 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
617 	     fs_send_request_fn send_request_fn,
618 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
619 {
620 	struct spdk_filesystem *fs;
621 	struct spdk_fs_request *req;
622 	struct spdk_fs_cb_args *args;
623 	struct spdk_bs_opts opts = {};
624 
625 	fs = fs_alloc(dev, send_request_fn);
626 	if (fs == NULL) {
627 		cb_fn(cb_arg, NULL, -ENOMEM);
628 		return;
629 	}
630 
631 	fs_conf_parse();
632 
633 	req = alloc_fs_request(fs->md_target.md_fs_channel);
634 	if (req == NULL) {
635 		fs_free_io_channels(fs);
636 		fs_io_device_unregister(fs);
637 		cb_fn(cb_arg, NULL, -ENOMEM);
638 		return;
639 	}
640 
641 	args = &req->args;
642 	args->fn.fs_op_with_handle = cb_fn;
643 	args->arg = cb_arg;
644 	args->fs = fs;
645 
646 	spdk_bs_opts_init(&opts);
647 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
648 	if (opt) {
649 		opts.cluster_sz = opt->cluster_sz;
650 	}
651 	spdk_bs_init(dev, &opts, init_cb, req);
652 }
653 
654 static struct spdk_file *
655 file_alloc(struct spdk_filesystem *fs)
656 {
657 	struct spdk_file *file;
658 
659 	file = calloc(1, sizeof(*file));
660 	if (file == NULL) {
661 		return NULL;
662 	}
663 
664 	file->tree = calloc(1, sizeof(*file->tree));
665 	if (file->tree == NULL) {
666 		free(file);
667 		return NULL;
668 	}
669 
670 	if (pthread_spin_init(&file->lock, 0)) {
671 		free(file->tree);
672 		free(file);
673 		return NULL;
674 	}
675 
676 	file->fs = fs;
677 	TAILQ_INIT(&file->open_requests);
678 	TAILQ_INIT(&file->sync_requests);
679 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
680 	file->priority = SPDK_FILE_PRIORITY_LOW;
681 	return file;
682 }
683 
684 static void fs_load_done(void *ctx, int bserrno);
685 
686 static int
687 _handle_deleted_files(struct spdk_fs_request *req)
688 {
689 	struct spdk_fs_cb_args *args = &req->args;
690 	struct spdk_filesystem *fs = args->fs;
691 
692 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
693 		struct spdk_deleted_file *deleted_file;
694 
695 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
696 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
697 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
698 		free(deleted_file);
699 		return 0;
700 	}
701 
702 	return 1;
703 }
704 
705 static void
706 fs_load_done(void *ctx, int bserrno)
707 {
708 	struct spdk_fs_request *req = ctx;
709 	struct spdk_fs_cb_args *args = &req->args;
710 	struct spdk_filesystem *fs = args->fs;
711 
712 	/* The filesystem has been loaded.  Now check if there are any files that
713 	 *  were marked for deletion before last unload.  Do not complete the
714 	 *  fs_load callback until all of them have been deleted on disk.
715 	 */
716 	if (_handle_deleted_files(req) == 0) {
717 		/* We found a file that's been marked for deleting but not actually
718 		 *  deleted yet.  This function will get called again once the delete
719 		 *  operation is completed.
720 		 */
721 		return;
722 	}
723 
724 	args->fn.fs_op_with_handle(args->arg, fs, 0);
725 	free_fs_request(req);
726 
727 }
728 
729 static void
730 _file_build_trace_arg_name(struct spdk_file *f)
731 {
732 	f->trace_arg_name = 0;
733 	memcpy(&f->trace_arg_name, f->name,
734 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
735 }
736 
737 static void
738 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
739 {
740 	struct spdk_fs_request *req = ctx;
741 	struct spdk_fs_cb_args *args = &req->args;
742 	struct spdk_filesystem *fs = args->fs;
743 	uint64_t *length;
744 	const char *name;
745 	uint32_t *is_deleted;
746 	size_t value_len;
747 
748 	if (rc < 0) {
749 		args->fn.fs_op_with_handle(args->arg, fs, rc);
750 		free_fs_request(req);
751 		return;
752 	}
753 
754 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
755 	if (rc < 0) {
756 		args->fn.fs_op_with_handle(args->arg, fs, rc);
757 		free_fs_request(req);
758 		return;
759 	}
760 
761 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
762 	if (rc < 0) {
763 		args->fn.fs_op_with_handle(args->arg, fs, rc);
764 		free_fs_request(req);
765 		return;
766 	}
767 
768 	assert(value_len == 8);
769 
770 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
771 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
772 	if (rc < 0) {
773 		struct spdk_file *f;
774 
775 		f = file_alloc(fs);
776 		if (f == NULL) {
777 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
778 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
779 			free_fs_request(req);
780 			return;
781 		}
782 
783 		f->name = strdup(name);
784 		_file_build_trace_arg_name(f);
785 		f->blobid = spdk_blob_get_id(blob);
786 		f->length = *length;
787 		f->length_flushed = *length;
788 		f->length_xattr = *length;
789 		f->append_pos = *length;
790 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
791 	} else {
792 		struct spdk_deleted_file *deleted_file;
793 
794 		deleted_file = calloc(1, sizeof(*deleted_file));
795 		if (deleted_file == NULL) {
796 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
797 			free_fs_request(req);
798 			return;
799 		}
800 		deleted_file->id = spdk_blob_get_id(blob);
801 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
802 	}
803 }
804 
805 static void
806 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
807 {
808 	struct spdk_fs_request *req = ctx;
809 	struct spdk_fs_cb_args *args = &req->args;
810 	struct spdk_filesystem *fs = args->fs;
811 	struct spdk_bs_type bstype;
812 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
813 	static const struct spdk_bs_type zeros;
814 
815 	if (bserrno != 0) {
816 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
817 		free_fs_request(req);
818 		fs_free_io_channels(fs);
819 		fs_io_device_unregister(fs);
820 		return;
821 	}
822 
823 	bstype = spdk_bs_get_bstype(bs);
824 
825 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
826 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n");
827 		spdk_bs_set_bstype(bs, blobfs_type);
828 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
829 		SPDK_ERRLOG("not blobfs\n");
830 		SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype));
831 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
832 		free_fs_request(req);
833 		fs_free_io_channels(fs);
834 		fs_io_device_unregister(fs);
835 		return;
836 	}
837 
838 	common_fs_bs_init(fs, bs);
839 	fs_load_done(req, 0);
840 }
841 
842 static void
843 fs_io_device_unregister(struct spdk_filesystem *fs)
844 {
845 	assert(fs != NULL);
846 	spdk_io_device_unregister(&fs->md_target, NULL);
847 	spdk_io_device_unregister(&fs->sync_target, NULL);
848 	spdk_io_device_unregister(&fs->io_target, NULL);
849 	free(fs);
850 }
851 
852 static void
853 fs_free_io_channels(struct spdk_filesystem *fs)
854 {
855 	assert(fs != NULL);
856 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
857 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
858 }
859 
860 void
861 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
862 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
863 {
864 	struct spdk_filesystem *fs;
865 	struct spdk_fs_cb_args *args;
866 	struct spdk_fs_request *req;
867 	struct spdk_bs_opts	bs_opts;
868 
869 	fs = fs_alloc(dev, send_request_fn);
870 	if (fs == NULL) {
871 		cb_fn(cb_arg, NULL, -ENOMEM);
872 		return;
873 	}
874 
875 	fs_conf_parse();
876 
877 	req = alloc_fs_request(fs->md_target.md_fs_channel);
878 	if (req == NULL) {
879 		fs_free_io_channels(fs);
880 		fs_io_device_unregister(fs);
881 		cb_fn(cb_arg, NULL, -ENOMEM);
882 		return;
883 	}
884 
885 	args = &req->args;
886 	args->fn.fs_op_with_handle = cb_fn;
887 	args->arg = cb_arg;
888 	args->fs = fs;
889 	TAILQ_INIT(&args->op.fs_load.deleted_files);
890 	spdk_bs_opts_init(&bs_opts);
891 	bs_opts.iter_cb_fn = iter_cb;
892 	bs_opts.iter_cb_arg = req;
893 	spdk_bs_load(dev, &bs_opts, load_cb, req);
894 }
895 
896 static void
897 unload_cb(void *ctx, int bserrno)
898 {
899 	struct spdk_fs_request *req = ctx;
900 	struct spdk_fs_cb_args *args = &req->args;
901 	struct spdk_filesystem *fs = args->fs;
902 	struct spdk_file *file, *tmp;
903 
904 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
905 		TAILQ_REMOVE(&fs->files, file, tailq);
906 		file_free(file);
907 	}
908 
909 	free_global_cache();
910 
911 	args->fn.fs_op(args->arg, bserrno);
912 	free(req);
913 
914 	fs_io_device_unregister(fs);
915 }
916 
917 void
918 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
919 {
920 	struct spdk_fs_request *req;
921 	struct spdk_fs_cb_args *args;
922 
923 	/*
924 	 * We must free the md_channel before unloading the blobstore, so just
925 	 *  allocate this request from the general heap.
926 	 */
927 	req = calloc(1, sizeof(*req));
928 	if (req == NULL) {
929 		cb_fn(cb_arg, -ENOMEM);
930 		return;
931 	}
932 
933 	args = &req->args;
934 	args->fn.fs_op = cb_fn;
935 	args->arg = cb_arg;
936 	args->fs = fs;
937 
938 	fs_free_io_channels(fs);
939 	spdk_bs_unload(fs->bs, unload_cb, req);
940 }
941 
942 static struct spdk_file *
943 fs_find_file(struct spdk_filesystem *fs, const char *name)
944 {
945 	struct spdk_file *file;
946 
947 	TAILQ_FOREACH(file, &fs->files, tailq) {
948 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
949 			return file;
950 		}
951 	}
952 
953 	return NULL;
954 }
955 
956 void
957 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
958 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
959 {
960 	struct spdk_file_stat stat;
961 	struct spdk_file *f = NULL;
962 
963 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
964 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
965 		return;
966 	}
967 
968 	f = fs_find_file(fs, name);
969 	if (f != NULL) {
970 		stat.blobid = f->blobid;
971 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
972 		cb_fn(cb_arg, &stat, 0);
973 		return;
974 	}
975 
976 	cb_fn(cb_arg, NULL, -ENOENT);
977 }
978 
979 static void
980 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
981 {
982 	struct spdk_fs_request *req = arg;
983 	struct spdk_fs_cb_args *args = &req->args;
984 
985 	args->rc = fserrno;
986 	if (fserrno == 0) {
987 		memcpy(args->arg, stat, sizeof(*stat));
988 	}
989 	sem_post(args->sem);
990 }
991 
992 static void
993 __file_stat(void *arg)
994 {
995 	struct spdk_fs_request *req = arg;
996 	struct spdk_fs_cb_args *args = &req->args;
997 
998 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
999 				args->fn.stat_op, req);
1000 }
1001 
1002 int
1003 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1004 		  const char *name, struct spdk_file_stat *stat)
1005 {
1006 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1007 	struct spdk_fs_request *req;
1008 	int rc;
1009 
1010 	req = alloc_fs_request(channel);
1011 	if (req == NULL) {
1012 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
1013 		return -ENOMEM;
1014 	}
1015 
1016 	req->args.fs = fs;
1017 	req->args.op.stat.name = name;
1018 	req->args.fn.stat_op = __copy_stat;
1019 	req->args.arg = stat;
1020 	req->args.sem = &channel->sem;
1021 	channel->send_request(__file_stat, req);
1022 	sem_wait(&channel->sem);
1023 
1024 	rc = req->args.rc;
1025 	free_fs_request(req);
1026 
1027 	return rc;
1028 }
1029 
1030 static void
1031 fs_create_blob_close_cb(void *ctx, int bserrno)
1032 {
1033 	int rc;
1034 	struct spdk_fs_request *req = ctx;
1035 	struct spdk_fs_cb_args *args = &req->args;
1036 
1037 	rc = args->rc ? args->rc : bserrno;
1038 	args->fn.file_op(args->arg, rc);
1039 	free_fs_request(req);
1040 }
1041 
1042 static void
1043 fs_create_blob_resize_cb(void *ctx, int bserrno)
1044 {
1045 	struct spdk_fs_request *req = ctx;
1046 	struct spdk_fs_cb_args *args = &req->args;
1047 	struct spdk_file *f = args->file;
1048 	struct spdk_blob *blob = args->op.create.blob;
1049 	uint64_t length = 0;
1050 
1051 	args->rc = bserrno;
1052 	if (bserrno) {
1053 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1054 		return;
1055 	}
1056 
1057 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1058 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1059 
1060 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1061 }
1062 
1063 static void
1064 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1065 {
1066 	struct spdk_fs_request *req = ctx;
1067 	struct spdk_fs_cb_args *args = &req->args;
1068 
1069 	if (bserrno) {
1070 		args->fn.file_op(args->arg, bserrno);
1071 		free_fs_request(req);
1072 		return;
1073 	}
1074 
1075 	args->op.create.blob = blob;
1076 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1077 }
1078 
1079 static void
1080 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1081 {
1082 	struct spdk_fs_request *req = ctx;
1083 	struct spdk_fs_cb_args *args = &req->args;
1084 	struct spdk_file *f = args->file;
1085 
1086 	if (bserrno) {
1087 		args->fn.file_op(args->arg, bserrno);
1088 		free_fs_request(req);
1089 		return;
1090 	}
1091 
1092 	f->blobid = blobid;
1093 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1094 }
1095 
1096 void
1097 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1098 			  spdk_file_op_complete cb_fn, void *cb_arg)
1099 {
1100 	struct spdk_file *file;
1101 	struct spdk_fs_request *req;
1102 	struct spdk_fs_cb_args *args;
1103 
1104 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1105 		cb_fn(cb_arg, -ENAMETOOLONG);
1106 		return;
1107 	}
1108 
1109 	file = fs_find_file(fs, name);
1110 	if (file != NULL) {
1111 		cb_fn(cb_arg, -EEXIST);
1112 		return;
1113 	}
1114 
1115 	file = file_alloc(fs);
1116 	if (file == NULL) {
1117 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1118 		cb_fn(cb_arg, -ENOMEM);
1119 		return;
1120 	}
1121 
1122 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1123 	if (req == NULL) {
1124 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1125 		cb_fn(cb_arg, -ENOMEM);
1126 		return;
1127 	}
1128 
1129 	args = &req->args;
1130 	args->file = file;
1131 	args->fn.file_op = cb_fn;
1132 	args->arg = cb_arg;
1133 
1134 	file->name = strdup(name);
1135 	_file_build_trace_arg_name(file);
1136 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1137 }
1138 
1139 static void
1140 __fs_create_file_done(void *arg, int fserrno)
1141 {
1142 	struct spdk_fs_request *req = arg;
1143 	struct spdk_fs_cb_args *args = &req->args;
1144 
1145 	__wake_caller(args, fserrno);
1146 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1147 }
1148 
1149 static void
1150 __fs_create_file(void *arg)
1151 {
1152 	struct spdk_fs_request *req = arg;
1153 	struct spdk_fs_cb_args *args = &req->args;
1154 
1155 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1156 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1157 }
1158 
1159 int
1160 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1161 {
1162 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1163 	struct spdk_fs_request *req;
1164 	struct spdk_fs_cb_args *args;
1165 	int rc;
1166 
1167 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1168 
1169 	req = alloc_fs_request(channel);
1170 	if (req == NULL) {
1171 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1172 		return -ENOMEM;
1173 	}
1174 
1175 	args = &req->args;
1176 	args->fs = fs;
1177 	args->op.create.name = name;
1178 	args->sem = &channel->sem;
1179 	fs->send_request(__fs_create_file, req);
1180 	sem_wait(&channel->sem);
1181 	rc = args->rc;
1182 	free_fs_request(req);
1183 
1184 	return rc;
1185 }
1186 
1187 static void
1188 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1189 {
1190 	struct spdk_fs_request *req = ctx;
1191 	struct spdk_fs_cb_args *args = &req->args;
1192 	struct spdk_file *f = args->file;
1193 
1194 	f->blob = blob;
1195 	while (!TAILQ_EMPTY(&f->open_requests)) {
1196 		req = TAILQ_FIRST(&f->open_requests);
1197 		args = &req->args;
1198 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1199 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1200 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1201 		free_fs_request(req);
1202 	}
1203 }
1204 
1205 static void
1206 fs_open_blob_create_cb(void *ctx, int bserrno)
1207 {
1208 	struct spdk_fs_request *req = ctx;
1209 	struct spdk_fs_cb_args *args = &req->args;
1210 	struct spdk_file *file = args->file;
1211 	struct spdk_filesystem *fs = args->fs;
1212 
1213 	if (file == NULL) {
1214 		/*
1215 		 * This is from an open with CREATE flag - the file
1216 		 *  is now created so look it up in the file list for this
1217 		 *  filesystem.
1218 		 */
1219 		file = fs_find_file(fs, args->op.open.name);
1220 		assert(file != NULL);
1221 		args->file = file;
1222 	}
1223 
1224 	file->ref_count++;
1225 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1226 	if (file->ref_count == 1) {
1227 		assert(file->blob == NULL);
1228 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1229 	} else if (file->blob != NULL) {
1230 		fs_open_blob_done(req, file->blob, 0);
1231 	} else {
1232 		/*
1233 		 * The blob open for this file is in progress due to a previous
1234 		 *  open request.  When that open completes, it will invoke the
1235 		 *  open callback for this request.
1236 		 */
1237 	}
1238 }
1239 
1240 void
1241 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1242 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1243 {
1244 	struct spdk_file *f = NULL;
1245 	struct spdk_fs_request *req;
1246 	struct spdk_fs_cb_args *args;
1247 
1248 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1249 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1250 		return;
1251 	}
1252 
1253 	f = fs_find_file(fs, name);
1254 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1255 		cb_fn(cb_arg, NULL, -ENOENT);
1256 		return;
1257 	}
1258 
1259 	if (f != NULL && f->is_deleted == true) {
1260 		cb_fn(cb_arg, NULL, -ENOENT);
1261 		return;
1262 	}
1263 
1264 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1265 	if (req == NULL) {
1266 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1267 		cb_fn(cb_arg, NULL, -ENOMEM);
1268 		return;
1269 	}
1270 
1271 	args = &req->args;
1272 	args->fn.file_op_with_handle = cb_fn;
1273 	args->arg = cb_arg;
1274 	args->file = f;
1275 	args->fs = fs;
1276 	args->op.open.name = name;
1277 
1278 	if (f == NULL) {
1279 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1280 	} else {
1281 		fs_open_blob_create_cb(req, 0);
1282 	}
1283 }
1284 
1285 static void
1286 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1287 {
1288 	struct spdk_fs_request *req = arg;
1289 	struct spdk_fs_cb_args *args = &req->args;
1290 
1291 	args->file = file;
1292 	__wake_caller(args, bserrno);
1293 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1294 }
1295 
1296 static void
1297 __fs_open_file(void *arg)
1298 {
1299 	struct spdk_fs_request *req = arg;
1300 	struct spdk_fs_cb_args *args = &req->args;
1301 
1302 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1303 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1304 				__fs_open_file_done, req);
1305 }
1306 
1307 int
1308 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1309 		  const char *name, uint32_t flags, struct spdk_file **file)
1310 {
1311 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1312 	struct spdk_fs_request *req;
1313 	struct spdk_fs_cb_args *args;
1314 	int rc;
1315 
1316 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1317 
1318 	req = alloc_fs_request(channel);
1319 	if (req == NULL) {
1320 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1321 		return -ENOMEM;
1322 	}
1323 
1324 	args = &req->args;
1325 	args->fs = fs;
1326 	args->op.open.name = name;
1327 	args->op.open.flags = flags;
1328 	args->sem = &channel->sem;
1329 	fs->send_request(__fs_open_file, req);
1330 	sem_wait(&channel->sem);
1331 	rc = args->rc;
1332 	if (rc == 0) {
1333 		*file = args->file;
1334 	} else {
1335 		*file = NULL;
1336 	}
1337 	free_fs_request(req);
1338 
1339 	return rc;
1340 }
1341 
1342 static void
1343 fs_rename_blob_close_cb(void *ctx, int bserrno)
1344 {
1345 	struct spdk_fs_request *req = ctx;
1346 	struct spdk_fs_cb_args *args = &req->args;
1347 
1348 	args->fn.fs_op(args->arg, bserrno);
1349 	free_fs_request(req);
1350 }
1351 
1352 static void
1353 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1354 {
1355 	struct spdk_fs_request *req = ctx;
1356 	struct spdk_fs_cb_args *args = &req->args;
1357 	const char *new_name = args->op.rename.new_name;
1358 
1359 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1360 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1361 }
1362 
1363 static void
1364 _fs_md_rename_file(struct spdk_fs_request *req)
1365 {
1366 	struct spdk_fs_cb_args *args = &req->args;
1367 	struct spdk_file *f;
1368 
1369 	f = fs_find_file(args->fs, args->op.rename.old_name);
1370 	if (f == NULL) {
1371 		args->fn.fs_op(args->arg, -ENOENT);
1372 		free_fs_request(req);
1373 		return;
1374 	}
1375 
1376 	free(f->name);
1377 	f->name = strdup(args->op.rename.new_name);
1378 	_file_build_trace_arg_name(f);
1379 	args->file = f;
1380 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1381 }
1382 
1383 static void
1384 fs_rename_delete_done(void *arg, int fserrno)
1385 {
1386 	_fs_md_rename_file(arg);
1387 }
1388 
1389 void
1390 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1391 			  const char *old_name, const char *new_name,
1392 			  spdk_file_op_complete cb_fn, void *cb_arg)
1393 {
1394 	struct spdk_file *f;
1395 	struct spdk_fs_request *req;
1396 	struct spdk_fs_cb_args *args;
1397 
1398 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1399 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1400 		cb_fn(cb_arg, -ENAMETOOLONG);
1401 		return;
1402 	}
1403 
1404 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1405 	if (req == NULL) {
1406 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1407 			    new_name);
1408 		cb_fn(cb_arg, -ENOMEM);
1409 		return;
1410 	}
1411 
1412 	args = &req->args;
1413 	args->fn.fs_op = cb_fn;
1414 	args->fs = fs;
1415 	args->arg = cb_arg;
1416 	args->op.rename.old_name = old_name;
1417 	args->op.rename.new_name = new_name;
1418 
1419 	f = fs_find_file(fs, new_name);
1420 	if (f == NULL) {
1421 		_fs_md_rename_file(req);
1422 		return;
1423 	}
1424 
1425 	/*
1426 	 * The rename overwrites an existing file.  So delete the existing file, then
1427 	 *  do the actual rename.
1428 	 */
1429 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1430 }
1431 
1432 static void
1433 __fs_rename_file_done(void *arg, int fserrno)
1434 {
1435 	struct spdk_fs_request *req = arg;
1436 	struct spdk_fs_cb_args *args = &req->args;
1437 
1438 	__wake_caller(args, fserrno);
1439 }
1440 
1441 static void
1442 __fs_rename_file(void *arg)
1443 {
1444 	struct spdk_fs_request *req = arg;
1445 	struct spdk_fs_cb_args *args = &req->args;
1446 
1447 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1448 				  __fs_rename_file_done, req);
1449 }
1450 
1451 int
1452 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1453 		    const char *old_name, const char *new_name)
1454 {
1455 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1456 	struct spdk_fs_request *req;
1457 	struct spdk_fs_cb_args *args;
1458 	int rc;
1459 
1460 	req = alloc_fs_request(channel);
1461 	if (req == NULL) {
1462 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1463 		return -ENOMEM;
1464 	}
1465 
1466 	args = &req->args;
1467 
1468 	args->fs = fs;
1469 	args->op.rename.old_name = old_name;
1470 	args->op.rename.new_name = new_name;
1471 	args->sem = &channel->sem;
1472 	fs->send_request(__fs_rename_file, req);
1473 	sem_wait(&channel->sem);
1474 	rc = args->rc;
1475 	free_fs_request(req);
1476 	return rc;
1477 }
1478 
1479 static void
1480 blob_delete_cb(void *ctx, int bserrno)
1481 {
1482 	struct spdk_fs_request *req = ctx;
1483 	struct spdk_fs_cb_args *args = &req->args;
1484 
1485 	args->fn.file_op(args->arg, bserrno);
1486 	free_fs_request(req);
1487 }
1488 
1489 void
1490 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1491 			  spdk_file_op_complete cb_fn, void *cb_arg)
1492 {
1493 	struct spdk_file *f;
1494 	spdk_blob_id blobid;
1495 	struct spdk_fs_request *req;
1496 	struct spdk_fs_cb_args *args;
1497 
1498 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1499 
1500 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1501 		cb_fn(cb_arg, -ENAMETOOLONG);
1502 		return;
1503 	}
1504 
1505 	f = fs_find_file(fs, name);
1506 	if (f == NULL) {
1507 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1508 		cb_fn(cb_arg, -ENOENT);
1509 		return;
1510 	}
1511 
1512 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1513 	if (req == NULL) {
1514 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1515 		cb_fn(cb_arg, -ENOMEM);
1516 		return;
1517 	}
1518 
1519 	args = &req->args;
1520 	args->fn.file_op = cb_fn;
1521 	args->arg = cb_arg;
1522 
1523 	if (f->ref_count > 0) {
1524 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1525 		f->is_deleted = true;
1526 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1527 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1528 		return;
1529 	}
1530 
1531 	blobid = f->blobid;
1532 	TAILQ_REMOVE(&fs->files, f, tailq);
1533 
1534 	file_free(f);
1535 
1536 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1537 }
1538 
1539 static uint64_t
1540 fs_name_to_uint64(const char *name)
1541 {
1542 	uint64_t result = 0;
1543 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1544 	return result;
1545 }
1546 
1547 static void
1548 __fs_delete_file_done(void *arg, int fserrno)
1549 {
1550 	struct spdk_fs_request *req = arg;
1551 	struct spdk_fs_cb_args *args = &req->args;
1552 
1553 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1554 	__wake_caller(args, fserrno);
1555 }
1556 
1557 static void
1558 __fs_delete_file(void *arg)
1559 {
1560 	struct spdk_fs_request *req = arg;
1561 	struct spdk_fs_cb_args *args = &req->args;
1562 
1563 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1564 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1565 }
1566 
1567 int
1568 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1569 		    const char *name)
1570 {
1571 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1572 	struct spdk_fs_request *req;
1573 	struct spdk_fs_cb_args *args;
1574 	int rc;
1575 
1576 	req = alloc_fs_request(channel);
1577 	if (req == NULL) {
1578 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1579 		return -ENOMEM;
1580 	}
1581 
1582 	args = &req->args;
1583 	args->fs = fs;
1584 	args->op.delete.name = name;
1585 	args->sem = &channel->sem;
1586 	fs->send_request(__fs_delete_file, req);
1587 	sem_wait(&channel->sem);
1588 	rc = args->rc;
1589 	free_fs_request(req);
1590 
1591 	return rc;
1592 }
1593 
1594 spdk_fs_iter
1595 spdk_fs_iter_first(struct spdk_filesystem *fs)
1596 {
1597 	struct spdk_file *f;
1598 
1599 	f = TAILQ_FIRST(&fs->files);
1600 	return f;
1601 }
1602 
1603 spdk_fs_iter
1604 spdk_fs_iter_next(spdk_fs_iter iter)
1605 {
1606 	struct spdk_file *f = iter;
1607 
1608 	if (f == NULL) {
1609 		return NULL;
1610 	}
1611 
1612 	f = TAILQ_NEXT(f, tailq);
1613 	return f;
1614 }
1615 
1616 const char *
1617 spdk_file_get_name(struct spdk_file *file)
1618 {
1619 	return file->name;
1620 }
1621 
1622 uint64_t
1623 spdk_file_get_length(struct spdk_file *file)
1624 {
1625 	uint64_t length;
1626 
1627 	assert(file != NULL);
1628 
1629 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1630 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1631 	return length;
1632 }
1633 
1634 static void
1635 fs_truncate_complete_cb(void *ctx, int bserrno)
1636 {
1637 	struct spdk_fs_request *req = ctx;
1638 	struct spdk_fs_cb_args *args = &req->args;
1639 
1640 	args->fn.file_op(args->arg, bserrno);
1641 	free_fs_request(req);
1642 }
1643 
1644 static void
1645 fs_truncate_resize_cb(void *ctx, int bserrno)
1646 {
1647 	struct spdk_fs_request *req = ctx;
1648 	struct spdk_fs_cb_args *args = &req->args;
1649 	struct spdk_file *file = args->file;
1650 	uint64_t *length = &args->op.truncate.length;
1651 
1652 	if (bserrno) {
1653 		args->fn.file_op(args->arg, bserrno);
1654 		free_fs_request(req);
1655 		return;
1656 	}
1657 
1658 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1659 
1660 	file->length = *length;
1661 	if (file->append_pos > file->length) {
1662 		file->append_pos = file->length;
1663 	}
1664 
1665 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1666 }
1667 
1668 static uint64_t
1669 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1670 {
1671 	return (length + cluster_sz - 1) / cluster_sz;
1672 }
1673 
1674 void
1675 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1676 			 spdk_file_op_complete cb_fn, void *cb_arg)
1677 {
1678 	struct spdk_filesystem *fs;
1679 	size_t num_clusters;
1680 	struct spdk_fs_request *req;
1681 	struct spdk_fs_cb_args *args;
1682 
1683 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1684 	if (length == file->length) {
1685 		cb_fn(cb_arg, 0);
1686 		return;
1687 	}
1688 
1689 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1690 	if (req == NULL) {
1691 		cb_fn(cb_arg, -ENOMEM);
1692 		return;
1693 	}
1694 
1695 	args = &req->args;
1696 	args->fn.file_op = cb_fn;
1697 	args->arg = cb_arg;
1698 	args->file = file;
1699 	args->op.truncate.length = length;
1700 	fs = file->fs;
1701 
1702 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1703 
1704 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1705 }
1706 
1707 static void
1708 __truncate(void *arg)
1709 {
1710 	struct spdk_fs_request *req = arg;
1711 	struct spdk_fs_cb_args *args = &req->args;
1712 
1713 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1714 				 args->fn.file_op, args);
1715 }
1716 
1717 int
1718 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1719 		   uint64_t length)
1720 {
1721 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1722 	struct spdk_fs_request *req;
1723 	struct spdk_fs_cb_args *args;
1724 	int rc;
1725 
1726 	req = alloc_fs_request(channel);
1727 	if (req == NULL) {
1728 		return -ENOMEM;
1729 	}
1730 
1731 	args = &req->args;
1732 
1733 	args->file = file;
1734 	args->op.truncate.length = length;
1735 	args->fn.file_op = __wake_caller;
1736 	args->sem = &channel->sem;
1737 
1738 	channel->send_request(__truncate, req);
1739 	sem_wait(&channel->sem);
1740 	rc = args->rc;
1741 	free_fs_request(req);
1742 
1743 	return rc;
1744 }
1745 
1746 static void
1747 __rw_done(void *ctx, int bserrno)
1748 {
1749 	struct spdk_fs_request *req = ctx;
1750 	struct spdk_fs_cb_args *args = &req->args;
1751 
1752 	spdk_free(args->op.rw.pin_buf);
1753 	args->fn.file_op(args->arg, bserrno);
1754 	free_fs_request(req);
1755 }
1756 
1757 static void
1758 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1759 {
1760 	int i;
1761 	size_t len;
1762 
1763 	for (i = 0; i < iovcnt; i++) {
1764 		len = spdk_min(iovs[i].iov_len, buf_len);
1765 		memcpy(buf, iovs[i].iov_base, len);
1766 		buf += len;
1767 		assert(buf_len >= len);
1768 		buf_len -= len;
1769 	}
1770 }
1771 
1772 static void
1773 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1774 {
1775 	int i;
1776 	size_t len;
1777 
1778 	for (i = 0; i < iovcnt; i++) {
1779 		len = spdk_min(iovs[i].iov_len, buf_len);
1780 		memcpy(iovs[i].iov_base, buf, len);
1781 		buf += len;
1782 		assert(buf_len >= len);
1783 		buf_len -= len;
1784 	}
1785 }
1786 
1787 static void
1788 __read_done(void *ctx, int bserrno)
1789 {
1790 	struct spdk_fs_request *req = ctx;
1791 	struct spdk_fs_cb_args *args = &req->args;
1792 	void *buf;
1793 
1794 	assert(req != NULL);
1795 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1796 	if (args->op.rw.is_read) {
1797 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1798 		__rw_done(req, 0);
1799 	} else {
1800 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1801 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1802 				   args->op.rw.pin_buf,
1803 				   args->op.rw.start_lba, args->op.rw.num_lba,
1804 				   __rw_done, req);
1805 	}
1806 }
1807 
1808 static void
1809 __do_blob_read(void *ctx, int fserrno)
1810 {
1811 	struct spdk_fs_request *req = ctx;
1812 	struct spdk_fs_cb_args *args = &req->args;
1813 
1814 	if (fserrno) {
1815 		__rw_done(req, fserrno);
1816 		return;
1817 	}
1818 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1819 			  args->op.rw.pin_buf,
1820 			  args->op.rw.start_lba, args->op.rw.num_lba,
1821 			  __read_done, req);
1822 }
1823 
1824 static void
1825 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1826 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1827 {
1828 	uint64_t end_lba;
1829 
1830 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1831 	*start_lba = offset / *lba_size;
1832 	end_lba = (offset + length - 1) / *lba_size;
1833 	*num_lba = (end_lba - *start_lba + 1);
1834 }
1835 
1836 static bool
1837 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1838 {
1839 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1840 
1841 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1842 		return true;
1843 	}
1844 
1845 	return false;
1846 }
1847 
1848 static void
1849 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1850 {
1851 	uint32_t i;
1852 
1853 	for (i = 0; i < iovcnt; i++) {
1854 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1855 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1856 	}
1857 }
1858 
1859 static void
1860 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1861 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1862 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1863 {
1864 	struct spdk_fs_request *req;
1865 	struct spdk_fs_cb_args *args;
1866 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1867 	uint64_t start_lba, num_lba, pin_buf_length;
1868 	uint32_t lba_size;
1869 
1870 	if (is_read && offset + length > file->length) {
1871 		cb_fn(cb_arg, -EINVAL);
1872 		return;
1873 	}
1874 
1875 	req = alloc_fs_request_with_iov(channel, iovcnt);
1876 	if (req == NULL) {
1877 		cb_fn(cb_arg, -ENOMEM);
1878 		return;
1879 	}
1880 
1881 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1882 
1883 	args = &req->args;
1884 	args->fn.file_op = cb_fn;
1885 	args->arg = cb_arg;
1886 	args->file = file;
1887 	args->op.rw.channel = channel->bs_channel;
1888 	_fs_request_setup_iovs(req, iovs, iovcnt);
1889 	args->op.rw.is_read = is_read;
1890 	args->op.rw.offset = offset;
1891 	args->op.rw.blocklen = lba_size;
1892 
1893 	pin_buf_length = num_lba * lba_size;
1894 	args->op.rw.length = pin_buf_length;
1895 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1896 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1897 	if (args->op.rw.pin_buf == NULL) {
1898 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1899 			      file->name, offset, length);
1900 		free_fs_request(req);
1901 		cb_fn(cb_arg, -ENOMEM);
1902 		return;
1903 	}
1904 
1905 	args->op.rw.start_lba = start_lba;
1906 	args->op.rw.num_lba = num_lba;
1907 
1908 	if (!is_read && file->length < offset + length) {
1909 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1910 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1911 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1912 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1913 				   args->op.rw.pin_buf,
1914 				   args->op.rw.start_lba, args->op.rw.num_lba,
1915 				   __rw_done, req);
1916 	} else {
1917 		__do_blob_read(req, 0);
1918 	}
1919 }
1920 
1921 static void
1922 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1923 	    void *payload, uint64_t offset, uint64_t length,
1924 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1925 {
1926 	struct iovec iov;
1927 
1928 	iov.iov_base = payload;
1929 	iov.iov_len = (size_t)length;
1930 
1931 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1932 }
1933 
1934 void
1935 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1936 		      void *payload, uint64_t offset, uint64_t length,
1937 		      spdk_file_op_complete cb_fn, void *cb_arg)
1938 {
1939 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1940 }
1941 
1942 void
1943 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1944 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1945 		       spdk_file_op_complete cb_fn, void *cb_arg)
1946 {
1947 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1948 		      file->name, offset, length);
1949 
1950 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1951 }
1952 
1953 void
1954 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1955 		     void *payload, uint64_t offset, uint64_t length,
1956 		     spdk_file_op_complete cb_fn, void *cb_arg)
1957 {
1958 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1959 		      file->name, offset, length);
1960 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1961 }
1962 
1963 void
1964 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1965 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1966 		      spdk_file_op_complete cb_fn, void *cb_arg)
1967 {
1968 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1969 		      file->name, offset, length);
1970 
1971 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1972 }
1973 
1974 struct spdk_io_channel *
1975 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1976 {
1977 	struct spdk_io_channel *io_channel;
1978 	struct spdk_fs_channel *fs_channel;
1979 
1980 	io_channel = spdk_get_io_channel(&fs->io_target);
1981 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1982 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1983 	fs_channel->send_request = __send_request_direct;
1984 
1985 	return io_channel;
1986 }
1987 
1988 void
1989 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1990 {
1991 	spdk_put_io_channel(channel);
1992 }
1993 
1994 struct spdk_fs_thread_ctx *
1995 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1996 {
1997 	struct spdk_fs_thread_ctx *ctx;
1998 
1999 	ctx = calloc(1, sizeof(*ctx));
2000 	if (!ctx) {
2001 		return NULL;
2002 	}
2003 
2004 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
2005 		free(ctx);
2006 		return NULL;
2007 	}
2008 
2009 	fs_channel_create(fs, &ctx->ch, 512);
2010 
2011 	ctx->ch.send_request = fs->send_request;
2012 	ctx->ch.sync = 1;
2013 
2014 	return ctx;
2015 }
2016 
2017 
2018 void
2019 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
2020 {
2021 	assert(ctx->ch.sync == 1);
2022 
2023 	while (true) {
2024 		pthread_spin_lock(&ctx->ch.lock);
2025 		if (ctx->ch.outstanding_reqs == 0) {
2026 			pthread_spin_unlock(&ctx->ch.lock);
2027 			break;
2028 		}
2029 		pthread_spin_unlock(&ctx->ch.lock);
2030 		usleep(1000);
2031 	}
2032 
2033 	fs_channel_destroy(NULL, &ctx->ch);
2034 	free(ctx);
2035 }
2036 
2037 int
2038 spdk_fs_set_cache_size(uint64_t size_in_mb)
2039 {
2040 	/* setting g_fs_cache_size is only permitted if cache pool
2041 	 * is already freed or hasn't been initialized
2042 	 */
2043 	if (g_cache_pool != NULL) {
2044 		return -EPERM;
2045 	}
2046 
2047 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2048 
2049 	return 0;
2050 }
2051 
2052 uint64_t
2053 spdk_fs_get_cache_size(void)
2054 {
2055 	return g_fs_cache_size / (1024 * 1024);
2056 }
2057 
2058 static void __file_flush(void *ctx);
2059 
2060 /* Try to free some cache buffers from this file.
2061  */
2062 static int
2063 reclaim_cache_buffers(struct spdk_file *file)
2064 {
2065 	int rc;
2066 
2067 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2068 
2069 	/* The function is safe to be called with any threads, while the file
2070 	 * lock maybe locked by other thread for now, so try to get the file
2071 	 * lock here.
2072 	 */
2073 	rc = pthread_spin_trylock(&file->lock);
2074 	if (rc != 0) {
2075 		return -1;
2076 	}
2077 
2078 	if (file->tree->present_mask == 0) {
2079 		pthread_spin_unlock(&file->lock);
2080 		return -1;
2081 	}
2082 	tree_free_buffers(file->tree);
2083 
2084 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2085 	/* If not freed, put it in the end of the queue */
2086 	if (file->tree->present_mask != 0) {
2087 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2088 	} else {
2089 		file->last = NULL;
2090 	}
2091 	pthread_spin_unlock(&file->lock);
2092 
2093 	return 0;
2094 }
2095 
2096 static int
2097 _blobfs_cache_pool_reclaim(void *arg)
2098 {
2099 	struct spdk_file *file, *tmp;
2100 	int rc;
2101 
2102 	if (!blobfs_cache_pool_need_reclaim()) {
2103 		return SPDK_POLLER_IDLE;
2104 	}
2105 
2106 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2107 		if (!file->open_for_writing &&
2108 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2109 			rc = reclaim_cache_buffers(file);
2110 			if (rc < 0) {
2111 				continue;
2112 			}
2113 			if (!blobfs_cache_pool_need_reclaim()) {
2114 				return SPDK_POLLER_BUSY;
2115 			}
2116 			break;
2117 		}
2118 	}
2119 
2120 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2121 		if (!file->open_for_writing) {
2122 			rc = reclaim_cache_buffers(file);
2123 			if (rc < 0) {
2124 				continue;
2125 			}
2126 			if (!blobfs_cache_pool_need_reclaim()) {
2127 				return SPDK_POLLER_BUSY;
2128 			}
2129 			break;
2130 		}
2131 	}
2132 
2133 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2134 		rc = reclaim_cache_buffers(file);
2135 		if (rc < 0) {
2136 			continue;
2137 		}
2138 		break;
2139 	}
2140 
2141 	return SPDK_POLLER_BUSY;
2142 }
2143 
2144 static void
2145 _add_file_to_cache_pool(void *ctx)
2146 {
2147 	struct spdk_file *file = ctx;
2148 
2149 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2150 }
2151 
2152 static void
2153 _remove_file_from_cache_pool(void *ctx)
2154 {
2155 	struct spdk_file *file = ctx;
2156 
2157 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2158 }
2159 
2160 static struct cache_buffer *
2161 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2162 {
2163 	struct cache_buffer *buf;
2164 	int count = 0;
2165 	bool need_update = false;
2166 
2167 	buf = calloc(1, sizeof(*buf));
2168 	if (buf == NULL) {
2169 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
2170 		return NULL;
2171 	}
2172 
2173 	do {
2174 		buf->buf = spdk_mempool_get(g_cache_pool);
2175 		if (buf->buf) {
2176 			break;
2177 		}
2178 		if (count++ == 100) {
2179 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2180 				    file, offset);
2181 			free(buf);
2182 			return NULL;
2183 		}
2184 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2185 	} while (true);
2186 
2187 	buf->buf_size = CACHE_BUFFER_SIZE;
2188 	buf->offset = offset;
2189 
2190 	if (file->tree->present_mask == 0) {
2191 		need_update = true;
2192 	}
2193 	file->tree = tree_insert_buffer(file->tree, buf);
2194 
2195 	if (need_update) {
2196 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2197 	}
2198 
2199 	return buf;
2200 }
2201 
2202 static struct cache_buffer *
2203 cache_append_buffer(struct spdk_file *file)
2204 {
2205 	struct cache_buffer *last;
2206 
2207 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2208 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2209 
2210 	last = cache_insert_buffer(file, file->append_pos);
2211 	if (last == NULL) {
2212 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
2213 		return NULL;
2214 	}
2215 
2216 	file->last = last;
2217 
2218 	return last;
2219 }
2220 
2221 static void __check_sync_reqs(struct spdk_file *file);
2222 
2223 static void
2224 __file_cache_finish_sync(void *ctx, int bserrno)
2225 {
2226 	struct spdk_file *file;
2227 	struct spdk_fs_request *sync_req = ctx;
2228 	struct spdk_fs_cb_args *sync_args;
2229 
2230 	sync_args = &sync_req->args;
2231 	file = sync_args->file;
2232 	pthread_spin_lock(&file->lock);
2233 	file->length_xattr = sync_args->op.sync.length;
2234 	assert(sync_args->op.sync.offset <= file->length_flushed);
2235 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2236 			  0, file->trace_arg_name);
2237 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2238 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2239 	pthread_spin_unlock(&file->lock);
2240 
2241 	sync_args->fn.file_op(sync_args->arg, bserrno);
2242 
2243 	free_fs_request(sync_req);
2244 	__check_sync_reqs(file);
2245 }
2246 
2247 static void
2248 __check_sync_reqs(struct spdk_file *file)
2249 {
2250 	struct spdk_fs_request *sync_req;
2251 
2252 	pthread_spin_lock(&file->lock);
2253 
2254 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2255 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2256 			break;
2257 		}
2258 	}
2259 
2260 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2261 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2262 		sync_req->args.op.sync.xattr_in_progress = true;
2263 		sync_req->args.op.sync.length = file->length_flushed;
2264 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2265 				    sizeof(file->length_flushed));
2266 
2267 		pthread_spin_unlock(&file->lock);
2268 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2269 				  0, file->trace_arg_name);
2270 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2271 	} else {
2272 		pthread_spin_unlock(&file->lock);
2273 	}
2274 }
2275 
2276 static void
2277 __file_flush_done(void *ctx, int bserrno)
2278 {
2279 	struct spdk_fs_request *req = ctx;
2280 	struct spdk_fs_cb_args *args = &req->args;
2281 	struct spdk_file *file = args->file;
2282 	struct cache_buffer *next = args->op.flush.cache_buffer;
2283 
2284 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2285 
2286 	pthread_spin_lock(&file->lock);
2287 	next->in_progress = false;
2288 	next->bytes_flushed += args->op.flush.length;
2289 	file->length_flushed += args->op.flush.length;
2290 	if (file->length_flushed > file->length) {
2291 		file->length = file->length_flushed;
2292 	}
2293 	if (next->bytes_flushed == next->buf_size) {
2294 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2295 		next = tree_find_buffer(file->tree, file->length_flushed);
2296 	}
2297 
2298 	/*
2299 	 * Assert that there is no cached data that extends past the end of the underlying
2300 	 *  blob.
2301 	 */
2302 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2303 	       next->bytes_filled == 0);
2304 
2305 	pthread_spin_unlock(&file->lock);
2306 
2307 	__check_sync_reqs(file);
2308 
2309 	__file_flush(req);
2310 }
2311 
2312 static void
2313 __file_flush(void *ctx)
2314 {
2315 	struct spdk_fs_request *req = ctx;
2316 	struct spdk_fs_cb_args *args = &req->args;
2317 	struct spdk_file *file = args->file;
2318 	struct cache_buffer *next;
2319 	uint64_t offset, length, start_lba, num_lba;
2320 	uint32_t lba_size;
2321 
2322 	pthread_spin_lock(&file->lock);
2323 	next = tree_find_buffer(file->tree, file->length_flushed);
2324 	if (next == NULL || next->in_progress ||
2325 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2326 		/*
2327 		 * There is either no data to flush, a flush I/O is already in
2328 		 *  progress, or the next buffer is partially filled but there's no
2329 		 *  outstanding request to sync it.
2330 		 * So return immediately - if a flush I/O is in progress we will flush
2331 		 *  more data after that is completed, or a partial buffer will get flushed
2332 		 *  when it is either filled or the file is synced.
2333 		 */
2334 		free_fs_request(req);
2335 		if (next == NULL) {
2336 			/*
2337 			 * For cases where a file's cache was evicted, and then the
2338 			 *  file was later appended, we will write the data directly
2339 			 *  to disk and bypass cache.  So just update length_flushed
2340 			 *  here to reflect that all data was already written to disk.
2341 			 */
2342 			file->length_flushed = file->append_pos;
2343 		}
2344 		pthread_spin_unlock(&file->lock);
2345 		if (next == NULL) {
2346 			/*
2347 			 * There is no data to flush, but we still need to check for any
2348 			 *  outstanding sync requests to make sure metadata gets updated.
2349 			 */
2350 			__check_sync_reqs(file);
2351 		}
2352 		return;
2353 	}
2354 
2355 	offset = next->offset + next->bytes_flushed;
2356 	length = next->bytes_filled - next->bytes_flushed;
2357 	if (length == 0) {
2358 		free_fs_request(req);
2359 		pthread_spin_unlock(&file->lock);
2360 		/*
2361 		 * There is no data to flush, but we still need to check for any
2362 		 *  outstanding sync requests to make sure metadata gets updated.
2363 		 */
2364 		__check_sync_reqs(file);
2365 		return;
2366 	}
2367 	args->op.flush.length = length;
2368 	args->op.flush.cache_buffer = next;
2369 
2370 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2371 
2372 	next->in_progress = true;
2373 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2374 		     offset, length, start_lba, num_lba);
2375 	pthread_spin_unlock(&file->lock);
2376 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2377 			   next->buf + (start_lba * lba_size) - next->offset,
2378 			   start_lba, num_lba, __file_flush_done, req);
2379 }
2380 
2381 static void
2382 __file_extend_done(void *arg, int bserrno)
2383 {
2384 	struct spdk_fs_cb_args *args = arg;
2385 
2386 	__wake_caller(args, bserrno);
2387 }
2388 
2389 static void
2390 __file_extend_resize_cb(void *_args, int bserrno)
2391 {
2392 	struct spdk_fs_cb_args *args = _args;
2393 	struct spdk_file *file = args->file;
2394 
2395 	if (bserrno) {
2396 		__wake_caller(args, bserrno);
2397 		return;
2398 	}
2399 
2400 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2401 }
2402 
2403 static void
2404 __file_extend_blob(void *_args)
2405 {
2406 	struct spdk_fs_cb_args *args = _args;
2407 	struct spdk_file *file = args->file;
2408 
2409 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2410 }
2411 
2412 static void
2413 __rw_from_file_done(void *ctx, int bserrno)
2414 {
2415 	struct spdk_fs_request *req = ctx;
2416 
2417 	__wake_caller(&req->args, bserrno);
2418 	free_fs_request(req);
2419 }
2420 
2421 static void
2422 __rw_from_file(void *ctx)
2423 {
2424 	struct spdk_fs_request *req = ctx;
2425 	struct spdk_fs_cb_args *args = &req->args;
2426 	struct spdk_file *file = args->file;
2427 
2428 	if (args->op.rw.is_read) {
2429 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2430 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2431 				     __rw_from_file_done, req);
2432 	} else {
2433 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2434 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2435 				      __rw_from_file_done, req);
2436 	}
2437 }
2438 
2439 static int
2440 __send_rw_from_file(struct spdk_file *file, void *payload,
2441 		    uint64_t offset, uint64_t length, bool is_read,
2442 		    struct spdk_fs_channel *channel)
2443 {
2444 	struct spdk_fs_request *req;
2445 	struct spdk_fs_cb_args *args;
2446 
2447 	req = alloc_fs_request_with_iov(channel, 1);
2448 	if (req == NULL) {
2449 		sem_post(&channel->sem);
2450 		return -ENOMEM;
2451 	}
2452 
2453 	args = &req->args;
2454 	args->file = file;
2455 	args->sem = &channel->sem;
2456 	args->iovs[0].iov_base = payload;
2457 	args->iovs[0].iov_len = (size_t)length;
2458 	args->op.rw.offset = offset;
2459 	args->op.rw.is_read = is_read;
2460 	file->fs->send_request(__rw_from_file, req);
2461 	return 0;
2462 }
2463 
2464 int
2465 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2466 		void *payload, uint64_t offset, uint64_t length)
2467 {
2468 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2469 	struct spdk_fs_request *flush_req;
2470 	uint64_t rem_length, copy, blob_size, cluster_sz;
2471 	uint32_t cache_buffers_filled = 0;
2472 	uint8_t *cur_payload;
2473 	struct cache_buffer *last;
2474 
2475 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2476 
2477 	if (length == 0) {
2478 		return 0;
2479 	}
2480 
2481 	if (offset != file->append_pos) {
2482 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2483 		return -EINVAL;
2484 	}
2485 
2486 	pthread_spin_lock(&file->lock);
2487 	file->open_for_writing = true;
2488 
2489 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2490 		cache_append_buffer(file);
2491 	}
2492 
2493 	if (file->last == NULL) {
2494 		int rc;
2495 
2496 		file->append_pos += length;
2497 		pthread_spin_unlock(&file->lock);
2498 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2499 		sem_wait(&channel->sem);
2500 		return rc;
2501 	}
2502 
2503 	blob_size = __file_get_blob_size(file);
2504 
2505 	if ((offset + length) > blob_size) {
2506 		struct spdk_fs_cb_args extend_args = {};
2507 
2508 		cluster_sz = file->fs->bs_opts.cluster_sz;
2509 		extend_args.sem = &channel->sem;
2510 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2511 		extend_args.file = file;
2512 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2513 		pthread_spin_unlock(&file->lock);
2514 		file->fs->send_request(__file_extend_blob, &extend_args);
2515 		sem_wait(&channel->sem);
2516 		if (extend_args.rc) {
2517 			return extend_args.rc;
2518 		}
2519 	}
2520 
2521 	flush_req = alloc_fs_request(channel);
2522 	if (flush_req == NULL) {
2523 		pthread_spin_unlock(&file->lock);
2524 		return -ENOMEM;
2525 	}
2526 
2527 	last = file->last;
2528 	rem_length = length;
2529 	cur_payload = payload;
2530 	while (rem_length > 0) {
2531 		copy = last->buf_size - last->bytes_filled;
2532 		if (copy > rem_length) {
2533 			copy = rem_length;
2534 		}
2535 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2536 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2537 		file->append_pos += copy;
2538 		if (file->length < file->append_pos) {
2539 			file->length = file->append_pos;
2540 		}
2541 		cur_payload += copy;
2542 		last->bytes_filled += copy;
2543 		rem_length -= copy;
2544 		if (last->bytes_filled == last->buf_size) {
2545 			cache_buffers_filled++;
2546 			last = cache_append_buffer(file);
2547 			if (last == NULL) {
2548 				BLOBFS_TRACE(file, "nomem\n");
2549 				free_fs_request(flush_req);
2550 				pthread_spin_unlock(&file->lock);
2551 				return -ENOMEM;
2552 			}
2553 		}
2554 	}
2555 
2556 	pthread_spin_unlock(&file->lock);
2557 
2558 	if (cache_buffers_filled == 0) {
2559 		free_fs_request(flush_req);
2560 		return 0;
2561 	}
2562 
2563 	flush_req->args.file = file;
2564 	file->fs->send_request(__file_flush, flush_req);
2565 	return 0;
2566 }
2567 
2568 static void
2569 __readahead_done(void *ctx, int bserrno)
2570 {
2571 	struct spdk_fs_request *req = ctx;
2572 	struct spdk_fs_cb_args *args = &req->args;
2573 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2574 	struct spdk_file *file = args->file;
2575 
2576 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2577 
2578 	pthread_spin_lock(&file->lock);
2579 	cache_buffer->bytes_filled = args->op.readahead.length;
2580 	cache_buffer->bytes_flushed = args->op.readahead.length;
2581 	cache_buffer->in_progress = false;
2582 	pthread_spin_unlock(&file->lock);
2583 
2584 	free_fs_request(req);
2585 }
2586 
2587 static void
2588 __readahead(void *ctx)
2589 {
2590 	struct spdk_fs_request *req = ctx;
2591 	struct spdk_fs_cb_args *args = &req->args;
2592 	struct spdk_file *file = args->file;
2593 	uint64_t offset, length, start_lba, num_lba;
2594 	uint32_t lba_size;
2595 
2596 	offset = args->op.readahead.offset;
2597 	length = args->op.readahead.length;
2598 	assert(length > 0);
2599 
2600 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2601 
2602 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2603 		     offset, length, start_lba, num_lba);
2604 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2605 			  args->op.readahead.cache_buffer->buf,
2606 			  start_lba, num_lba, __readahead_done, req);
2607 }
2608 
2609 static uint64_t
2610 __next_cache_buffer_offset(uint64_t offset)
2611 {
2612 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2613 }
2614 
2615 static void
2616 check_readahead(struct spdk_file *file, uint64_t offset,
2617 		struct spdk_fs_channel *channel)
2618 {
2619 	struct spdk_fs_request *req;
2620 	struct spdk_fs_cb_args *args;
2621 
2622 	offset = __next_cache_buffer_offset(offset);
2623 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2624 		return;
2625 	}
2626 
2627 	req = alloc_fs_request(channel);
2628 	if (req == NULL) {
2629 		return;
2630 	}
2631 	args = &req->args;
2632 
2633 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2634 
2635 	args->file = file;
2636 	args->op.readahead.offset = offset;
2637 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2638 	if (!args->op.readahead.cache_buffer) {
2639 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2640 		free_fs_request(req);
2641 		return;
2642 	}
2643 
2644 	args->op.readahead.cache_buffer->in_progress = true;
2645 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2646 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2647 	} else {
2648 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2649 	}
2650 	file->fs->send_request(__readahead, req);
2651 }
2652 
2653 int64_t
2654 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2655 	       void *payload, uint64_t offset, uint64_t length)
2656 {
2657 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2658 	uint64_t final_offset, final_length;
2659 	uint32_t sub_reads = 0;
2660 	struct cache_buffer *buf;
2661 	uint64_t read_len;
2662 	int rc = 0;
2663 
2664 	pthread_spin_lock(&file->lock);
2665 
2666 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2667 
2668 	file->open_for_writing = false;
2669 
2670 	if (length == 0 || offset >= file->append_pos) {
2671 		pthread_spin_unlock(&file->lock);
2672 		return 0;
2673 	}
2674 
2675 	if (offset + length > file->append_pos) {
2676 		length = file->append_pos - offset;
2677 	}
2678 
2679 	if (offset != file->next_seq_offset) {
2680 		file->seq_byte_count = 0;
2681 	}
2682 	file->seq_byte_count += length;
2683 	file->next_seq_offset = offset + length;
2684 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2685 		check_readahead(file, offset, channel);
2686 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2687 	}
2688 
2689 	final_length = 0;
2690 	final_offset = offset + length;
2691 	while (offset < final_offset) {
2692 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2693 		if (length > (final_offset - offset)) {
2694 			length = final_offset - offset;
2695 		}
2696 
2697 		buf = tree_find_filled_buffer(file->tree, offset);
2698 		if (buf == NULL) {
2699 			pthread_spin_unlock(&file->lock);
2700 			rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2701 			pthread_spin_lock(&file->lock);
2702 			if (rc == 0) {
2703 				sub_reads++;
2704 			}
2705 		} else {
2706 			read_len = length;
2707 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2708 				read_len = buf->offset + buf->bytes_filled - offset;
2709 			}
2710 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2711 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2712 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2713 				tree_remove_buffer(file->tree, buf);
2714 				if (file->tree->present_mask == 0) {
2715 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2716 				}
2717 			}
2718 		}
2719 
2720 		if (rc == 0) {
2721 			final_length += length;
2722 		} else {
2723 			break;
2724 		}
2725 		payload += length;
2726 		offset += length;
2727 	}
2728 	pthread_spin_unlock(&file->lock);
2729 	while (sub_reads > 0) {
2730 		sem_wait(&channel->sem);
2731 		sub_reads--;
2732 	}
2733 	if (rc == 0) {
2734 		return final_length;
2735 	} else {
2736 		return rc;
2737 	}
2738 }
2739 
2740 static void
2741 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2742 	   spdk_file_op_complete cb_fn, void *cb_arg)
2743 {
2744 	struct spdk_fs_request *sync_req;
2745 	struct spdk_fs_request *flush_req;
2746 	struct spdk_fs_cb_args *sync_args;
2747 	struct spdk_fs_cb_args *flush_args;
2748 
2749 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2750 
2751 	pthread_spin_lock(&file->lock);
2752 	if (file->append_pos <= file->length_xattr) {
2753 		BLOBFS_TRACE(file, "done - file already synced\n");
2754 		pthread_spin_unlock(&file->lock);
2755 		cb_fn(cb_arg, 0);
2756 		return;
2757 	}
2758 
2759 	sync_req = alloc_fs_request(channel);
2760 	if (!sync_req) {
2761 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2762 		pthread_spin_unlock(&file->lock);
2763 		cb_fn(cb_arg, -ENOMEM);
2764 		return;
2765 	}
2766 	sync_args = &sync_req->args;
2767 
2768 	flush_req = alloc_fs_request(channel);
2769 	if (!flush_req) {
2770 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2771 		free_fs_request(sync_req);
2772 		pthread_spin_unlock(&file->lock);
2773 		cb_fn(cb_arg, -ENOMEM);
2774 		return;
2775 	}
2776 	flush_args = &flush_req->args;
2777 
2778 	sync_args->file = file;
2779 	sync_args->fn.file_op = cb_fn;
2780 	sync_args->arg = cb_arg;
2781 	sync_args->op.sync.offset = file->append_pos;
2782 	sync_args->op.sync.xattr_in_progress = false;
2783 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2784 	pthread_spin_unlock(&file->lock);
2785 
2786 	flush_args->file = file;
2787 	channel->send_request(__file_flush, flush_req);
2788 }
2789 
2790 int
2791 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2792 {
2793 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2794 	struct spdk_fs_cb_args args = {};
2795 
2796 	args.sem = &channel->sem;
2797 	_file_sync(file, channel, __wake_caller, &args);
2798 	sem_wait(&channel->sem);
2799 
2800 	return args.rc;
2801 }
2802 
2803 void
2804 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2805 		     spdk_file_op_complete cb_fn, void *cb_arg)
2806 {
2807 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2808 
2809 	_file_sync(file, channel, cb_fn, cb_arg);
2810 }
2811 
2812 void
2813 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2814 {
2815 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2816 	file->priority = priority;
2817 
2818 }
2819 
2820 /*
2821  * Close routines
2822  */
2823 
2824 static void
2825 __file_close_async_done(void *ctx, int bserrno)
2826 {
2827 	struct spdk_fs_request *req = ctx;
2828 	struct spdk_fs_cb_args *args = &req->args;
2829 	struct spdk_file *file = args->file;
2830 
2831 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2832 
2833 	if (file->is_deleted) {
2834 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2835 		return;
2836 	}
2837 
2838 	args->fn.file_op(args->arg, bserrno);
2839 	free_fs_request(req);
2840 }
2841 
2842 static void
2843 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2844 {
2845 	struct spdk_blob *blob;
2846 
2847 	pthread_spin_lock(&file->lock);
2848 	if (file->ref_count == 0) {
2849 		pthread_spin_unlock(&file->lock);
2850 		__file_close_async_done(req, -EBADF);
2851 		return;
2852 	}
2853 
2854 	file->ref_count--;
2855 	if (file->ref_count > 0) {
2856 		pthread_spin_unlock(&file->lock);
2857 		req->args.fn.file_op(req->args.arg, 0);
2858 		free_fs_request(req);
2859 		return;
2860 	}
2861 
2862 	pthread_spin_unlock(&file->lock);
2863 
2864 	blob = file->blob;
2865 	file->blob = NULL;
2866 	spdk_blob_close(blob, __file_close_async_done, req);
2867 }
2868 
2869 static void
2870 __file_close_async__sync_done(void *arg, int fserrno)
2871 {
2872 	struct spdk_fs_request *req = arg;
2873 	struct spdk_fs_cb_args *args = &req->args;
2874 
2875 	__file_close_async(args->file, req);
2876 }
2877 
2878 void
2879 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2880 {
2881 	struct spdk_fs_request *req;
2882 	struct spdk_fs_cb_args *args;
2883 
2884 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2885 	if (req == NULL) {
2886 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2887 		cb_fn(cb_arg, -ENOMEM);
2888 		return;
2889 	}
2890 
2891 	args = &req->args;
2892 	args->file = file;
2893 	args->fn.file_op = cb_fn;
2894 	args->arg = cb_arg;
2895 
2896 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2897 }
2898 
2899 static void
2900 __file_close(void *arg)
2901 {
2902 	struct spdk_fs_request *req = arg;
2903 	struct spdk_fs_cb_args *args = &req->args;
2904 	struct spdk_file *file = args->file;
2905 
2906 	__file_close_async(file, req);
2907 }
2908 
2909 int
2910 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2911 {
2912 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2913 	struct spdk_fs_request *req;
2914 	struct spdk_fs_cb_args *args;
2915 
2916 	req = alloc_fs_request(channel);
2917 	if (req == NULL) {
2918 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2919 		return -ENOMEM;
2920 	}
2921 
2922 	args = &req->args;
2923 
2924 	spdk_file_sync(file, ctx);
2925 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2926 	args->file = file;
2927 	args->sem = &channel->sem;
2928 	args->fn.file_op = __wake_caller;
2929 	args->arg = args;
2930 	channel->send_request(__file_close, req);
2931 	sem_wait(&channel->sem);
2932 
2933 	return args->rc;
2934 }
2935 
2936 int
2937 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2938 {
2939 	if (size < sizeof(spdk_blob_id)) {
2940 		return -EINVAL;
2941 	}
2942 
2943 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2944 
2945 	return sizeof(spdk_blob_id);
2946 }
2947 
2948 static void
2949 _file_free(void *ctx)
2950 {
2951 	struct spdk_file *file = ctx;
2952 
2953 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2954 
2955 	free(file->name);
2956 	free(file->tree);
2957 	free(file);
2958 }
2959 
2960 static void
2961 file_free(struct spdk_file *file)
2962 {
2963 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2964 	pthread_spin_lock(&file->lock);
2965 	if (file->tree->present_mask == 0) {
2966 		pthread_spin_unlock(&file->lock);
2967 		free(file->name);
2968 		free(file->tree);
2969 		free(file);
2970 		return;
2971 	}
2972 
2973 	tree_free_buffers(file->tree);
2974 	assert(file->tree->present_mask == 0);
2975 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2976 	pthread_spin_unlock(&file->lock);
2977 }
2978 
2979 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2980 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2981