xref: /spdk/lib/blobfs/blobfs.c (revision 8afdeef3becfe9409cc9e7372bd0bc10e8b7d46d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/blobfs.h"
9 #include "cache_tree.h"
10 
11 #include "spdk/queue.h"
12 #include "spdk/thread.h"
13 #include "spdk/assert.h"
14 #include "spdk/env.h"
15 #include "spdk/util.h"
16 #include "spdk/log.h"
17 #include "spdk/trace.h"
18 
19 #include "spdk_internal/trace_defs.h"
20 
21 #define BLOBFS_TRACE(file, str, args...) \
22 	SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args)
23 
24 #define BLOBFS_TRACE_RW(file, str, args...) \
25 	SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args)
26 
27 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
28 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
29 
30 #define SPDK_BLOBFS_SIGNATURE	"BLOBFS"
31 
32 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
33 static struct spdk_mempool *g_cache_pool;
34 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches);
35 static struct spdk_poller *g_cache_pool_mgmt_poller;
36 static struct spdk_thread *g_cache_pool_thread;
37 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL
38 static int g_fs_count = 0;
39 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
40 
41 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
42 {
43 	struct spdk_trace_tpoint_opts opts[] = {
44 		{
45 			"BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START,
46 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
47 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
48 		},
49 		{
50 			"BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END,
51 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
52 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
53 		},
54 		{
55 			"BLOBFS_OPEN", TRACE_BLOBFS_OPEN,
56 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
57 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
58 		},
59 		{
60 			"BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE,
61 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
62 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
63 		},
64 		{
65 			"BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START,
66 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
67 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
68 		},
69 		{
70 			"BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE,
71 			OWNER_TYPE_NONE, OBJECT_NONE, 0,
72 			{{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }},
73 		}
74 	};
75 
76 	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
77 }
78 
79 void
80 cache_buffer_free(struct cache_buffer *cache_buffer)
81 {
82 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
83 	free(cache_buffer);
84 }
85 
86 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
87 
88 struct spdk_file {
89 	struct spdk_filesystem	*fs;
90 	struct spdk_blob	*blob;
91 	char			*name;
92 	uint64_t		length;
93 	bool                    is_deleted;
94 	bool			open_for_writing;
95 	uint64_t		length_flushed;
96 	uint64_t		length_xattr;
97 	uint64_t		append_pos;
98 	uint64_t		seq_byte_count;
99 	uint64_t		next_seq_offset;
100 	uint32_t		priority;
101 	TAILQ_ENTRY(spdk_file)	tailq;
102 	spdk_blob_id		blobid;
103 	uint32_t		ref_count;
104 	pthread_spinlock_t	lock;
105 	struct cache_buffer	*last;
106 	struct cache_tree	*tree;
107 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
108 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
109 	TAILQ_ENTRY(spdk_file)	cache_tailq;
110 };
111 
112 struct spdk_deleted_file {
113 	spdk_blob_id	id;
114 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
115 };
116 
117 struct spdk_filesystem {
118 	struct spdk_blob_store	*bs;
119 	TAILQ_HEAD(, spdk_file)	files;
120 	struct spdk_bs_opts	bs_opts;
121 	struct spdk_bs_dev	*bdev;
122 	fs_send_request_fn	send_request;
123 
124 	struct {
125 		uint32_t		max_ops;
126 		struct spdk_io_channel	*sync_io_channel;
127 		struct spdk_fs_channel	*sync_fs_channel;
128 	} sync_target;
129 
130 	struct {
131 		uint32_t		max_ops;
132 		struct spdk_io_channel	*md_io_channel;
133 		struct spdk_fs_channel	*md_fs_channel;
134 	} md_target;
135 
136 	struct {
137 		uint32_t		max_ops;
138 	} io_target;
139 };
140 
141 struct spdk_fs_cb_args {
142 	union {
143 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
144 		spdk_fs_op_complete			fs_op;
145 		spdk_file_op_with_handle_complete	file_op_with_handle;
146 		spdk_file_op_complete			file_op;
147 		spdk_file_stat_op_complete		stat_op;
148 	} fn;
149 	void *arg;
150 	sem_t *sem;
151 	struct spdk_filesystem *fs;
152 	struct spdk_file *file;
153 	int rc;
154 	int *rwerrno;
155 	struct iovec *iovs;
156 	uint32_t iovcnt;
157 	struct iovec iov;
158 	union {
159 		struct {
160 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
161 		} fs_load;
162 		struct {
163 			uint64_t	length;
164 		} truncate;
165 		struct {
166 			struct spdk_io_channel	*channel;
167 			void		*pin_buf;
168 			int		is_read;
169 			off_t		offset;
170 			size_t		length;
171 			uint64_t	start_lba;
172 			uint64_t	num_lba;
173 			uint32_t	blocklen;
174 		} rw;
175 		struct {
176 			const char	*old_name;
177 			const char	*new_name;
178 		} rename;
179 		struct {
180 			struct cache_buffer	*cache_buffer;
181 			uint64_t		length;
182 		} flush;
183 		struct {
184 			struct cache_buffer	*cache_buffer;
185 			uint64_t		length;
186 			uint64_t		offset;
187 		} readahead;
188 		struct {
189 			/* offset of the file when the sync request was made */
190 			uint64_t			offset;
191 			TAILQ_ENTRY(spdk_fs_request)	tailq;
192 			bool				xattr_in_progress;
193 			/* length written to the xattr for this file - this should
194 			 * always be the same as the offset if only one thread is
195 			 * writing to the file, but could differ if multiple threads
196 			 * are appending
197 			 */
198 			uint64_t			length;
199 		} sync;
200 		struct {
201 			uint32_t			num_clusters;
202 		} resize;
203 		struct {
204 			const char	*name;
205 			uint32_t	flags;
206 			TAILQ_ENTRY(spdk_fs_request)	tailq;
207 		} open;
208 		struct {
209 			const char		*name;
210 			struct spdk_blob	*blob;
211 		} create;
212 		struct {
213 			const char	*name;
214 		} delete;
215 		struct {
216 			const char	*name;
217 		} stat;
218 	} op;
219 };
220 
221 static void file_free(struct spdk_file *file);
222 static void fs_io_device_unregister(struct spdk_filesystem *fs);
223 static void fs_free_io_channels(struct spdk_filesystem *fs);
224 
225 void
226 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
227 {
228 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
229 }
230 
231 static int _blobfs_cache_pool_reclaim(void *arg);
232 
233 static bool
234 blobfs_cache_pool_need_reclaim(void)
235 {
236 	size_t count;
237 
238 	count = spdk_mempool_count(g_cache_pool);
239 	/* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller
240 	 *  when the number of available cache buffer is less than 1/5 of total buffers.
241 	 */
242 	if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) {
243 		return false;
244 	}
245 
246 	return true;
247 }
248 
249 static void
250 __start_cache_pool_mgmt(void *ctx)
251 {
252 	assert(g_cache_pool_mgmt_poller == NULL);
253 	g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL,
254 				   BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
255 }
256 
257 static void
258 __stop_cache_pool_mgmt(void *ctx)
259 {
260 	spdk_poller_unregister(&g_cache_pool_mgmt_poller);
261 
262 	assert(g_cache_pool != NULL);
263 	assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE);
264 	spdk_mempool_free(g_cache_pool);
265 	g_cache_pool = NULL;
266 
267 	spdk_thread_exit(g_cache_pool_thread);
268 }
269 
270 static void
271 allocate_cache_pool(void)
272 {
273 	assert(g_cache_pool == NULL);
274 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
275 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
276 					   CACHE_BUFFER_SIZE,
277 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
278 					   SPDK_ENV_SOCKET_ID_ANY);
279 	if (!g_cache_pool) {
280 		if (spdk_mempool_lookup("spdk_fs_cache") != NULL) {
281 			SPDK_ERRLOG("Unable to allocate mempool: already exists\n");
282 			SPDK_ERRLOG("Probably running in multiprocess environment, which is "
283 				    "unsupported by the blobfs library\n");
284 		} else {
285 			SPDK_ERRLOG("Create mempool failed, you may "
286 				    "increase the memory and try again\n");
287 		}
288 		assert(false);
289 	}
290 }
291 
292 static void
293 initialize_global_cache(void)
294 {
295 	pthread_mutex_lock(&g_cache_init_lock);
296 	if (g_fs_count == 0) {
297 		allocate_cache_pool();
298 		g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL);
299 		assert(g_cache_pool_thread != NULL);
300 		spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL);
301 	}
302 	g_fs_count++;
303 	pthread_mutex_unlock(&g_cache_init_lock);
304 }
305 
306 static void
307 free_global_cache(void)
308 {
309 	pthread_mutex_lock(&g_cache_init_lock);
310 	g_fs_count--;
311 	if (g_fs_count == 0) {
312 		spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL);
313 	}
314 	pthread_mutex_unlock(&g_cache_init_lock);
315 }
316 
317 static uint64_t
318 __file_get_blob_size(struct spdk_file *file)
319 {
320 	uint64_t cluster_sz;
321 
322 	cluster_sz = file->fs->bs_opts.cluster_sz;
323 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
324 }
325 
326 struct spdk_fs_request {
327 	struct spdk_fs_cb_args		args;
328 	TAILQ_ENTRY(spdk_fs_request)	link;
329 	struct spdk_fs_channel		*channel;
330 };
331 
332 struct spdk_fs_channel {
333 	struct spdk_fs_request		*req_mem;
334 	TAILQ_HEAD(, spdk_fs_request)	reqs;
335 	sem_t				sem;
336 	struct spdk_filesystem		*fs;
337 	struct spdk_io_channel		*bs_channel;
338 	fs_send_request_fn		send_request;
339 	bool				sync;
340 	uint32_t			outstanding_reqs;
341 	pthread_spinlock_t		lock;
342 };
343 
344 /* For now, this is effectively an alias. But eventually we'll shift
345  * some data members over. */
346 struct spdk_fs_thread_ctx {
347 	struct spdk_fs_channel	ch;
348 };
349 
350 static struct spdk_fs_request *
351 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
352 {
353 	struct spdk_fs_request *req;
354 	struct iovec *iovs = NULL;
355 
356 	if (iovcnt > 1) {
357 		iovs = calloc(iovcnt, sizeof(struct iovec));
358 		if (!iovs) {
359 			return NULL;
360 		}
361 	}
362 
363 	if (channel->sync) {
364 		pthread_spin_lock(&channel->lock);
365 	}
366 
367 	req = TAILQ_FIRST(&channel->reqs);
368 	if (req) {
369 		channel->outstanding_reqs++;
370 		TAILQ_REMOVE(&channel->reqs, req, link);
371 	}
372 
373 	if (channel->sync) {
374 		pthread_spin_unlock(&channel->lock);
375 	}
376 
377 	if (req == NULL) {
378 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
379 		free(iovs);
380 		return NULL;
381 	}
382 	memset(req, 0, sizeof(*req));
383 	req->channel = channel;
384 	if (iovcnt > 1) {
385 		req->args.iovs = iovs;
386 	} else {
387 		req->args.iovs = &req->args.iov;
388 	}
389 	req->args.iovcnt = iovcnt;
390 
391 	return req;
392 }
393 
394 static struct spdk_fs_request *
395 alloc_fs_request(struct spdk_fs_channel *channel)
396 {
397 	return alloc_fs_request_with_iov(channel, 0);
398 }
399 
400 static void
401 free_fs_request(struct spdk_fs_request *req)
402 {
403 	struct spdk_fs_channel *channel = req->channel;
404 
405 	if (req->args.iovcnt > 1) {
406 		free(req->args.iovs);
407 	}
408 
409 	if (channel->sync) {
410 		pthread_spin_lock(&channel->lock);
411 	}
412 
413 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
414 	channel->outstanding_reqs--;
415 
416 	if (channel->sync) {
417 		pthread_spin_unlock(&channel->lock);
418 	}
419 }
420 
421 static int
422 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
423 		  uint32_t max_ops)
424 {
425 	uint32_t i;
426 
427 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
428 	if (!channel->req_mem) {
429 		return -1;
430 	}
431 
432 	channel->outstanding_reqs = 0;
433 	TAILQ_INIT(&channel->reqs);
434 	sem_init(&channel->sem, 0, 0);
435 
436 	for (i = 0; i < max_ops; i++) {
437 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
438 	}
439 
440 	channel->fs = fs;
441 
442 	return 0;
443 }
444 
445 static int
446 fs_md_channel_create(void *io_device, void *ctx_buf)
447 {
448 	struct spdk_filesystem		*fs;
449 	struct spdk_fs_channel		*channel = ctx_buf;
450 
451 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
452 
453 	return fs_channel_create(fs, channel, fs->md_target.max_ops);
454 }
455 
456 static int
457 fs_sync_channel_create(void *io_device, void *ctx_buf)
458 {
459 	struct spdk_filesystem		*fs;
460 	struct spdk_fs_channel		*channel = ctx_buf;
461 
462 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
463 
464 	return fs_channel_create(fs, channel, fs->sync_target.max_ops);
465 }
466 
467 static int
468 fs_io_channel_create(void *io_device, void *ctx_buf)
469 {
470 	struct spdk_filesystem		*fs;
471 	struct spdk_fs_channel		*channel = ctx_buf;
472 
473 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
474 
475 	return fs_channel_create(fs, channel, fs->io_target.max_ops);
476 }
477 
478 static void
479 fs_channel_destroy(void *io_device, void *ctx_buf)
480 {
481 	struct spdk_fs_channel *channel = ctx_buf;
482 
483 	if (channel->outstanding_reqs > 0) {
484 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
485 			    channel->outstanding_reqs);
486 	}
487 
488 	free(channel->req_mem);
489 	if (channel->bs_channel != NULL) {
490 		spdk_bs_free_io_channel(channel->bs_channel);
491 	}
492 }
493 
494 static void
495 __send_request_direct(fs_request_fn fn, void *arg)
496 {
497 	fn(arg);
498 }
499 
500 static void
501 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
502 {
503 	fs->bs = bs;
504 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
505 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
506 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
507 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
508 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
509 
510 	initialize_global_cache();
511 }
512 
513 static void
514 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
515 {
516 	struct spdk_fs_request *req = ctx;
517 	struct spdk_fs_cb_args *args = &req->args;
518 	struct spdk_filesystem *fs = args->fs;
519 
520 	if (bserrno == 0) {
521 		common_fs_bs_init(fs, bs);
522 	} else {
523 		free(fs);
524 		fs = NULL;
525 	}
526 
527 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
528 	free_fs_request(req);
529 }
530 
531 static struct spdk_filesystem *
532 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
533 {
534 	struct spdk_filesystem *fs;
535 
536 	fs = calloc(1, sizeof(*fs));
537 	if (fs == NULL) {
538 		return NULL;
539 	}
540 
541 	fs->bdev = dev;
542 	fs->send_request = send_request_fn;
543 	TAILQ_INIT(&fs->files);
544 
545 	fs->md_target.max_ops = 512;
546 	spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy,
547 				sizeof(struct spdk_fs_channel), "blobfs_md");
548 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
549 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
550 
551 	fs->sync_target.max_ops = 512;
552 	spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy,
553 				sizeof(struct spdk_fs_channel), "blobfs_sync");
554 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
555 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
556 
557 	fs->io_target.max_ops = 512;
558 	spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy,
559 				sizeof(struct spdk_fs_channel), "blobfs_io");
560 
561 	return fs;
562 }
563 
564 static void
565 __wake_caller(void *arg, int fserrno)
566 {
567 	struct spdk_fs_cb_args *args = arg;
568 
569 	if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) {
570 		*(args->rwerrno) = fserrno;
571 	}
572 	args->rc = fserrno;
573 	sem_post(args->sem);
574 }
575 
576 void
577 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
578 	     fs_send_request_fn send_request_fn,
579 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
580 {
581 	struct spdk_filesystem *fs;
582 	struct spdk_fs_request *req;
583 	struct spdk_fs_cb_args *args;
584 	struct spdk_bs_opts opts = {};
585 
586 	fs = fs_alloc(dev, send_request_fn);
587 	if (fs == NULL) {
588 		cb_fn(cb_arg, NULL, -ENOMEM);
589 		return;
590 	}
591 
592 	req = alloc_fs_request(fs->md_target.md_fs_channel);
593 	if (req == NULL) {
594 		fs_free_io_channels(fs);
595 		fs_io_device_unregister(fs);
596 		cb_fn(cb_arg, NULL, -ENOMEM);
597 		return;
598 	}
599 
600 	args = &req->args;
601 	args->fn.fs_op_with_handle = cb_fn;
602 	args->arg = cb_arg;
603 	args->fs = fs;
604 
605 	spdk_bs_opts_init(&opts, sizeof(opts));
606 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE);
607 	if (opt) {
608 		opts.cluster_sz = opt->cluster_sz;
609 	}
610 	spdk_bs_init(dev, &opts, init_cb, req);
611 }
612 
613 static struct spdk_file *
614 file_alloc(struct spdk_filesystem *fs)
615 {
616 	struct spdk_file *file;
617 
618 	file = calloc(1, sizeof(*file));
619 	if (file == NULL) {
620 		return NULL;
621 	}
622 
623 	file->tree = calloc(1, sizeof(*file->tree));
624 	if (file->tree == NULL) {
625 		free(file);
626 		return NULL;
627 	}
628 
629 	if (pthread_spin_init(&file->lock, 0)) {
630 		free(file->tree);
631 		free(file);
632 		return NULL;
633 	}
634 
635 	file->fs = fs;
636 	TAILQ_INIT(&file->open_requests);
637 	TAILQ_INIT(&file->sync_requests);
638 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
639 	file->priority = SPDK_FILE_PRIORITY_LOW;
640 	return file;
641 }
642 
643 static void fs_load_done(void *ctx, int bserrno);
644 
645 static int
646 _handle_deleted_files(struct spdk_fs_request *req)
647 {
648 	struct spdk_fs_cb_args *args = &req->args;
649 	struct spdk_filesystem *fs = args->fs;
650 
651 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
652 		struct spdk_deleted_file *deleted_file;
653 
654 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
655 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
656 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
657 		free(deleted_file);
658 		return 0;
659 	}
660 
661 	return 1;
662 }
663 
664 static void
665 fs_load_done(void *ctx, int bserrno)
666 {
667 	struct spdk_fs_request *req = ctx;
668 	struct spdk_fs_cb_args *args = &req->args;
669 	struct spdk_filesystem *fs = args->fs;
670 
671 	/* The filesystem has been loaded.  Now check if there are any files that
672 	 *  were marked for deletion before last unload.  Do not complete the
673 	 *  fs_load callback until all of them have been deleted on disk.
674 	 */
675 	if (_handle_deleted_files(req) == 0) {
676 		/* We found a file that's been marked for deleting but not actually
677 		 *  deleted yet.  This function will get called again once the delete
678 		 *  operation is completed.
679 		 */
680 		return;
681 	}
682 
683 	args->fn.fs_op_with_handle(args->arg, fs, 0);
684 	free_fs_request(req);
685 
686 }
687 
688 static void
689 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
690 {
691 	struct spdk_fs_request *req = ctx;
692 	struct spdk_fs_cb_args *args = &req->args;
693 	struct spdk_filesystem *fs = args->fs;
694 	uint64_t *length;
695 	const char *name;
696 	uint32_t *is_deleted;
697 	size_t value_len;
698 
699 	if (rc < 0) {
700 		args->fn.fs_op_with_handle(args->arg, fs, rc);
701 		free_fs_request(req);
702 		return;
703 	}
704 
705 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
706 	if (rc < 0) {
707 		args->fn.fs_op_with_handle(args->arg, fs, rc);
708 		free_fs_request(req);
709 		return;
710 	}
711 
712 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
713 	if (rc < 0) {
714 		args->fn.fs_op_with_handle(args->arg, fs, rc);
715 		free_fs_request(req);
716 		return;
717 	}
718 
719 	assert(value_len == 8);
720 
721 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
722 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
723 	if (rc < 0) {
724 		struct spdk_file *f;
725 
726 		f = file_alloc(fs);
727 		if (f == NULL) {
728 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
729 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
730 			free_fs_request(req);
731 			return;
732 		}
733 
734 		f->name = strdup(name);
735 		if (!f->name) {
736 			SPDK_ERRLOG("Cannot allocate memory for file name\n");
737 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
738 			free_fs_request(req);
739 			file_free(f);
740 			return;
741 		}
742 
743 		f->blobid = spdk_blob_get_id(blob);
744 		f->length = *length;
745 		f->length_flushed = *length;
746 		f->length_xattr = *length;
747 		f->append_pos = *length;
748 		SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length);
749 	} else {
750 		struct spdk_deleted_file *deleted_file;
751 
752 		deleted_file = calloc(1, sizeof(*deleted_file));
753 		if (deleted_file == NULL) {
754 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
755 			free_fs_request(req);
756 			return;
757 		}
758 		deleted_file->id = spdk_blob_get_id(blob);
759 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
760 	}
761 }
762 
763 static void
764 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
765 {
766 	struct spdk_fs_request *req = ctx;
767 	struct spdk_fs_cb_args *args = &req->args;
768 	struct spdk_filesystem *fs = args->fs;
769 	struct spdk_bs_type bstype;
770 	static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE};
771 	static const struct spdk_bs_type zeros;
772 
773 	if (bserrno != 0) {
774 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
775 		free_fs_request(req);
776 		fs_free_io_channels(fs);
777 		fs_io_device_unregister(fs);
778 		return;
779 	}
780 
781 	bstype = spdk_bs_get_bstype(bs);
782 
783 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
784 		SPDK_DEBUGLOG(blobfs, "assigning bstype\n");
785 		spdk_bs_set_bstype(bs, blobfs_type);
786 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
787 		SPDK_ERRLOG("not blobfs\n");
788 		SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype));
789 		args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL);
790 		free_fs_request(req);
791 		fs_free_io_channels(fs);
792 		fs_io_device_unregister(fs);
793 		return;
794 	}
795 
796 	common_fs_bs_init(fs, bs);
797 	fs_load_done(req, 0);
798 }
799 
800 static void
801 fs_io_device_unregister(struct spdk_filesystem *fs)
802 {
803 	assert(fs != NULL);
804 	spdk_io_device_unregister(&fs->md_target, NULL);
805 	spdk_io_device_unregister(&fs->sync_target, NULL);
806 	spdk_io_device_unregister(&fs->io_target, NULL);
807 	free(fs);
808 }
809 
810 static void
811 fs_free_io_channels(struct spdk_filesystem *fs)
812 {
813 	assert(fs != NULL);
814 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
815 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
816 }
817 
818 void
819 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
820 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
821 {
822 	struct spdk_filesystem *fs;
823 	struct spdk_fs_cb_args *args;
824 	struct spdk_fs_request *req;
825 	struct spdk_bs_opts	bs_opts;
826 
827 	fs = fs_alloc(dev, send_request_fn);
828 	if (fs == NULL) {
829 		cb_fn(cb_arg, NULL, -ENOMEM);
830 		return;
831 	}
832 
833 	req = alloc_fs_request(fs->md_target.md_fs_channel);
834 	if (req == NULL) {
835 		fs_free_io_channels(fs);
836 		fs_io_device_unregister(fs);
837 		cb_fn(cb_arg, NULL, -ENOMEM);
838 		return;
839 	}
840 
841 	args = &req->args;
842 	args->fn.fs_op_with_handle = cb_fn;
843 	args->arg = cb_arg;
844 	args->fs = fs;
845 	TAILQ_INIT(&args->op.fs_load.deleted_files);
846 	spdk_bs_opts_init(&bs_opts, sizeof(bs_opts));
847 	bs_opts.iter_cb_fn = iter_cb;
848 	bs_opts.iter_cb_arg = req;
849 	spdk_bs_load(dev, &bs_opts, load_cb, req);
850 }
851 
852 static void
853 unload_cb(void *ctx, int bserrno)
854 {
855 	struct spdk_fs_request *req = ctx;
856 	struct spdk_fs_cb_args *args = &req->args;
857 	struct spdk_filesystem *fs = args->fs;
858 	struct spdk_file *file, *tmp;
859 
860 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
861 		TAILQ_REMOVE(&fs->files, file, tailq);
862 		file_free(file);
863 	}
864 
865 	free_global_cache();
866 
867 	args->fn.fs_op(args->arg, bserrno);
868 	free(req);
869 
870 	fs_io_device_unregister(fs);
871 }
872 
873 void
874 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
875 {
876 	struct spdk_fs_request *req;
877 	struct spdk_fs_cb_args *args;
878 
879 	/*
880 	 * We must free the md_channel before unloading the blobstore, so just
881 	 *  allocate this request from the general heap.
882 	 */
883 	req = calloc(1, sizeof(*req));
884 	if (req == NULL) {
885 		cb_fn(cb_arg, -ENOMEM);
886 		return;
887 	}
888 
889 	args = &req->args;
890 	args->fn.fs_op = cb_fn;
891 	args->arg = cb_arg;
892 	args->fs = fs;
893 
894 	fs_free_io_channels(fs);
895 	spdk_bs_unload(fs->bs, unload_cb, req);
896 }
897 
898 static struct spdk_file *
899 fs_find_file(struct spdk_filesystem *fs, const char *name)
900 {
901 	struct spdk_file *file;
902 
903 	TAILQ_FOREACH(file, &fs->files, tailq) {
904 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
905 			return file;
906 		}
907 	}
908 
909 	return NULL;
910 }
911 
912 void
913 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
914 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
915 {
916 	struct spdk_file_stat stat;
917 	struct spdk_file *f = NULL;
918 
919 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
920 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
921 		return;
922 	}
923 
924 	f = fs_find_file(fs, name);
925 	if (f != NULL) {
926 		stat.blobid = f->blobid;
927 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
928 		cb_fn(cb_arg, &stat, 0);
929 		return;
930 	}
931 
932 	cb_fn(cb_arg, NULL, -ENOENT);
933 }
934 
935 static void
936 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
937 {
938 	struct spdk_fs_request *req = arg;
939 	struct spdk_fs_cb_args *args = &req->args;
940 
941 	args->rc = fserrno;
942 	if (fserrno == 0) {
943 		memcpy(args->arg, stat, sizeof(*stat));
944 	}
945 	sem_post(args->sem);
946 }
947 
948 static void
949 __file_stat(void *arg)
950 {
951 	struct spdk_fs_request *req = arg;
952 	struct spdk_fs_cb_args *args = &req->args;
953 
954 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
955 				args->fn.stat_op, req);
956 }
957 
958 int
959 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
960 		  const char *name, struct spdk_file_stat *stat)
961 {
962 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
963 	struct spdk_fs_request *req;
964 	int rc;
965 
966 	req = alloc_fs_request(channel);
967 	if (req == NULL) {
968 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
969 		return -ENOMEM;
970 	}
971 
972 	req->args.fs = fs;
973 	req->args.op.stat.name = name;
974 	req->args.fn.stat_op = __copy_stat;
975 	req->args.arg = stat;
976 	req->args.sem = &channel->sem;
977 	channel->send_request(__file_stat, req);
978 	sem_wait(&channel->sem);
979 
980 	rc = req->args.rc;
981 	free_fs_request(req);
982 
983 	return rc;
984 }
985 
986 static void
987 fs_create_blob_close_cb(void *ctx, int bserrno)
988 {
989 	int rc;
990 	struct spdk_fs_request *req = ctx;
991 	struct spdk_fs_cb_args *args = &req->args;
992 
993 	rc = args->rc ? args->rc : bserrno;
994 	args->fn.file_op(args->arg, rc);
995 	free_fs_request(req);
996 }
997 
998 static void
999 fs_create_blob_resize_cb(void *ctx, int bserrno)
1000 {
1001 	struct spdk_fs_request *req = ctx;
1002 	struct spdk_fs_cb_args *args = &req->args;
1003 	struct spdk_file *f = args->file;
1004 	struct spdk_blob *blob = args->op.create.blob;
1005 	uint64_t length = 0;
1006 
1007 	args->rc = bserrno;
1008 	if (bserrno) {
1009 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1010 		return;
1011 	}
1012 
1013 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1014 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1015 
1016 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1017 }
1018 
1019 static void
1020 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1021 {
1022 	struct spdk_fs_request *req = ctx;
1023 	struct spdk_fs_cb_args *args = &req->args;
1024 
1025 	if (bserrno) {
1026 		args->fn.file_op(args->arg, bserrno);
1027 		free_fs_request(req);
1028 		return;
1029 	}
1030 
1031 	args->op.create.blob = blob;
1032 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1033 }
1034 
1035 static void
1036 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1037 {
1038 	struct spdk_fs_request *req = ctx;
1039 	struct spdk_fs_cb_args *args = &req->args;
1040 	struct spdk_file *f = args->file;
1041 
1042 	if (bserrno) {
1043 		args->fn.file_op(args->arg, bserrno);
1044 		free_fs_request(req);
1045 		return;
1046 	}
1047 
1048 	f->blobid = blobid;
1049 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1050 }
1051 
1052 void
1053 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1054 			  spdk_file_op_complete cb_fn, void *cb_arg)
1055 {
1056 	struct spdk_file *file;
1057 	struct spdk_fs_request *req;
1058 	struct spdk_fs_cb_args *args;
1059 
1060 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1061 		cb_fn(cb_arg, -ENAMETOOLONG);
1062 		return;
1063 	}
1064 
1065 	file = fs_find_file(fs, name);
1066 	if (file != NULL) {
1067 		cb_fn(cb_arg, -EEXIST);
1068 		return;
1069 	}
1070 
1071 	file = file_alloc(fs);
1072 	if (file == NULL) {
1073 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1074 		cb_fn(cb_arg, -ENOMEM);
1075 		return;
1076 	}
1077 
1078 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1079 	if (req == NULL) {
1080 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1081 		TAILQ_REMOVE(&fs->files, file, tailq);
1082 		file_free(file);
1083 		cb_fn(cb_arg, -ENOMEM);
1084 		return;
1085 	}
1086 
1087 	args = &req->args;
1088 	args->file = file;
1089 	args->fn.file_op = cb_fn;
1090 	args->arg = cb_arg;
1091 
1092 	file->name = strdup(name);
1093 	if (!file->name) {
1094 		SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name);
1095 		free_fs_request(req);
1096 		TAILQ_REMOVE(&fs->files, file, tailq);
1097 		file_free(file);
1098 		cb_fn(cb_arg, -ENOMEM);
1099 		return;
1100 	}
1101 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1102 }
1103 
1104 static void
1105 __fs_create_file_done(void *arg, int fserrno)
1106 {
1107 	struct spdk_fs_request *req = arg;
1108 	struct spdk_fs_cb_args *args = &req->args;
1109 
1110 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1111 	__wake_caller(args, fserrno);
1112 }
1113 
1114 static void
1115 __fs_create_file(void *arg)
1116 {
1117 	struct spdk_fs_request *req = arg;
1118 	struct spdk_fs_cb_args *args = &req->args;
1119 
1120 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name);
1121 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1122 }
1123 
1124 int
1125 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1126 {
1127 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1128 	struct spdk_fs_request *req;
1129 	struct spdk_fs_cb_args *args;
1130 	int rc;
1131 
1132 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1133 
1134 	req = alloc_fs_request(channel);
1135 	if (req == NULL) {
1136 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1137 		return -ENOMEM;
1138 	}
1139 
1140 	args = &req->args;
1141 	args->fs = fs;
1142 	args->op.create.name = name;
1143 	args->sem = &channel->sem;
1144 	fs->send_request(__fs_create_file, req);
1145 	sem_wait(&channel->sem);
1146 	rc = args->rc;
1147 	free_fs_request(req);
1148 
1149 	return rc;
1150 }
1151 
1152 static void
1153 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1154 {
1155 	struct spdk_fs_request *req = ctx;
1156 	struct spdk_fs_cb_args *args = &req->args;
1157 	struct spdk_file *f = args->file;
1158 
1159 	f->blob = blob;
1160 	while (!TAILQ_EMPTY(&f->open_requests)) {
1161 		req = TAILQ_FIRST(&f->open_requests);
1162 		args = &req->args;
1163 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1164 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name);
1165 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1166 		free_fs_request(req);
1167 	}
1168 }
1169 
1170 static void
1171 fs_open_blob_create_cb(void *ctx, int bserrno)
1172 {
1173 	struct spdk_fs_request *req = ctx;
1174 	struct spdk_fs_cb_args *args = &req->args;
1175 	struct spdk_file *file = args->file;
1176 	struct spdk_filesystem *fs = args->fs;
1177 
1178 	if (file == NULL) {
1179 		/*
1180 		 * This is from an open with CREATE flag - the file
1181 		 *  is now created so look it up in the file list for this
1182 		 *  filesystem.
1183 		 */
1184 		file = fs_find_file(fs, args->op.open.name);
1185 		assert(file != NULL);
1186 		args->file = file;
1187 	}
1188 
1189 	file->ref_count++;
1190 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1191 	if (file->ref_count == 1) {
1192 		assert(file->blob == NULL);
1193 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1194 	} else if (file->blob != NULL) {
1195 		fs_open_blob_done(req, file->blob, 0);
1196 	} else {
1197 		/*
1198 		 * The blob open for this file is in progress due to a previous
1199 		 *  open request.  When that open completes, it will invoke the
1200 		 *  open callback for this request.
1201 		 */
1202 	}
1203 }
1204 
1205 void
1206 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1207 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1208 {
1209 	struct spdk_file *f = NULL;
1210 	struct spdk_fs_request *req;
1211 	struct spdk_fs_cb_args *args;
1212 
1213 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1214 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1215 		return;
1216 	}
1217 
1218 	f = fs_find_file(fs, name);
1219 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1220 		cb_fn(cb_arg, NULL, -ENOENT);
1221 		return;
1222 	}
1223 
1224 	if (f != NULL && f->is_deleted == true) {
1225 		cb_fn(cb_arg, NULL, -ENOENT);
1226 		return;
1227 	}
1228 
1229 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1230 	if (req == NULL) {
1231 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1232 		cb_fn(cb_arg, NULL, -ENOMEM);
1233 		return;
1234 	}
1235 
1236 	args = &req->args;
1237 	args->fn.file_op_with_handle = cb_fn;
1238 	args->arg = cb_arg;
1239 	args->file = f;
1240 	args->fs = fs;
1241 	args->op.open.name = name;
1242 
1243 	if (f == NULL) {
1244 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1245 	} else {
1246 		fs_open_blob_create_cb(req, 0);
1247 	}
1248 }
1249 
1250 static void
1251 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1252 {
1253 	struct spdk_fs_request *req = arg;
1254 	struct spdk_fs_cb_args *args = &req->args;
1255 
1256 	args->file = file;
1257 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1258 	__wake_caller(args, bserrno);
1259 }
1260 
1261 static void
1262 __fs_open_file(void *arg)
1263 {
1264 	struct spdk_fs_request *req = arg;
1265 	struct spdk_fs_cb_args *args = &req->args;
1266 
1267 	SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name);
1268 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1269 				__fs_open_file_done, req);
1270 }
1271 
1272 int
1273 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1274 		  const char *name, uint32_t flags, struct spdk_file **file)
1275 {
1276 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1277 	struct spdk_fs_request *req;
1278 	struct spdk_fs_cb_args *args;
1279 	int rc;
1280 
1281 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1282 
1283 	req = alloc_fs_request(channel);
1284 	if (req == NULL) {
1285 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1286 		return -ENOMEM;
1287 	}
1288 
1289 	args = &req->args;
1290 	args->fs = fs;
1291 	args->op.open.name = name;
1292 	args->op.open.flags = flags;
1293 	args->sem = &channel->sem;
1294 	fs->send_request(__fs_open_file, req);
1295 	sem_wait(&channel->sem);
1296 	rc = args->rc;
1297 	if (rc == 0) {
1298 		*file = args->file;
1299 	} else {
1300 		*file = NULL;
1301 	}
1302 	free_fs_request(req);
1303 
1304 	return rc;
1305 }
1306 
1307 static void
1308 fs_rename_blob_close_cb(void *ctx, int bserrno)
1309 {
1310 	struct spdk_fs_request *req = ctx;
1311 	struct spdk_fs_cb_args *args = &req->args;
1312 
1313 	args->fn.fs_op(args->arg, bserrno);
1314 	free_fs_request(req);
1315 }
1316 
1317 static void
1318 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1319 {
1320 	struct spdk_fs_request *req = ctx;
1321 	struct spdk_fs_cb_args *args = &req->args;
1322 	const char *new_name = args->op.rename.new_name;
1323 
1324 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1325 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1326 }
1327 
1328 static void
1329 _fs_md_rename_file(struct spdk_fs_request *req)
1330 {
1331 	struct spdk_fs_cb_args *args = &req->args;
1332 	struct spdk_file *f;
1333 
1334 	f = fs_find_file(args->fs, args->op.rename.old_name);
1335 	if (f == NULL) {
1336 		args->fn.fs_op(args->arg, -ENOENT);
1337 		free_fs_request(req);
1338 		return;
1339 	}
1340 
1341 	free(f->name);
1342 	f->name = strdup(args->op.rename.new_name);
1343 	if (!f->name) {
1344 		SPDK_ERRLOG("Cannot allocate memory for file name\n");
1345 		args->fn.fs_op(args->arg, -ENOMEM);
1346 		free_fs_request(req);
1347 		return;
1348 	}
1349 
1350 	args->file = f;
1351 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1352 }
1353 
1354 static void
1355 fs_rename_delete_done(void *arg, int fserrno)
1356 {
1357 	_fs_md_rename_file(arg);
1358 }
1359 
1360 void
1361 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1362 			  const char *old_name, const char *new_name,
1363 			  spdk_file_op_complete cb_fn, void *cb_arg)
1364 {
1365 	struct spdk_file *f;
1366 	struct spdk_fs_request *req;
1367 	struct spdk_fs_cb_args *args;
1368 
1369 	SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name);
1370 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1371 		cb_fn(cb_arg, -ENAMETOOLONG);
1372 		return;
1373 	}
1374 
1375 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1376 	if (req == NULL) {
1377 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1378 			    new_name);
1379 		cb_fn(cb_arg, -ENOMEM);
1380 		return;
1381 	}
1382 
1383 	args = &req->args;
1384 	args->fn.fs_op = cb_fn;
1385 	args->fs = fs;
1386 	args->arg = cb_arg;
1387 	args->op.rename.old_name = old_name;
1388 	args->op.rename.new_name = new_name;
1389 
1390 	f = fs_find_file(fs, new_name);
1391 	if (f == NULL) {
1392 		_fs_md_rename_file(req);
1393 		return;
1394 	}
1395 
1396 	/*
1397 	 * The rename overwrites an existing file.  So delete the existing file, then
1398 	 *  do the actual rename.
1399 	 */
1400 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1401 }
1402 
1403 static void
1404 __fs_rename_file_done(void *arg, int fserrno)
1405 {
1406 	struct spdk_fs_request *req = arg;
1407 	struct spdk_fs_cb_args *args = &req->args;
1408 
1409 	__wake_caller(args, fserrno);
1410 }
1411 
1412 static void
1413 __fs_rename_file(void *arg)
1414 {
1415 	struct spdk_fs_request *req = arg;
1416 	struct spdk_fs_cb_args *args = &req->args;
1417 
1418 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1419 				  __fs_rename_file_done, req);
1420 }
1421 
1422 int
1423 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1424 		    const char *old_name, const char *new_name)
1425 {
1426 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1427 	struct spdk_fs_request *req;
1428 	struct spdk_fs_cb_args *args;
1429 	int rc;
1430 
1431 	req = alloc_fs_request(channel);
1432 	if (req == NULL) {
1433 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1434 		return -ENOMEM;
1435 	}
1436 
1437 	args = &req->args;
1438 
1439 	args->fs = fs;
1440 	args->op.rename.old_name = old_name;
1441 	args->op.rename.new_name = new_name;
1442 	args->sem = &channel->sem;
1443 	fs->send_request(__fs_rename_file, req);
1444 	sem_wait(&channel->sem);
1445 	rc = args->rc;
1446 	free_fs_request(req);
1447 	return rc;
1448 }
1449 
1450 static void
1451 blob_delete_cb(void *ctx, int bserrno)
1452 {
1453 	struct spdk_fs_request *req = ctx;
1454 	struct spdk_fs_cb_args *args = &req->args;
1455 
1456 	args->fn.file_op(args->arg, bserrno);
1457 	free_fs_request(req);
1458 }
1459 
1460 void
1461 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1462 			  spdk_file_op_complete cb_fn, void *cb_arg)
1463 {
1464 	struct spdk_file *f;
1465 	spdk_blob_id blobid;
1466 	struct spdk_fs_request *req;
1467 	struct spdk_fs_cb_args *args;
1468 
1469 	SPDK_DEBUGLOG(blobfs, "file=%s\n", name);
1470 
1471 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1472 		cb_fn(cb_arg, -ENAMETOOLONG);
1473 		return;
1474 	}
1475 
1476 	f = fs_find_file(fs, name);
1477 	if (f == NULL) {
1478 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1479 		cb_fn(cb_arg, -ENOENT);
1480 		return;
1481 	}
1482 
1483 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1484 	if (req == NULL) {
1485 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1486 		cb_fn(cb_arg, -ENOMEM);
1487 		return;
1488 	}
1489 
1490 	args = &req->args;
1491 	args->fn.file_op = cb_fn;
1492 	args->arg = cb_arg;
1493 
1494 	if (f->ref_count > 0) {
1495 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1496 		f->is_deleted = true;
1497 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1498 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1499 		return;
1500 	}
1501 
1502 	blobid = f->blobid;
1503 	TAILQ_REMOVE(&fs->files, f, tailq);
1504 
1505 	file_free(f);
1506 
1507 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1508 }
1509 
1510 static void
1511 __fs_delete_file_done(void *arg, int fserrno)
1512 {
1513 	struct spdk_fs_request *req = arg;
1514 	struct spdk_fs_cb_args *args = &req->args;
1515 
1516 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name);
1517 	__wake_caller(args, fserrno);
1518 }
1519 
1520 static void
1521 __fs_delete_file(void *arg)
1522 {
1523 	struct spdk_fs_request *req = arg;
1524 	struct spdk_fs_cb_args *args = &req->args;
1525 
1526 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name);
1527 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1528 }
1529 
1530 int
1531 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1532 		    const char *name)
1533 {
1534 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1535 	struct spdk_fs_request *req;
1536 	struct spdk_fs_cb_args *args;
1537 	int rc;
1538 
1539 	req = alloc_fs_request(channel);
1540 	if (req == NULL) {
1541 		SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name);
1542 		return -ENOMEM;
1543 	}
1544 
1545 	args = &req->args;
1546 	args->fs = fs;
1547 	args->op.delete.name = name;
1548 	args->sem = &channel->sem;
1549 	fs->send_request(__fs_delete_file, req);
1550 	sem_wait(&channel->sem);
1551 	rc = args->rc;
1552 	free_fs_request(req);
1553 
1554 	return rc;
1555 }
1556 
1557 spdk_fs_iter
1558 spdk_fs_iter_first(struct spdk_filesystem *fs)
1559 {
1560 	struct spdk_file *f;
1561 
1562 	f = TAILQ_FIRST(&fs->files);
1563 	return f;
1564 }
1565 
1566 spdk_fs_iter
1567 spdk_fs_iter_next(spdk_fs_iter iter)
1568 {
1569 	struct spdk_file *f = iter;
1570 
1571 	if (f == NULL) {
1572 		return NULL;
1573 	}
1574 
1575 	f = TAILQ_NEXT(f, tailq);
1576 	return f;
1577 }
1578 
1579 const char *
1580 spdk_file_get_name(struct spdk_file *file)
1581 {
1582 	return file->name;
1583 }
1584 
1585 uint64_t
1586 spdk_file_get_length(struct spdk_file *file)
1587 {
1588 	uint64_t length;
1589 
1590 	assert(file != NULL);
1591 
1592 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1593 	SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length);
1594 	return length;
1595 }
1596 
1597 static void
1598 fs_truncate_complete_cb(void *ctx, int bserrno)
1599 {
1600 	struct spdk_fs_request *req = ctx;
1601 	struct spdk_fs_cb_args *args = &req->args;
1602 
1603 	args->fn.file_op(args->arg, bserrno);
1604 	free_fs_request(req);
1605 }
1606 
1607 static void
1608 fs_truncate_resize_cb(void *ctx, int bserrno)
1609 {
1610 	struct spdk_fs_request *req = ctx;
1611 	struct spdk_fs_cb_args *args = &req->args;
1612 	struct spdk_file *file = args->file;
1613 	uint64_t *length = &args->op.truncate.length;
1614 
1615 	if (bserrno) {
1616 		args->fn.file_op(args->arg, bserrno);
1617 		free_fs_request(req);
1618 		return;
1619 	}
1620 
1621 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1622 
1623 	file->length = *length;
1624 	if (file->append_pos > file->length) {
1625 		file->append_pos = file->length;
1626 	}
1627 
1628 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1629 }
1630 
1631 static uint64_t
1632 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1633 {
1634 	return (length + cluster_sz - 1) / cluster_sz;
1635 }
1636 
1637 void
1638 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1639 			 spdk_file_op_complete cb_fn, void *cb_arg)
1640 {
1641 	struct spdk_filesystem *fs;
1642 	size_t num_clusters;
1643 	struct spdk_fs_request *req;
1644 	struct spdk_fs_cb_args *args;
1645 
1646 	SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1647 	if (length == file->length) {
1648 		cb_fn(cb_arg, 0);
1649 		return;
1650 	}
1651 
1652 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1653 	if (req == NULL) {
1654 		cb_fn(cb_arg, -ENOMEM);
1655 		return;
1656 	}
1657 
1658 	args = &req->args;
1659 	args->fn.file_op = cb_fn;
1660 	args->arg = cb_arg;
1661 	args->file = file;
1662 	args->op.truncate.length = length;
1663 	fs = file->fs;
1664 
1665 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1666 
1667 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1668 }
1669 
1670 static void
1671 __truncate(void *arg)
1672 {
1673 	struct spdk_fs_request *req = arg;
1674 	struct spdk_fs_cb_args *args = &req->args;
1675 
1676 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1677 				 args->fn.file_op, args);
1678 }
1679 
1680 int
1681 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1682 		   uint64_t length)
1683 {
1684 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1685 	struct spdk_fs_request *req;
1686 	struct spdk_fs_cb_args *args;
1687 	int rc;
1688 
1689 	req = alloc_fs_request(channel);
1690 	if (req == NULL) {
1691 		return -ENOMEM;
1692 	}
1693 
1694 	args = &req->args;
1695 
1696 	args->file = file;
1697 	args->op.truncate.length = length;
1698 	args->fn.file_op = __wake_caller;
1699 	args->sem = &channel->sem;
1700 
1701 	channel->send_request(__truncate, req);
1702 	sem_wait(&channel->sem);
1703 	rc = args->rc;
1704 	free_fs_request(req);
1705 
1706 	return rc;
1707 }
1708 
1709 static void
1710 __rw_done(void *ctx, int bserrno)
1711 {
1712 	struct spdk_fs_request *req = ctx;
1713 	struct spdk_fs_cb_args *args = &req->args;
1714 
1715 	spdk_free(args->op.rw.pin_buf);
1716 	args->fn.file_op(args->arg, bserrno);
1717 	free_fs_request(req);
1718 }
1719 
1720 static void
1721 __read_done(void *ctx, int bserrno)
1722 {
1723 	struct spdk_fs_request *req = ctx;
1724 	struct spdk_fs_cb_args *args = &req->args;
1725 	void *buf;
1726 
1727 	if (bserrno) {
1728 		__rw_done(req, bserrno);
1729 		return;
1730 	}
1731 
1732 	assert(req != NULL);
1733 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1734 	if (args->op.rw.is_read) {
1735 		spdk_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1736 		__rw_done(req, 0);
1737 	} else {
1738 		spdk_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1739 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1740 				   args->op.rw.pin_buf,
1741 				   args->op.rw.start_lba, args->op.rw.num_lba,
1742 				   __rw_done, req);
1743 	}
1744 }
1745 
1746 static void
1747 __do_blob_read(void *ctx, int fserrno)
1748 {
1749 	struct spdk_fs_request *req = ctx;
1750 	struct spdk_fs_cb_args *args = &req->args;
1751 
1752 	if (fserrno) {
1753 		__rw_done(req, fserrno);
1754 		return;
1755 	}
1756 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1757 			  args->op.rw.pin_buf,
1758 			  args->op.rw.start_lba, args->op.rw.num_lba,
1759 			  __read_done, req);
1760 }
1761 
1762 static void
1763 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1764 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1765 {
1766 	uint64_t end_lba;
1767 
1768 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1769 	*start_lba = offset / *lba_size;
1770 	end_lba = (offset + length - 1) / *lba_size;
1771 	*num_lba = (end_lba - *start_lba + 1);
1772 }
1773 
1774 static bool
1775 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length)
1776 {
1777 	uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1778 
1779 	if ((offset % lba_size == 0) && (length % lba_size == 0)) {
1780 		return true;
1781 	}
1782 
1783 	return false;
1784 }
1785 
1786 static void
1787 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1788 {
1789 	uint32_t i;
1790 
1791 	for (i = 0; i < iovcnt; i++) {
1792 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1793 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1794 	}
1795 }
1796 
1797 static void
1798 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1799 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1800 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1801 {
1802 	struct spdk_fs_request *req;
1803 	struct spdk_fs_cb_args *args;
1804 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1805 	uint64_t start_lba, num_lba, pin_buf_length;
1806 	uint32_t lba_size;
1807 
1808 	if (is_read && offset + length > file->length) {
1809 		cb_fn(cb_arg, -EINVAL);
1810 		return;
1811 	}
1812 
1813 	req = alloc_fs_request_with_iov(channel, iovcnt);
1814 	if (req == NULL) {
1815 		cb_fn(cb_arg, -ENOMEM);
1816 		return;
1817 	}
1818 
1819 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1820 
1821 	args = &req->args;
1822 	args->fn.file_op = cb_fn;
1823 	args->arg = cb_arg;
1824 	args->file = file;
1825 	args->op.rw.channel = channel->bs_channel;
1826 	_fs_request_setup_iovs(req, iovs, iovcnt);
1827 	args->op.rw.is_read = is_read;
1828 	args->op.rw.offset = offset;
1829 	args->op.rw.blocklen = lba_size;
1830 
1831 	pin_buf_length = num_lba * lba_size;
1832 	args->op.rw.length = pin_buf_length;
1833 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1834 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1835 	if (args->op.rw.pin_buf == NULL) {
1836 		SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1837 			      file->name, offset, length);
1838 		free_fs_request(req);
1839 		cb_fn(cb_arg, -ENOMEM);
1840 		return;
1841 	}
1842 
1843 	args->op.rw.start_lba = start_lba;
1844 	args->op.rw.num_lba = num_lba;
1845 
1846 	if (!is_read && file->length < offset + length) {
1847 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1848 	} else if (!is_read && __is_lba_aligned(file, offset, length)) {
1849 		spdk_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt);
1850 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1851 				   args->op.rw.pin_buf,
1852 				   args->op.rw.start_lba, args->op.rw.num_lba,
1853 				   __rw_done, req);
1854 	} else {
1855 		__do_blob_read(req, 0);
1856 	}
1857 }
1858 
1859 static void
1860 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1861 	    void *payload, uint64_t offset, uint64_t length,
1862 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1863 {
1864 	struct iovec iov;
1865 
1866 	iov.iov_base = payload;
1867 	iov.iov_len = (size_t)length;
1868 
1869 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1870 }
1871 
1872 void
1873 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1874 		      void *payload, uint64_t offset, uint64_t length,
1875 		      spdk_file_op_complete cb_fn, void *cb_arg)
1876 {
1877 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1878 }
1879 
1880 void
1881 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1882 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1883 		       spdk_file_op_complete cb_fn, void *cb_arg)
1884 {
1885 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1886 		      file->name, offset, length);
1887 
1888 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1889 }
1890 
1891 void
1892 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1893 		     void *payload, uint64_t offset, uint64_t length,
1894 		     spdk_file_op_complete cb_fn, void *cb_arg)
1895 {
1896 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1897 		      file->name, offset, length);
1898 
1899 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1900 }
1901 
1902 void
1903 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1904 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1905 		      spdk_file_op_complete cb_fn, void *cb_arg)
1906 {
1907 	SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n",
1908 		      file->name, offset, length);
1909 
1910 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1911 }
1912 
1913 struct spdk_io_channel *
1914 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1915 {
1916 	struct spdk_io_channel *io_channel;
1917 	struct spdk_fs_channel *fs_channel;
1918 
1919 	io_channel = spdk_get_io_channel(&fs->io_target);
1920 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1921 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1922 	fs_channel->send_request = __send_request_direct;
1923 
1924 	return io_channel;
1925 }
1926 
1927 void
1928 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1929 {
1930 	spdk_put_io_channel(channel);
1931 }
1932 
1933 struct spdk_fs_thread_ctx *
1934 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1935 {
1936 	struct spdk_fs_thread_ctx *ctx;
1937 
1938 	ctx = calloc(1, sizeof(*ctx));
1939 	if (!ctx) {
1940 		return NULL;
1941 	}
1942 
1943 	if (pthread_spin_init(&ctx->ch.lock, 0)) {
1944 		free(ctx);
1945 		return NULL;
1946 	}
1947 
1948 	fs_channel_create(fs, &ctx->ch, 512);
1949 
1950 	ctx->ch.send_request = fs->send_request;
1951 	ctx->ch.sync = 1;
1952 
1953 	return ctx;
1954 }
1955 
1956 
1957 void
1958 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1959 {
1960 	assert(ctx->ch.sync == 1);
1961 
1962 	while (true) {
1963 		pthread_spin_lock(&ctx->ch.lock);
1964 		if (ctx->ch.outstanding_reqs == 0) {
1965 			pthread_spin_unlock(&ctx->ch.lock);
1966 			break;
1967 		}
1968 		pthread_spin_unlock(&ctx->ch.lock);
1969 		usleep(1000);
1970 	}
1971 
1972 	fs_channel_destroy(NULL, &ctx->ch);
1973 	free(ctx);
1974 }
1975 
1976 int
1977 spdk_fs_set_cache_size(uint64_t size_in_mb)
1978 {
1979 	/* setting g_fs_cache_size is only permitted if cache pool
1980 	 * is already freed or hasn't been initialized
1981 	 */
1982 	if (g_cache_pool != NULL) {
1983 		return -EPERM;
1984 	}
1985 
1986 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1987 
1988 	return 0;
1989 }
1990 
1991 uint64_t
1992 spdk_fs_get_cache_size(void)
1993 {
1994 	return g_fs_cache_size / (1024 * 1024);
1995 }
1996 
1997 static void __file_flush(void *ctx);
1998 
1999 /* Try to free some cache buffers from this file.
2000  */
2001 static int
2002 reclaim_cache_buffers(struct spdk_file *file)
2003 {
2004 	int rc;
2005 
2006 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2007 
2008 	/* The function is safe to be called with any threads, while the file
2009 	 * lock maybe locked by other thread for now, so try to get the file
2010 	 * lock here.
2011 	 */
2012 	rc = pthread_spin_trylock(&file->lock);
2013 	if (rc != 0) {
2014 		return -1;
2015 	}
2016 
2017 	if (file->tree->present_mask == 0) {
2018 		pthread_spin_unlock(&file->lock);
2019 		return -1;
2020 	}
2021 	tree_free_buffers(file->tree);
2022 
2023 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2024 	/* If not freed, put it in the end of the queue */
2025 	if (file->tree->present_mask != 0) {
2026 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2027 	}
2028 
2029 	/* tree_free_buffers() may have freed the buffer pointed to by file->last.
2030 	 * So check if current append_pos is still in the cache, and if not, clear
2031 	 * file->last.
2032 	 */
2033 	if (tree_find_buffer(file->tree, file->append_pos) == NULL) {
2034 		file->last = NULL;
2035 	}
2036 
2037 	pthread_spin_unlock(&file->lock);
2038 
2039 	return 0;
2040 }
2041 
2042 static int
2043 _blobfs_cache_pool_reclaim(void *arg)
2044 {
2045 	struct spdk_file *file, *tmp;
2046 	int rc;
2047 
2048 	if (!blobfs_cache_pool_need_reclaim()) {
2049 		return SPDK_POLLER_IDLE;
2050 	}
2051 
2052 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2053 		if (!file->open_for_writing &&
2054 		    file->priority == SPDK_FILE_PRIORITY_LOW) {
2055 			rc = reclaim_cache_buffers(file);
2056 			if (rc < 0) {
2057 				continue;
2058 			}
2059 			if (!blobfs_cache_pool_need_reclaim()) {
2060 				return SPDK_POLLER_BUSY;
2061 			}
2062 			break;
2063 		}
2064 	}
2065 
2066 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2067 		if (!file->open_for_writing) {
2068 			rc = reclaim_cache_buffers(file);
2069 			if (rc < 0) {
2070 				continue;
2071 			}
2072 			if (!blobfs_cache_pool_need_reclaim()) {
2073 				return SPDK_POLLER_BUSY;
2074 			}
2075 			break;
2076 		}
2077 	}
2078 
2079 	TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) {
2080 		rc = reclaim_cache_buffers(file);
2081 		if (rc < 0) {
2082 			continue;
2083 		}
2084 		break;
2085 	}
2086 
2087 	return SPDK_POLLER_BUSY;
2088 }
2089 
2090 static void
2091 _add_file_to_cache_pool(void *ctx)
2092 {
2093 	struct spdk_file *file = ctx;
2094 
2095 	TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2096 }
2097 
2098 static void
2099 _remove_file_from_cache_pool(void *ctx)
2100 {
2101 	struct spdk_file *file = ctx;
2102 
2103 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2104 }
2105 
2106 static struct cache_buffer *
2107 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2108 {
2109 	struct cache_buffer *buf;
2110 	int count = 0;
2111 	bool need_update = false;
2112 
2113 	buf = calloc(1, sizeof(*buf));
2114 	if (buf == NULL) {
2115 		SPDK_DEBUGLOG(blobfs, "calloc failed\n");
2116 		return NULL;
2117 	}
2118 
2119 	do {
2120 		buf->buf = spdk_mempool_get(g_cache_pool);
2121 		if (buf->buf) {
2122 			break;
2123 		}
2124 		if (count++ == 100) {
2125 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2126 				    file, offset);
2127 			free(buf);
2128 			return NULL;
2129 		}
2130 		usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US);
2131 	} while (true);
2132 
2133 	buf->buf_size = CACHE_BUFFER_SIZE;
2134 	buf->offset = offset;
2135 
2136 	if (file->tree->present_mask == 0) {
2137 		need_update = true;
2138 	}
2139 	file->tree = tree_insert_buffer(file->tree, buf);
2140 
2141 	if (need_update) {
2142 		spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file);
2143 	}
2144 
2145 	return buf;
2146 }
2147 
2148 static struct cache_buffer *
2149 cache_append_buffer(struct spdk_file *file)
2150 {
2151 	struct cache_buffer *last;
2152 
2153 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2154 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2155 
2156 	last = cache_insert_buffer(file, file->append_pos);
2157 	if (last == NULL) {
2158 		SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n");
2159 		return NULL;
2160 	}
2161 
2162 	file->last = last;
2163 
2164 	return last;
2165 }
2166 
2167 static void __check_sync_reqs(struct spdk_file *file);
2168 
2169 static void
2170 __file_cache_finish_sync(void *ctx, int bserrno)
2171 {
2172 	struct spdk_file *file;
2173 	struct spdk_fs_request *sync_req = ctx;
2174 	struct spdk_fs_cb_args *sync_args;
2175 
2176 	sync_args = &sync_req->args;
2177 	file = sync_args->file;
2178 	pthread_spin_lock(&file->lock);
2179 	file->length_xattr = sync_args->op.sync.length;
2180 	assert(sync_args->op.sync.offset <= file->length_flushed);
2181 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2182 			  0, file->name);
2183 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2184 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2185 	pthread_spin_unlock(&file->lock);
2186 
2187 	sync_args->fn.file_op(sync_args->arg, bserrno);
2188 
2189 	free_fs_request(sync_req);
2190 	__check_sync_reqs(file);
2191 }
2192 
2193 static void
2194 __check_sync_reqs(struct spdk_file *file)
2195 {
2196 	struct spdk_fs_request *sync_req;
2197 
2198 	pthread_spin_lock(&file->lock);
2199 
2200 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2201 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2202 			break;
2203 		}
2204 	}
2205 
2206 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2207 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2208 		sync_req->args.op.sync.xattr_in_progress = true;
2209 		sync_req->args.op.sync.length = file->length_flushed;
2210 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2211 				    sizeof(file->length_flushed));
2212 
2213 		pthread_spin_unlock(&file->lock);
2214 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2215 				  0, file->name);
2216 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2217 	} else {
2218 		pthread_spin_unlock(&file->lock);
2219 	}
2220 }
2221 
2222 static void
2223 __file_flush_done(void *ctx, int bserrno)
2224 {
2225 	struct spdk_fs_request *req = ctx;
2226 	struct spdk_fs_cb_args *args = &req->args;
2227 	struct spdk_file *file = args->file;
2228 	struct cache_buffer *next = args->op.flush.cache_buffer;
2229 
2230 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2231 
2232 	pthread_spin_lock(&file->lock);
2233 	next->in_progress = false;
2234 	next->bytes_flushed += args->op.flush.length;
2235 	file->length_flushed += args->op.flush.length;
2236 	if (file->length_flushed > file->length) {
2237 		file->length = file->length_flushed;
2238 	}
2239 	if (next->bytes_flushed == next->buf_size) {
2240 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2241 		next = tree_find_buffer(file->tree, file->length_flushed);
2242 	}
2243 
2244 	/*
2245 	 * Assert that there is no cached data that extends past the end of the underlying
2246 	 *  blob.
2247 	 */
2248 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2249 	       next->bytes_filled == 0);
2250 
2251 	pthread_spin_unlock(&file->lock);
2252 
2253 	__check_sync_reqs(file);
2254 
2255 	__file_flush(req);
2256 }
2257 
2258 static void
2259 __file_flush(void *ctx)
2260 {
2261 	struct spdk_fs_request *req = ctx;
2262 	struct spdk_fs_cb_args *args = &req->args;
2263 	struct spdk_file *file = args->file;
2264 	struct cache_buffer *next;
2265 	uint64_t offset, length, start_lba, num_lba;
2266 	uint32_t lba_size;
2267 
2268 	pthread_spin_lock(&file->lock);
2269 	next = tree_find_buffer(file->tree, file->length_flushed);
2270 	if (next == NULL || next->in_progress ||
2271 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2272 		/*
2273 		 * There is either no data to flush, a flush I/O is already in
2274 		 *  progress, or the next buffer is partially filled but there's no
2275 		 *  outstanding request to sync it.
2276 		 * So return immediately - if a flush I/O is in progress we will flush
2277 		 *  more data after that is completed, or a partial buffer will get flushed
2278 		 *  when it is either filled or the file is synced.
2279 		 */
2280 		free_fs_request(req);
2281 		if (next == NULL) {
2282 			/*
2283 			 * For cases where a file's cache was evicted, and then the
2284 			 *  file was later appended, we will write the data directly
2285 			 *  to disk and bypass cache.  So just update length_flushed
2286 			 *  here to reflect that all data was already written to disk.
2287 			 */
2288 			file->length_flushed = file->append_pos;
2289 		}
2290 		pthread_spin_unlock(&file->lock);
2291 		if (next == NULL) {
2292 			/*
2293 			 * There is no data to flush, but we still need to check for any
2294 			 *  outstanding sync requests to make sure metadata gets updated.
2295 			 */
2296 			__check_sync_reqs(file);
2297 		}
2298 		return;
2299 	}
2300 
2301 	offset = next->offset + next->bytes_flushed;
2302 	length = next->bytes_filled - next->bytes_flushed;
2303 	if (length == 0) {
2304 		free_fs_request(req);
2305 		pthread_spin_unlock(&file->lock);
2306 		/*
2307 		 * There is no data to flush, but we still need to check for any
2308 		 *  outstanding sync requests to make sure metadata gets updated.
2309 		 */
2310 		__check_sync_reqs(file);
2311 		return;
2312 	}
2313 	args->op.flush.length = length;
2314 	args->op.flush.cache_buffer = next;
2315 
2316 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2317 
2318 	next->in_progress = true;
2319 	BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n",
2320 		     offset, length, start_lba, num_lba);
2321 	pthread_spin_unlock(&file->lock);
2322 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2323 			   next->buf + (start_lba * lba_size) - next->offset,
2324 			   start_lba, num_lba, __file_flush_done, req);
2325 }
2326 
2327 static void
2328 __file_extend_done(void *arg, int bserrno)
2329 {
2330 	struct spdk_fs_cb_args *args = arg;
2331 
2332 	__wake_caller(args, bserrno);
2333 }
2334 
2335 static void
2336 __file_extend_resize_cb(void *_args, int bserrno)
2337 {
2338 	struct spdk_fs_cb_args *args = _args;
2339 	struct spdk_file *file = args->file;
2340 
2341 	if (bserrno) {
2342 		__wake_caller(args, bserrno);
2343 		return;
2344 	}
2345 
2346 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2347 }
2348 
2349 static void
2350 __file_extend_blob(void *_args)
2351 {
2352 	struct spdk_fs_cb_args *args = _args;
2353 	struct spdk_file *file = args->file;
2354 
2355 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2356 }
2357 
2358 static void
2359 __rw_from_file_done(void *ctx, int bserrno)
2360 {
2361 	struct spdk_fs_request *req = ctx;
2362 
2363 	__wake_caller(&req->args, bserrno);
2364 	free_fs_request(req);
2365 }
2366 
2367 static void
2368 __rw_from_file(void *ctx)
2369 {
2370 	struct spdk_fs_request *req = ctx;
2371 	struct spdk_fs_cb_args *args = &req->args;
2372 	struct spdk_file *file = args->file;
2373 
2374 	if (args->op.rw.is_read) {
2375 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2376 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2377 				     __rw_from_file_done, req);
2378 	} else {
2379 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2380 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2381 				      __rw_from_file_done, req);
2382 	}
2383 }
2384 
2385 struct rw_from_file_arg {
2386 	struct spdk_fs_channel *channel;
2387 	int rwerrno;
2388 };
2389 
2390 static int
2391 __send_rw_from_file(struct spdk_file *file, void *payload,
2392 		    uint64_t offset, uint64_t length, bool is_read,
2393 		    struct rw_from_file_arg *arg)
2394 {
2395 	struct spdk_fs_request *req;
2396 	struct spdk_fs_cb_args *args;
2397 
2398 	req = alloc_fs_request_with_iov(arg->channel, 1);
2399 	if (req == NULL) {
2400 		sem_post(&arg->channel->sem);
2401 		return -ENOMEM;
2402 	}
2403 
2404 	args = &req->args;
2405 	args->file = file;
2406 	args->sem = &arg->channel->sem;
2407 	args->iovs[0].iov_base = payload;
2408 	args->iovs[0].iov_len = (size_t)length;
2409 	args->op.rw.offset = offset;
2410 	args->op.rw.is_read = is_read;
2411 	args->rwerrno = &arg->rwerrno;
2412 	file->fs->send_request(__rw_from_file, req);
2413 	return 0;
2414 }
2415 
2416 int
2417 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2418 		void *payload, uint64_t offset, uint64_t length)
2419 {
2420 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2421 	struct spdk_fs_request *flush_req;
2422 	uint64_t rem_length, copy, blob_size, cluster_sz;
2423 	uint32_t cache_buffers_filled = 0;
2424 	uint8_t *cur_payload;
2425 	struct cache_buffer *last;
2426 
2427 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2428 
2429 	if (length == 0) {
2430 		return 0;
2431 	}
2432 
2433 	if (offset != file->append_pos) {
2434 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2435 		return -EINVAL;
2436 	}
2437 
2438 	pthread_spin_lock(&file->lock);
2439 	file->open_for_writing = true;
2440 
2441 	do {
2442 		if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2443 			cache_append_buffer(file);
2444 		}
2445 
2446 		if (file->last == NULL) {
2447 			struct rw_from_file_arg arg = {};
2448 			int rc;
2449 
2450 			arg.channel = channel;
2451 			arg.rwerrno = 0;
2452 			file->append_pos += length;
2453 			pthread_spin_unlock(&file->lock);
2454 			rc = __send_rw_from_file(file, payload, offset, length, false, &arg);
2455 			if (rc != 0) {
2456 				return rc;
2457 			}
2458 			sem_wait(&channel->sem);
2459 			return arg.rwerrno;
2460 		}
2461 
2462 		blob_size = __file_get_blob_size(file);
2463 
2464 		if ((offset + length) > blob_size) {
2465 			struct spdk_fs_cb_args extend_args = {};
2466 
2467 			cluster_sz = file->fs->bs_opts.cluster_sz;
2468 			extend_args.sem = &channel->sem;
2469 			extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2470 			extend_args.file = file;
2471 			BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2472 			pthread_spin_unlock(&file->lock);
2473 			file->fs->send_request(__file_extend_blob, &extend_args);
2474 			sem_wait(&channel->sem);
2475 			if (extend_args.rc) {
2476 				return extend_args.rc;
2477 			}
2478 			pthread_spin_lock(&file->lock);
2479 		}
2480 	} while (file->last == NULL);
2481 
2482 	flush_req = alloc_fs_request(channel);
2483 	if (flush_req == NULL) {
2484 		pthread_spin_unlock(&file->lock);
2485 		return -ENOMEM;
2486 	}
2487 
2488 	last = file->last;
2489 	rem_length = length;
2490 	cur_payload = payload;
2491 	while (rem_length > 0) {
2492 		copy = last->buf_size - last->bytes_filled;
2493 		if (copy > rem_length) {
2494 			copy = rem_length;
2495 		}
2496 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2497 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2498 		file->append_pos += copy;
2499 		if (file->length < file->append_pos) {
2500 			file->length = file->append_pos;
2501 		}
2502 		cur_payload += copy;
2503 		last->bytes_filled += copy;
2504 		rem_length -= copy;
2505 		if (last->bytes_filled == last->buf_size) {
2506 			cache_buffers_filled++;
2507 			last = cache_append_buffer(file);
2508 			if (last == NULL) {
2509 				BLOBFS_TRACE(file, "nomem\n");
2510 				free_fs_request(flush_req);
2511 				pthread_spin_unlock(&file->lock);
2512 				return -ENOMEM;
2513 			}
2514 		}
2515 	}
2516 
2517 	pthread_spin_unlock(&file->lock);
2518 
2519 	if (cache_buffers_filled == 0) {
2520 		free_fs_request(flush_req);
2521 		return 0;
2522 	}
2523 
2524 	flush_req->args.file = file;
2525 	file->fs->send_request(__file_flush, flush_req);
2526 	return 0;
2527 }
2528 
2529 static void
2530 __readahead_done(void *ctx, int bserrno)
2531 {
2532 	struct spdk_fs_request *req = ctx;
2533 	struct spdk_fs_cb_args *args = &req->args;
2534 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2535 	struct spdk_file *file = args->file;
2536 
2537 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2538 
2539 	pthread_spin_lock(&file->lock);
2540 	cache_buffer->bytes_filled = args->op.readahead.length;
2541 	cache_buffer->bytes_flushed = args->op.readahead.length;
2542 	cache_buffer->in_progress = false;
2543 	pthread_spin_unlock(&file->lock);
2544 
2545 	free_fs_request(req);
2546 }
2547 
2548 static void
2549 __readahead(void *ctx)
2550 {
2551 	struct spdk_fs_request *req = ctx;
2552 	struct spdk_fs_cb_args *args = &req->args;
2553 	struct spdk_file *file = args->file;
2554 	uint64_t offset, length, start_lba, num_lba;
2555 	uint32_t lba_size;
2556 
2557 	offset = args->op.readahead.offset;
2558 	length = args->op.readahead.length;
2559 	assert(length > 0);
2560 
2561 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2562 
2563 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2564 		     offset, length, start_lba, num_lba);
2565 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2566 			  args->op.readahead.cache_buffer->buf,
2567 			  start_lba, num_lba, __readahead_done, req);
2568 }
2569 
2570 static uint64_t
2571 __next_cache_buffer_offset(uint64_t offset)
2572 {
2573 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2574 }
2575 
2576 static void
2577 check_readahead(struct spdk_file *file, uint64_t offset,
2578 		struct spdk_fs_channel *channel)
2579 {
2580 	struct spdk_fs_request *req;
2581 	struct spdk_fs_cb_args *args;
2582 
2583 	offset = __next_cache_buffer_offset(offset);
2584 	if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2585 		return;
2586 	}
2587 
2588 	req = alloc_fs_request(channel);
2589 	if (req == NULL) {
2590 		return;
2591 	}
2592 	args = &req->args;
2593 
2594 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2595 
2596 	args->file = file;
2597 	args->op.readahead.offset = offset;
2598 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2599 	if (!args->op.readahead.cache_buffer) {
2600 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2601 		free_fs_request(req);
2602 		return;
2603 	}
2604 
2605 	args->op.readahead.cache_buffer->in_progress = true;
2606 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2607 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2608 	} else {
2609 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2610 	}
2611 	file->fs->send_request(__readahead, req);
2612 }
2613 
2614 int64_t
2615 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2616 	       void *payload, uint64_t offset, uint64_t length)
2617 {
2618 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2619 	uint64_t final_offset, final_length;
2620 	uint32_t sub_reads = 0;
2621 	struct cache_buffer *buf;
2622 	uint64_t read_len;
2623 	struct rw_from_file_arg arg = {};
2624 
2625 	pthread_spin_lock(&file->lock);
2626 
2627 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2628 
2629 	file->open_for_writing = false;
2630 
2631 	if (length == 0 || offset >= file->append_pos) {
2632 		pthread_spin_unlock(&file->lock);
2633 		return 0;
2634 	}
2635 
2636 	if (offset + length > file->append_pos) {
2637 		length = file->append_pos - offset;
2638 	}
2639 
2640 	if (offset != file->next_seq_offset) {
2641 		file->seq_byte_count = 0;
2642 	}
2643 	file->seq_byte_count += length;
2644 	file->next_seq_offset = offset + length;
2645 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2646 		check_readahead(file, offset, channel);
2647 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2648 	}
2649 
2650 	arg.channel = channel;
2651 	arg.rwerrno = 0;
2652 	final_length = 0;
2653 	final_offset = offset + length;
2654 	while (offset < final_offset) {
2655 		int ret = 0;
2656 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2657 		if (length > (final_offset - offset)) {
2658 			length = final_offset - offset;
2659 		}
2660 
2661 		buf = tree_find_filled_buffer(file->tree, offset);
2662 		if (buf == NULL) {
2663 			pthread_spin_unlock(&file->lock);
2664 			ret = __send_rw_from_file(file, payload, offset, length, true, &arg);
2665 			pthread_spin_lock(&file->lock);
2666 			if (ret == 0) {
2667 				sub_reads++;
2668 			}
2669 		} else {
2670 			read_len = length;
2671 			if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2672 				read_len = buf->offset + buf->bytes_filled - offset;
2673 			}
2674 			BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len);
2675 			memcpy(payload, &buf->buf[offset - buf->offset], read_len);
2676 			if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) {
2677 				tree_remove_buffer(file->tree, buf);
2678 				if (file->tree->present_mask == 0) {
2679 					spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file);
2680 				}
2681 			}
2682 		}
2683 
2684 		if (ret == 0) {
2685 			final_length += length;
2686 		} else {
2687 			arg.rwerrno = ret;
2688 			break;
2689 		}
2690 		payload += length;
2691 		offset += length;
2692 	}
2693 	pthread_spin_unlock(&file->lock);
2694 	while (sub_reads > 0) {
2695 		sem_wait(&channel->sem);
2696 		sub_reads--;
2697 	}
2698 	if (arg.rwerrno == 0) {
2699 		return final_length;
2700 	} else {
2701 		return arg.rwerrno;
2702 	}
2703 }
2704 
2705 static void
2706 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2707 	   spdk_file_op_complete cb_fn, void *cb_arg)
2708 {
2709 	struct spdk_fs_request *sync_req;
2710 	struct spdk_fs_request *flush_req;
2711 	struct spdk_fs_cb_args *sync_args;
2712 	struct spdk_fs_cb_args *flush_args;
2713 
2714 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2715 
2716 	pthread_spin_lock(&file->lock);
2717 	if (file->append_pos <= file->length_xattr) {
2718 		BLOBFS_TRACE(file, "done - file already synced\n");
2719 		pthread_spin_unlock(&file->lock);
2720 		cb_fn(cb_arg, 0);
2721 		return;
2722 	}
2723 
2724 	sync_req = alloc_fs_request(channel);
2725 	if (!sync_req) {
2726 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2727 		pthread_spin_unlock(&file->lock);
2728 		cb_fn(cb_arg, -ENOMEM);
2729 		return;
2730 	}
2731 	sync_args = &sync_req->args;
2732 
2733 	flush_req = alloc_fs_request(channel);
2734 	if (!flush_req) {
2735 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2736 		free_fs_request(sync_req);
2737 		pthread_spin_unlock(&file->lock);
2738 		cb_fn(cb_arg, -ENOMEM);
2739 		return;
2740 	}
2741 	flush_args = &flush_req->args;
2742 
2743 	sync_args->file = file;
2744 	sync_args->fn.file_op = cb_fn;
2745 	sync_args->arg = cb_arg;
2746 	sync_args->op.sync.offset = file->append_pos;
2747 	sync_args->op.sync.xattr_in_progress = false;
2748 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2749 	pthread_spin_unlock(&file->lock);
2750 
2751 	flush_args->file = file;
2752 	channel->send_request(__file_flush, flush_req);
2753 }
2754 
2755 int
2756 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2757 {
2758 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2759 	struct spdk_fs_cb_args args = {};
2760 
2761 	args.sem = &channel->sem;
2762 	_file_sync(file, channel, __wake_caller, &args);
2763 	sem_wait(&channel->sem);
2764 
2765 	return args.rc;
2766 }
2767 
2768 void
2769 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2770 		     spdk_file_op_complete cb_fn, void *cb_arg)
2771 {
2772 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2773 
2774 	_file_sync(file, channel, cb_fn, cb_arg);
2775 }
2776 
2777 void
2778 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2779 {
2780 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2781 	file->priority = priority;
2782 
2783 }
2784 
2785 /*
2786  * Close routines
2787  */
2788 
2789 static void
2790 __file_close_async_done(void *ctx, int bserrno)
2791 {
2792 	struct spdk_fs_request *req = ctx;
2793 	struct spdk_fs_cb_args *args = &req->args;
2794 	struct spdk_file *file = args->file;
2795 
2796 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name);
2797 
2798 	if (file->is_deleted) {
2799 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2800 		return;
2801 	}
2802 
2803 	args->fn.file_op(args->arg, bserrno);
2804 	free_fs_request(req);
2805 }
2806 
2807 static void
2808 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2809 {
2810 	struct spdk_blob *blob;
2811 
2812 	pthread_spin_lock(&file->lock);
2813 	if (file->ref_count == 0) {
2814 		pthread_spin_unlock(&file->lock);
2815 		__file_close_async_done(req, -EBADF);
2816 		return;
2817 	}
2818 
2819 	file->ref_count--;
2820 	if (file->ref_count > 0) {
2821 		pthread_spin_unlock(&file->lock);
2822 		req->args.fn.file_op(req->args.arg, 0);
2823 		free_fs_request(req);
2824 		return;
2825 	}
2826 
2827 	pthread_spin_unlock(&file->lock);
2828 
2829 	blob = file->blob;
2830 	file->blob = NULL;
2831 	spdk_blob_close(blob, __file_close_async_done, req);
2832 }
2833 
2834 static void
2835 __file_close_async__sync_done(void *arg, int fserrno)
2836 {
2837 	struct spdk_fs_request *req = arg;
2838 	struct spdk_fs_cb_args *args = &req->args;
2839 
2840 	__file_close_async(args->file, req);
2841 }
2842 
2843 void
2844 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2845 {
2846 	struct spdk_fs_request *req;
2847 	struct spdk_fs_cb_args *args;
2848 
2849 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2850 	if (req == NULL) {
2851 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2852 		cb_fn(cb_arg, -ENOMEM);
2853 		return;
2854 	}
2855 
2856 	args = &req->args;
2857 	args->file = file;
2858 	args->fn.file_op = cb_fn;
2859 	args->arg = cb_arg;
2860 
2861 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2862 }
2863 
2864 static void
2865 __file_close(void *arg)
2866 {
2867 	struct spdk_fs_request *req = arg;
2868 	struct spdk_fs_cb_args *args = &req->args;
2869 	struct spdk_file *file = args->file;
2870 
2871 	__file_close_async(file, req);
2872 }
2873 
2874 int
2875 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2876 {
2877 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2878 	struct spdk_fs_request *req;
2879 	struct spdk_fs_cb_args *args;
2880 
2881 	req = alloc_fs_request(channel);
2882 	if (req == NULL) {
2883 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2884 		return -ENOMEM;
2885 	}
2886 
2887 	args = &req->args;
2888 
2889 	spdk_file_sync(file, ctx);
2890 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2891 	args->file = file;
2892 	args->sem = &channel->sem;
2893 	args->fn.file_op = __wake_caller;
2894 	args->arg = args;
2895 	channel->send_request(__file_close, req);
2896 	sem_wait(&channel->sem);
2897 
2898 	return args->rc;
2899 }
2900 
2901 int
2902 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2903 {
2904 	if (size < sizeof(spdk_blob_id)) {
2905 		return -EINVAL;
2906 	}
2907 
2908 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2909 
2910 	return sizeof(spdk_blob_id);
2911 }
2912 
2913 static void
2914 _file_free(void *ctx)
2915 {
2916 	struct spdk_file *file = ctx;
2917 
2918 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2919 
2920 	free(file->name);
2921 	free(file->tree);
2922 	free(file);
2923 }
2924 
2925 static void
2926 file_free(struct spdk_file *file)
2927 {
2928 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2929 	pthread_spin_lock(&file->lock);
2930 	if (file->tree->present_mask == 0) {
2931 		pthread_spin_unlock(&file->lock);
2932 		free(file->name);
2933 		free(file->tree);
2934 		free(file);
2935 		return;
2936 	}
2937 
2938 	tree_free_buffers(file->tree);
2939 	assert(file->tree->present_mask == 0);
2940 	spdk_thread_send_msg(g_cache_pool_thread, _file_free, file);
2941 	pthread_spin_unlock(&file->lock);
2942 }
2943 
2944 SPDK_LOG_REGISTER_COMPONENT(blobfs)
2945 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw)
2946