xref: /spdk/lib/blobfs/blobfs.c (revision 2f5c602574a98ede645991abe279a96e19c50196)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "tree.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk/log.h"
45 #include "spdk/trace.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
55 
56 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
57 
58 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
59 static struct spdk_mempool *g_cache_pool;
60 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches);
61 static struct spdk_poller *g_cache_pool_mgmt_poller;
62 static struct spdk_thread *g_cache_pool_thread;
63 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
64 static int g_fs_count = 0;
65 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
66 
67 #define TRACE_GROUP_BLOBFS	0x7
68 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
69 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
70 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
71 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
72 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
73 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
74 
75 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
76 {
77 	struct spdk_trace_tpoint_opts opts[] = {
78 		{
79 			"BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START,
80 			OWNER_NONE, OBJECT_NONE, 0,
81 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
82 		},
83 		{
84 			"BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END,
85 			OWNER_NONE, OBJECT_NONE, 0,
86 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
87 		},
88 		{
89 			"BLOBFS_OPEN", TRACE_BLOBFS_OPEN,
90 			OWNER_NONE, OBJECT_NONE, 0,
91 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
92 		},
93 		{
94 			"BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE,
95 			OWNER_NONE, OBJECT_NONE, 0,
96 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
97 		},
98 		{
99 			"BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START,
100 			OWNER_NONE, OBJECT_NONE, 0,
101 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
102 		},
103 		{
104 			"BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE,
105 			OWNER_NONE, OBJECT_NONE, 0,
106 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
107 		}
108 	};
109 
110 	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
111 }
112 
113 void
114 cache_buffer_free(struct cache_buffer *cache_buffer)
115 {
116 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
117 	free(cache_buffer);
118 }
119 
120 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
121 
122 struct spdk_file {
123 	struct spdk_filesystem	*fs;
124 	struct spdk_blob	*blob;
125 	char			*name;
126 	uint64_t		length;
127 	bool                    is_deleted;
128 	bool			open_for_writing;
129 	uint64_t		length_flushed;
130 	uint64_t		length_xattr;
131 	uint64_t		append_pos;
132 	uint64_t		seq_byte_count;
133 	uint64_t		next_seq_offset;
134 	uint32_t		priority;
135 	TAILQ_ENTRY(spdk_file)	tailq;
136 	spdk_blob_id		blobid;
137 	uint32_t		ref_count;
138 	pthread_spinlock_t	lock;
139 	struct cache_buffer	*last;
140 	struct cache_tree	*tree;
141 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
142 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
143 	TAILQ_ENTRY(spdk_file)	cache_tailq;
144 };
145 
146 struct spdk_deleted_file {
147 	spdk_blob_id	id;
148 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
149 };
150 
151 struct spdk_filesystem {
152 	struct spdk_blob_store	*bs;
153 	TAILQ_HEAD(, spdk_file)	files;
154 	struct spdk_bs_opts	bs_opts;
155 	struct spdk_bs_dev	*bdev;
156 	fs_send_request_fn	send_request;
157 
158 	struct {
159 		uint32_t		max_ops;
160 		struct spdk_io_channel	*sync_io_channel;
161 		struct spdk_fs_channel	*sync_fs_channel;
162 	} sync_target;
163 
164 	struct {
165 		uint32_t		max_ops;
166 		struct spdk_io_channel	*md_io_channel;
167 		struct spdk_fs_channel	*md_fs_channel;
168 	} md_target;
169 
170 	struct {
171 		uint32_t		max_ops;
172 	} io_target;
173 };
174 
175 struct spdk_fs_cb_args {
176 	union {
177 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
178 		spdk_fs_op_complete			fs_op;
179 		spdk_file_op_with_handle_complete	file_op_with_handle;
180 		spdk_file_op_complete			file_op;
181 		spdk_file_stat_op_complete		stat_op;
182 	} fn;
183 	void *arg;
184 	sem_t *sem;
185 	struct spdk_filesystem *fs;
186 	struct spdk_file *file;
187 	int rc;
188 	int *rwerrno;
189 	struct iovec *iovs;
190 	uint32_t iovcnt;
191 	struct iovec iov;
192 	union {
193 		struct {
194 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
195 		} fs_load;
196 		struct {
197 			uint64_t	length;
198 		} truncate;
199 		struct {
200 			struct spdk_io_channel	*channel;
201 			void		*pin_buf;
202 			int		is_read;
203 			off_t		offset;
204 			size_t		length;
205 			uint64_t	start_lba;
206 			uint64_t	num_lba;
207 			uint32_t	blocklen;
208 		} rw;
209 		struct {
210 			const char	*old_name;
211 			const char	*new_name;
212 		} rename;
213 		struct {
214 			struct cache_buffer	*cache_buffer;
215 			uint64_t		length;
216 		} flush;
217 		struct {
218 			struct cache_buffer	*cache_buffer;
219 			uint64_t		length;
220 			uint64_t		offset;
221 		} readahead;
222 		struct {
223 			/* offset of the file when the sync request was made */
224 			uint64_t			offset;
225 			TAILQ_ENTRY(spdk_fs_request)	tailq;
226 			bool				xattr_in_progress;
227 			/* length written to the xattr for this file - this should
228 			 * always be the same as the offset if only one thread is
229 			 * writing to the file, but could differ if multiple threads
230 			 * are appending
231 			 */
232 			uint64_t			length;
233 		} sync;
234 		struct {
235 			uint32_t			num_clusters;
236 		} resize;
237 		struct {
238 			const char	*name;
239 			uint32_t	flags;
240 			TAILQ_ENTRY(spdk_fs_request)	tailq;
241 		} open;
242 		struct {
243 			const char		*name;
244 			struct spdk_blob	*blob;
245 		} create;
246 		struct {
247 			const char	*name;
248 		} delete;
249 		struct {
250 			const char	*name;
251 		} stat;
252 	} op;
253 };
254 
255 static void file_free(struct spdk_file *file);
256 static void fs_io_device_unregister(struct spdk_filesystem *fs);
257 static void fs_free_io_channels(struct spdk_filesystem *fs);
258 
259 void
260 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
261 {
262 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
263 }
264 
265 static int _blobfs_cache_pool_reclaim(void *arg);
266 
267 static bool
268 blobfs_cache_pool_need_reclaim(void)
269 {
270 	size_t count;
271 
272 	count = spdk_mempool_count(g_cache_pool);
273 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
274 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
275 	 */
276 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
277 		return false;
278 	}
279 
280 	return true;
281 }
282 
283 static void
284 __start_cache_pool_mgmt(void *ctx)
285 {
286 	assert(g_cache_pool == NULL);
287 
288 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
289 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
290 					   CACHE_BUFFER_SIZE,
291 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
292 					   SPDK_ENV_SOCKET_ID_ANY);
293 	if (!g_cache_pool) {
294 		SPDK_ERRLOG("Create mempool failed, you may "
295 			    "increase the memory and try again\n");
296 		assert(false);
297 	}
298 
299 	assert(g_cache_pool_mgmt_poller == NULL);
300 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
301 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
302 }
303 
304 static void
305 __stop_cache_pool_mgmt(void *ctx)
306 {
307 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
308 
309 	assert(g_cache_pool != NULL);
310 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
311 	spdk_mempool_free(g_cache_pool);
312 	g_cache_pool = NULL;
313 
314 	spdk_thread_exit(g_cache_pool_thread);
315 }
316 
317 static void
318 initialize_global_cache(void)
319 {
320 	pthread_mutex_lock(&g_cache_init_lock);
321 	if (g_fs_count == 0) {
322 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
323 		assert(g_cache_pool_thread != NULL);
324 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
325 	}
326 	g_fs_count++;
327 	pthread_mutex_unlock(&g_cache_init_lock);
328 }
329 
330 static void
331 free_global_cache(void)
332 {
333 	pthread_mutex_lock(&g_cache_init_lock);
334 	g_fs_count--;
335 	if (g_fs_count == 0) {
336 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
337 	}
338 	pthread_mutex_unlock(&g_cache_init_lock);
339 }
340 
341 static uint64_t
342 __file_get_blob_size(struct spdk_file *file)
343 {
344 	uint64_t cluster_sz;
345 
346 	cluster_sz = file->fs->bs_opts.cluster_sz;
347 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
348 }
349 
350 struct spdk_fs_request {
351 	struct spdk_fs_cb_args		args;
352 	TAILQ_ENTRY(spdk_fs_request)	link;
353 	struct spdk_fs_channel		*channel;
354 };
355 
356 struct spdk_fs_channel {
357 	struct spdk_fs_request		*req_mem;
358 	TAILQ_HEAD(, spdk_fs_request)	reqs;
359 	sem_t				sem;
360 	struct spdk_filesystem		*fs;
361 	struct spdk_io_channel		*bs_channel;
362 	fs_send_request_fn		send_request;
363 	bool				sync;
364 	uint32_t			outstanding_reqs;
365 	pthread_spinlock_t		lock;
366 };
367 
368 /* For now, this is effectively an alias. But eventually we'll shift
369  * some data members over. */
370 struct spdk_fs_thread_ctx {
371 	struct spdk_fs_channel	ch;
372 };
373 
374 static struct spdk_fs_request *
375 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
376 {
377 	struct spdk_fs_request *req;
378 	struct iovec *iovs = NULL;
379 
380 	if (iovcnt > 1) {
381 		iovs = calloc(iovcnt, sizeof(struct iovec));
382 		if (!iovs) {
383 			return NULL;
384 		}
385 	}
386 
387 	if (channel->sync) {
388 		pthread_spin_lock(&channel->lock);
389 	}
390 
391 	req = TAILQ_FIRST(&channel->reqs);
392 	if (req) {
393 		channel->outstanding_reqs++;
394 		TAILQ_REMOVE(&channel->reqs, req, link);
395 	}
396 
397 	if (channel->sync) {
398 		pthread_spin_unlock(&channel->lock);
399 	}
400 
401 	if (req == NULL) {
402 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
403 		free(iovs);
404 		return NULL;
405 	}
406 	memset(req, 0, sizeof(*req));
407 	req->channel = channel;
408 	if (iovcnt > 1) {
409 		req->args.iovs = iovs;
410 	} else {
411 		req->args.iovs = &req->args.iov;
412 	}
413 	req->args.iovcnt = iovcnt;
414 
415 	return req;
416 }
417 
418 static struct spdk_fs_request *
419 alloc_fs_request(struct spdk_fs_channel *channel)
420 {
421 	return alloc_fs_request_with_iov(channel, 0);
422 }
423 
424 static void
425 free_fs_request(struct spdk_fs_request *req)
426 {
427 	struct spdk_fs_channel *channel = req->channel;
428 
429 	if (req->args.iovcnt > 1) {
430 		free(req->args.iovs);
431 	}
432 
433 	if (channel->sync) {
434 		pthread_spin_lock(&channel->lock);
435 	}
436 
437 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
438 	channel->outstanding_reqs--;
439 
440 	if (channel->sync) {
441 		pthread_spin_unlock(&channel->lock);
442 	}
443 }
444 
445 static int
446 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
447 		  uint32_t max_ops)
448 {
449 	uint32_t i;
450 
451 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
452 	if (!channel->req_mem) {
453 		return -1;
454 	}
455 
456 	channel->outstanding_reqs = 0;
457 	TAILQ_INIT(&channel->reqs);
458 	sem_init(&channel->sem, 0, 0);
459 
460 	for (i = 0; i < max_ops; i++) {
461 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
462 	}
463 
464 	channel->fs = fs;
465 
466 	return 0;
467 }
468 
469 static int
470 fs_md_channel_create(void *io_device, void *ctx_buf)
471 {
472 	struct spdk_filesystem		*fs;
473 	struct spdk_fs_channel		*channel = ctx_buf;
474 
475 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
476 
477 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
478 }
479 
480 static int
481 fs_sync_channel_create(void *io_device, void *ctx_buf)
482 {
483 	struct spdk_filesystem		*fs;
484 	struct spdk_fs_channel		*channel = ctx_buf;
485 
486 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
487 
488 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
489 }
490 
491 static int
492 fs_io_channel_create(void *io_device, void *ctx_buf)
493 {
494 	struct spdk_filesystem		*fs;
495 	struct spdk_fs_channel		*channel = ctx_buf;
496 
497 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
498 
499 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
500 }
501 
502 static void
503 fs_channel_destroy(void *io_device, void *ctx_buf)
504 {
505 	struct spdk_fs_channel *channel = ctx_buf;
506 
507 	if (channel->outstanding_reqs > 0) {
508 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
509 			    channel->outstanding_reqs);
510 	}
511 
512 	free(channel->req_mem);
513 	if (channel->bs_channel != NULL) {
514 		spdk_bs_free_io_channel(channel->bs_channel);
515 	}
516 }
517 
518 static void
519 __send_request_direct(fs_request_fn fn, void *arg)
520 {
521 	fn(arg);
522 }
523 
524 static void
525 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
526 {
527 	fs->bs = bs;
528 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
529 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
530 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
531 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
532 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
533 
534 	initialize_global_cache();
535 }
536 
537 static void
538 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
539 {
540 	struct spdk_fs_request *req = ctx;
541 	struct spdk_fs_cb_args *args = &req->args;
542 	struct spdk_filesystem *fs = args->fs;
543 
544 	if (bserrno == 0) {
545 		common_fs_bs_init(fs, bs);
546 	} else {
547 		free(fs);
548 		fs = NULL;
549 	}
550 
551 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
552 	free_fs_request(req);
553 }
554 
555 static struct spdk_filesystem *
556 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
557 {
558 	struct spdk_filesystem *fs;
559 
560 	fs = calloc(1, sizeof(*fs));
561 	if (fs == NULL) {
562 		return NULL;
563 	}
564 
565 	fs->bdev = dev;
566 	fs->send_request = send_request_fn;
567 	TAILQ_INIT(&fs->files);
568 
569 	fs->md_target.max_ops = 512;
570 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
571 				sizeof(struct spdk_fs_channel), "blobfs_md");
572 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
573 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
574 
575 	fs->sync_target.max_ops = 512;
576 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
577 				sizeof(struct spdk_fs_channel), "blobfs_sync");
578 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
579 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
580 
581 	fs->io_target.max_ops = 512;
582 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
583 				sizeof(struct spdk_fs_channel), "blobfs_io");
584 
585 	return fs;
586 }
587 
588 static void
589 __wake_caller(void *arg, int fserrno)
590 {
591 	struct spdk_fs_cb_args *args = arg;
592 
593 	if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) {
594 		*(args->rwerrno) = fserrno;
595 	}
596 	args->rc = fserrno;
597 	sem_post(args->sem);
598 }
599 
600 void
601 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
602 	     fs_send_request_fn send_request_fn,
603 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
604 {
605 	struct spdk_filesystem *fs;
606 	struct spdk_fs_request *req;
607 	struct spdk_fs_cb_args *args;
608 	struct spdk_bs_opts opts = {};
609 
610 	fs = fs_alloc(dev, send_request_fn);
611 	if (fs == NULL) {
612 		cb_fn(cb_arg, NULL, -ENOMEM);
613 		return;
614 	}
615 
616 	req = alloc_fs_request(fs->md_target.md_fs_channel);
617 	if (req == NULL) {
618 		fs_free_io_channels(fs);
619 		fs_io_device_unregister(fs);
620 		cb_fn(cb_arg, NULL, -ENOMEM);
621 		return;
622 	}
623 
624 	args = &req->args;
625 	args->fn.fs_op_with_handle = cb_fn;
626 	args->arg = cb_arg;
627 	args->fs = fs;
628 
629 	spdk_bs_opts_init(&opts, sizeof(opts));
630 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
631 	if (opt) {
632 		opts.cluster_sz = opt->cluster_sz;
633 	}
634 	spdk_bs_init(dev, &opts, init_cb, req);
635 }
636 
637 static struct spdk_file *
638 file_alloc(struct spdk_filesystem *fs)
639 {
640 	struct spdk_file *file;
641 
642 	file = calloc(1, sizeof(*file));
643 	if (file == NULL) {
644 		return NULL;
645 	}
646 
647 	file->tree = calloc(1, sizeof(*file->tree));
648 	if (file->tree == NULL) {
649 		free(file);
650 		return NULL;
651 	}
652 
653 	if (pthread_spin_init(&file->lock, 0)) {
654 		free(file->tree);
655 		free(file);
656 		return NULL;
657 	}
658 
659 	file->fs = fs;
660 	TAILQ_INIT(&file->open_requests);
661 	TAILQ_INIT(&file->sync_requests);
662 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
663 	file->priority = SPDK_FILE_PRIORITY_LOW;
664 	return file;
665 }
666 
667 static void fs_load_done(void *ctx, int bserrno);
668 
669 static int
670 _handle_deleted_files(struct spdk_fs_request *req)
671 {
672 	struct spdk_fs_cb_args *args = &req->args;
673 	struct spdk_filesystem *fs = args->fs;
674 
675 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
676 		struct spdk_deleted_file *deleted_file;
677 
678 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
679 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
680 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
681 		free(deleted_file);
682 		return 0;
683 	}
684 
685 	return 1;
686 }
687 
688 static void
689 fs_load_done(void *ctx, int bserrno)
690 {
691 	struct spdk_fs_request *req = ctx;
692 	struct spdk_fs_cb_args *args = &req->args;
693 	struct spdk_filesystem *fs = args->fs;
694 
695 	/* The filesystem has been loaded.  Now check if there are any files that
696 	 *  were marked for deletion before last unload.  Do not complete the
697 	 *  fs_load callback until all of them have been deleted on disk.
698 	 */
699 	if (_handle_deleted_files(req) == 0) {
700 		/* We found a file that's been marked for deleting but not actually
701 		 *  deleted yet.  This function will get called again once the delete
702 		 *  operation is completed.
703 		 */
704 		return;
705 	}
706 
707 	args->fn.fs_op_with_handle(args->arg, fs, 0);
708 	free_fs_request(req);
709 
710 }
711 
712 static void
713 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
714 {
715 	struct spdk_fs_request *req = ctx;
716 	struct spdk_fs_cb_args *args = &req->args;
717 	struct spdk_filesystem *fs = args->fs;
718 	uint64_t *length;
719 	const char *name;
720 	uint32_t *is_deleted;
721 	size_t value_len;
722 
723 	if (rc < 0) {
724 		args->fn.fs_op_with_handle(args->arg, fs, rc);
725 		free_fs_request(req);
726 		return;
727 	}
728 
729 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
730 	if (rc < 0) {
731 		args->fn.fs_op_with_handle(args->arg, fs, rc);
732 		free_fs_request(req);
733 		return;
734 	}
735 
736 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
737 	if (rc < 0) {
738 		args->fn.fs_op_with_handle(args->arg, fs, rc);
739 		free_fs_request(req);
740 		return;
741 	}
742 
743 	assert(value_len == 8);
744 
745 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
746 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
747 	if (rc < 0) {
748 		struct spdk_file *f;
749 
750 		f = file_alloc(fs);
751 		if (f == NULL) {
752 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
753 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
754 			free_fs_request(req);
755 			return;
756 		}
757 
758 		f->name = strdup(name);
759 		f->blobid = spdk_blob_get_id(blob);
760 		f->length = *length;
761 		f->length_flushed = *length;
762 		f->length_xattr = *length;
763 		f->append_pos = *length;
764 		SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length);
765 	} else {
766 		struct spdk_deleted_file *deleted_file;
767 
768 		deleted_file = calloc(1, sizeof(*deleted_file));
769 		if (deleted_file == NULL) {
770 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
771 			free_fs_request(req);
772 			return;
773 		}
774 		deleted_file->id = spdk_blob_get_id(blob);
775 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
776 	}
777 }
778 
779 static void
780 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
781 {
782 	struct spdk_fs_request *req = ctx;
783 	struct spdk_fs_cb_args *args = &req->args;
784 	struct spdk_filesystem *fs = args->fs;
785 	struct spdk_bs_type bstype;
786 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
787 	static const struct spdk_bs_type zeros;
788 
789 	if (bserrno != 0) {
790 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
791 		free_fs_request(req);
792 		fs_free_io_channels(fs);
793 		fs_io_device_unregister(fs);
794 		return;
795 	}
796 
797 	bstype = spdk_bs_get_bstype(bs);
798 
799 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
800 		SPDK_DEBUGLOG(blobfs, "assigning bstype\n");
801 		spdk_bs_set_bstype(bs, blobfs_type);
802 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
803 		SPDK_ERRLOG("not blobfs\n");
804 		SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype));
805 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
806 		free_fs_request(req);
807 		fs_free_io_channels(fs);
808 		fs_io_device_unregister(fs);
809 		return;
810 	}
811 
812 	common_fs_bs_init(fs, bs);
813 	fs_load_done(req, 0);
814 }
815 
816 static void
817 fs_io_device_unregister(struct spdk_filesystem *fs)
818 {
819 	assert(fs != NULL);
820 	spdk_io_device_unregister(&fs->md_target, NULL);
821 	spdk_io_device_unregister(&fs->sync_target, NULL);
822 	spdk_io_device_unregister(&fs->io_target, NULL);
823 	free(fs);
824 }
825 
826 static void
827 fs_free_io_channels(struct spdk_filesystem *fs)
828 {
829 	assert(fs != NULL);
830 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
831 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
832 }
833 
834 void
835 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
836 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
837 {
838 	struct spdk_filesystem *fs;
839 	struct spdk_fs_cb_args *args;
840 	struct spdk_fs_request *req;
841 	struct spdk_bs_opts	bs_opts;
842 
843 	fs = fs_alloc(dev, send_request_fn);
844 	if (fs == NULL) {
845 		cb_fn(cb_arg, NULL, -ENOMEM);
846 		return;
847 	}
848 
849 	req = alloc_fs_request(fs->md_target.md_fs_channel);
850 	if (req == NULL) {
851 		fs_free_io_channels(fs);
852 		fs_io_device_unregister(fs);
853 		cb_fn(cb_arg, NULL, -ENOMEM);
854 		return;
855 	}
856 
857 	args = &req->args;
858 	args->fn.fs_op_with_handle = cb_fn;
859 	args->arg = cb_arg;
860 	args->fs = fs;
861 	TAILQ_INIT(&args->op.fs_load.deleted_files);
862 	spdk_bs_opts_init(&bs_opts, sizeof(bs_opts));
863 	bs_opts.iter_cb_fn = iter_cb;
864 	bs_opts.iter_cb_arg = req;
865 	spdk_bs_load(dev, &bs_opts, load_cb, req);
866 }
867 
868 static void
869 unload_cb(void *ctx, int bserrno)
870 {
871 	struct spdk_fs_request *req = ctx;
872 	struct spdk_fs_cb_args *args = &req->args;
873 	struct spdk_filesystem *fs = args->fs;
874 	struct spdk_file *file, *tmp;
875 
876 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
877 		TAILQ_REMOVE(&fs->files, file, tailq);
878 		file_free(file);
879 	}
880 
881 	free_global_cache();
882 
883 	args->fn.fs_op(args->arg, bserrno);
884 	free(req);
885 
886 	fs_io_device_unregister(fs);
887 }
888 
889 void
890 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
891 {
892 	struct spdk_fs_request *req;
893 	struct spdk_fs_cb_args *args;
894 
895 	/*
896 	 * We must free the md_channel before unloading the blobstore, so just
897 	 *  allocate this request from the general heap.
898 	 */
899 	req = calloc(1, sizeof(*req));
900 	if (req == NULL) {
901 		cb_fn(cb_arg, -ENOMEM);
902 		return;
903 	}
904 
905 	args = &req->args;
906 	args->fn.fs_op = cb_fn;
907 	args->arg = cb_arg;
908 	args->fs = fs;
909 
910 	fs_free_io_channels(fs);
911 	spdk_bs_unload(fs->bs, unload_cb, req);
912 }
913 
914 static struct spdk_file *
915 fs_find_file(struct spdk_filesystem *fs, const char *name)
916 {
917 	struct spdk_file *file;
918 
919 	TAILQ_FOREACH(file, &fs->files, tailq) {
920 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
921 			return file;
922 		}
923 	}
924 
925 	return NULL;
926 }
927 
928 void
929 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
930 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
931 {
932 	struct spdk_file_stat stat;
933 	struct spdk_file *f = NULL;
934 
935 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
936 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
937 		return;
938 	}
939 
940 	f = fs_find_file(fs, name);
941 	if (f != NULL) {
942 		stat.blobid = f->blobid;
943 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
944 		cb_fn(cb_arg, &stat, 0);
945 		return;
946 	}
947 
948 	cb_fn(cb_arg, NULL, -ENOENT);
949 }
950 
951 static void
952 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
953 {
954 	struct spdk_fs_request *req = arg;
955 	struct spdk_fs_cb_args *args = &req->args;
956 
957 	args->rc = fserrno;
958 	if (fserrno == 0) {
959 		memcpy(args->arg, stat, sizeof(*stat));
960 	}
961 	sem_post(args->sem);
962 }
963 
964 static void
965 __file_stat(void *arg)
966 {
967 	struct spdk_fs_request *req = arg;
968 	struct spdk_fs_cb_args *args = &req->args;
969 
970 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
971 				args->fn.stat_op, req);
972 }
973 
974 int
975 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
976 		  const char *name, struct spdk_file_stat *stat)
977 {
978 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
979 	struct spdk_fs_request *req;
980 	int rc;
981 
982 	req = alloc_fs_request(channel);
983 	if (req == NULL) {
984 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
985 		return -ENOMEM;
986 	}
987 
988 	req->args.fs = fs;
989 	req->args.op.stat.name = name;
990 	req->args.fn.stat_op = __copy_stat;
991 	req->args.arg = stat;
992 	req->args.sem = &channel->sem;
993 	channel->send_request(__file_stat, req);
994 	sem_wait(&channel->sem);
995 
996 	rc = req->args.rc;
997 	free_fs_request(req);
998 
999 	return rc;
1000 }
1001 
1002 static void
1003 fs_create_blob_close_cb(void *ctx, int bserrno)
1004 {
1005 	int rc;
1006 	struct spdk_fs_request *req = ctx;
1007 	struct spdk_fs_cb_args *args = &req->args;
1008 
1009 	rc = args->rc ? args->rc : bserrno;
1010 	args->fn.file_op(args->arg, rc);
1011 	free_fs_request(req);
1012 }
1013 
1014 static void
1015 fs_create_blob_resize_cb(void *ctx, int bserrno)
1016 {
1017 	struct spdk_fs_request *req = ctx;
1018 	struct spdk_fs_cb_args *args = &req->args;
1019 	struct spdk_file *f = args->file;
1020 	struct spdk_blob *blob = args->op.create.blob;
1021 	uint64_t length = 0;
1022 
1023 	args->rc = bserrno;
1024 	if (bserrno) {
1025 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1026 		return;
1027 	}
1028 
1029 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1030 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1031 
1032 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1033 }
1034 
1035 static void
1036 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1037 {
1038 	struct spdk_fs_request *req = ctx;
1039 	struct spdk_fs_cb_args *args = &req->args;
1040 
1041 	if (bserrno) {
1042 		args->fn.file_op(args->arg, bserrno);
1043 		free_fs_request(req);
1044 		return;
1045 	}
1046 
1047 	args->op.create.blob = blob;
1048 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1049 }
1050 
1051 static void
1052 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1053 {
1054 	struct spdk_fs_request *req = ctx;
1055 	struct spdk_fs_cb_args *args = &req->args;
1056 	struct spdk_file *f = args->file;
1057 
1058 	if (bserrno) {
1059 		args->fn.file_op(args->arg, bserrno);
1060 		free_fs_request(req);
1061 		return;
1062 	}
1063 
1064 	f->blobid = blobid;
1065 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1066 }
1067 
1068 void
1069 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1070 			  spdk_file_op_complete cb_fn, void *cb_arg)
1071 {
1072 	struct spdk_file *file;
1073 	struct spdk_fs_request *req;
1074 	struct spdk_fs_cb_args *args;
1075 
1076 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1077 		cb_fn(cb_arg, -ENAMETOOLONG);
1078 		return;
1079 	}
1080 
1081 	file = fs_find_file(fs, name);
1082 	if (file != NULL) {
1083 		cb_fn(cb_arg, -EEXIST);
1084 		return;
1085 	}
1086 
1087 	file = file_alloc(fs);
1088 	if (file == NULL) {
1089 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1090 		cb_fn(cb_arg, -ENOMEM);
1091 		return;
1092 	}
1093 
1094 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1095 	if (req == NULL) {
1096 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1097 		TAILQ_REMOVE(&fs->files, file, tailq);
1098 		file_free(file);
1099 		cb_fn(cb_arg, -ENOMEM);
1100 		return;
1101 	}
1102 
1103 	args = &req->args;
1104 	args->file = file;
1105 	args->fn.file_op = cb_fn;
1106 	args->arg = cb_arg;
1107 
1108 	file->name = strdup(name);
1109 	if (!file->name) {
1110 		SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name);
1111 		free_fs_request(req);
1112 		TAILQ_REMOVE(&fs->files, file, tailq);
1113 		file_free(file);
1114 		cb_fn(cb_arg, -ENOMEM);
1115 		return;
1116 	}
1117 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1118 }
1119 
1120 static void
1121 __fs_create_file_done(void *arg, int fserrno)
1122 {
1123 	struct spdk_fs_request *req = arg;
1124 	struct spdk_fs_cb_args *args = &req->args;
1125 
1126 	__wake_caller(args, fserrno);
1127 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1128 }
1129 
1130 static void
1131 __fs_create_file(void *arg)
1132 {
1133 	struct spdk_fs_request *req = arg;
1134 	struct spdk_fs_cb_args *args = &req->args;
1135 
1136 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1137 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1138 }
1139 
1140 int
1141 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1142 {
1143 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1144 	struct spdk_fs_request *req;
1145 	struct spdk_fs_cb_args *args;
1146 	int rc;
1147 
1148 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1149 
1150 	req = alloc_fs_request(channel);
1151 	if (req == NULL) {
1152 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1153 		return -ENOMEM;
1154 	}
1155 
1156 	args = &req->args;
1157 	args->fs = fs;
1158 	args->op.create.name = name;
1159 	args->sem = &channel->sem;
1160 	fs->send_request(__fs_create_file, req);
1161 	sem_wait(&channel->sem);
1162 	rc = args->rc;
1163 	free_fs_request(req);
1164 
1165 	return rc;
1166 }
1167 
1168 static void
1169 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1170 {
1171 	struct spdk_fs_request *req = ctx;
1172 	struct spdk_fs_cb_args *args = &req->args;
1173 	struct spdk_file *f = args->file;
1174 
1175 	f->blob = blob;
1176 	while (!TAILQ_EMPTY(&f->open_requests)) {
1177 		req = TAILQ_FIRST(&f->open_requests);
1178 		args = &req->args;
1179 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1180 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name);
1181 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1182 		free_fs_request(req);
1183 	}
1184 }
1185 
1186 static void
1187 fs_open_blob_create_cb(void *ctx, int bserrno)
1188 {
1189 	struct spdk_fs_request *req = ctx;
1190 	struct spdk_fs_cb_args *args = &req->args;
1191 	struct spdk_file *file = args->file;
1192 	struct spdk_filesystem *fs = args->fs;
1193 
1194 	if (file == NULL) {
1195 		/*
1196 		 * This is from an open with CREATE flag - the file
1197 		 *  is now created so look it up in the file list for this
1198 		 *  filesystem.
1199 		 */
1200 		file = fs_find_file(fs, args->op.open.name);
1201 		assert(file != NULL);
1202 		args->file = file;
1203 	}
1204 
1205 	file->ref_count++;
1206 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1207 	if (file->ref_count == 1) {
1208 		assert(file->blob == NULL);
1209 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1210 	} else if (file->blob != NULL) {
1211 		fs_open_blob_done(req, file->blob, 0);
1212 	} else {
1213 		/*
1214 		 * The blob open for this file is in progress due to a previous
1215 		 *  open request.  When that open completes, it will invoke the
1216 		 *  open callback for this request.
1217 		 */
1218 	}
1219 }
1220 
1221 void
1222 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1223 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1224 {
1225 	struct spdk_file *f = NULL;
1226 	struct spdk_fs_request *req;
1227 	struct spdk_fs_cb_args *args;
1228 
1229 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1230 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1231 		return;
1232 	}
1233 
1234 	f = fs_find_file(fs, name);
1235 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1236 		cb_fn(cb_arg, NULL, -ENOENT);
1237 		return;
1238 	}
1239 
1240 	if (f != NULL && f->is_deleted == true) {
1241 		cb_fn(cb_arg, NULL, -ENOENT);
1242 		return;
1243 	}
1244 
1245 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1246 	if (req == NULL) {
1247 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1248 		cb_fn(cb_arg, NULL, -ENOMEM);
1249 		return;
1250 	}
1251 
1252 	args = &req->args;
1253 	args->fn.file_op_with_handle = cb_fn;
1254 	args->arg = cb_arg;
1255 	args->file = f;
1256 	args->fs = fs;
1257 	args->op.open.name = name;
1258 
1259 	if (f == NULL) {
1260 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1261 	} else {
1262 		fs_open_blob_create_cb(req, 0);
1263 	}
1264 }
1265 
1266 static void
1267 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1268 {
1269 	struct spdk_fs_request *req = arg;
1270 	struct spdk_fs_cb_args *args = &req->args;
1271 
1272 	args->file = file;
1273 	__wake_caller(args, bserrno);
1274 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1275 }
1276 
1277 static void
1278 __fs_open_file(void *arg)
1279 {
1280 	struct spdk_fs_request *req = arg;
1281 	struct spdk_fs_cb_args *args = &req->args;
1282 
1283 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1284 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1285 				__fs_open_file_done, req);
1286 }
1287 
1288 int
1289 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1290 		  const char *name, uint32_t flags, struct spdk_file **file)
1291 {
1292 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1293 	struct spdk_fs_request *req;
1294 	struct spdk_fs_cb_args *args;
1295 	int rc;
1296 
1297 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1298 
1299 	req = alloc_fs_request(channel);
1300 	if (req == NULL) {
1301 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1302 		return -ENOMEM;
1303 	}
1304 
1305 	args = &req->args;
1306 	args->fs = fs;
1307 	args->op.open.name = name;
1308 	args->op.open.flags = flags;
1309 	args->sem = &channel->sem;
1310 	fs->send_request(__fs_open_file, req);
1311 	sem_wait(&channel->sem);
1312 	rc = args->rc;
1313 	if (rc == 0) {
1314 		*file = args->file;
1315 	} else {
1316 		*file = NULL;
1317 	}
1318 	free_fs_request(req);
1319 
1320 	return rc;
1321 }
1322 
1323 static void
1324 fs_rename_blob_close_cb(void *ctx, int bserrno)
1325 {
1326 	struct spdk_fs_request *req = ctx;
1327 	struct spdk_fs_cb_args *args = &req->args;
1328 
1329 	args->fn.fs_op(args->arg, bserrno);
1330 	free_fs_request(req);
1331 }
1332 
1333 static void
1334 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1335 {
1336 	struct spdk_fs_request *req = ctx;
1337 	struct spdk_fs_cb_args *args = &req->args;
1338 	const char *new_name = args->op.rename.new_name;
1339 
1340 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1341 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1342 }
1343 
1344 static void
1345 _fs_md_rename_file(struct spdk_fs_request *req)
1346 {
1347 	struct spdk_fs_cb_args *args = &req->args;
1348 	struct spdk_file *f;
1349 
1350 	f = fs_find_file(args->fs, args->op.rename.old_name);
1351 	if (f == NULL) {
1352 		args->fn.fs_op(args->arg, -ENOENT);
1353 		free_fs_request(req);
1354 		return;
1355 	}
1356 
1357 	free(f->name);
1358 	f->name = strdup(args->op.rename.new_name);
1359 	args->file = f;
1360 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1361 }
1362 
1363 static void
1364 fs_rename_delete_done(void *arg, int fserrno)
1365 {
1366 	_fs_md_rename_file(arg);
1367 }
1368 
1369 void
1370 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1371 			  const char *old_name, const char *new_name,
1372 			  spdk_file_op_complete cb_fn, void *cb_arg)
1373 {
1374 	struct spdk_file *f;
1375 	struct spdk_fs_request *req;
1376 	struct spdk_fs_cb_args *args;
1377 
1378 	SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name);
1379 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1380 		cb_fn(cb_arg, -ENAMETOOLONG);
1381 		return;
1382 	}
1383 
1384 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1385 	if (req == NULL) {
1386 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1387 			    new_name);
1388 		cb_fn(cb_arg, -ENOMEM);
1389 		return;
1390 	}
1391 
1392 	args = &req->args;
1393 	args->fn.fs_op = cb_fn;
1394 	args->fs = fs;
1395 	args->arg = cb_arg;
1396 	args->op.rename.old_name = old_name;
1397 	args->op.rename.new_name = new_name;
1398 
1399 	f = fs_find_file(fs, new_name);
1400 	if (f == NULL) {
1401 		_fs_md_rename_file(req);
1402 		return;
1403 	}
1404 
1405 	/*
1406 	 * The rename overwrites an existing file.  So delete the existing file, then
1407 	 *  do the actual rename.
1408 	 */
1409 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1410 }
1411 
1412 static void
1413 __fs_rename_file_done(void *arg, int fserrno)
1414 {
1415 	struct spdk_fs_request *req = arg;
1416 	struct spdk_fs_cb_args *args = &req->args;
1417 
1418 	__wake_caller(args, fserrno);
1419 }
1420 
1421 static void
1422 __fs_rename_file(void *arg)
1423 {
1424 	struct spdk_fs_request *req = arg;
1425 	struct spdk_fs_cb_args *args = &req->args;
1426 
1427 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1428 				  __fs_rename_file_done, req);
1429 }
1430 
1431 int
1432 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1433 		    const char *old_name, const char *new_name)
1434 {
1435 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1436 	struct spdk_fs_request *req;
1437 	struct spdk_fs_cb_args *args;
1438 	int rc;
1439 
1440 	req = alloc_fs_request(channel);
1441 	if (req == NULL) {
1442 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1443 		return -ENOMEM;
1444 	}
1445 
1446 	args = &req->args;
1447 
1448 	args->fs = fs;
1449 	args->op.rename.old_name = old_name;
1450 	args->op.rename.new_name = new_name;
1451 	args->sem = &channel->sem;
1452 	fs->send_request(__fs_rename_file, req);
1453 	sem_wait(&channel->sem);
1454 	rc = args->rc;
1455 	free_fs_request(req);
1456 	return rc;
1457 }
1458 
1459 static void
1460 blob_delete_cb(void *ctx, int bserrno)
1461 {
1462 	struct spdk_fs_request *req = ctx;
1463 	struct spdk_fs_cb_args *args = &req->args;
1464 
1465 	args->fn.file_op(args->arg, bserrno);
1466 	free_fs_request(req);
1467 }
1468 
1469 void
1470 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1471 			  spdk_file_op_complete cb_fn, void *cb_arg)
1472 {
1473 	struct spdk_file *f;
1474 	spdk_blob_id blobid;
1475 	struct spdk_fs_request *req;
1476 	struct spdk_fs_cb_args *args;
1477 
1478 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1479 
1480 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1481 		cb_fn(cb_arg, -ENAMETOOLONG);
1482 		return;
1483 	}
1484 
1485 	f = fs_find_file(fs, name);
1486 	if (f == NULL) {
1487 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1488 		cb_fn(cb_arg, -ENOENT);
1489 		return;
1490 	}
1491 
1492 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1493 	if (req == NULL) {
1494 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1495 		cb_fn(cb_arg, -ENOMEM);
1496 		return;
1497 	}
1498 
1499 	args = &req->args;
1500 	args->fn.file_op = cb_fn;
1501 	args->arg = cb_arg;
1502 
1503 	if (f->ref_count > 0) {
1504 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1505 		f->is_deleted = true;
1506 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1507 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1508 		return;
1509 	}
1510 
1511 	blobid = f->blobid;
1512 	TAILQ_REMOVE(&fs->files, f, tailq);
1513 
1514 	file_free(f);
1515 
1516 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1517 }
1518 
1519 static void
1520 __fs_delete_file_done(void *arg, int fserrno)
1521 {
1522 	struct spdk_fs_request *req = arg;
1523 	struct spdk_fs_cb_args *args = &req->args;
1524 
1525 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name);
1526 	__wake_caller(args, fserrno);
1527 }
1528 
1529 static void
1530 __fs_delete_file(void *arg)
1531 {
1532 	struct spdk_fs_request *req = arg;
1533 	struct spdk_fs_cb_args *args = &req->args;
1534 
1535 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name);
1536 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1537 }
1538 
1539 int
1540 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1541 		    const char *name)
1542 {
1543 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1544 	struct spdk_fs_request *req;
1545 	struct spdk_fs_cb_args *args;
1546 	int rc;
1547 
1548 	req = alloc_fs_request(channel);
1549 	if (req == NULL) {
1550 		SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name);
1551 		return -ENOMEM;
1552 	}
1553 
1554 	args = &req->args;
1555 	args->fs = fs;
1556 	args->op.delete.name = name;
1557 	args->sem = &channel->sem;
1558 	fs->send_request(__fs_delete_file, req);
1559 	sem_wait(&channel->sem);
1560 	rc = args->rc;
1561 	free_fs_request(req);
1562 
1563 	return rc;
1564 }
1565 
1566 spdk_fs_iter
1567 spdk_fs_iter_first(struct spdk_filesystem *fs)
1568 {
1569 	struct spdk_file *f;
1570 
1571 	f = TAILQ_FIRST(&fs->files);
1572 	return f;
1573 }
1574 
1575 spdk_fs_iter
1576 spdk_fs_iter_next(spdk_fs_iter iter)
1577 {
1578 	struct spdk_file *f = iter;
1579 
1580 	if (f == NULL) {
1581 		return NULL;
1582 	}
1583 
1584 	f = TAILQ_NEXT(f, tailq);
1585 	return f;
1586 }
1587 
1588 const char *
1589 spdk_file_get_name(struct spdk_file *file)
1590 {
1591 	return file->name;
1592 }
1593 
1594 uint64_t
1595 spdk_file_get_length(struct spdk_file *file)
1596 {
1597 	uint64_t length;
1598 
1599 	assert(file != NULL);
1600 
1601 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1602 	SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length);
1603 	return length;
1604 }
1605 
1606 static void
1607 fs_truncate_complete_cb(void *ctx, int bserrno)
1608 {
1609 	struct spdk_fs_request *req = ctx;
1610 	struct spdk_fs_cb_args *args = &req->args;
1611 
1612 	args->fn.file_op(args->arg, bserrno);
1613 	free_fs_request(req);
1614 }
1615 
1616 static void
1617 fs_truncate_resize_cb(void *ctx, int bserrno)
1618 {
1619 	struct spdk_fs_request *req = ctx;
1620 	struct spdk_fs_cb_args *args = &req->args;
1621 	struct spdk_file *file = args->file;
1622 	uint64_t *length = &args->op.truncate.length;
1623 
1624 	if (bserrno) {
1625 		args->fn.file_op(args->arg, bserrno);
1626 		free_fs_request(req);
1627 		return;
1628 	}
1629 
1630 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1631 
1632 	file->length = *length;
1633 	if (file->append_pos > file->length) {
1634 		file->append_pos = file->length;
1635 	}
1636 
1637 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1638 }
1639 
1640 static uint64_t
1641 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1642 {
1643 	return (length + cluster_sz - 1) / cluster_sz;
1644 }
1645 
1646 void
1647 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1648 			 spdk_file_op_complete cb_fn, void *cb_arg)
1649 {
1650 	struct spdk_filesystem *fs;
1651 	size_t num_clusters;
1652 	struct spdk_fs_request *req;
1653 	struct spdk_fs_cb_args *args;
1654 
1655 	SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1656 	if (length == file->length) {
1657 		cb_fn(cb_arg, 0);
1658 		return;
1659 	}
1660 
1661 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1662 	if (req == NULL) {
1663 		cb_fn(cb_arg, -ENOMEM);
1664 		return;
1665 	}
1666 
1667 	args = &req->args;
1668 	args->fn.file_op = cb_fn;
1669 	args->arg = cb_arg;
1670 	args->file = file;
1671 	args->op.truncate.length = length;
1672 	fs = file->fs;
1673 
1674 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1675 
1676 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1677 }
1678 
1679 static void
1680 __truncate(void *arg)
1681 {
1682 	struct spdk_fs_request *req = arg;
1683 	struct spdk_fs_cb_args *args = &req->args;
1684 
1685 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1686 				 args->fn.file_op, args);
1687 }
1688 
1689 int
1690 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1691 		   uint64_t length)
1692 {
1693 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1694 	struct spdk_fs_request *req;
1695 	struct spdk_fs_cb_args *args;
1696 	int rc;
1697 
1698 	req = alloc_fs_request(channel);
1699 	if (req == NULL) {
1700 		return -ENOMEM;
1701 	}
1702 
1703 	args = &req->args;
1704 
1705 	args->file = file;
1706 	args->op.truncate.length = length;
1707 	args->fn.file_op = __wake_caller;
1708 	args->sem = &channel->sem;
1709 
1710 	channel->send_request(__truncate, req);
1711 	sem_wait(&channel->sem);
1712 	rc = args->rc;
1713 	free_fs_request(req);
1714 
1715 	return rc;
1716 }
1717 
1718 static void
1719 __rw_done(void *ctx, int bserrno)
1720 {
1721 	struct spdk_fs_request *req = ctx;
1722 	struct spdk_fs_cb_args *args = &req->args;
1723 
1724 	spdk_free(args->op.rw.pin_buf);
1725 	args->fn.file_op(args->arg, bserrno);
1726 	free_fs_request(req);
1727 }
1728 
1729 static void
1730 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1731 {
1732 	int i;
1733 	size_t len;
1734 
1735 	for (i = 0; i < iovcnt; i++) {
1736 		len = spdk_min(iovs[i].iov_len, buf_len);
1737 		memcpy(buf, iovs[i].iov_base, len);
1738 		buf += len;
1739 		assert(buf_len >= len);
1740 		buf_len -= len;
1741 	}
1742 }
1743 
1744 static void
1745 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1746 {
1747 	int i;
1748 	size_t len;
1749 
1750 	for (i = 0; i < iovcnt; i++) {
1751 		len = spdk_min(iovs[i].iov_len, buf_len);
1752 		memcpy(iovs[i].iov_base, buf, len);
1753 		buf += len;
1754 		assert(buf_len >= len);
1755 		buf_len -= len;
1756 	}
1757 }
1758 
1759 static void
1760 __read_done(void *ctx, int bserrno)
1761 {
1762 	struct spdk_fs_request *req = ctx;
1763 	struct spdk_fs_cb_args *args = &req->args;
1764 	void *buf;
1765 
1766 	assert(req != NULL);
1767 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1768 	if (args->op.rw.is_read) {
1769 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1770 		__rw_done(req, 0);
1771 	} else {
1772 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1773 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1774 				   args->op.rw.pin_buf,
1775 				   args->op.rw.start_lba, args->op.rw.num_lba,
1776 				   __rw_done, req);
1777 	}
1778 }
1779 
1780 static void
1781 __do_blob_read(void *ctx, int fserrno)
1782 {
1783 	struct spdk_fs_request *req = ctx;
1784 	struct spdk_fs_cb_args *args = &req->args;
1785 
1786 	if (fserrno) {
1787 		__rw_done(req, fserrno);
1788 		return;
1789 	}
1790 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1791 			  args->op.rw.pin_buf,
1792 			  args->op.rw.start_lba, args->op.rw.num_lba,
1793 			  __read_done, req);
1794 }
1795 
1796 static void
1797 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1798 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1799 {
1800 	uint64_t end_lba;
1801 
1802 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1803 	*start_lba = offset / *lba_size;
1804 	end_lba = (offset + length - 1) / *lba_size;
1805 	*num_lba = (end_lba - *start_lba + 1);
1806 }
1807 
1808 static bool
1809 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1810 {
1811 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1812 
1813 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1814 		return true;
1815 	}
1816 
1817 	return false;
1818 }
1819 
1820 static void
1821 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1822 {
1823 	uint32_t i;
1824 
1825 	for (i = 0; i < iovcnt; i++) {
1826 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1827 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1828 	}
1829 }
1830 
1831 static void
1832 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1833 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1834 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1835 {
1836 	struct spdk_fs_request *req;
1837 	struct spdk_fs_cb_args *args;
1838 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1839 	uint64_t start_lba, num_lba, pin_buf_length;
1840 	uint32_t lba_size;
1841 
1842 	if (is_read && offset + length > file->length) {
1843 		cb_fn(cb_arg, -EINVAL);
1844 		return;
1845 	}
1846 
1847 	req = alloc_fs_request_with_iov(channel, iovcnt);
1848 	if (req == NULL) {
1849 		cb_fn(cb_arg, -ENOMEM);
1850 		return;
1851 	}
1852 
1853 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1854 
1855 	args = &req->args;
1856 	args->fn.file_op = cb_fn;
1857 	args->arg = cb_arg;
1858 	args->file = file;
1859 	args->op.rw.channel = channel->bs_channel;
1860 	_fs_request_setup_iovs(req, iovs, iovcnt);
1861 	args->op.rw.is_read = is_read;
1862 	args->op.rw.offset = offset;
1863 	args->op.rw.blocklen = lba_size;
1864 
1865 	pin_buf_length = num_lba * lba_size;
1866 	args->op.rw.length = pin_buf_length;
1867 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1868 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1869 	if (args->op.rw.pin_buf == NULL) {
1870 		SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1871 			      file->name, offset, length);
1872 		free_fs_request(req);
1873 		cb_fn(cb_arg, -ENOMEM);
1874 		return;
1875 	}
1876 
1877 	args->op.rw.start_lba = start_lba;
1878 	args->op.rw.num_lba = num_lba;
1879 
1880 	if (!is_read && file->length < offset + length) {
1881 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1882 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1883 		_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1884 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1885 				   args->op.rw.pin_buf,
1886 				   args->op.rw.start_lba, args->op.rw.num_lba,
1887 				   __rw_done, req);
1888 	} else {
1889 		__do_blob_read(req, 0);
1890 	}
1891 }
1892 
1893 static void
1894 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1895 	    void *payload, uint64_t offset, uint64_t length,
1896 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1897 {
1898 	struct iovec iov;
1899 
1900 	iov.iov_base = payload;
1901 	iov.iov_len = (size_t)length;
1902 
1903 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1904 }
1905 
1906 void
1907 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1908 		      void *payload, uint64_t offset, uint64_t length,
1909 		      spdk_file_op_complete cb_fn, void *cb_arg)
1910 {
1911 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1912 }
1913 
1914 void
1915 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1916 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1917 		       spdk_file_op_complete cb_fn, void *cb_arg)
1918 {
1919 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1920 		      file->name, offset, length);
1921 
1922 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1923 }
1924 
1925 void
1926 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1927 		     void *payload, uint64_t offset, uint64_t length,
1928 		     spdk_file_op_complete cb_fn, void *cb_arg)
1929 {
1930 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1931 		      file->name, offset, length);
1932 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1933 }
1934 
1935 void
1936 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1937 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1938 		      spdk_file_op_complete cb_fn, void *cb_arg)
1939 {
1940 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1941 		      file->name, offset, length);
1942 
1943 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1944 }
1945 
1946 struct spdk_io_channel *
1947 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1948 {
1949 	struct spdk_io_channel *io_channel;
1950 	struct spdk_fs_channel *fs_channel;
1951 
1952 	io_channel = spdk_get_io_channel(&fs->io_target);
1953 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1954 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1955 	fs_channel->send_request = __send_request_direct;
1956 
1957 	return io_channel;
1958 }
1959 
1960 void
1961 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1962 {
1963 	spdk_put_io_channel(channel);
1964 }
1965 
1966 struct spdk_fs_thread_ctx *
1967 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1968 {
1969 	struct spdk_fs_thread_ctx *ctx;
1970 
1971 	ctx = calloc(1, sizeof(*ctx));
1972 	if (!ctx) {
1973 		return NULL;
1974 	}
1975 
1976 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
1977 		free(ctx);
1978 		return NULL;
1979 	}
1980 
1981 	fs_channel_create(fs, &ctx->ch, 512);
1982 
1983 	ctx->ch.send_request = fs->send_request;
1984 	ctx->ch.sync = 1;
1985 
1986 	return ctx;
1987 }
1988 
1989 
1990 void
1991 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1992 {
1993 	assert(ctx->ch.sync == 1);
1994 
1995 	while (true) {
1996 		pthread_spin_lock(&ctx->ch.lock);
1997 		if (ctx->ch.outstanding_reqs == 0) {
1998 			pthread_spin_unlock(&ctx->ch.lock);
1999 			break;
2000 		}
2001 		pthread_spin_unlock(&ctx->ch.lock);
2002 		usleep(1000);
2003 	}
2004 
2005 	fs_channel_destroy(NULL, &ctx->ch);
2006 	free(ctx);
2007 }
2008 
2009 int
2010 spdk_fs_set_cache_size(uint64_t size_in_mb)
2011 {
2012 	/* setting g_fs_cache_size is only permitted if cache pool
2013 	 * is already freed or hasn't been initialized
2014 	 */
2015 	if (g_cache_pool != NULL) {
2016 		return -EPERM;
2017 	}
2018 
2019 	g_fs_cache_size = size_in_mb * 1024 * 1024;
2020 
2021 	return 0;
2022 }
2023 
2024 uint64_t
2025 spdk_fs_get_cache_size(void)
2026 {
2027 	return g_fs_cache_size / (1024 * 1024);
2028 }
2029 
2030 static void __file_flush(void *ctx);
2031 
2032 /* Try to free some cache buffers from this file.
2033  */
2034 static int
2035 reclaim_cache_buffers(struct spdk_file *file)
2036 {
2037 	int rc;
2038 
2039 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2040 
2041 	/* The function is safe to be called with any threads, while the file
2042 	 * lock maybe locked by other thread for now, so try to get the file
2043 	 * lock here.
2044 	 */
2045 	rc = pthread_spin_trylock(&file->lock);
2046 	if (rc != 0) {
2047 		return -1;
2048 	}
2049 
2050 	if (file->tree->present_mask == 0) {
2051 		pthread_spin_unlock(&file->lock);
2052 		return -1;
2053 	}
2054 	tree_free_buffers(file->tree);
2055 
2056 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2057 	/* If not freed, put it in the end of the queue */
2058 	if (file->tree->present_mask != 0) {
2059 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2060 	} else {
2061 		file->last = NULL;
2062 	}
2063 	pthread_spin_unlock(&file->lock);
2064 
2065 	return 0;
2066 }
2067 
2068 static int
2069 _blobfs_cache_pool_reclaim(void *arg)
2070 {
2071 	struct spdk_file *file, *tmp;
2072 	int rc;
2073 
2074 	if (!blobfs_cache_pool_need_reclaim()) {
2075 		return SPDK_POLLER_IDLE;
2076 	}
2077 
2078 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2079 		if (!file->open_for_writing &&
2080 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2081 			rc = reclaim_cache_buffers(file);
2082 			if (rc < 0) {
2083 				continue;
2084 			}
2085 			if (!blobfs_cache_pool_need_reclaim()) {
2086 				return SPDK_POLLER_BUSY;
2087 			}
2088 			break;
2089 		}
2090 	}
2091 
2092 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2093 		if (!file->open_for_writing) {
2094 			rc = reclaim_cache_buffers(file);
2095 			if (rc < 0) {
2096 				continue;
2097 			}
2098 			if (!blobfs_cache_pool_need_reclaim()) {
2099 				return SPDK_POLLER_BUSY;
2100 			}
2101 			break;
2102 		}
2103 	}
2104 
2105 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2106 		rc = reclaim_cache_buffers(file);
2107 		if (rc < 0) {
2108 			continue;
2109 		}
2110 		break;
2111 	}
2112 
2113 	return SPDK_POLLER_BUSY;
2114 }
2115 
2116 static void
2117 _add_file_to_cache_pool(void *ctx)
2118 {
2119 	struct spdk_file *file = ctx;
2120 
2121 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2122 }
2123 
2124 static void
2125 _remove_file_from_cache_pool(void *ctx)
2126 {
2127 	struct spdk_file *file = ctx;
2128 
2129 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2130 }
2131 
2132 static struct cache_buffer *
2133 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2134 {
2135 	struct cache_buffer *buf;
2136 	int count = 0;
2137 	bool need_update = false;
2138 
2139 	buf = calloc(1, sizeof(*buf));
2140 	if (buf == NULL) {
2141 		SPDK_DEBUGLOG(blobfs, "calloc failed\n");
2142 		return NULL;
2143 	}
2144 
2145 	do {
2146 		buf->buf = spdk_mempool_get(g_cache_pool);
2147 		if (buf->buf) {
2148 			break;
2149 		}
2150 		if (count++ == 100) {
2151 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2152 				    file, offset);
2153 			free(buf);
2154 			return NULL;
2155 		}
2156 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2157 	} while (true);
2158 
2159 	buf->buf_size = CACHE_BUFFER_SIZE;
2160 	buf->offset = offset;
2161 
2162 	if (file->tree->present_mask == 0) {
2163 		need_update = true;
2164 	}
2165 	file->tree = tree_insert_buffer(file->tree, buf);
2166 
2167 	if (need_update) {
2168 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2169 	}
2170 
2171 	return buf;
2172 }
2173 
2174 static struct cache_buffer *
2175 cache_append_buffer(struct spdk_file *file)
2176 {
2177 	struct cache_buffer *last;
2178 
2179 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2180 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2181 
2182 	last = cache_insert_buffer(file, file->append_pos);
2183 	if (last == NULL) {
2184 		SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n");
2185 		return NULL;
2186 	}
2187 
2188 	file->last = last;
2189 
2190 	return last;
2191 }
2192 
2193 static void __check_sync_reqs(struct spdk_file *file);
2194 
2195 static void
2196 __file_cache_finish_sync(void *ctx, int bserrno)
2197 {
2198 	struct spdk_file *file;
2199 	struct spdk_fs_request *sync_req = ctx;
2200 	struct spdk_fs_cb_args *sync_args;
2201 
2202 	sync_args = &sync_req->args;
2203 	file = sync_args->file;
2204 	pthread_spin_lock(&file->lock);
2205 	file->length_xattr = sync_args->op.sync.length;
2206 	assert(sync_args->op.sync.offset <= file->length_flushed);
2207 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2208 			  0, file->name);
2209 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2210 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2211 	pthread_spin_unlock(&file->lock);
2212 
2213 	sync_args->fn.file_op(sync_args->arg, bserrno);
2214 
2215 	free_fs_request(sync_req);
2216 	__check_sync_reqs(file);
2217 }
2218 
2219 static void
2220 __check_sync_reqs(struct spdk_file *file)
2221 {
2222 	struct spdk_fs_request *sync_req;
2223 
2224 	pthread_spin_lock(&file->lock);
2225 
2226 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2227 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2228 			break;
2229 		}
2230 	}
2231 
2232 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2233 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2234 		sync_req->args.op.sync.xattr_in_progress = true;
2235 		sync_req->args.op.sync.length = file->length_flushed;
2236 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2237 				    sizeof(file->length_flushed));
2238 
2239 		pthread_spin_unlock(&file->lock);
2240 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2241 				  0, file->name);
2242 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2243 	} else {
2244 		pthread_spin_unlock(&file->lock);
2245 	}
2246 }
2247 
2248 static void
2249 __file_flush_done(void *ctx, int bserrno)
2250 {
2251 	struct spdk_fs_request *req = ctx;
2252 	struct spdk_fs_cb_args *args = &req->args;
2253 	struct spdk_file *file = args->file;
2254 	struct cache_buffer *next = args->op.flush.cache_buffer;
2255 
2256 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2257 
2258 	pthread_spin_lock(&file->lock);
2259 	next->in_progress = false;
2260 	next->bytes_flushed += args->op.flush.length;
2261 	file->length_flushed += args->op.flush.length;
2262 	if (file->length_flushed > file->length) {
2263 		file->length = file->length_flushed;
2264 	}
2265 	if (next->bytes_flushed == next->buf_size) {
2266 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2267 		next = tree_find_buffer(file->tree, file->length_flushed);
2268 	}
2269 
2270 	/*
2271 	 * Assert that there is no cached data that extends past the end of the underlying
2272 	 *  blob.
2273 	 */
2274 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2275 	       next->bytes_filled == 0);
2276 
2277 	pthread_spin_unlock(&file->lock);
2278 
2279 	__check_sync_reqs(file);
2280 
2281 	__file_flush(req);
2282 }
2283 
2284 static void
2285 __file_flush(void *ctx)
2286 {
2287 	struct spdk_fs_request *req = ctx;
2288 	struct spdk_fs_cb_args *args = &req->args;
2289 	struct spdk_file *file = args->file;
2290 	struct cache_buffer *next;
2291 	uint64_t offset, length, start_lba, num_lba;
2292 	uint32_t lba_size;
2293 
2294 	pthread_spin_lock(&file->lock);
2295 	next = tree_find_buffer(file->tree, file->length_flushed);
2296 	if (next == NULL || next->in_progress ||
2297 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2298 		/*
2299 		 * There is either no data to flush, a flush I/O is already in
2300 		 *  progress, or the next buffer is partially filled but there's no
2301 		 *  outstanding request to sync it.
2302 		 * So return immediately - if a flush I/O is in progress we will flush
2303 		 *  more data after that is completed, or a partial buffer will get flushed
2304 		 *  when it is either filled or the file is synced.
2305 		 */
2306 		free_fs_request(req);
2307 		if (next == NULL) {
2308 			/*
2309 			 * For cases where a file's cache was evicted, and then the
2310 			 *  file was later appended, we will write the data directly
2311 			 *  to disk and bypass cache.  So just update length_flushed
2312 			 *  here to reflect that all data was already written to disk.
2313 			 */
2314 			file->length_flushed = file->append_pos;
2315 		}
2316 		pthread_spin_unlock(&file->lock);
2317 		if (next == NULL) {
2318 			/*
2319 			 * There is no data to flush, but we still need to check for any
2320 			 *  outstanding sync requests to make sure metadata gets updated.
2321 			 */
2322 			__check_sync_reqs(file);
2323 		}
2324 		return;
2325 	}
2326 
2327 	offset = next->offset + next->bytes_flushed;
2328 	length = next->bytes_filled - next->bytes_flushed;
2329 	if (length == 0) {
2330 		free_fs_request(req);
2331 		pthread_spin_unlock(&file->lock);
2332 		/*
2333 		 * There is no data to flush, but we still need to check for any
2334 		 *  outstanding sync requests to make sure metadata gets updated.
2335 		 */
2336 		__check_sync_reqs(file);
2337 		return;
2338 	}
2339 	args->op.flush.length = length;
2340 	args->op.flush.cache_buffer = next;
2341 
2342 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2343 
2344 	next->in_progress = true;
2345 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2346 		     offset, length, start_lba, num_lba);
2347 	pthread_spin_unlock(&file->lock);
2348 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2349 			   next->buf + (start_lba * lba_size) - next->offset,
2350 			   start_lba, num_lba, __file_flush_done, req);
2351 }
2352 
2353 static void
2354 __file_extend_done(void *arg, int bserrno)
2355 {
2356 	struct spdk_fs_cb_args *args = arg;
2357 
2358 	__wake_caller(args, bserrno);
2359 }
2360 
2361 static void
2362 __file_extend_resize_cb(void *_args, int bserrno)
2363 {
2364 	struct spdk_fs_cb_args *args = _args;
2365 	struct spdk_file *file = args->file;
2366 
2367 	if (bserrno) {
2368 		__wake_caller(args, bserrno);
2369 		return;
2370 	}
2371 
2372 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2373 }
2374 
2375 static void
2376 __file_extend_blob(void *_args)
2377 {
2378 	struct spdk_fs_cb_args *args = _args;
2379 	struct spdk_file *file = args->file;
2380 
2381 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2382 }
2383 
2384 static void
2385 __rw_from_file_done(void *ctx, int bserrno)
2386 {
2387 	struct spdk_fs_request *req = ctx;
2388 
2389 	__wake_caller(&req->args, bserrno);
2390 	free_fs_request(req);
2391 }
2392 
2393 static void
2394 __rw_from_file(void *ctx)
2395 {
2396 	struct spdk_fs_request *req = ctx;
2397 	struct spdk_fs_cb_args *args = &req->args;
2398 	struct spdk_file *file = args->file;
2399 
2400 	if (args->op.rw.is_read) {
2401 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2402 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2403 				     __rw_from_file_done, req);
2404 	} else {
2405 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2406 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2407 				      __rw_from_file_done, req);
2408 	}
2409 }
2410 
2411 struct rw_from_file_arg {
2412 	struct spdk_fs_channel *channel;
2413 	int rwerrno;
2414 };
2415 
2416 static int
2417 __send_rw_from_file(struct spdk_file *file, void *payload,
2418 		    uint64_t offset, uint64_t length, bool is_read,
2419 		    struct rw_from_file_arg *arg)
2420 {
2421 	struct spdk_fs_request *req;
2422 	struct spdk_fs_cb_args *args;
2423 
2424 	req = alloc_fs_request_with_iov(arg->channel, 1);
2425 	if (req == NULL) {
2426 		sem_post(&arg->channel->sem);
2427 		return -ENOMEM;
2428 	}
2429 
2430 	args = &req->args;
2431 	args->file = file;
2432 	args->sem = &arg->channel->sem;
2433 	args->iovs[0].iov_base = payload;
2434 	args->iovs[0].iov_len = (size_t)length;
2435 	args->op.rw.offset = offset;
2436 	args->op.rw.is_read = is_read;
2437 	args->rwerrno = &arg->rwerrno;
2438 	file->fs->send_request(__rw_from_file, req);
2439 	return 0;
2440 }
2441 
2442 int
2443 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2444 		void *payload, uint64_t offset, uint64_t length)
2445 {
2446 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2447 	struct spdk_fs_request *flush_req;
2448 	uint64_t rem_length, copy, blob_size, cluster_sz;
2449 	uint32_t cache_buffers_filled = 0;
2450 	uint8_t *cur_payload;
2451 	struct cache_buffer *last;
2452 
2453 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2454 
2455 	if (length == 0) {
2456 		return 0;
2457 	}
2458 
2459 	if (offset != file->append_pos) {
2460 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2461 		return -EINVAL;
2462 	}
2463 
2464 	pthread_spin_lock(&file->lock);
2465 	file->open_for_writing = true;
2466 
2467 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2468 		cache_append_buffer(file);
2469 	}
2470 
2471 	if (file->last == NULL) {
2472 		struct rw_from_file_arg arg = {};
2473 		int rc;
2474 
2475 		arg.channel = channel;
2476 		arg.rwerrno = 0;
2477 		file->append_pos += length;
2478 		pthread_spin_unlock(&file->lock);
2479 		rc = __send_rw_from_file(file, payload, offset, length, false, &arg);
2480 		if (rc != 0) {
2481 			return rc;
2482 		}
2483 		sem_wait(&channel->sem);
2484 		return arg.rwerrno;
2485 	}
2486 
2487 	blob_size = __file_get_blob_size(file);
2488 
2489 	if ((offset + length) > blob_size) {
2490 		struct spdk_fs_cb_args extend_args = {};
2491 
2492 		cluster_sz = file->fs->bs_opts.cluster_sz;
2493 		extend_args.sem = &channel->sem;
2494 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2495 		extend_args.file = file;
2496 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2497 		pthread_spin_unlock(&file->lock);
2498 		file->fs->send_request(__file_extend_blob, &extend_args);
2499 		sem_wait(&channel->sem);
2500 		if (extend_args.rc) {
2501 			return extend_args.rc;
2502 		}
2503 	}
2504 
2505 	flush_req = alloc_fs_request(channel);
2506 	if (flush_req == NULL) {
2507 		pthread_spin_unlock(&file->lock);
2508 		return -ENOMEM;
2509 	}
2510 
2511 	last = file->last;
2512 	rem_length = length;
2513 	cur_payload = payload;
2514 	while (rem_length > 0) {
2515 		copy = last->buf_size - last->bytes_filled;
2516 		if (copy > rem_length) {
2517 			copy = rem_length;
2518 		}
2519 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2520 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2521 		file->append_pos += copy;
2522 		if (file->length < file->append_pos) {
2523 			file->length = file->append_pos;
2524 		}
2525 		cur_payload += copy;
2526 		last->bytes_filled += copy;
2527 		rem_length -= copy;
2528 		if (last->bytes_filled == last->buf_size) {
2529 			cache_buffers_filled++;
2530 			last = cache_append_buffer(file);
2531 			if (last == NULL) {
2532 				BLOBFS_TRACE(file, "nomem\n");
2533 				free_fs_request(flush_req);
2534 				pthread_spin_unlock(&file->lock);
2535 				return -ENOMEM;
2536 			}
2537 		}
2538 	}
2539 
2540 	pthread_spin_unlock(&file->lock);
2541 
2542 	if (cache_buffers_filled == 0) {
2543 		free_fs_request(flush_req);
2544 		return 0;
2545 	}
2546 
2547 	flush_req->args.file = file;
2548 	file->fs->send_request(__file_flush, flush_req);
2549 	return 0;
2550 }
2551 
2552 static void
2553 __readahead_done(void *ctx, int bserrno)
2554 {
2555 	struct spdk_fs_request *req = ctx;
2556 	struct spdk_fs_cb_args *args = &req->args;
2557 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2558 	struct spdk_file *file = args->file;
2559 
2560 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2561 
2562 	pthread_spin_lock(&file->lock);
2563 	cache_buffer->bytes_filled = args->op.readahead.length;
2564 	cache_buffer->bytes_flushed = args->op.readahead.length;
2565 	cache_buffer->in_progress = false;
2566 	pthread_spin_unlock(&file->lock);
2567 
2568 	free_fs_request(req);
2569 }
2570 
2571 static void
2572 __readahead(void *ctx)
2573 {
2574 	struct spdk_fs_request *req = ctx;
2575 	struct spdk_fs_cb_args *args = &req->args;
2576 	struct spdk_file *file = args->file;
2577 	uint64_t offset, length, start_lba, num_lba;
2578 	uint32_t lba_size;
2579 
2580 	offset = args->op.readahead.offset;
2581 	length = args->op.readahead.length;
2582 	assert(length > 0);
2583 
2584 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2585 
2586 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2587 		     offset, length, start_lba, num_lba);
2588 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2589 			  args->op.readahead.cache_buffer->buf,
2590 			  start_lba, num_lba, __readahead_done, req);
2591 }
2592 
2593 static uint64_t
2594 __next_cache_buffer_offset(uint64_t offset)
2595 {
2596 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2597 }
2598 
2599 static void
2600 check_readahead(struct spdk_file *file, uint64_t offset,
2601 		struct spdk_fs_channel *channel)
2602 {
2603 	struct spdk_fs_request *req;
2604 	struct spdk_fs_cb_args *args;
2605 
2606 	offset = __next_cache_buffer_offset(offset);
2607 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2608 		return;
2609 	}
2610 
2611 	req = alloc_fs_request(channel);
2612 	if (req == NULL) {
2613 		return;
2614 	}
2615 	args = &req->args;
2616 
2617 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2618 
2619 	args->file = file;
2620 	args->op.readahead.offset = offset;
2621 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2622 	if (!args->op.readahead.cache_buffer) {
2623 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2624 		free_fs_request(req);
2625 		return;
2626 	}
2627 
2628 	args->op.readahead.cache_buffer->in_progress = true;
2629 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2630 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2631 	} else {
2632 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2633 	}
2634 	file->fs->send_request(__readahead, req);
2635 }
2636 
2637 int64_t
2638 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2639 	       void *payload, uint64_t offset, uint64_t length)
2640 {
2641 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2642 	uint64_t final_offset, final_length;
2643 	uint32_t sub_reads = 0;
2644 	struct cache_buffer *buf;
2645 	uint64_t read_len;
2646 	struct rw_from_file_arg arg = {};
2647 
2648 	pthread_spin_lock(&file->lock);
2649 
2650 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2651 
2652 	file->open_for_writing = false;
2653 
2654 	if (length == 0 || offset >= file->append_pos) {
2655 		pthread_spin_unlock(&file->lock);
2656 		return 0;
2657 	}
2658 
2659 	if (offset + length > file->append_pos) {
2660 		length = file->append_pos - offset;
2661 	}
2662 
2663 	if (offset != file->next_seq_offset) {
2664 		file->seq_byte_count = 0;
2665 	}
2666 	file->seq_byte_count += length;
2667 	file->next_seq_offset = offset + length;
2668 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2669 		check_readahead(file, offset, channel);
2670 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2671 	}
2672 
2673 	arg.channel = channel;
2674 	arg.rwerrno = 0;
2675 	final_length = 0;
2676 	final_offset = offset + length;
2677 	while (offset < final_offset) {
2678 		int ret = 0;
2679 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2680 		if (length > (final_offset - offset)) {
2681 			length = final_offset - offset;
2682 		}
2683 
2684 		buf = tree_find_filled_buffer(file->tree, offset);
2685 		if (buf == NULL) {
2686 			pthread_spin_unlock(&file->lock);
2687 			ret = __send_rw_from_file(file, payload, offset, length, true, &arg);
2688 			pthread_spin_lock(&file->lock);
2689 			if (ret == 0) {
2690 				sub_reads++;
2691 			}
2692 		} else {
2693 			read_len = length;
2694 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2695 				read_len = buf->offset + buf->bytes_filled - offset;
2696 			}
2697 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2698 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2699 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2700 				tree_remove_buffer(file->tree, buf);
2701 				if (file->tree->present_mask == 0) {
2702 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2703 				}
2704 			}
2705 		}
2706 
2707 		if (ret == 0) {
2708 			final_length += length;
2709 		} else {
2710 			arg.rwerrno = ret;
2711 			break;
2712 		}
2713 		payload += length;
2714 		offset += length;
2715 	}
2716 	pthread_spin_unlock(&file->lock);
2717 	while (sub_reads > 0) {
2718 		sem_wait(&channel->sem);
2719 		sub_reads--;
2720 	}
2721 	if (arg.rwerrno == 0) {
2722 		return final_length;
2723 	} else {
2724 		return arg.rwerrno;
2725 	}
2726 }
2727 
2728 static void
2729 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2730 	   spdk_file_op_complete cb_fn, void *cb_arg)
2731 {
2732 	struct spdk_fs_request *sync_req;
2733 	struct spdk_fs_request *flush_req;
2734 	struct spdk_fs_cb_args *sync_args;
2735 	struct spdk_fs_cb_args *flush_args;
2736 
2737 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2738 
2739 	pthread_spin_lock(&file->lock);
2740 	if (file->append_pos <= file->length_xattr) {
2741 		BLOBFS_TRACE(file, "done - file already synced\n");
2742 		pthread_spin_unlock(&file->lock);
2743 		cb_fn(cb_arg, 0);
2744 		return;
2745 	}
2746 
2747 	sync_req = alloc_fs_request(channel);
2748 	if (!sync_req) {
2749 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2750 		pthread_spin_unlock(&file->lock);
2751 		cb_fn(cb_arg, -ENOMEM);
2752 		return;
2753 	}
2754 	sync_args = &sync_req->args;
2755 
2756 	flush_req = alloc_fs_request(channel);
2757 	if (!flush_req) {
2758 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2759 		free_fs_request(sync_req);
2760 		pthread_spin_unlock(&file->lock);
2761 		cb_fn(cb_arg, -ENOMEM);
2762 		return;
2763 	}
2764 	flush_args = &flush_req->args;
2765 
2766 	sync_args->file = file;
2767 	sync_args->fn.file_op = cb_fn;
2768 	sync_args->arg = cb_arg;
2769 	sync_args->op.sync.offset = file->append_pos;
2770 	sync_args->op.sync.xattr_in_progress = false;
2771 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2772 	pthread_spin_unlock(&file->lock);
2773 
2774 	flush_args->file = file;
2775 	channel->send_request(__file_flush, flush_req);
2776 }
2777 
2778 int
2779 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2780 {
2781 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2782 	struct spdk_fs_cb_args args = {};
2783 
2784 	args.sem = &channel->sem;
2785 	_file_sync(file, channel, __wake_caller, &args);
2786 	sem_wait(&channel->sem);
2787 
2788 	return args.rc;
2789 }
2790 
2791 void
2792 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2793 		     spdk_file_op_complete cb_fn, void *cb_arg)
2794 {
2795 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2796 
2797 	_file_sync(file, channel, cb_fn, cb_arg);
2798 }
2799 
2800 void
2801 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2802 {
2803 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2804 	file->priority = priority;
2805 
2806 }
2807 
2808 /*
2809  * Close routines
2810  */
2811 
2812 static void
2813 __file_close_async_done(void *ctx, int bserrno)
2814 {
2815 	struct spdk_fs_request *req = ctx;
2816 	struct spdk_fs_cb_args *args = &req->args;
2817 	struct spdk_file *file = args->file;
2818 
2819 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name);
2820 
2821 	if (file->is_deleted) {
2822 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2823 		return;
2824 	}
2825 
2826 	args->fn.file_op(args->arg, bserrno);
2827 	free_fs_request(req);
2828 }
2829 
2830 static void
2831 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2832 {
2833 	struct spdk_blob *blob;
2834 
2835 	pthread_spin_lock(&file->lock);
2836 	if (file->ref_count == 0) {
2837 		pthread_spin_unlock(&file->lock);
2838 		__file_close_async_done(req, -EBADF);
2839 		return;
2840 	}
2841 
2842 	file->ref_count--;
2843 	if (file->ref_count > 0) {
2844 		pthread_spin_unlock(&file->lock);
2845 		req->args.fn.file_op(req->args.arg, 0);
2846 		free_fs_request(req);
2847 		return;
2848 	}
2849 
2850 	pthread_spin_unlock(&file->lock);
2851 
2852 	blob = file->blob;
2853 	file->blob = NULL;
2854 	spdk_blob_close(blob, __file_close_async_done, req);
2855 }
2856 
2857 static void
2858 __file_close_async__sync_done(void *arg, int fserrno)
2859 {
2860 	struct spdk_fs_request *req = arg;
2861 	struct spdk_fs_cb_args *args = &req->args;
2862 
2863 	__file_close_async(args->file, req);
2864 }
2865 
2866 void
2867 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2868 {
2869 	struct spdk_fs_request *req;
2870 	struct spdk_fs_cb_args *args;
2871 
2872 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2873 	if (req == NULL) {
2874 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2875 		cb_fn(cb_arg, -ENOMEM);
2876 		return;
2877 	}
2878 
2879 	args = &req->args;
2880 	args->file = file;
2881 	args->fn.file_op = cb_fn;
2882 	args->arg = cb_arg;
2883 
2884 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2885 }
2886 
2887 static void
2888 __file_close(void *arg)
2889 {
2890 	struct spdk_fs_request *req = arg;
2891 	struct spdk_fs_cb_args *args = &req->args;
2892 	struct spdk_file *file = args->file;
2893 
2894 	__file_close_async(file, req);
2895 }
2896 
2897 int
2898 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2899 {
2900 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2901 	struct spdk_fs_request *req;
2902 	struct spdk_fs_cb_args *args;
2903 
2904 	req = alloc_fs_request(channel);
2905 	if (req == NULL) {
2906 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2907 		return -ENOMEM;
2908 	}
2909 
2910 	args = &req->args;
2911 
2912 	spdk_file_sync(file, ctx);
2913 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2914 	args->file = file;
2915 	args->sem = &channel->sem;
2916 	args->fn.file_op = __wake_caller;
2917 	args->arg = args;
2918 	channel->send_request(__file_close, req);
2919 	sem_wait(&channel->sem);
2920 
2921 	return args->rc;
2922 }
2923 
2924 int
2925 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2926 {
2927 	if (size < sizeof(spdk_blob_id)) {
2928 		return -EINVAL;
2929 	}
2930 
2931 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2932 
2933 	return sizeof(spdk_blob_id);
2934 }
2935 
2936 static void
2937 _file_free(void *ctx)
2938 {
2939 	struct spdk_file *file = ctx;
2940 
2941 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2942 
2943 	free(file->name);
2944 	free(file->tree);
2945 	free(file);
2946 }
2947 
2948 static void
2949 file_free(struct spdk_file *file)
2950 {
2951 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2952 	pthread_spin_lock(&file->lock);
2953 	if (file->tree->present_mask == 0) {
2954 		pthread_spin_unlock(&file->lock);
2955 		free(file->name);
2956 		free(file->tree);
2957 		free(file);
2958 		return;
2959 	}
2960 
2961 	tree_free_buffers(file->tree);
2962 	assert(file->tree->present_mask == 0);
2963 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2964 	pthread_spin_unlock(&file->lock);
2965 }
2966 
2967 SPDK_LOG_REGISTER_COMPONENT(blobfs)
2968 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw)
2969