xref: /spdk/lib/blobfs/blobfs.c (revision ce1501218bda70b4fdb3132506ec5c0759cb312d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "tree.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
58 
59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
60 static struct spdk_mempool *g_cache_pool;
61 static TAILQ_HEAD(, spdk_file) g_caches;
62 static struct spdk_poller *g_cache_pool_mgmt_poller;
63 static struct spdk_thread *g_cache_pool_thread;
64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
65 static int g_fs_count = 0;
66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
67 
68 #define TRACE_GROUP_BLOBFS	0x7
69 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
70 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
71 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
72 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
73 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
74 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
75 
76 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
77 {
78 	spdk_trace_register_description("BLOBFS_XATTR_START",
79 					TRACE_BLOBFS_XATTR_START,
80 					OWNER_NONE, OBJECT_NONE, 0,
81 					SPDK_TRACE_ARG_TYPE_STR,
82 					"file:    ");
83 	spdk_trace_register_description("BLOBFS_XATTR_END",
84 					TRACE_BLOBFS_XATTR_END,
85 					OWNER_NONE, OBJECT_NONE, 0,
86 					SPDK_TRACE_ARG_TYPE_STR,
87 					"file:    ");
88 	spdk_trace_register_description("BLOBFS_OPEN",
89 					TRACE_BLOBFS_OPEN,
90 					OWNER_NONE, OBJECT_NONE, 0,
91 					SPDK_TRACE_ARG_TYPE_STR,
92 					"file:    ");
93 	spdk_trace_register_description("BLOBFS_CLOSE",
94 					TRACE_BLOBFS_CLOSE,
95 					OWNER_NONE, OBJECT_NONE, 0,
96 					SPDK_TRACE_ARG_TYPE_STR,
97 					"file:    ");
98 	spdk_trace_register_description("BLOBFS_DELETE_START",
99 					TRACE_BLOBFS_DELETE_START,
100 					OWNER_NONE, OBJECT_NONE, 0,
101 					SPDK_TRACE_ARG_TYPE_STR,
102 					"file:    ");
103 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
104 					TRACE_BLOBFS_DELETE_DONE,
105 					OWNER_NONE, OBJECT_NONE, 0,
106 					SPDK_TRACE_ARG_TYPE_STR,
107 					"file:    ");
108 }
109 
110 void
111 cache_buffer_free(struct cache_buffer *cache_buffer)
112 {
113 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
114 	free(cache_buffer);
115 }
116 
117 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
118 
119 struct spdk_file {
120 	struct spdk_filesystem	*fs;
121 	struct spdk_blob	*blob;
122 	char			*name;
123 	uint64_t		trace_arg_name;
124 	uint64_t		length;
125 	bool                    is_deleted;
126 	bool			open_for_writing;
127 	uint64_t		length_flushed;
128 	uint64_t		length_xattr;
129 	uint64_t		append_pos;
130 	uint64_t		seq_byte_count;
131 	uint64_t		next_seq_offset;
132 	uint32_t		priority;
133 	TAILQ_ENTRY(spdk_file)	tailq;
134 	spdk_blob_id		blobid;
135 	uint32_t		ref_count;
136 	pthread_spinlock_t	lock;
137 	struct cache_buffer	*last;
138 	struct cache_tree	*tree;
139 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
140 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
141 	TAILQ_ENTRY(spdk_file)	cache_tailq;
142 };
143 
144 struct spdk_deleted_file {
145 	spdk_blob_id	id;
146 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
147 };
148 
149 struct spdk_filesystem {
150 	struct spdk_blob_store	*bs;
151 	TAILQ_HEAD(, spdk_file)	files;
152 	struct spdk_bs_opts	bs_opts;
153 	struct spdk_bs_dev	*bdev;
154 	fs_send_request_fn	send_request;
155 
156 	struct {
157 		uint32_t		max_ops;
158 		struct spdk_io_channel	*sync_io_channel;
159 		struct spdk_fs_channel	*sync_fs_channel;
160 	} sync_target;
161 
162 	struct {
163 		uint32_t		max_ops;
164 		struct spdk_io_channel	*md_io_channel;
165 		struct spdk_fs_channel	*md_fs_channel;
166 	} md_target;
167 
168 	struct {
169 		uint32_t		max_ops;
170 	} io_target;
171 };
172 
173 struct spdk_fs_cb_args {
174 	union {
175 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
176 		spdk_fs_op_complete			fs_op;
177 		spdk_file_op_with_handle_complete	file_op_with_handle;
178 		spdk_file_op_complete			file_op;
179 		spdk_file_stat_op_complete		stat_op;
180 	} fn;
181 	void *arg;
182 	sem_t *sem;
183 	struct spdk_filesystem *fs;
184 	struct spdk_file *file;
185 	int rc;
186 	struct iovec *iovs;
187 	uint32_t iovcnt;
188 	struct iovec iov;
189 	union {
190 		struct {
191 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
192 		} fs_load;
193 		struct {
194 			uint64_t	length;
195 		} truncate;
196 		struct {
197 			struct spdk_io_channel	*channel;
198 			void		*pin_buf;
199 			int		is_read;
200 			off_t		offset;
201 			size_t		length;
202 			uint64_t	start_lba;
203 			uint64_t	num_lba;
204 			uint32_t	blocklen;
205 		} rw;
206 		struct {
207 			const char	*old_name;
208 			const char	*new_name;
209 		} rename;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 		} flush;
214 		struct {
215 			struct cache_buffer	*cache_buffer;
216 			uint64_t		length;
217 			uint64_t		offset;
218 		} readahead;
219 		struct {
220 			/* offset of the file when the sync request was made */
221 			uint64_t			offset;
222 			TAILQ_ENTRY(spdk_fs_request)	tailq;
223 			bool				xattr_in_progress;
224 			/* length written to the xattr for this file - this should
225 			 * always be the same as the offset if only one thread is
226 			 * writing to the file, but could differ if multiple threads
227 			 * are appending
228 			 */
229 			uint64_t			length;
230 		} sync;
231 		struct {
232 			uint32_t			num_clusters;
233 		} resize;
234 		struct {
235 			const char	*name;
236 			uint32_t	flags;
237 			TAILQ_ENTRY(spdk_fs_request)	tailq;
238 		} open;
239 		struct {
240 			const char		*name;
241 			struct spdk_blob	*blob;
242 		} create;
243 		struct {
244 			const char	*name;
245 		} delete;
246 		struct {
247 			const char	*name;
248 		} stat;
249 	} op;
250 };
251 
252 static void file_free(struct spdk_file *file);
253 static void fs_io_device_unregister(struct spdk_filesystem *fs);
254 static void fs_free_io_channels(struct spdk_filesystem *fs);
255 
256 void
257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
258 {
259 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
260 }
261 
262 static int _blobfs_cache_pool_reclaim(void *arg);
263 
264 static bool
265 blobfs_cache_pool_need_reclaim(void)
266 {
267 	size_t count;
268 
269 	count = spdk_mempool_count(g_cache_pool);
270 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
271 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
272 	 */
273 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
274 		return false;
275 	}
276 
277 	return true;
278 }
279 
280 static void
281 __start_cache_pool_mgmt(void *ctx)
282 {
283 	assert(g_cache_pool == NULL);
284 
285 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
286 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
287 					   CACHE_BUFFER_SIZE,
288 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
289 					   SPDK_ENV_SOCKET_ID_ANY);
290 	if (!g_cache_pool) {
291 		SPDK_ERRLOG("Create mempool failed, you may "
292 			    "increase the memory and try again\n");
293 		assert(false);
294 	}
295 	TAILQ_INIT(&g_caches);
296 
297 	assert(g_cache_pool_mgmt_poller == NULL);
298 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
299 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
300 }
301 
302 static void
303 __stop_cache_pool_mgmt(void *ctx)
304 {
305 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
306 
307 	assert(g_cache_pool != NULL);
308 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
309 	spdk_mempool_free(g_cache_pool);
310 	g_cache_pool = NULL;
311 
312 	spdk_thread_exit(g_cache_pool_thread);
313 }
314 
315 static void
316 initialize_global_cache(void)
317 {
318 	pthread_mutex_lock(&g_cache_init_lock);
319 	if (g_fs_count == 0) {
320 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
321 		assert(g_cache_pool_thread != NULL);
322 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
323 	}
324 	g_fs_count++;
325 	pthread_mutex_unlock(&g_cache_init_lock);
326 }
327 
328 static void
329 free_global_cache(void)
330 {
331 	pthread_mutex_lock(&g_cache_init_lock);
332 	g_fs_count--;
333 	if (g_fs_count == 0) {
334 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
335 	}
336 	pthread_mutex_unlock(&g_cache_init_lock);
337 }
338 
339 static uint64_t
340 __file_get_blob_size(struct spdk_file *file)
341 {
342 	uint64_t cluster_sz;
343 
344 	cluster_sz = file->fs->bs_opts.cluster_sz;
345 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
346 }
347 
348 struct spdk_fs_request {
349 	struct spdk_fs_cb_args		args;
350 	TAILQ_ENTRY(spdk_fs_request)	link;
351 	struct spdk_fs_channel		*channel;
352 };
353 
354 struct spdk_fs_channel {
355 	struct spdk_fs_request		*req_mem;
356 	TAILQ_HEAD(, spdk_fs_request)	reqs;
357 	sem_t				sem;
358 	struct spdk_filesystem		*fs;
359 	struct spdk_io_channel		*bs_channel;
360 	fs_send_request_fn		send_request;
361 	bool				sync;
362 	uint32_t			outstanding_reqs;
363 	pthread_spinlock_t		lock;
364 };
365 
366 /* For now, this is effectively an alias. But eventually we'll shift
367  * some data members over. */
368 struct spdk_fs_thread_ctx {
369 	struct spdk_fs_channel	ch;
370 };
371 
372 static struct spdk_fs_request *
373 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
374 {
375 	struct spdk_fs_request *req;
376 	struct iovec *iovs = NULL;
377 
378 	if (iovcnt > 1) {
379 		iovs = calloc(iovcnt, sizeof(struct iovec));
380 		if (!iovs) {
381 			return NULL;
382 		}
383 	}
384 
385 	if (channel->sync) {
386 		pthread_spin_lock(&channel->lock);
387 	}
388 
389 	req = TAILQ_FIRST(&channel->reqs);
390 	if (req) {
391 		channel->outstanding_reqs++;
392 		TAILQ_REMOVE(&channel->reqs, req, link);
393 	}
394 
395 	if (channel->sync) {
396 		pthread_spin_unlock(&channel->lock);
397 	}
398 
399 	if (req == NULL) {
400 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
401 		free(iovs);
402 		return NULL;
403 	}
404 	memset(req, 0, sizeof(*req));
405 	req->channel = channel;
406 	if (iovcnt > 1) {
407 		req->args.iovs = iovs;
408 	} else {
409 		req->args.iovs = &req->args.iov;
410 	}
411 	req->args.iovcnt = iovcnt;
412 
413 	return req;
414 }
415 
416 static struct spdk_fs_request *
417 alloc_fs_request(struct spdk_fs_channel *channel)
418 {
419 	return alloc_fs_request_with_iov(channel, 0);
420 }
421 
422 static void
423 free_fs_request(struct spdk_fs_request *req)
424 {
425 	struct spdk_fs_channel *channel = req->channel;
426 
427 	if (req->args.iovcnt > 1) {
428 		free(req->args.iovs);
429 	}
430 
431 	if (channel->sync) {
432 		pthread_spin_lock(&channel->lock);
433 	}
434 
435 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
436 	channel->outstanding_reqs--;
437 
438 	if (channel->sync) {
439 		pthread_spin_unlock(&channel->lock);
440 	}
441 }
442 
443 static int
444 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
445 		  uint32_t max_ops)
446 {
447 	uint32_t i;
448 
449 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
450 	if (!channel->req_mem) {
451 		return -1;
452 	}
453 
454 	channel->outstanding_reqs = 0;
455 	TAILQ_INIT(&channel->reqs);
456 	sem_init(&channel->sem, 0, 0);
457 
458 	for (i = 0; i < max_ops; i++) {
459 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
460 	}
461 
462 	channel->fs = fs;
463 
464 	return 0;
465 }
466 
467 static int
468 fs_md_channel_create(void *io_device, void *ctx_buf)
469 {
470 	struct spdk_filesystem		*fs;
471 	struct spdk_fs_channel		*channel = ctx_buf;
472 
473 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
474 
475 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
476 }
477 
478 static int
479 fs_sync_channel_create(void *io_device, void *ctx_buf)
480 {
481 	struct spdk_filesystem		*fs;
482 	struct spdk_fs_channel		*channel = ctx_buf;
483 
484 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
485 
486 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
487 }
488 
489 static int
490 fs_io_channel_create(void *io_device, void *ctx_buf)
491 {
492 	struct spdk_filesystem		*fs;
493 	struct spdk_fs_channel		*channel = ctx_buf;
494 
495 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
496 
497 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
498 }
499 
500 static void
501 fs_channel_destroy(void *io_device, void *ctx_buf)
502 {
503 	struct spdk_fs_channel *channel = ctx_buf;
504 
505 	if (channel->outstanding_reqs > 0) {
506 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
507 			    channel->outstanding_reqs);
508 	}
509 
510 	free(channel->req_mem);
511 	if (channel->bs_channel != NULL) {
512 		spdk_bs_free_io_channel(channel->bs_channel);
513 	}
514 }
515 
516 static void
517 __send_request_direct(fs_request_fn fn, void *arg)
518 {
519 	fn(arg);
520 }
521 
522 static void
523 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
524 {
525 	fs->bs = bs;
526 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
527 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
528 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
529 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
530 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
531 
532 	initialize_global_cache();
533 }
534 
535 static void
536 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
537 {
538 	struct spdk_fs_request *req = ctx;
539 	struct spdk_fs_cb_args *args = &req->args;
540 	struct spdk_filesystem *fs = args->fs;
541 
542 	if (bserrno == 0) {
543 		common_fs_bs_init(fs, bs);
544 	} else {
545 		free(fs);
546 		fs = NULL;
547 	}
548 
549 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
550 	free_fs_request(req);
551 }
552 
553 static void
554 fs_conf_parse(void)
555 {
556 	struct spdk_conf_section *sp;
557 	int cache_buffer_shift;
558 
559 	sp = spdk_conf_find_section(NULL, "Blobfs");
560 	if (sp == NULL) {
561 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
562 		return;
563 	}
564 
565 	cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
566 	if (cache_buffer_shift <= 0) {
567 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
568 	} else {
569 		g_fs_cache_buffer_shift = cache_buffer_shift;
570 	}
571 }
572 
573 static struct spdk_filesystem *
574 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
575 {
576 	struct spdk_filesystem *fs;
577 
578 	fs = calloc(1, sizeof(*fs));
579 	if (fs == NULL) {
580 		return NULL;
581 	}
582 
583 	fs->bdev = dev;
584 	fs->send_request = send_request_fn;
585 	TAILQ_INIT(&fs->files);
586 
587 	fs->md_target.max_ops = 512;
588 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
589 				sizeof(struct spdk_fs_channel), "blobfs_md");
590 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
591 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
592 
593 	fs->sync_target.max_ops = 512;
594 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
595 				sizeof(struct spdk_fs_channel), "blobfs_sync");
596 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
597 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
598 
599 	fs->io_target.max_ops = 512;
600 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
601 				sizeof(struct spdk_fs_channel), "blobfs_io");
602 
603 	return fs;
604 }
605 
606 static void
607 __wake_caller(void *arg, int fserrno)
608 {
609 	struct spdk_fs_cb_args *args = arg;
610 
611 	args->rc = fserrno;
612 	sem_post(args->sem);
613 }
614 
615 void
616 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
617 	     fs_send_request_fn send_request_fn,
618 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
619 {
620 	struct spdk_filesystem *fs;
621 	struct spdk_fs_request *req;
622 	struct spdk_fs_cb_args *args;
623 	struct spdk_bs_opts opts = {};
624 
625 	fs = fs_alloc(dev, send_request_fn);
626 	if (fs == NULL) {
627 		cb_fn(cb_arg, NULL, -ENOMEM);
628 		return;
629 	}
630 
631 	fs_conf_parse();
632 
633 	req = alloc_fs_request(fs->md_target.md_fs_channel);
634 	if (req == NULL) {
635 		fs_free_io_channels(fs);
636 		fs_io_device_unregister(fs);
637 		cb_fn(cb_arg, NULL, -ENOMEM);
638 		return;
639 	}
640 
641 	args = &req->args;
642 	args->fn.fs_op_with_handle = cb_fn;
643 	args->arg = cb_arg;
644 	args->fs = fs;
645 
646 	spdk_bs_opts_init(&opts);
647 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
648 	if (opt) {
649 		opts.cluster_sz = opt->cluster_sz;
650 	}
651 	spdk_bs_init(dev, &opts, init_cb, req);
652 }
653 
654 static struct spdk_file *
655 file_alloc(struct spdk_filesystem *fs)
656 {
657 	struct spdk_file *file;
658 
659 	file = calloc(1, sizeof(*file));
660 	if (file == NULL) {
661 		return NULL;
662 	}
663 
664 	file->tree = calloc(1, sizeof(*file->tree));
665 	if (file->tree == NULL) {
666 		free(file);
667 		return NULL;
668 	}
669 
670 	file->fs = fs;
671 	TAILQ_INIT(&file->open_requests);
672 	TAILQ_INIT(&file->sync_requests);
673 	pthread_spin_init(&file->lock, 0);
674 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
675 	file->priority = SPDK_FILE_PRIORITY_LOW;
676 	return file;
677 }
678 
679 static void fs_load_done(void *ctx, int bserrno);
680 
681 static int
682 _handle_deleted_files(struct spdk_fs_request *req)
683 {
684 	struct spdk_fs_cb_args *args = &req->args;
685 	struct spdk_filesystem *fs = args->fs;
686 
687 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
688 		struct spdk_deleted_file *deleted_file;
689 
690 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
691 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
692 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
693 		free(deleted_file);
694 		return 0;
695 	}
696 
697 	return 1;
698 }
699 
700 static void
701 fs_load_done(void *ctx, int bserrno)
702 {
703 	struct spdk_fs_request *req = ctx;
704 	struct spdk_fs_cb_args *args = &req->args;
705 	struct spdk_filesystem *fs = args->fs;
706 
707 	/* The filesystem has been loaded.  Now check if there are any files that
708 	 *  were marked for deletion before last unload.  Do not complete the
709 	 *  fs_load callback until all of them have been deleted on disk.
710 	 */
711 	if (_handle_deleted_files(req) == 0) {
712 		/* We found a file that's been marked for deleting but not actually
713 		 *  deleted yet.  This function will get called again once the delete
714 		 *  operation is completed.
715 		 */
716 		return;
717 	}
718 
719 	args->fn.fs_op_with_handle(args->arg, fs, 0);
720 	free_fs_request(req);
721 
722 }
723 
724 static void
725 _file_build_trace_arg_name(struct spdk_file *f)
726 {
727 	f->trace_arg_name = 0;
728 	memcpy(&f->trace_arg_name, f->name,
729 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
730 }
731 
732 static void
733 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
734 {
735 	struct spdk_fs_request *req = ctx;
736 	struct spdk_fs_cb_args *args = &req->args;
737 	struct spdk_filesystem *fs = args->fs;
738 	uint64_t *length;
739 	const char *name;
740 	uint32_t *is_deleted;
741 	size_t value_len;
742 
743 	if (rc < 0) {
744 		args->fn.fs_op_with_handle(args->arg, fs, rc);
745 		free_fs_request(req);
746 		return;
747 	}
748 
749 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
750 	if (rc < 0) {
751 		args->fn.fs_op_with_handle(args->arg, fs, rc);
752 		free_fs_request(req);
753 		return;
754 	}
755 
756 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
757 	if (rc < 0) {
758 		args->fn.fs_op_with_handle(args->arg, fs, rc);
759 		free_fs_request(req);
760 		return;
761 	}
762 
763 	assert(value_len == 8);
764 
765 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
766 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
767 	if (rc < 0) {
768 		struct spdk_file *f;
769 
770 		f = file_alloc(fs);
771 		if (f == NULL) {
772 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
773 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
774 			free_fs_request(req);
775 			return;
776 		}
777 
778 		f->name = strdup(name);
779 		_file_build_trace_arg_name(f);
780 		f->blobid = spdk_blob_get_id(blob);
781 		f->length = *length;
782 		f->length_flushed = *length;
783 		f->length_xattr = *length;
784 		f->append_pos = *length;
785 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
786 	} else {
787 		struct spdk_deleted_file *deleted_file;
788 
789 		deleted_file = calloc(1, sizeof(*deleted_file));
790 		if (deleted_file == NULL) {
791 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
792 			free_fs_request(req);
793 			return;
794 		}
795 		deleted_file->id = spdk_blob_get_id(blob);
796 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
797 	}
798 }
799 
800 static void
801 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
802 {
803 	struct spdk_fs_request *req = ctx;
804 	struct spdk_fs_cb_args *args = &req->args;
805 	struct spdk_filesystem *fs = args->fs;
806 	struct spdk_bs_type bstype;
807 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
808 	static const struct spdk_bs_type zeros;
809 
810 	if (bserrno != 0) {
811 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
812 		free_fs_request(req);
813 		fs_free_io_channels(fs);
814 		fs_io_device_unregister(fs);
815 		return;
816 	}
817 
818 	bstype = spdk_bs_get_bstype(bs);
819 
820 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
821 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n");
822 		spdk_bs_set_bstype(bs, blobfs_type);
823 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
824 		SPDK_ERRLOG("not blobfs\n");
825 		SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype));
826 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
827 		free_fs_request(req);
828 		fs_free_io_channels(fs);
829 		fs_io_device_unregister(fs);
830 		return;
831 	}
832 
833 	common_fs_bs_init(fs, bs);
834 	fs_load_done(req, 0);
835 }
836 
837 static void
838 fs_io_device_unregister(struct spdk_filesystem *fs)
839 {
840 	assert(fs != NULL);
841 	spdk_io_device_unregister(&fs->md_target, NULL);
842 	spdk_io_device_unregister(&fs->sync_target, NULL);
843 	spdk_io_device_unregister(&fs->io_target, NULL);
844 	free(fs);
845 }
846 
847 static void
848 fs_free_io_channels(struct spdk_filesystem *fs)
849 {
850 	assert(fs != NULL);
851 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
852 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
853 }
854 
855 void
856 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
857 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
858 {
859 	struct spdk_filesystem *fs;
860 	struct spdk_fs_cb_args *args;
861 	struct spdk_fs_request *req;
862 	struct spdk_bs_opts	bs_opts;
863 
864 	fs = fs_alloc(dev, send_request_fn);
865 	if (fs == NULL) {
866 		cb_fn(cb_arg, NULL, -ENOMEM);
867 		return;
868 	}
869 
870 	fs_conf_parse();
871 
872 	req = alloc_fs_request(fs->md_target.md_fs_channel);
873 	if (req == NULL) {
874 		fs_free_io_channels(fs);
875 		fs_io_device_unregister(fs);
876 		cb_fn(cb_arg, NULL, -ENOMEM);
877 		return;
878 	}
879 
880 	args = &req->args;
881 	args->fn.fs_op_with_handle = cb_fn;
882 	args->arg = cb_arg;
883 	args->fs = fs;
884 	TAILQ_INIT(&args->op.fs_load.deleted_files);
885 	spdk_bs_opts_init(&bs_opts);
886 	bs_opts.iter_cb_fn = iter_cb;
887 	bs_opts.iter_cb_arg = req;
888 	spdk_bs_load(dev, &bs_opts, load_cb, req);
889 }
890 
891 static void
892 unload_cb(void *ctx, int bserrno)
893 {
894 	struct spdk_fs_request *req = ctx;
895 	struct spdk_fs_cb_args *args = &req->args;
896 	struct spdk_filesystem *fs = args->fs;
897 	struct spdk_file *file, *tmp;
898 
899 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
900 		TAILQ_REMOVE(&fs->files, file, tailq);
901 		file_free(file);
902 	}
903 
904 	free_global_cache();
905 
906 	args->fn.fs_op(args->arg, bserrno);
907 	free(req);
908 
909 	fs_io_device_unregister(fs);
910 }
911 
912 void
913 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
914 {
915 	struct spdk_fs_request *req;
916 	struct spdk_fs_cb_args *args;
917 
918 	/*
919 	 * We must free the md_channel before unloading the blobstore, so just
920 	 *  allocate this request from the general heap.
921 	 */
922 	req = calloc(1, sizeof(*req));
923 	if (req == NULL) {
924 		cb_fn(cb_arg, -ENOMEM);
925 		return;
926 	}
927 
928 	args = &req->args;
929 	args->fn.fs_op = cb_fn;
930 	args->arg = cb_arg;
931 	args->fs = fs;
932 
933 	fs_free_io_channels(fs);
934 	spdk_bs_unload(fs->bs, unload_cb, req);
935 }
936 
937 static struct spdk_file *
938 fs_find_file(struct spdk_filesystem *fs, const char *name)
939 {
940 	struct spdk_file *file;
941 
942 	TAILQ_FOREACH(file, &fs->files, tailq) {
943 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
944 			return file;
945 		}
946 	}
947 
948 	return NULL;
949 }
950 
951 void
952 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
953 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
954 {
955 	struct spdk_file_stat stat;
956 	struct spdk_file *f = NULL;
957 
958 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
959 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
960 		return;
961 	}
962 
963 	f = fs_find_file(fs, name);
964 	if (f != NULL) {
965 		stat.blobid = f->blobid;
966 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
967 		cb_fn(cb_arg, &stat, 0);
968 		return;
969 	}
970 
971 	cb_fn(cb_arg, NULL, -ENOENT);
972 }
973 
974 static void
975 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
976 {
977 	struct spdk_fs_request *req = arg;
978 	struct spdk_fs_cb_args *args = &req->args;
979 
980 	args->rc = fserrno;
981 	if (fserrno == 0) {
982 		memcpy(args->arg, stat, sizeof(*stat));
983 	}
984 	sem_post(args->sem);
985 }
986 
987 static void
988 __file_stat(void *arg)
989 {
990 	struct spdk_fs_request *req = arg;
991 	struct spdk_fs_cb_args *args = &req->args;
992 
993 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
994 				args->fn.stat_op, req);
995 }
996 
997 int
998 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
999 		  const char *name, struct spdk_file_stat *stat)
1000 {
1001 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1002 	struct spdk_fs_request *req;
1003 	int rc;
1004 
1005 	req = alloc_fs_request(channel);
1006 	if (req == NULL) {
1007 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
1008 		return -ENOMEM;
1009 	}
1010 
1011 	req->args.fs = fs;
1012 	req->args.op.stat.name = name;
1013 	req->args.fn.stat_op = __copy_stat;
1014 	req->args.arg = stat;
1015 	req->args.sem = &channel->sem;
1016 	channel->send_request(__file_stat, req);
1017 	sem_wait(&channel->sem);
1018 
1019 	rc = req->args.rc;
1020 	free_fs_request(req);
1021 
1022 	return rc;
1023 }
1024 
1025 static void
1026 fs_create_blob_close_cb(void *ctx, int bserrno)
1027 {
1028 	int rc;
1029 	struct spdk_fs_request *req = ctx;
1030 	struct spdk_fs_cb_args *args = &req->args;
1031 
1032 	rc = args->rc ? args->rc : bserrno;
1033 	args->fn.file_op(args->arg, rc);
1034 	free_fs_request(req);
1035 }
1036 
1037 static void
1038 fs_create_blob_resize_cb(void *ctx, int bserrno)
1039 {
1040 	struct spdk_fs_request *req = ctx;
1041 	struct spdk_fs_cb_args *args = &req->args;
1042 	struct spdk_file *f = args->file;
1043 	struct spdk_blob *blob = args->op.create.blob;
1044 	uint64_t length = 0;
1045 
1046 	args->rc = bserrno;
1047 	if (bserrno) {
1048 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1049 		return;
1050 	}
1051 
1052 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1053 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1054 
1055 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1056 }
1057 
1058 static void
1059 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1060 {
1061 	struct spdk_fs_request *req = ctx;
1062 	struct spdk_fs_cb_args *args = &req->args;
1063 
1064 	if (bserrno) {
1065 		args->fn.file_op(args->arg, bserrno);
1066 		free_fs_request(req);
1067 		return;
1068 	}
1069 
1070 	args->op.create.blob = blob;
1071 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1072 }
1073 
1074 static void
1075 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1076 {
1077 	struct spdk_fs_request *req = ctx;
1078 	struct spdk_fs_cb_args *args = &req->args;
1079 	struct spdk_file *f = args->file;
1080 
1081 	if (bserrno) {
1082 		args->fn.file_op(args->arg, bserrno);
1083 		free_fs_request(req);
1084 		return;
1085 	}
1086 
1087 	f->blobid = blobid;
1088 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1089 }
1090 
1091 void
1092 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1093 			  spdk_file_op_complete cb_fn, void *cb_arg)
1094 {
1095 	struct spdk_file *file;
1096 	struct spdk_fs_request *req;
1097 	struct spdk_fs_cb_args *args;
1098 
1099 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1100 		cb_fn(cb_arg, -ENAMETOOLONG);
1101 		return;
1102 	}
1103 
1104 	file = fs_find_file(fs, name);
1105 	if (file != NULL) {
1106 		cb_fn(cb_arg, -EEXIST);
1107 		return;
1108 	}
1109 
1110 	file = file_alloc(fs);
1111 	if (file == NULL) {
1112 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1113 		cb_fn(cb_arg, -ENOMEM);
1114 		return;
1115 	}
1116 
1117 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1118 	if (req == NULL) {
1119 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1120 		cb_fn(cb_arg, -ENOMEM);
1121 		return;
1122 	}
1123 
1124 	args = &req->args;
1125 	args->file = file;
1126 	args->fn.file_op = cb_fn;
1127 	args->arg = cb_arg;
1128 
1129 	file->name = strdup(name);
1130 	_file_build_trace_arg_name(file);
1131 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1132 }
1133 
1134 static void
1135 __fs_create_file_done(void *arg, int fserrno)
1136 {
1137 	struct spdk_fs_request *req = arg;
1138 	struct spdk_fs_cb_args *args = &req->args;
1139 
1140 	__wake_caller(args, fserrno);
1141 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1142 }
1143 
1144 static void
1145 __fs_create_file(void *arg)
1146 {
1147 	struct spdk_fs_request *req = arg;
1148 	struct spdk_fs_cb_args *args = &req->args;
1149 
1150 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1151 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1152 }
1153 
1154 int
1155 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1156 {
1157 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1158 	struct spdk_fs_request *req;
1159 	struct spdk_fs_cb_args *args;
1160 	int rc;
1161 
1162 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1163 
1164 	req = alloc_fs_request(channel);
1165 	if (req == NULL) {
1166 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1167 		return -ENOMEM;
1168 	}
1169 
1170 	args = &req->args;
1171 	args->fs = fs;
1172 	args->op.create.name = name;
1173 	args->sem = &channel->sem;
1174 	fs->send_request(__fs_create_file, req);
1175 	sem_wait(&channel->sem);
1176 	rc = args->rc;
1177 	free_fs_request(req);
1178 
1179 	return rc;
1180 }
1181 
1182 static void
1183 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1184 {
1185 	struct spdk_fs_request *req = ctx;
1186 	struct spdk_fs_cb_args *args = &req->args;
1187 	struct spdk_file *f = args->file;
1188 
1189 	f->blob = blob;
1190 	while (!TAILQ_EMPTY(&f->open_requests)) {
1191 		req = TAILQ_FIRST(&f->open_requests);
1192 		args = &req->args;
1193 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1194 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1195 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1196 		free_fs_request(req);
1197 	}
1198 }
1199 
1200 static void
1201 fs_open_blob_create_cb(void *ctx, int bserrno)
1202 {
1203 	struct spdk_fs_request *req = ctx;
1204 	struct spdk_fs_cb_args *args = &req->args;
1205 	struct spdk_file *file = args->file;
1206 	struct spdk_filesystem *fs = args->fs;
1207 
1208 	if (file == NULL) {
1209 		/*
1210 		 * This is from an open with CREATE flag - the file
1211 		 *  is now created so look it up in the file list for this
1212 		 *  filesystem.
1213 		 */
1214 		file = fs_find_file(fs, args->op.open.name);
1215 		assert(file != NULL);
1216 		args->file = file;
1217 	}
1218 
1219 	file->ref_count++;
1220 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1221 	if (file->ref_count == 1) {
1222 		assert(file->blob == NULL);
1223 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1224 	} else if (file->blob != NULL) {
1225 		fs_open_blob_done(req, file->blob, 0);
1226 	} else {
1227 		/*
1228 		 * The blob open for this file is in progress due to a previous
1229 		 *  open request.  When that open completes, it will invoke the
1230 		 *  open callback for this request.
1231 		 */
1232 	}
1233 }
1234 
1235 void
1236 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1237 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1238 {
1239 	struct spdk_file *f = NULL;
1240 	struct spdk_fs_request *req;
1241 	struct spdk_fs_cb_args *args;
1242 
1243 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1244 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1245 		return;
1246 	}
1247 
1248 	f = fs_find_file(fs, name);
1249 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1250 		cb_fn(cb_arg, NULL, -ENOENT);
1251 		return;
1252 	}
1253 
1254 	if (f != NULL && f->is_deleted == true) {
1255 		cb_fn(cb_arg, NULL, -ENOENT);
1256 		return;
1257 	}
1258 
1259 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1260 	if (req == NULL) {
1261 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1262 		cb_fn(cb_arg, NULL, -ENOMEM);
1263 		return;
1264 	}
1265 
1266 	args = &req->args;
1267 	args->fn.file_op_with_handle = cb_fn;
1268 	args->arg = cb_arg;
1269 	args->file = f;
1270 	args->fs = fs;
1271 	args->op.open.name = name;
1272 
1273 	if (f == NULL) {
1274 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1275 	} else {
1276 		fs_open_blob_create_cb(req, 0);
1277 	}
1278 }
1279 
1280 static void
1281 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1282 {
1283 	struct spdk_fs_request *req = arg;
1284 	struct spdk_fs_cb_args *args = &req->args;
1285 
1286 	args->file = file;
1287 	__wake_caller(args, bserrno);
1288 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1289 }
1290 
1291 static void
1292 __fs_open_file(void *arg)
1293 {
1294 	struct spdk_fs_request *req = arg;
1295 	struct spdk_fs_cb_args *args = &req->args;
1296 
1297 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1298 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1299 				__fs_open_file_done, req);
1300 }
1301 
1302 int
1303 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1304 		  const char *name, uint32_t flags, struct spdk_file **file)
1305 {
1306 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1307 	struct spdk_fs_request *req;
1308 	struct spdk_fs_cb_args *args;
1309 	int rc;
1310 
1311 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1312 
1313 	req = alloc_fs_request(channel);
1314 	if (req == NULL) {
1315 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1316 		return -ENOMEM;
1317 	}
1318 
1319 	args = &req->args;
1320 	args->fs = fs;
1321 	args->op.open.name = name;
1322 	args->op.open.flags = flags;
1323 	args->sem = &channel->sem;
1324 	fs->send_request(__fs_open_file, req);
1325 	sem_wait(&channel->sem);
1326 	rc = args->rc;
1327 	if (rc == 0) {
1328 		*file = args->file;
1329 	} else {
1330 		*file = NULL;
1331 	}
1332 	free_fs_request(req);
1333 
1334 	return rc;
1335 }
1336 
1337 static void
1338 fs_rename_blob_close_cb(void *ctx, int bserrno)
1339 {
1340 	struct spdk_fs_request *req = ctx;
1341 	struct spdk_fs_cb_args *args = &req->args;
1342 
1343 	args->fn.fs_op(args->arg, bserrno);
1344 	free_fs_request(req);
1345 }
1346 
1347 static void
1348 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1349 {
1350 	struct spdk_fs_request *req = ctx;
1351 	struct spdk_fs_cb_args *args = &req->args;
1352 	const char *new_name = args->op.rename.new_name;
1353 
1354 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1355 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1356 }
1357 
1358 static void
1359 _fs_md_rename_file(struct spdk_fs_request *req)
1360 {
1361 	struct spdk_fs_cb_args *args = &req->args;
1362 	struct spdk_file *f;
1363 
1364 	f = fs_find_file(args->fs, args->op.rename.old_name);
1365 	if (f == NULL) {
1366 		args->fn.fs_op(args->arg, -ENOENT);
1367 		free_fs_request(req);
1368 		return;
1369 	}
1370 
1371 	free(f->name);
1372 	f->name = strdup(args->op.rename.new_name);
1373 	_file_build_trace_arg_name(f);
1374 	args->file = f;
1375 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1376 }
1377 
1378 static void
1379 fs_rename_delete_done(void *arg, int fserrno)
1380 {
1381 	_fs_md_rename_file(arg);
1382 }
1383 
1384 void
1385 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1386 			  const char *old_name, const char *new_name,
1387 			  spdk_file_op_complete cb_fn, void *cb_arg)
1388 {
1389 	struct spdk_file *f;
1390 	struct spdk_fs_request *req;
1391 	struct spdk_fs_cb_args *args;
1392 
1393 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1394 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1395 		cb_fn(cb_arg, -ENAMETOOLONG);
1396 		return;
1397 	}
1398 
1399 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1400 	if (req == NULL) {
1401 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1402 			    new_name);
1403 		cb_fn(cb_arg, -ENOMEM);
1404 		return;
1405 	}
1406 
1407 	args = &req->args;
1408 	args->fn.fs_op = cb_fn;
1409 	args->fs = fs;
1410 	args->arg = cb_arg;
1411 	args->op.rename.old_name = old_name;
1412 	args->op.rename.new_name = new_name;
1413 
1414 	f = fs_find_file(fs, new_name);
1415 	if (f == NULL) {
1416 		_fs_md_rename_file(req);
1417 		return;
1418 	}
1419 
1420 	/*
1421 	 * The rename overwrites an existing file.  So delete the existing file, then
1422 	 *  do the actual rename.
1423 	 */
1424 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1425 }
1426 
1427 static void
1428 __fs_rename_file_done(void *arg, int fserrno)
1429 {
1430 	struct spdk_fs_request *req = arg;
1431 	struct spdk_fs_cb_args *args = &req->args;
1432 
1433 	__wake_caller(args, fserrno);
1434 }
1435 
1436 static void
1437 __fs_rename_file(void *arg)
1438 {
1439 	struct spdk_fs_request *req = arg;
1440 	struct spdk_fs_cb_args *args = &req->args;
1441 
1442 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1443 				  __fs_rename_file_done, req);
1444 }
1445 
1446 int
1447 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1448 		    const char *old_name, const char *new_name)
1449 {
1450 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1451 	struct spdk_fs_request *req;
1452 	struct spdk_fs_cb_args *args;
1453 	int rc;
1454 
1455 	req = alloc_fs_request(channel);
1456 	if (req == NULL) {
1457 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1458 		return -ENOMEM;
1459 	}
1460 
1461 	args = &req->args;
1462 
1463 	args->fs = fs;
1464 	args->op.rename.old_name = old_name;
1465 	args->op.rename.new_name = new_name;
1466 	args->sem = &channel->sem;
1467 	fs->send_request(__fs_rename_file, req);
1468 	sem_wait(&channel->sem);
1469 	rc = args->rc;
1470 	free_fs_request(req);
1471 	return rc;
1472 }
1473 
1474 static void
1475 blob_delete_cb(void *ctx, int bserrno)
1476 {
1477 	struct spdk_fs_request *req = ctx;
1478 	struct spdk_fs_cb_args *args = &req->args;
1479 
1480 	args->fn.file_op(args->arg, bserrno);
1481 	free_fs_request(req);
1482 }
1483 
1484 void
1485 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1486 			  spdk_file_op_complete cb_fn, void *cb_arg)
1487 {
1488 	struct spdk_file *f;
1489 	spdk_blob_id blobid;
1490 	struct spdk_fs_request *req;
1491 	struct spdk_fs_cb_args *args;
1492 
1493 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1494 
1495 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1496 		cb_fn(cb_arg, -ENAMETOOLONG);
1497 		return;
1498 	}
1499 
1500 	f = fs_find_file(fs, name);
1501 	if (f == NULL) {
1502 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1503 		cb_fn(cb_arg, -ENOENT);
1504 		return;
1505 	}
1506 
1507 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1508 	if (req == NULL) {
1509 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1510 		cb_fn(cb_arg, -ENOMEM);
1511 		return;
1512 	}
1513 
1514 	args = &req->args;
1515 	args->fn.file_op = cb_fn;
1516 	args->arg = cb_arg;
1517 
1518 	if (f->ref_count > 0) {
1519 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1520 		f->is_deleted = true;
1521 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1522 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1523 		return;
1524 	}
1525 
1526 	blobid = f->blobid;
1527 	TAILQ_REMOVE(&fs->files, f, tailq);
1528 
1529 	file_free(f);
1530 
1531 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1532 }
1533 
1534 static uint64_t
1535 fs_name_to_uint64(const char *name)
1536 {
1537 	uint64_t result = 0;
1538 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1539 	return result;
1540 }
1541 
1542 static void
1543 __fs_delete_file_done(void *arg, int fserrno)
1544 {
1545 	struct spdk_fs_request *req = arg;
1546 	struct spdk_fs_cb_args *args = &req->args;
1547 
1548 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1549 	__wake_caller(args, fserrno);
1550 }
1551 
1552 static void
1553 __fs_delete_file(void *arg)
1554 {
1555 	struct spdk_fs_request *req = arg;
1556 	struct spdk_fs_cb_args *args = &req->args;
1557 
1558 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1559 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1560 }
1561 
1562 int
1563 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1564 		    const char *name)
1565 {
1566 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1567 	struct spdk_fs_request *req;
1568 	struct spdk_fs_cb_args *args;
1569 	int rc;
1570 
1571 	req = alloc_fs_request(channel);
1572 	if (req == NULL) {
1573 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1574 		return -ENOMEM;
1575 	}
1576 
1577 	args = &req->args;
1578 	args->fs = fs;
1579 	args->op.delete.name = name;
1580 	args->sem = &channel->sem;
1581 	fs->send_request(__fs_delete_file, req);
1582 	sem_wait(&channel->sem);
1583 	rc = args->rc;
1584 	free_fs_request(req);
1585 
1586 	return rc;
1587 }
1588 
1589 spdk_fs_iter
1590 spdk_fs_iter_first(struct spdk_filesystem *fs)
1591 {
1592 	struct spdk_file *f;
1593 
1594 	f = TAILQ_FIRST(&fs->files);
1595 	return f;
1596 }
1597 
1598 spdk_fs_iter
1599 spdk_fs_iter_next(spdk_fs_iter iter)
1600 {
1601 	struct spdk_file *f = iter;
1602 
1603 	if (f == NULL) {
1604 		return NULL;
1605 	}
1606 
1607 	f = TAILQ_NEXT(f, tailq);
1608 	return f;
1609 }
1610 
1611 const char *
1612 spdk_file_get_name(struct spdk_file *file)
1613 {
1614 	return file->name;
1615 }
1616 
1617 uint64_t
1618 spdk_file_get_length(struct spdk_file *file)
1619 {
1620 	uint64_t length;
1621 
1622 	assert(file != NULL);
1623 
1624 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1625 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1626 	return length;
1627 }
1628 
1629 static void
1630 fs_truncate_complete_cb(void *ctx, int bserrno)
1631 {
1632 	struct spdk_fs_request *req = ctx;
1633 	struct spdk_fs_cb_args *args = &req->args;
1634 
1635 	args->fn.file_op(args->arg, bserrno);
1636 	free_fs_request(req);
1637 }
1638 
1639 static void
1640 fs_truncate_resize_cb(void *ctx, int bserrno)
1641 {
1642 	struct spdk_fs_request *req = ctx;
1643 	struct spdk_fs_cb_args *args = &req->args;
1644 	struct spdk_file *file = args->file;
1645 	uint64_t *length = &args->op.truncate.length;
1646 
1647 	if (bserrno) {
1648 		args->fn.file_op(args->arg, bserrno);
1649 		free_fs_request(req);
1650 		return;
1651 	}
1652 
1653 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1654 
1655 	file->length = *length;
1656 	if (file->append_pos > file->length) {
1657 		file->append_pos = file->length;
1658 	}
1659 
1660 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1661 }
1662 
1663 static uint64_t
1664 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1665 {
1666 	return (length + cluster_sz - 1) / cluster_sz;
1667 }
1668 
1669 void
1670 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1671 			 spdk_file_op_complete cb_fn, void *cb_arg)
1672 {
1673 	struct spdk_filesystem *fs;
1674 	size_t num_clusters;
1675 	struct spdk_fs_request *req;
1676 	struct spdk_fs_cb_args *args;
1677 
1678 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1679 	if (length == file->length) {
1680 		cb_fn(cb_arg, 0);
1681 		return;
1682 	}
1683 
1684 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1685 	if (req == NULL) {
1686 		cb_fn(cb_arg, -ENOMEM);
1687 		return;
1688 	}
1689 
1690 	args = &req->args;
1691 	args->fn.file_op = cb_fn;
1692 	args->arg = cb_arg;
1693 	args->file = file;
1694 	args->op.truncate.length = length;
1695 	fs = file->fs;
1696 
1697 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1698 
1699 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1700 }
1701 
1702 static void
1703 __truncate(void *arg)
1704 {
1705 	struct spdk_fs_request *req = arg;
1706 	struct spdk_fs_cb_args *args = &req->args;
1707 
1708 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1709 				 args->fn.file_op, args);
1710 }
1711 
1712 int
1713 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1714 		   uint64_t length)
1715 {
1716 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1717 	struct spdk_fs_request *req;
1718 	struct spdk_fs_cb_args *args;
1719 	int rc;
1720 
1721 	req = alloc_fs_request(channel);
1722 	if (req == NULL) {
1723 		return -ENOMEM;
1724 	}
1725 
1726 	args = &req->args;
1727 
1728 	args->file = file;
1729 	args->op.truncate.length = length;
1730 	args->fn.file_op = __wake_caller;
1731 	args->sem = &channel->sem;
1732 
1733 	channel->send_request(__truncate, req);
1734 	sem_wait(&channel->sem);
1735 	rc = args->rc;
1736 	free_fs_request(req);
1737 
1738 	return rc;
1739 }
1740 
1741 static void
1742 __rw_done(void *ctx, int bserrno)
1743 {
1744 	struct spdk_fs_request *req = ctx;
1745 	struct spdk_fs_cb_args *args = &req->args;
1746 
1747 	spdk_free(args->op.rw.pin_buf);
1748 	args->fn.file_op(args->arg, bserrno);
1749 	free_fs_request(req);
1750 }
1751 
1752 static void
1753 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1754 {
1755 	int i;
1756 	size_t len;
1757 
1758 	for (i = 0; i < iovcnt; i++) {
1759 		len = spdk_min(iovs[i].iov_len, buf_len);
1760 		memcpy(buf, iovs[i].iov_base, len);
1761 		buf += len;
1762 		assert(buf_len >= len);
1763 		buf_len -= len;
1764 	}
1765 }
1766 
1767 static void
1768 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1769 {
1770 	int i;
1771 	size_t len;
1772 
1773 	for (i = 0; i < iovcnt; i++) {
1774 		len = spdk_min(iovs[i].iov_len, buf_len);
1775 		memcpy(iovs[i].iov_base, buf, len);
1776 		buf += len;
1777 		assert(buf_len >= len);
1778 		buf_len -= len;
1779 	}
1780 }
1781 
1782 static void
1783 __read_done(void *ctx, int bserrno)
1784 {
1785 	struct spdk_fs_request *req = ctx;
1786 	struct spdk_fs_cb_args *args = &req->args;
1787 	void *buf;
1788 
1789 	assert(req != NULL);
1790 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1791 	if (args->op.rw.is_read) {
1792 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1793 		__rw_done(req, 0);
1794 	} else {
1795 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1796 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1797 				   args->op.rw.pin_buf,
1798 				   args->op.rw.start_lba, args->op.rw.num_lba,
1799 				   __rw_done, req);
1800 	}
1801 }
1802 
1803 static void
1804 __do_blob_read(void *ctx, int fserrno)
1805 {
1806 	struct spdk_fs_request *req = ctx;
1807 	struct spdk_fs_cb_args *args = &req->args;
1808 
1809 	if (fserrno) {
1810 		__rw_done(req, fserrno);
1811 		return;
1812 	}
1813 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1814 			  args->op.rw.pin_buf,
1815 			  args->op.rw.start_lba, args->op.rw.num_lba,
1816 			  __read_done, req);
1817 }
1818 
1819 static void
1820 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1821 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1822 {
1823 	uint64_t end_lba;
1824 
1825 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1826 	*start_lba = offset / *lba_size;
1827 	end_lba = (offset + length - 1) / *lba_size;
1828 	*num_lba = (end_lba - *start_lba + 1);
1829 }
1830 
1831 static bool
1832 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1833 {
1834 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1835 
1836 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1837 		return true;
1838 	}
1839 
1840 	return false;
1841 }
1842 
1843 static void
1844 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1845 {
1846 	uint32_t i;
1847 
1848 	for (i = 0; i < iovcnt; i++) {
1849 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1850 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1851 	}
1852 }
1853 
1854 static void
1855 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1856 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1857 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1858 {
1859 	struct spdk_fs_request *req;
1860 	struct spdk_fs_cb_args *args;
1861 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1862 	uint64_t start_lba, num_lba, pin_buf_length;
1863 	uint32_t lba_size;
1864 
1865 	if (is_read && offset + length > file->length) {
1866 		cb_fn(cb_arg, -EINVAL);
1867 		return;
1868 	}
1869 
1870 	req = alloc_fs_request_with_iov(channel, iovcnt);
1871 	if (req == NULL) {
1872 		cb_fn(cb_arg, -ENOMEM);
1873 		return;
1874 	}
1875 
1876 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1877 
1878 	args = &req->args;
1879 	args->fn.file_op = cb_fn;
1880 	args->arg = cb_arg;
1881 	args->file = file;
1882 	args->op.rw.channel = channel->bs_channel;
1883 	_fs_request_setup_iovs(req, iovs, iovcnt);
1884 	args->op.rw.is_read = is_read;
1885 	args->op.rw.offset = offset;
1886 	args->op.rw.blocklen = lba_size;
1887 
1888 	pin_buf_length = num_lba * lba_size;
1889 	args->op.rw.length = pin_buf_length;
1890 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1891 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1892 	if (args->op.rw.pin_buf == NULL) {
1893 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1894 			      file->name, offset, length);
1895 		free_fs_request(req);
1896 		cb_fn(cb_arg, -ENOMEM);
1897 		return;
1898 	}
1899 
1900 	args->op.rw.start_lba = start_lba;
1901 	args->op.rw.num_lba = num_lba;
1902 
1903 	if (!is_read && file->length < offset + length) {
1904 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1905 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1906 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1907 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1908 				   args->op.rw.pin_buf,
1909 				   args->op.rw.start_lba, args->op.rw.num_lba,
1910 				   __rw_done, req);
1911 	} else {
1912 		__do_blob_read(req, 0);
1913 	}
1914 }
1915 
1916 static void
1917 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1918 	    void *payload, uint64_t offset, uint64_t length,
1919 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1920 {
1921 	struct iovec iov;
1922 
1923 	iov.iov_base = payload;
1924 	iov.iov_len = (size_t)length;
1925 
1926 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1927 }
1928 
1929 void
1930 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1931 		      void *payload, uint64_t offset, uint64_t length,
1932 		      spdk_file_op_complete cb_fn, void *cb_arg)
1933 {
1934 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1935 }
1936 
1937 void
1938 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1939 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1940 		       spdk_file_op_complete cb_fn, void *cb_arg)
1941 {
1942 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1943 		      file->name, offset, length);
1944 
1945 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1946 }
1947 
1948 void
1949 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1950 		     void *payload, uint64_t offset, uint64_t length,
1951 		     spdk_file_op_complete cb_fn, void *cb_arg)
1952 {
1953 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1954 		      file->name, offset, length);
1955 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1956 }
1957 
1958 void
1959 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1960 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1961 		      spdk_file_op_complete cb_fn, void *cb_arg)
1962 {
1963 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1964 		      file->name, offset, length);
1965 
1966 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1967 }
1968 
1969 struct spdk_io_channel *
1970 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1971 {
1972 	struct spdk_io_channel *io_channel;
1973 	struct spdk_fs_channel *fs_channel;
1974 
1975 	io_channel = spdk_get_io_channel(&fs->io_target);
1976 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1977 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1978 	fs_channel->send_request = __send_request_direct;
1979 
1980 	return io_channel;
1981 }
1982 
1983 void
1984 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1985 {
1986 	spdk_put_io_channel(channel);
1987 }
1988 
1989 struct spdk_fs_thread_ctx *
1990 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1991 {
1992 	struct spdk_fs_thread_ctx *ctx;
1993 
1994 	ctx = calloc(1, sizeof(*ctx));
1995 	if (!ctx) {
1996 		return NULL;
1997 	}
1998 
1999 	fs_channel_create(fs, &ctx->ch, 512);
2000 
2001 	ctx->ch.send_request = fs->send_request;
2002 	ctx->ch.sync = 1;
2003 	pthread_spin_init(&ctx->ch.lock, 0);
2004 
2005 	return ctx;
2006 }
2007 
2008 
2009 void
2010 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
2011 {
2012 	assert(ctx->ch.sync == 1);
2013 
2014 	while (true) {
2015 		pthread_spin_lock(&ctx->ch.lock);
2016 		if (ctx->ch.outstanding_reqs == 0) {
2017 			pthread_spin_unlock(&ctx->ch.lock);
2018 			break;
2019 		}
2020 		pthread_spin_unlock(&ctx->ch.lock);
2021 		usleep(1000);
2022 	}
2023 
2024 	fs_channel_destroy(NULL, &ctx->ch);
2025 	free(ctx);
2026 }
2027 
2028 int
2029 spdk_fs_set_cache_size(uint64_t size_in_mb)
2030 {
2031 	/* setting g_fs_cache_size is only permitted if cache pool
2032 	 * is already freed or hasn't been initialized
2033 	 */
2034 	if (g_cache_pool != NULL) {
2035 		return -EPERM;
2036 	}
2037 
2038 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2039 
2040 	return 0;
2041 }
2042 
2043 uint64_t
2044 spdk_fs_get_cache_size(void)
2045 {
2046 	return g_fs_cache_size / (1024 * 1024);
2047 }
2048 
2049 static void __file_flush(void *ctx);
2050 
2051 /* Try to free some cache buffers from this file.
2052  */
2053 static int
2054 reclaim_cache_buffers(struct spdk_file *file)
2055 {
2056 	int rc;
2057 
2058 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2059 
2060 	/* The function is safe to be called with any threads, while the file
2061 	 * lock maybe locked by other thread for now, so try to get the file
2062 	 * lock here.
2063 	 */
2064 	rc = pthread_spin_trylock(&file->lock);
2065 	if (rc != 0) {
2066 		return -1;
2067 	}
2068 
2069 	if (file->tree->present_mask == 0) {
2070 		pthread_spin_unlock(&file->lock);
2071 		return -1;
2072 	}
2073 	tree_free_buffers(file->tree);
2074 
2075 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2076 	/* If not freed, put it in the end of the queue */
2077 	if (file->tree->present_mask != 0) {
2078 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2079 	} else {
2080 		file->last = NULL;
2081 	}
2082 	pthread_spin_unlock(&file->lock);
2083 
2084 	return 0;
2085 }
2086 
2087 static int
2088 _blobfs_cache_pool_reclaim(void *arg)
2089 {
2090 	struct spdk_file *file, *tmp;
2091 	int rc;
2092 
2093 	if (!blobfs_cache_pool_need_reclaim()) {
2094 		return SPDK_POLLER_IDLE;
2095 	}
2096 
2097 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2098 		if (!file->open_for_writing &&
2099 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2100 			rc = reclaim_cache_buffers(file);
2101 			if (rc < 0) {
2102 				continue;
2103 			}
2104 			if (!blobfs_cache_pool_need_reclaim()) {
2105 				return SPDK_POLLER_BUSY;
2106 			}
2107 			break;
2108 		}
2109 	}
2110 
2111 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2112 		if (!file->open_for_writing) {
2113 			rc = reclaim_cache_buffers(file);
2114 			if (rc < 0) {
2115 				continue;
2116 			}
2117 			if (!blobfs_cache_pool_need_reclaim()) {
2118 				return SPDK_POLLER_BUSY;
2119 			}
2120 			break;
2121 		}
2122 	}
2123 
2124 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2125 		rc = reclaim_cache_buffers(file);
2126 		if (rc < 0) {
2127 			continue;
2128 		}
2129 		break;
2130 	}
2131 
2132 	return SPDK_POLLER_BUSY;
2133 }
2134 
2135 static void
2136 _add_file_to_cache_pool(void *ctx)
2137 {
2138 	struct spdk_file *file = ctx;
2139 
2140 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2141 }
2142 
2143 static void
2144 _remove_file_from_cache_pool(void *ctx)
2145 {
2146 	struct spdk_file *file = ctx;
2147 
2148 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2149 }
2150 
2151 static struct cache_buffer *
2152 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2153 {
2154 	struct cache_buffer *buf;
2155 	int count = 0;
2156 	bool need_update = false;
2157 
2158 	buf = calloc(1, sizeof(*buf));
2159 	if (buf == NULL) {
2160 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
2161 		return NULL;
2162 	}
2163 
2164 	do {
2165 		buf->buf = spdk_mempool_get(g_cache_pool);
2166 		if (buf->buf) {
2167 			break;
2168 		}
2169 		if (count++ == 100) {
2170 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2171 				    file, offset);
2172 			free(buf);
2173 			return NULL;
2174 		}
2175 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2176 	} while (true);
2177 
2178 	buf->buf_size = CACHE_BUFFER_SIZE;
2179 	buf->offset = offset;
2180 
2181 	if (file->tree->present_mask == 0) {
2182 		need_update = true;
2183 	}
2184 	file->tree = tree_insert_buffer(file->tree, buf);
2185 
2186 	if (need_update) {
2187 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2188 	}
2189 
2190 	return buf;
2191 }
2192 
2193 static struct cache_buffer *
2194 cache_append_buffer(struct spdk_file *file)
2195 {
2196 	struct cache_buffer *last;
2197 
2198 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2199 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2200 
2201 	last = cache_insert_buffer(file, file->append_pos);
2202 	if (last == NULL) {
2203 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
2204 		return NULL;
2205 	}
2206 
2207 	file->last = last;
2208 
2209 	return last;
2210 }
2211 
2212 static void __check_sync_reqs(struct spdk_file *file);
2213 
2214 static void
2215 __file_cache_finish_sync(void *ctx, int bserrno)
2216 {
2217 	struct spdk_file *file;
2218 	struct spdk_fs_request *sync_req = ctx;
2219 	struct spdk_fs_cb_args *sync_args;
2220 
2221 	sync_args = &sync_req->args;
2222 	file = sync_args->file;
2223 	pthread_spin_lock(&file->lock);
2224 	file->length_xattr = sync_args->op.sync.length;
2225 	assert(sync_args->op.sync.offset <= file->length_flushed);
2226 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2227 			  0, file->trace_arg_name);
2228 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2229 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2230 	pthread_spin_unlock(&file->lock);
2231 
2232 	sync_args->fn.file_op(sync_args->arg, bserrno);
2233 
2234 	free_fs_request(sync_req);
2235 	__check_sync_reqs(file);
2236 }
2237 
2238 static void
2239 __check_sync_reqs(struct spdk_file *file)
2240 {
2241 	struct spdk_fs_request *sync_req;
2242 
2243 	pthread_spin_lock(&file->lock);
2244 
2245 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2246 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2247 			break;
2248 		}
2249 	}
2250 
2251 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2252 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2253 		sync_req->args.op.sync.xattr_in_progress = true;
2254 		sync_req->args.op.sync.length = file->length_flushed;
2255 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2256 				    sizeof(file->length_flushed));
2257 
2258 		pthread_spin_unlock(&file->lock);
2259 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2260 				  0, file->trace_arg_name);
2261 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2262 	} else {
2263 		pthread_spin_unlock(&file->lock);
2264 	}
2265 }
2266 
2267 static void
2268 __file_flush_done(void *ctx, int bserrno)
2269 {
2270 	struct spdk_fs_request *req = ctx;
2271 	struct spdk_fs_cb_args *args = &req->args;
2272 	struct spdk_file *file = args->file;
2273 	struct cache_buffer *next = args->op.flush.cache_buffer;
2274 
2275 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2276 
2277 	pthread_spin_lock(&file->lock);
2278 	next->in_progress = false;
2279 	next->bytes_flushed += args->op.flush.length;
2280 	file->length_flushed += args->op.flush.length;
2281 	if (file->length_flushed > file->length) {
2282 		file->length = file->length_flushed;
2283 	}
2284 	if (next->bytes_flushed == next->buf_size) {
2285 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2286 		next = tree_find_buffer(file->tree, file->length_flushed);
2287 	}
2288 
2289 	/*
2290 	 * Assert that there is no cached data that extends past the end of the underlying
2291 	 *  blob.
2292 	 */
2293 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2294 	       next->bytes_filled == 0);
2295 
2296 	pthread_spin_unlock(&file->lock);
2297 
2298 	__check_sync_reqs(file);
2299 
2300 	__file_flush(req);
2301 }
2302 
2303 static void
2304 __file_flush(void *ctx)
2305 {
2306 	struct spdk_fs_request *req = ctx;
2307 	struct spdk_fs_cb_args *args = &req->args;
2308 	struct spdk_file *file = args->file;
2309 	struct cache_buffer *next;
2310 	uint64_t offset, length, start_lba, num_lba;
2311 	uint32_t lba_size;
2312 
2313 	pthread_spin_lock(&file->lock);
2314 	next = tree_find_buffer(file->tree, file->length_flushed);
2315 	if (next == NULL || next->in_progress ||
2316 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2317 		/*
2318 		 * There is either no data to flush, a flush I/O is already in
2319 		 *  progress, or the next buffer is partially filled but there's no
2320 		 *  outstanding request to sync it.
2321 		 * So return immediately - if a flush I/O is in progress we will flush
2322 		 *  more data after that is completed, or a partial buffer will get flushed
2323 		 *  when it is either filled or the file is synced.
2324 		 */
2325 		free_fs_request(req);
2326 		if (next == NULL) {
2327 			/*
2328 			 * For cases where a file's cache was evicted, and then the
2329 			 *  file was later appended, we will write the data directly
2330 			 *  to disk and bypass cache.  So just update length_flushed
2331 			 *  here to reflect that all data was already written to disk.
2332 			 */
2333 			file->length_flushed = file->append_pos;
2334 		}
2335 		pthread_spin_unlock(&file->lock);
2336 		if (next == NULL) {
2337 			/*
2338 			 * There is no data to flush, but we still need to check for any
2339 			 *  outstanding sync requests to make sure metadata gets updated.
2340 			 */
2341 			__check_sync_reqs(file);
2342 		}
2343 		return;
2344 	}
2345 
2346 	offset = next->offset + next->bytes_flushed;
2347 	length = next->bytes_filled - next->bytes_flushed;
2348 	if (length == 0) {
2349 		free_fs_request(req);
2350 		pthread_spin_unlock(&file->lock);
2351 		/*
2352 		 * There is no data to flush, but we still need to check for any
2353 		 *  outstanding sync requests to make sure metadata gets updated.
2354 		 */
2355 		__check_sync_reqs(file);
2356 		return;
2357 	}
2358 	args->op.flush.length = length;
2359 	args->op.flush.cache_buffer = next;
2360 
2361 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2362 
2363 	next->in_progress = true;
2364 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2365 		     offset, length, start_lba, num_lba);
2366 	pthread_spin_unlock(&file->lock);
2367 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2368 			   next->buf + (start_lba * lba_size) - next->offset,
2369 			   start_lba, num_lba, __file_flush_done, req);
2370 }
2371 
2372 static void
2373 __file_extend_done(void *arg, int bserrno)
2374 {
2375 	struct spdk_fs_cb_args *args = arg;
2376 
2377 	__wake_caller(args, bserrno);
2378 }
2379 
2380 static void
2381 __file_extend_resize_cb(void *_args, int bserrno)
2382 {
2383 	struct spdk_fs_cb_args *args = _args;
2384 	struct spdk_file *file = args->file;
2385 
2386 	if (bserrno) {
2387 		__wake_caller(args, bserrno);
2388 		return;
2389 	}
2390 
2391 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2392 }
2393 
2394 static void
2395 __file_extend_blob(void *_args)
2396 {
2397 	struct spdk_fs_cb_args *args = _args;
2398 	struct spdk_file *file = args->file;
2399 
2400 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2401 }
2402 
2403 static void
2404 __rw_from_file_done(void *ctx, int bserrno)
2405 {
2406 	struct spdk_fs_request *req = ctx;
2407 
2408 	__wake_caller(&req->args, bserrno);
2409 	free_fs_request(req);
2410 }
2411 
2412 static void
2413 __rw_from_file(void *ctx)
2414 {
2415 	struct spdk_fs_request *req = ctx;
2416 	struct spdk_fs_cb_args *args = &req->args;
2417 	struct spdk_file *file = args->file;
2418 
2419 	if (args->op.rw.is_read) {
2420 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2421 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2422 				     __rw_from_file_done, req);
2423 	} else {
2424 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2425 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2426 				      __rw_from_file_done, req);
2427 	}
2428 }
2429 
2430 static int
2431 __send_rw_from_file(struct spdk_file *file, void *payload,
2432 		    uint64_t offset, uint64_t length, bool is_read,
2433 		    struct spdk_fs_channel *channel)
2434 {
2435 	struct spdk_fs_request *req;
2436 	struct spdk_fs_cb_args *args;
2437 
2438 	req = alloc_fs_request_with_iov(channel, 1);
2439 	if (req == NULL) {
2440 		sem_post(&channel->sem);
2441 		return -ENOMEM;
2442 	}
2443 
2444 	args = &req->args;
2445 	args->file = file;
2446 	args->sem = &channel->sem;
2447 	args->iovs[0].iov_base = payload;
2448 	args->iovs[0].iov_len = (size_t)length;
2449 	args->op.rw.offset = offset;
2450 	args->op.rw.is_read = is_read;
2451 	file->fs->send_request(__rw_from_file, req);
2452 	return 0;
2453 }
2454 
2455 int
2456 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2457 		void *payload, uint64_t offset, uint64_t length)
2458 {
2459 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2460 	struct spdk_fs_request *flush_req;
2461 	uint64_t rem_length, copy, blob_size, cluster_sz;
2462 	uint32_t cache_buffers_filled = 0;
2463 	uint8_t *cur_payload;
2464 	struct cache_buffer *last;
2465 
2466 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2467 
2468 	if (length == 0) {
2469 		return 0;
2470 	}
2471 
2472 	if (offset != file->append_pos) {
2473 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2474 		return -EINVAL;
2475 	}
2476 
2477 	pthread_spin_lock(&file->lock);
2478 	file->open_for_writing = true;
2479 
2480 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2481 		cache_append_buffer(file);
2482 	}
2483 
2484 	if (file->last == NULL) {
2485 		int rc;
2486 
2487 		file->append_pos += length;
2488 		pthread_spin_unlock(&file->lock);
2489 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2490 		sem_wait(&channel->sem);
2491 		return rc;
2492 	}
2493 
2494 	blob_size = __file_get_blob_size(file);
2495 
2496 	if ((offset + length) > blob_size) {
2497 		struct spdk_fs_cb_args extend_args = {};
2498 
2499 		cluster_sz = file->fs->bs_opts.cluster_sz;
2500 		extend_args.sem = &channel->sem;
2501 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2502 		extend_args.file = file;
2503 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2504 		pthread_spin_unlock(&file->lock);
2505 		file->fs->send_request(__file_extend_blob, &extend_args);
2506 		sem_wait(&channel->sem);
2507 		if (extend_args.rc) {
2508 			return extend_args.rc;
2509 		}
2510 	}
2511 
2512 	flush_req = alloc_fs_request(channel);
2513 	if (flush_req == NULL) {
2514 		pthread_spin_unlock(&file->lock);
2515 		return -ENOMEM;
2516 	}
2517 
2518 	last = file->last;
2519 	rem_length = length;
2520 	cur_payload = payload;
2521 	while (rem_length > 0) {
2522 		copy = last->buf_size - last->bytes_filled;
2523 		if (copy > rem_length) {
2524 			copy = rem_length;
2525 		}
2526 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2527 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2528 		file->append_pos += copy;
2529 		if (file->length < file->append_pos) {
2530 			file->length = file->append_pos;
2531 		}
2532 		cur_payload += copy;
2533 		last->bytes_filled += copy;
2534 		rem_length -= copy;
2535 		if (last->bytes_filled == last->buf_size) {
2536 			cache_buffers_filled++;
2537 			last = cache_append_buffer(file);
2538 			if (last == NULL) {
2539 				BLOBFS_TRACE(file, "nomem\n");
2540 				free_fs_request(flush_req);
2541 				pthread_spin_unlock(&file->lock);
2542 				return -ENOMEM;
2543 			}
2544 		}
2545 	}
2546 
2547 	pthread_spin_unlock(&file->lock);
2548 
2549 	if (cache_buffers_filled == 0) {
2550 		free_fs_request(flush_req);
2551 		return 0;
2552 	}
2553 
2554 	flush_req->args.file = file;
2555 	file->fs->send_request(__file_flush, flush_req);
2556 	return 0;
2557 }
2558 
2559 static void
2560 __readahead_done(void *ctx, int bserrno)
2561 {
2562 	struct spdk_fs_request *req = ctx;
2563 	struct spdk_fs_cb_args *args = &req->args;
2564 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2565 	struct spdk_file *file = args->file;
2566 
2567 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2568 
2569 	pthread_spin_lock(&file->lock);
2570 	cache_buffer->bytes_filled = args->op.readahead.length;
2571 	cache_buffer->bytes_flushed = args->op.readahead.length;
2572 	cache_buffer->in_progress = false;
2573 	pthread_spin_unlock(&file->lock);
2574 
2575 	free_fs_request(req);
2576 }
2577 
2578 static void
2579 __readahead(void *ctx)
2580 {
2581 	struct spdk_fs_request *req = ctx;
2582 	struct spdk_fs_cb_args *args = &req->args;
2583 	struct spdk_file *file = args->file;
2584 	uint64_t offset, length, start_lba, num_lba;
2585 	uint32_t lba_size;
2586 
2587 	offset = args->op.readahead.offset;
2588 	length = args->op.readahead.length;
2589 	assert(length > 0);
2590 
2591 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2592 
2593 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2594 		     offset, length, start_lba, num_lba);
2595 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2596 			  args->op.readahead.cache_buffer->buf,
2597 			  start_lba, num_lba, __readahead_done, req);
2598 }
2599 
2600 static uint64_t
2601 __next_cache_buffer_offset(uint64_t offset)
2602 {
2603 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2604 }
2605 
2606 static void
2607 check_readahead(struct spdk_file *file, uint64_t offset,
2608 		struct spdk_fs_channel *channel)
2609 {
2610 	struct spdk_fs_request *req;
2611 	struct spdk_fs_cb_args *args;
2612 
2613 	offset = __next_cache_buffer_offset(offset);
2614 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2615 		return;
2616 	}
2617 
2618 	req = alloc_fs_request(channel);
2619 	if (req == NULL) {
2620 		return;
2621 	}
2622 	args = &req->args;
2623 
2624 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2625 
2626 	args->file = file;
2627 	args->op.readahead.offset = offset;
2628 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2629 	if (!args->op.readahead.cache_buffer) {
2630 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2631 		free_fs_request(req);
2632 		return;
2633 	}
2634 
2635 	args->op.readahead.cache_buffer->in_progress = true;
2636 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2637 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2638 	} else {
2639 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2640 	}
2641 	file->fs->send_request(__readahead, req);
2642 }
2643 
2644 int64_t
2645 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2646 	       void *payload, uint64_t offset, uint64_t length)
2647 {
2648 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2649 	uint64_t final_offset, final_length;
2650 	uint32_t sub_reads = 0;
2651 	struct cache_buffer *buf;
2652 	uint64_t read_len;
2653 	int rc = 0;
2654 
2655 	pthread_spin_lock(&file->lock);
2656 
2657 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2658 
2659 	file->open_for_writing = false;
2660 
2661 	if (length == 0 || offset >= file->append_pos) {
2662 		pthread_spin_unlock(&file->lock);
2663 		return 0;
2664 	}
2665 
2666 	if (offset + length > file->append_pos) {
2667 		length = file->append_pos - offset;
2668 	}
2669 
2670 	if (offset != file->next_seq_offset) {
2671 		file->seq_byte_count = 0;
2672 	}
2673 	file->seq_byte_count += length;
2674 	file->next_seq_offset = offset + length;
2675 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2676 		check_readahead(file, offset, channel);
2677 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2678 	}
2679 
2680 	final_length = 0;
2681 	final_offset = offset + length;
2682 	while (offset < final_offset) {
2683 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2684 		if (length > (final_offset - offset)) {
2685 			length = final_offset - offset;
2686 		}
2687 
2688 		buf = tree_find_filled_buffer(file->tree, offset);
2689 		if (buf == NULL) {
2690 			pthread_spin_unlock(&file->lock);
2691 			rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2692 			pthread_spin_lock(&file->lock);
2693 			if (rc == 0) {
2694 				sub_reads++;
2695 			}
2696 		} else {
2697 			read_len = length;
2698 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2699 				read_len = buf->offset + buf->bytes_filled - offset;
2700 			}
2701 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2702 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2703 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2704 				tree_remove_buffer(file->tree, buf);
2705 				if (file->tree->present_mask == 0) {
2706 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2707 				}
2708 			}
2709 		}
2710 
2711 		if (rc == 0) {
2712 			final_length += length;
2713 		} else {
2714 			break;
2715 		}
2716 		payload += length;
2717 		offset += length;
2718 	}
2719 	pthread_spin_unlock(&file->lock);
2720 	while (sub_reads > 0) {
2721 		sem_wait(&channel->sem);
2722 		sub_reads--;
2723 	}
2724 	if (rc == 0) {
2725 		return final_length;
2726 	} else {
2727 		return rc;
2728 	}
2729 }
2730 
2731 static void
2732 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2733 	   spdk_file_op_complete cb_fn, void *cb_arg)
2734 {
2735 	struct spdk_fs_request *sync_req;
2736 	struct spdk_fs_request *flush_req;
2737 	struct spdk_fs_cb_args *sync_args;
2738 	struct spdk_fs_cb_args *flush_args;
2739 
2740 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2741 
2742 	pthread_spin_lock(&file->lock);
2743 	if (file->append_pos <= file->length_xattr) {
2744 		BLOBFS_TRACE(file, "done - file already synced\n");
2745 		pthread_spin_unlock(&file->lock);
2746 		cb_fn(cb_arg, 0);
2747 		return;
2748 	}
2749 
2750 	sync_req = alloc_fs_request(channel);
2751 	if (!sync_req) {
2752 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2753 		pthread_spin_unlock(&file->lock);
2754 		cb_fn(cb_arg, -ENOMEM);
2755 		return;
2756 	}
2757 	sync_args = &sync_req->args;
2758 
2759 	flush_req = alloc_fs_request(channel);
2760 	if (!flush_req) {
2761 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2762 		free_fs_request(sync_req);
2763 		pthread_spin_unlock(&file->lock);
2764 		cb_fn(cb_arg, -ENOMEM);
2765 		return;
2766 	}
2767 	flush_args = &flush_req->args;
2768 
2769 	sync_args->file = file;
2770 	sync_args->fn.file_op = cb_fn;
2771 	sync_args->arg = cb_arg;
2772 	sync_args->op.sync.offset = file->append_pos;
2773 	sync_args->op.sync.xattr_in_progress = false;
2774 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2775 	pthread_spin_unlock(&file->lock);
2776 
2777 	flush_args->file = file;
2778 	channel->send_request(__file_flush, flush_req);
2779 }
2780 
2781 int
2782 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2783 {
2784 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2785 	struct spdk_fs_cb_args args = {};
2786 
2787 	args.sem = &channel->sem;
2788 	_file_sync(file, channel, __wake_caller, &args);
2789 	sem_wait(&channel->sem);
2790 
2791 	return args.rc;
2792 }
2793 
2794 void
2795 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2796 		     spdk_file_op_complete cb_fn, void *cb_arg)
2797 {
2798 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2799 
2800 	_file_sync(file, channel, cb_fn, cb_arg);
2801 }
2802 
2803 void
2804 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2805 {
2806 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2807 	file->priority = priority;
2808 
2809 }
2810 
2811 /*
2812  * Close routines
2813  */
2814 
2815 static void
2816 __file_close_async_done(void *ctx, int bserrno)
2817 {
2818 	struct spdk_fs_request *req = ctx;
2819 	struct spdk_fs_cb_args *args = &req->args;
2820 	struct spdk_file *file = args->file;
2821 
2822 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2823 
2824 	if (file->is_deleted) {
2825 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2826 		return;
2827 	}
2828 
2829 	args->fn.file_op(args->arg, bserrno);
2830 	free_fs_request(req);
2831 }
2832 
2833 static void
2834 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2835 {
2836 	struct spdk_blob *blob;
2837 
2838 	pthread_spin_lock(&file->lock);
2839 	if (file->ref_count == 0) {
2840 		pthread_spin_unlock(&file->lock);
2841 		__file_close_async_done(req, -EBADF);
2842 		return;
2843 	}
2844 
2845 	file->ref_count--;
2846 	if (file->ref_count > 0) {
2847 		pthread_spin_unlock(&file->lock);
2848 		req->args.fn.file_op(req->args.arg, 0);
2849 		free_fs_request(req);
2850 		return;
2851 	}
2852 
2853 	pthread_spin_unlock(&file->lock);
2854 
2855 	blob = file->blob;
2856 	file->blob = NULL;
2857 	spdk_blob_close(blob, __file_close_async_done, req);
2858 }
2859 
2860 static void
2861 __file_close_async__sync_done(void *arg, int fserrno)
2862 {
2863 	struct spdk_fs_request *req = arg;
2864 	struct spdk_fs_cb_args *args = &req->args;
2865 
2866 	__file_close_async(args->file, req);
2867 }
2868 
2869 void
2870 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2871 {
2872 	struct spdk_fs_request *req;
2873 	struct spdk_fs_cb_args *args;
2874 
2875 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2876 	if (req == NULL) {
2877 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2878 		cb_fn(cb_arg, -ENOMEM);
2879 		return;
2880 	}
2881 
2882 	args = &req->args;
2883 	args->file = file;
2884 	args->fn.file_op = cb_fn;
2885 	args->arg = cb_arg;
2886 
2887 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2888 }
2889 
2890 static void
2891 __file_close(void *arg)
2892 {
2893 	struct spdk_fs_request *req = arg;
2894 	struct spdk_fs_cb_args *args = &req->args;
2895 	struct spdk_file *file = args->file;
2896 
2897 	__file_close_async(file, req);
2898 }
2899 
2900 int
2901 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2902 {
2903 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2904 	struct spdk_fs_request *req;
2905 	struct spdk_fs_cb_args *args;
2906 
2907 	req = alloc_fs_request(channel);
2908 	if (req == NULL) {
2909 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2910 		return -ENOMEM;
2911 	}
2912 
2913 	args = &req->args;
2914 
2915 	spdk_file_sync(file, ctx);
2916 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2917 	args->file = file;
2918 	args->sem = &channel->sem;
2919 	args->fn.file_op = __wake_caller;
2920 	args->arg = args;
2921 	channel->send_request(__file_close, req);
2922 	sem_wait(&channel->sem);
2923 
2924 	return args->rc;
2925 }
2926 
2927 int
2928 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2929 {
2930 	if (size < sizeof(spdk_blob_id)) {
2931 		return -EINVAL;
2932 	}
2933 
2934 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2935 
2936 	return sizeof(spdk_blob_id);
2937 }
2938 
2939 static void
2940 _file_free(void *ctx)
2941 {
2942 	struct spdk_file *file = ctx;
2943 
2944 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2945 
2946 	free(file->name);
2947 	free(file->tree);
2948 	free(file);
2949 }
2950 
2951 static void
2952 file_free(struct spdk_file *file)
2953 {
2954 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2955 	pthread_spin_lock(&file->lock);
2956 	if (file->tree->present_mask == 0) {
2957 		pthread_spin_unlock(&file->lock);
2958 		free(file->name);
2959 		free(file->tree);
2960 		free(file);
2961 		return;
2962 	}
2963 
2964 	tree_free_buffers(file->tree);
2965 	assert(file->tree->present_mask == 0);
2966 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2967 	pthread_spin_unlock(&file->lock);
2968 }
2969 
2970 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2971 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2972