xref: /spdk/lib/blobfs/blobfs.c (revision d270cd36ad0d0c1bde49d8ca24acbddbfbb246a8)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "tree.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/thread.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 #include "spdk/trace.h"
47 
48 #define BLOBFS_TRACE(file, str, args...) \
49 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args)
50 
51 #define BLOBFS_TRACE_RW(file, str, args...) \
52 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args)
53 
54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024)
56 
57 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE;
58 static struct spdk_mempool *g_cache_pool;
59 static TAILQ_HEAD(, spdk_file) g_caches;
60 static int g_fs_count = 0;
61 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_spinlock_t g_caches_lock;
63 
64 #define TRACE_GROUP_BLOBFS	0x7
65 #define TRACE_BLOBFS_XATTR_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0)
66 #define TRACE_BLOBFS_XATTR_END		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1)
67 #define TRACE_BLOBFS_OPEN		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2)
68 #define TRACE_BLOBFS_CLOSE		SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3)
69 #define TRACE_BLOBFS_DELETE_START	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4)
70 #define TRACE_BLOBFS_DELETE_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5)
71 
72 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS)
73 {
74 	spdk_trace_register_description("BLOBFS_XATTR_START",
75 					TRACE_BLOBFS_XATTR_START,
76 					OWNER_NONE, OBJECT_NONE, 0,
77 					SPDK_TRACE_ARG_TYPE_STR,
78 					"file:    ");
79 	spdk_trace_register_description("BLOBFS_XATTR_END",
80 					TRACE_BLOBFS_XATTR_END,
81 					OWNER_NONE, OBJECT_NONE, 0,
82 					SPDK_TRACE_ARG_TYPE_STR,
83 					"file:    ");
84 	spdk_trace_register_description("BLOBFS_OPEN",
85 					TRACE_BLOBFS_OPEN,
86 					OWNER_NONE, OBJECT_NONE, 0,
87 					SPDK_TRACE_ARG_TYPE_STR,
88 					"file:    ");
89 	spdk_trace_register_description("BLOBFS_CLOSE",
90 					TRACE_BLOBFS_CLOSE,
91 					OWNER_NONE, OBJECT_NONE, 0,
92 					SPDK_TRACE_ARG_TYPE_STR,
93 					"file:    ");
94 	spdk_trace_register_description("BLOBFS_DELETE_START",
95 					TRACE_BLOBFS_DELETE_START,
96 					OWNER_NONE, OBJECT_NONE, 0,
97 					SPDK_TRACE_ARG_TYPE_STR,
98 					"file:    ");
99 	spdk_trace_register_description("BLOBFS_DELETE_DONE",
100 					TRACE_BLOBFS_DELETE_DONE,
101 					OWNER_NONE, OBJECT_NONE, 0,
102 					SPDK_TRACE_ARG_TYPE_STR,
103 					"file:    ");
104 }
105 
106 void
107 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
108 {
109 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
110 	free(cache_buffer);
111 }
112 
113 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
114 
115 struct spdk_file {
116 	struct spdk_filesystem	*fs;
117 	struct spdk_blob	*blob;
118 	char			*name;
119 	uint64_t		trace_arg_name;
120 	uint64_t		length;
121 	bool                    is_deleted;
122 	bool			open_for_writing;
123 	uint64_t		length_flushed;
124 	uint64_t		length_xattr;
125 	uint64_t		append_pos;
126 	uint64_t		seq_byte_count;
127 	uint64_t		next_seq_offset;
128 	uint32_t		priority;
129 	TAILQ_ENTRY(spdk_file)	tailq;
130 	spdk_blob_id		blobid;
131 	uint32_t		ref_count;
132 	pthread_spinlock_t	lock;
133 	struct cache_buffer	*last;
134 	struct cache_tree	*tree;
135 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
136 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
137 	TAILQ_ENTRY(spdk_file)	cache_tailq;
138 };
139 
140 struct spdk_deleted_file {
141 	spdk_blob_id	id;
142 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
143 };
144 
145 struct spdk_filesystem {
146 	struct spdk_blob_store	*bs;
147 	TAILQ_HEAD(, spdk_file)	files;
148 	struct spdk_bs_opts	bs_opts;
149 	struct spdk_bs_dev	*bdev;
150 	fs_send_request_fn	send_request;
151 
152 	struct {
153 		uint32_t		max_ops;
154 		struct spdk_io_channel	*sync_io_channel;
155 		struct spdk_fs_channel	*sync_fs_channel;
156 	} sync_target;
157 
158 	struct {
159 		uint32_t		max_ops;
160 		struct spdk_io_channel	*md_io_channel;
161 		struct spdk_fs_channel	*md_fs_channel;
162 	} md_target;
163 
164 	struct {
165 		uint32_t		max_ops;
166 	} io_target;
167 };
168 
169 struct spdk_fs_cb_args {
170 	union {
171 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
172 		spdk_fs_op_complete			fs_op;
173 		spdk_file_op_with_handle_complete	file_op_with_handle;
174 		spdk_file_op_complete			file_op;
175 		spdk_file_stat_op_complete		stat_op;
176 	} fn;
177 	void *arg;
178 	sem_t *sem;
179 	struct spdk_filesystem *fs;
180 	struct spdk_file *file;
181 	int rc;
182 	struct iovec *iovs;
183 	uint32_t iovcnt;
184 	struct iovec iov;
185 	union {
186 		struct {
187 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
188 		} fs_load;
189 		struct {
190 			uint64_t	length;
191 		} truncate;
192 		struct {
193 			struct spdk_io_channel	*channel;
194 			void		*pin_buf;
195 			int		is_read;
196 			off_t		offset;
197 			size_t		length;
198 			uint64_t	start_lba;
199 			uint64_t	num_lba;
200 			uint32_t	blocklen;
201 		} rw;
202 		struct {
203 			const char	*old_name;
204 			const char	*new_name;
205 		} rename;
206 		struct {
207 			struct cache_buffer	*cache_buffer;
208 			uint64_t		length;
209 		} flush;
210 		struct {
211 			struct cache_buffer	*cache_buffer;
212 			uint64_t		length;
213 			uint64_t		offset;
214 		} readahead;
215 		struct {
216 			/* offset of the file when the sync request was made */
217 			uint64_t			offset;
218 			TAILQ_ENTRY(spdk_fs_request)	tailq;
219 			bool				xattr_in_progress;
220 			/* length written to the xattr for this file - this should
221 			 * always be the same as the offset if only one thread is
222 			 * writing to the file, but could differ if multiple threads
223 			 * are appending
224 			 */
225 			uint64_t			length;
226 		} sync;
227 		struct {
228 			uint32_t			num_clusters;
229 		} resize;
230 		struct {
231 			const char	*name;
232 			uint32_t	flags;
233 			TAILQ_ENTRY(spdk_fs_request)	tailq;
234 		} open;
235 		struct {
236 			const char		*name;
237 			struct spdk_blob	*blob;
238 		} create;
239 		struct {
240 			const char	*name;
241 		} delete;
242 		struct {
243 			const char	*name;
244 		} stat;
245 	} op;
246 };
247 
248 static void cache_free_buffers(struct spdk_file *file);
249 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs);
250 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs);
251 
252 void
253 spdk_fs_opts_init(struct spdk_blobfs_opts *opts)
254 {
255 	opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ;
256 }
257 
258 static void
259 __initialize_cache(void)
260 {
261 	assert(g_cache_pool == NULL);
262 
263 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
264 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
265 					   CACHE_BUFFER_SIZE,
266 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
267 					   SPDK_ENV_SOCKET_ID_ANY);
268 	if (!g_cache_pool) {
269 		SPDK_ERRLOG("Create mempool failed, you may "
270 			    "increase the memory and try again\n");
271 		assert(false);
272 	}
273 	TAILQ_INIT(&g_caches);
274 	pthread_spin_init(&g_caches_lock, 0);
275 }
276 
277 static void
278 __free_cache(void)
279 {
280 	assert(g_cache_pool != NULL);
281 
282 	spdk_mempool_free(g_cache_pool);
283 	g_cache_pool = NULL;
284 }
285 
286 static uint64_t
287 __file_get_blob_size(struct spdk_file *file)
288 {
289 	uint64_t cluster_sz;
290 
291 	cluster_sz = file->fs->bs_opts.cluster_sz;
292 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
293 }
294 
295 struct spdk_fs_request {
296 	struct spdk_fs_cb_args		args;
297 	TAILQ_ENTRY(spdk_fs_request)	link;
298 	struct spdk_fs_channel		*channel;
299 };
300 
301 struct spdk_fs_channel {
302 	struct spdk_fs_request		*req_mem;
303 	TAILQ_HEAD(, spdk_fs_request)	reqs;
304 	sem_t				sem;
305 	struct spdk_filesystem		*fs;
306 	struct spdk_io_channel		*bs_channel;
307 	fs_send_request_fn		send_request;
308 	bool				sync;
309 	uint32_t			outstanding_reqs;
310 	pthread_spinlock_t		lock;
311 };
312 
313 /* For now, this is effectively an alias. But eventually we'll shift
314  * some data members over. */
315 struct spdk_fs_thread_ctx {
316 	struct spdk_fs_channel	ch;
317 };
318 
319 static struct spdk_fs_request *
320 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt)
321 {
322 	struct spdk_fs_request *req;
323 	struct iovec *iovs = NULL;
324 
325 	if (iovcnt > 1) {
326 		iovs = calloc(iovcnt, sizeof(struct iovec));
327 		if (!iovs) {
328 			return NULL;
329 		}
330 	}
331 
332 	if (channel->sync) {
333 		pthread_spin_lock(&channel->lock);
334 	}
335 
336 	req = TAILQ_FIRST(&channel->reqs);
337 	if (req) {
338 		channel->outstanding_reqs++;
339 		TAILQ_REMOVE(&channel->reqs, req, link);
340 	}
341 
342 	if (channel->sync) {
343 		pthread_spin_unlock(&channel->lock);
344 	}
345 
346 	if (req == NULL) {
347 		SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel);
348 		free(iovs);
349 		return NULL;
350 	}
351 	memset(req, 0, sizeof(*req));
352 	req->channel = channel;
353 	if (iovcnt > 1) {
354 		req->args.iovs = iovs;
355 	} else {
356 		req->args.iovs = &req->args.iov;
357 	}
358 	req->args.iovcnt = iovcnt;
359 
360 	return req;
361 }
362 
363 static struct spdk_fs_request *
364 alloc_fs_request(struct spdk_fs_channel *channel)
365 {
366 	return alloc_fs_request_with_iov(channel, 0);
367 }
368 
369 static void
370 free_fs_request(struct spdk_fs_request *req)
371 {
372 	struct spdk_fs_channel *channel = req->channel;
373 
374 	if (req->args.iovcnt > 1) {
375 		free(req->args.iovs);
376 	}
377 
378 	if (channel->sync) {
379 		pthread_spin_lock(&channel->lock);
380 	}
381 
382 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
383 	channel->outstanding_reqs--;
384 
385 	if (channel->sync) {
386 		pthread_spin_unlock(&channel->lock);
387 	}
388 }
389 
390 static int
391 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
392 			uint32_t max_ops)
393 {
394 	uint32_t i;
395 
396 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
397 	if (!channel->req_mem) {
398 		return -1;
399 	}
400 
401 	channel->outstanding_reqs = 0;
402 	TAILQ_INIT(&channel->reqs);
403 	sem_init(&channel->sem, 0, 0);
404 
405 	for (i = 0; i < max_ops; i++) {
406 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
407 	}
408 
409 	channel->fs = fs;
410 
411 	return 0;
412 }
413 
414 static int
415 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
416 {
417 	struct spdk_filesystem		*fs;
418 	struct spdk_fs_channel		*channel = ctx_buf;
419 
420 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
421 
422 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
423 }
424 
425 static int
426 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
427 {
428 	struct spdk_filesystem		*fs;
429 	struct spdk_fs_channel		*channel = ctx_buf;
430 
431 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
432 
433 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
434 }
435 
436 static int
437 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
438 {
439 	struct spdk_filesystem		*fs;
440 	struct spdk_fs_channel		*channel = ctx_buf;
441 
442 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
443 
444 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
445 }
446 
447 static void
448 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
449 {
450 	struct spdk_fs_channel *channel = ctx_buf;
451 
452 	if (channel->outstanding_reqs > 0) {
453 		SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n",
454 			    channel->outstanding_reqs);
455 	}
456 
457 	free(channel->req_mem);
458 	if (channel->bs_channel != NULL) {
459 		spdk_bs_free_io_channel(channel->bs_channel);
460 	}
461 }
462 
463 static void
464 __send_request_direct(fs_request_fn fn, void *arg)
465 {
466 	fn(arg);
467 }
468 
469 static void
470 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
471 {
472 	fs->bs = bs;
473 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
474 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
475 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
476 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
477 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
478 
479 	pthread_mutex_lock(&g_cache_init_lock);
480 	if (g_fs_count == 0) {
481 		__initialize_cache();
482 	}
483 	g_fs_count++;
484 	pthread_mutex_unlock(&g_cache_init_lock);
485 }
486 
487 static void
488 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
489 {
490 	struct spdk_fs_request *req = ctx;
491 	struct spdk_fs_cb_args *args = &req->args;
492 	struct spdk_filesystem *fs = args->fs;
493 
494 	if (bserrno == 0) {
495 		common_fs_bs_init(fs, bs);
496 	} else {
497 		free(fs);
498 		fs = NULL;
499 	}
500 
501 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
502 	free_fs_request(req);
503 }
504 
505 static void
506 fs_conf_parse(void)
507 {
508 	struct spdk_conf_section *sp;
509 
510 	sp = spdk_conf_find_section(NULL, "Blobfs");
511 	if (sp == NULL) {
512 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
513 		return;
514 	}
515 
516 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
517 	if (g_fs_cache_buffer_shift <= 0) {
518 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
519 	}
520 }
521 
522 static struct spdk_filesystem *
523 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
524 {
525 	struct spdk_filesystem *fs;
526 
527 	fs = calloc(1, sizeof(*fs));
528 	if (fs == NULL) {
529 		return NULL;
530 	}
531 
532 	fs->bdev = dev;
533 	fs->send_request = send_request_fn;
534 	TAILQ_INIT(&fs->files);
535 
536 	fs->md_target.max_ops = 512;
537 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
538 				sizeof(struct spdk_fs_channel), "blobfs_md");
539 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
540 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
541 
542 	fs->sync_target.max_ops = 512;
543 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
544 				sizeof(struct spdk_fs_channel), "blobfs_sync");
545 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
546 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
547 
548 	fs->io_target.max_ops = 512;
549 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
550 				sizeof(struct spdk_fs_channel), "blobfs_io");
551 
552 	return fs;
553 }
554 
555 static void
556 __wake_caller(void *arg, int fserrno)
557 {
558 	struct spdk_fs_cb_args *args = arg;
559 
560 	args->rc = fserrno;
561 	sem_post(args->sem);
562 }
563 
564 void
565 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt,
566 	     fs_send_request_fn send_request_fn,
567 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
568 {
569 	struct spdk_filesystem *fs;
570 	struct spdk_fs_request *req;
571 	struct spdk_fs_cb_args *args;
572 	struct spdk_bs_opts opts = {};
573 
574 	fs = fs_alloc(dev, send_request_fn);
575 	if (fs == NULL) {
576 		cb_fn(cb_arg, NULL, -ENOMEM);
577 		return;
578 	}
579 
580 	fs_conf_parse();
581 
582 	req = alloc_fs_request(fs->md_target.md_fs_channel);
583 	if (req == NULL) {
584 		spdk_fs_free_io_channels(fs);
585 		spdk_fs_io_device_unregister(fs);
586 		cb_fn(cb_arg, NULL, -ENOMEM);
587 		return;
588 	}
589 
590 	args = &req->args;
591 	args->fn.fs_op_with_handle = cb_fn;
592 	args->arg = cb_arg;
593 	args->fs = fs;
594 
595 	spdk_bs_opts_init(&opts);
596 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS");
597 	if (opt) {
598 		opts.cluster_sz = opt->cluster_sz;
599 	}
600 	spdk_bs_init(dev, &opts, init_cb, req);
601 }
602 
603 static struct spdk_file *
604 file_alloc(struct spdk_filesystem *fs)
605 {
606 	struct spdk_file *file;
607 
608 	file = calloc(1, sizeof(*file));
609 	if (file == NULL) {
610 		return NULL;
611 	}
612 
613 	file->tree = calloc(1, sizeof(*file->tree));
614 	if (file->tree == NULL) {
615 		free(file);
616 		return NULL;
617 	}
618 
619 	file->fs = fs;
620 	TAILQ_INIT(&file->open_requests);
621 	TAILQ_INIT(&file->sync_requests);
622 	pthread_spin_init(&file->lock, 0);
623 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
624 	file->priority = SPDK_FILE_PRIORITY_LOW;
625 	return file;
626 }
627 
628 static void fs_load_done(void *ctx, int bserrno);
629 
630 static int
631 _handle_deleted_files(struct spdk_fs_request *req)
632 {
633 	struct spdk_fs_cb_args *args = &req->args;
634 	struct spdk_filesystem *fs = args->fs;
635 
636 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
637 		struct spdk_deleted_file *deleted_file;
638 
639 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
640 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
641 		spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req);
642 		free(deleted_file);
643 		return 0;
644 	}
645 
646 	return 1;
647 }
648 
649 static void
650 fs_load_done(void *ctx, int bserrno)
651 {
652 	struct spdk_fs_request *req = ctx;
653 	struct spdk_fs_cb_args *args = &req->args;
654 	struct spdk_filesystem *fs = args->fs;
655 
656 	/* The filesystem has been loaded.  Now check if there are any files that
657 	 *  were marked for deletion before last unload.  Do not complete the
658 	 *  fs_load callback until all of them have been deleted on disk.
659 	 */
660 	if (_handle_deleted_files(req) == 0) {
661 		/* We found a file that's been marked for deleting but not actually
662 		 *  deleted yet.  This function will get called again once the delete
663 		 *  operation is completed.
664 		 */
665 		return;
666 	}
667 
668 	args->fn.fs_op_with_handle(args->arg, fs, 0);
669 	free_fs_request(req);
670 
671 }
672 
673 static void
674 _file_build_trace_arg_name(struct spdk_file *f)
675 {
676 	f->trace_arg_name = 0;
677 	memcpy(&f->trace_arg_name, f->name,
678 	       spdk_min(sizeof(f->trace_arg_name), strlen(f->name)));
679 }
680 
681 static void
682 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
683 {
684 	struct spdk_fs_request *req = ctx;
685 	struct spdk_fs_cb_args *args = &req->args;
686 	struct spdk_filesystem *fs = args->fs;
687 	uint64_t *length;
688 	const char *name;
689 	uint32_t *is_deleted;
690 	size_t value_len;
691 
692 	if (rc < 0) {
693 		args->fn.fs_op_with_handle(args->arg, fs, rc);
694 		free_fs_request(req);
695 		return;
696 	}
697 
698 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len);
699 	if (rc < 0) {
700 		args->fn.fs_op_with_handle(args->arg, fs, rc);
701 		free_fs_request(req);
702 		return;
703 	}
704 
705 	rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len);
706 	if (rc < 0) {
707 		args->fn.fs_op_with_handle(args->arg, fs, rc);
708 		free_fs_request(req);
709 		return;
710 	}
711 
712 	assert(value_len == 8);
713 
714 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
715 	rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
716 	if (rc < 0) {
717 		struct spdk_file *f;
718 
719 		f = file_alloc(fs);
720 		if (f == NULL) {
721 			SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n");
722 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
723 			free_fs_request(req);
724 			return;
725 		}
726 
727 		f->name = strdup(name);
728 		_file_build_trace_arg_name(f);
729 		f->blobid = spdk_blob_get_id(blob);
730 		f->length = *length;
731 		f->length_flushed = *length;
732 		f->length_xattr = *length;
733 		f->append_pos = *length;
734 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
735 	} else {
736 		struct spdk_deleted_file *deleted_file;
737 
738 		deleted_file = calloc(1, sizeof(*deleted_file));
739 		if (deleted_file == NULL) {
740 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
741 			free_fs_request(req);
742 			return;
743 		}
744 		deleted_file->id = spdk_blob_get_id(blob);
745 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
746 	}
747 }
748 
749 static void
750 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
751 {
752 	struct spdk_fs_request *req = ctx;
753 	struct spdk_fs_cb_args *args = &req->args;
754 	struct spdk_filesystem *fs = args->fs;
755 	struct spdk_bs_type bstype;
756 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
757 	static const struct spdk_bs_type zeros;
758 
759 	if (bserrno != 0) {
760 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
761 		free_fs_request(req);
762 		free(fs);
763 		return;
764 	}
765 
766 	bstype = spdk_bs_get_bstype(bs);
767 
768 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
769 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n");
770 		spdk_bs_set_bstype(bs, blobfs_type);
771 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
772 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n");
773 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype));
774 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
775 		free_fs_request(req);
776 		free(fs);
777 		return;
778 	}
779 
780 	common_fs_bs_init(fs, bs);
781 	fs_load_done(req, 0);
782 }
783 
784 static void
785 spdk_fs_io_device_unregister(struct spdk_filesystem *fs)
786 {
787 	assert(fs != NULL);
788 	spdk_io_device_unregister(&fs->md_target, NULL);
789 	spdk_io_device_unregister(&fs->sync_target, NULL);
790 	spdk_io_device_unregister(&fs->io_target, NULL);
791 	free(fs);
792 }
793 
794 static void
795 spdk_fs_free_io_channels(struct spdk_filesystem *fs)
796 {
797 	assert(fs != NULL);
798 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
799 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
800 }
801 
802 void
803 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
804 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
805 {
806 	struct spdk_filesystem *fs;
807 	struct spdk_fs_cb_args *args;
808 	struct spdk_fs_request *req;
809 	struct spdk_bs_opts	bs_opts;
810 
811 	fs = fs_alloc(dev, send_request_fn);
812 	if (fs == NULL) {
813 		cb_fn(cb_arg, NULL, -ENOMEM);
814 		return;
815 	}
816 
817 	fs_conf_parse();
818 
819 	req = alloc_fs_request(fs->md_target.md_fs_channel);
820 	if (req == NULL) {
821 		spdk_fs_free_io_channels(fs);
822 		spdk_fs_io_device_unregister(fs);
823 		cb_fn(cb_arg, NULL, -ENOMEM);
824 		return;
825 	}
826 
827 	args = &req->args;
828 	args->fn.fs_op_with_handle = cb_fn;
829 	args->arg = cb_arg;
830 	args->fs = fs;
831 	TAILQ_INIT(&args->op.fs_load.deleted_files);
832 	spdk_bs_opts_init(&bs_opts);
833 	bs_opts.iter_cb_fn = iter_cb;
834 	bs_opts.iter_cb_arg = req;
835 	spdk_bs_load(dev, &bs_opts, load_cb, req);
836 }
837 
838 static void
839 unload_cb(void *ctx, int bserrno)
840 {
841 	struct spdk_fs_request *req = ctx;
842 	struct spdk_fs_cb_args *args = &req->args;
843 	struct spdk_filesystem *fs = args->fs;
844 	struct spdk_file *file, *tmp;
845 
846 	TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) {
847 		TAILQ_REMOVE(&fs->files, file, tailq);
848 		cache_free_buffers(file);
849 		free(file->name);
850 		free(file->tree);
851 		free(file);
852 	}
853 
854 	pthread_mutex_lock(&g_cache_init_lock);
855 	g_fs_count--;
856 	if (g_fs_count == 0) {
857 		__free_cache();
858 	}
859 	pthread_mutex_unlock(&g_cache_init_lock);
860 
861 	args->fn.fs_op(args->arg, bserrno);
862 	free(req);
863 
864 	spdk_fs_io_device_unregister(fs);
865 }
866 
867 void
868 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
869 {
870 	struct spdk_fs_request *req;
871 	struct spdk_fs_cb_args *args;
872 
873 	/*
874 	 * We must free the md_channel before unloading the blobstore, so just
875 	 *  allocate this request from the general heap.
876 	 */
877 	req = calloc(1, sizeof(*req));
878 	if (req == NULL) {
879 		cb_fn(cb_arg, -ENOMEM);
880 		return;
881 	}
882 
883 	args = &req->args;
884 	args->fn.fs_op = cb_fn;
885 	args->arg = cb_arg;
886 	args->fs = fs;
887 
888 	spdk_fs_free_io_channels(fs);
889 	spdk_bs_unload(fs->bs, unload_cb, req);
890 }
891 
892 static struct spdk_file *
893 fs_find_file(struct spdk_filesystem *fs, const char *name)
894 {
895 	struct spdk_file *file;
896 
897 	TAILQ_FOREACH(file, &fs->files, tailq) {
898 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
899 			return file;
900 		}
901 	}
902 
903 	return NULL;
904 }
905 
906 void
907 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
908 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
909 {
910 	struct spdk_file_stat stat;
911 	struct spdk_file *f = NULL;
912 
913 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
914 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
915 		return;
916 	}
917 
918 	f = fs_find_file(fs, name);
919 	if (f != NULL) {
920 		stat.blobid = f->blobid;
921 		stat.size = f->append_pos >= f->length ? f->append_pos : f->length;
922 		cb_fn(cb_arg, &stat, 0);
923 		return;
924 	}
925 
926 	cb_fn(cb_arg, NULL, -ENOENT);
927 }
928 
929 static void
930 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
931 {
932 	struct spdk_fs_request *req = arg;
933 	struct spdk_fs_cb_args *args = &req->args;
934 
935 	args->rc = fserrno;
936 	if (fserrno == 0) {
937 		memcpy(args->arg, stat, sizeof(*stat));
938 	}
939 	sem_post(args->sem);
940 }
941 
942 static void
943 __file_stat(void *arg)
944 {
945 	struct spdk_fs_request *req = arg;
946 	struct spdk_fs_cb_args *args = &req->args;
947 
948 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
949 				args->fn.stat_op, req);
950 }
951 
952 int
953 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
954 		  const char *name, struct spdk_file_stat *stat)
955 {
956 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
957 	struct spdk_fs_request *req;
958 	int rc;
959 
960 	req = alloc_fs_request(channel);
961 	if (req == NULL) {
962 		SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name);
963 		return -ENOMEM;
964 	}
965 
966 	req->args.fs = fs;
967 	req->args.op.stat.name = name;
968 	req->args.fn.stat_op = __copy_stat;
969 	req->args.arg = stat;
970 	req->args.sem = &channel->sem;
971 	channel->send_request(__file_stat, req);
972 	sem_wait(&channel->sem);
973 
974 	rc = req->args.rc;
975 	free_fs_request(req);
976 
977 	return rc;
978 }
979 
980 static void
981 fs_create_blob_close_cb(void *ctx, int bserrno)
982 {
983 	int rc;
984 	struct spdk_fs_request *req = ctx;
985 	struct spdk_fs_cb_args *args = &req->args;
986 
987 	rc = args->rc ? args->rc : bserrno;
988 	args->fn.file_op(args->arg, rc);
989 	free_fs_request(req);
990 }
991 
992 static void
993 fs_create_blob_resize_cb(void *ctx, int bserrno)
994 {
995 	struct spdk_fs_request *req = ctx;
996 	struct spdk_fs_cb_args *args = &req->args;
997 	struct spdk_file *f = args->file;
998 	struct spdk_blob *blob = args->op.create.blob;
999 	uint64_t length = 0;
1000 
1001 	args->rc = bserrno;
1002 	if (bserrno) {
1003 		spdk_blob_close(blob, fs_create_blob_close_cb, args);
1004 		return;
1005 	}
1006 
1007 	spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
1008 	spdk_blob_set_xattr(blob, "length", &length, sizeof(length));
1009 
1010 	spdk_blob_close(blob, fs_create_blob_close_cb, args);
1011 }
1012 
1013 static void
1014 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1015 {
1016 	struct spdk_fs_request *req = ctx;
1017 	struct spdk_fs_cb_args *args = &req->args;
1018 
1019 	if (bserrno) {
1020 		args->fn.file_op(args->arg, bserrno);
1021 		free_fs_request(req);
1022 		return;
1023 	}
1024 
1025 	args->op.create.blob = blob;
1026 	spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req);
1027 }
1028 
1029 static void
1030 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
1031 {
1032 	struct spdk_fs_request *req = ctx;
1033 	struct spdk_fs_cb_args *args = &req->args;
1034 	struct spdk_file *f = args->file;
1035 
1036 	if (bserrno) {
1037 		args->fn.file_op(args->arg, bserrno);
1038 		free_fs_request(req);
1039 		return;
1040 	}
1041 
1042 	f->blobid = blobid;
1043 	spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
1044 }
1045 
1046 void
1047 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
1048 			  spdk_file_op_complete cb_fn, void *cb_arg)
1049 {
1050 	struct spdk_file *file;
1051 	struct spdk_fs_request *req;
1052 	struct spdk_fs_cb_args *args;
1053 
1054 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1055 		cb_fn(cb_arg, -ENAMETOOLONG);
1056 		return;
1057 	}
1058 
1059 	file = fs_find_file(fs, name);
1060 	if (file != NULL) {
1061 		cb_fn(cb_arg, -EEXIST);
1062 		return;
1063 	}
1064 
1065 	file = file_alloc(fs);
1066 	if (file == NULL) {
1067 		SPDK_ERRLOG("Cannot allocate new file for creation\n");
1068 		cb_fn(cb_arg, -ENOMEM);
1069 		return;
1070 	}
1071 
1072 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1073 	if (req == NULL) {
1074 		SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name);
1075 		cb_fn(cb_arg, -ENOMEM);
1076 		return;
1077 	}
1078 
1079 	args = &req->args;
1080 	args->file = file;
1081 	args->fn.file_op = cb_fn;
1082 	args->arg = cb_arg;
1083 
1084 	file->name = strdup(name);
1085 	_file_build_trace_arg_name(file);
1086 	spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args);
1087 }
1088 
1089 static void
1090 __fs_create_file_done(void *arg, int fserrno)
1091 {
1092 	struct spdk_fs_request *req = arg;
1093 	struct spdk_fs_cb_args *args = &req->args;
1094 
1095 	args->rc = fserrno;
1096 	sem_post(args->sem);
1097 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1098 }
1099 
1100 static void
1101 __fs_create_file(void *arg)
1102 {
1103 	struct spdk_fs_request *req = arg;
1104 	struct spdk_fs_cb_args *args = &req->args;
1105 
1106 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name);
1107 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
1108 }
1109 
1110 int
1111 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name)
1112 {
1113 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1114 	struct spdk_fs_request *req;
1115 	struct spdk_fs_cb_args *args;
1116 	int rc;
1117 
1118 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1119 
1120 	req = alloc_fs_request(channel);
1121 	if (req == NULL) {
1122 		SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name);
1123 		return -ENOMEM;
1124 	}
1125 
1126 	args = &req->args;
1127 	args->fs = fs;
1128 	args->op.create.name = name;
1129 	args->sem = &channel->sem;
1130 	fs->send_request(__fs_create_file, req);
1131 	sem_wait(&channel->sem);
1132 	rc = args->rc;
1133 	free_fs_request(req);
1134 
1135 	return rc;
1136 }
1137 
1138 static void
1139 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
1140 {
1141 	struct spdk_fs_request *req = ctx;
1142 	struct spdk_fs_cb_args *args = &req->args;
1143 	struct spdk_file *f = args->file;
1144 
1145 	f->blob = blob;
1146 	while (!TAILQ_EMPTY(&f->open_requests)) {
1147 		req = TAILQ_FIRST(&f->open_requests);
1148 		args = &req->args;
1149 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
1150 		spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name);
1151 		args->fn.file_op_with_handle(args->arg, f, bserrno);
1152 		free_fs_request(req);
1153 	}
1154 }
1155 
1156 static void
1157 fs_open_blob_create_cb(void *ctx, int bserrno)
1158 {
1159 	struct spdk_fs_request *req = ctx;
1160 	struct spdk_fs_cb_args *args = &req->args;
1161 	struct spdk_file *file = args->file;
1162 	struct spdk_filesystem *fs = args->fs;
1163 
1164 	if (file == NULL) {
1165 		/*
1166 		 * This is from an open with CREATE flag - the file
1167 		 *  is now created so look it up in the file list for this
1168 		 *  filesystem.
1169 		 */
1170 		file = fs_find_file(fs, args->op.open.name);
1171 		assert(file != NULL);
1172 		args->file = file;
1173 	}
1174 
1175 	file->ref_count++;
1176 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1177 	if (file->ref_count == 1) {
1178 		assert(file->blob == NULL);
1179 		spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1180 	} else if (file->blob != NULL) {
1181 		fs_open_blob_done(req, file->blob, 0);
1182 	} else {
1183 		/*
1184 		 * The blob open for this file is in progress due to a previous
1185 		 *  open request.  When that open completes, it will invoke the
1186 		 *  open callback for this request.
1187 		 */
1188 	}
1189 }
1190 
1191 void
1192 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1193 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1194 {
1195 	struct spdk_file *f = NULL;
1196 	struct spdk_fs_request *req;
1197 	struct spdk_fs_cb_args *args;
1198 
1199 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1200 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1201 		return;
1202 	}
1203 
1204 	f = fs_find_file(fs, name);
1205 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1206 		cb_fn(cb_arg, NULL, -ENOENT);
1207 		return;
1208 	}
1209 
1210 	if (f != NULL && f->is_deleted == true) {
1211 		cb_fn(cb_arg, NULL, -ENOENT);
1212 		return;
1213 	}
1214 
1215 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1216 	if (req == NULL) {
1217 		SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name);
1218 		cb_fn(cb_arg, NULL, -ENOMEM);
1219 		return;
1220 	}
1221 
1222 	args = &req->args;
1223 	args->fn.file_op_with_handle = cb_fn;
1224 	args->arg = cb_arg;
1225 	args->file = f;
1226 	args->fs = fs;
1227 	args->op.open.name = name;
1228 
1229 	if (f == NULL) {
1230 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1231 	} else {
1232 		fs_open_blob_create_cb(req, 0);
1233 	}
1234 }
1235 
1236 static void
1237 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1238 {
1239 	struct spdk_fs_request *req = arg;
1240 	struct spdk_fs_cb_args *args = &req->args;
1241 
1242 	args->file = file;
1243 	__wake_caller(args, bserrno);
1244 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1245 }
1246 
1247 static void
1248 __fs_open_file(void *arg)
1249 {
1250 	struct spdk_fs_request *req = arg;
1251 	struct spdk_fs_cb_args *args = &req->args;
1252 
1253 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name);
1254 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1255 				__fs_open_file_done, req);
1256 }
1257 
1258 int
1259 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1260 		  const char *name, uint32_t flags, struct spdk_file **file)
1261 {
1262 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1263 	struct spdk_fs_request *req;
1264 	struct spdk_fs_cb_args *args;
1265 	int rc;
1266 
1267 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1268 
1269 	req = alloc_fs_request(channel);
1270 	if (req == NULL) {
1271 		SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name);
1272 		return -ENOMEM;
1273 	}
1274 
1275 	args = &req->args;
1276 	args->fs = fs;
1277 	args->op.open.name = name;
1278 	args->op.open.flags = flags;
1279 	args->sem = &channel->sem;
1280 	fs->send_request(__fs_open_file, req);
1281 	sem_wait(&channel->sem);
1282 	rc = args->rc;
1283 	if (rc == 0) {
1284 		*file = args->file;
1285 	} else {
1286 		*file = NULL;
1287 	}
1288 	free_fs_request(req);
1289 
1290 	return rc;
1291 }
1292 
1293 static void
1294 fs_rename_blob_close_cb(void *ctx, int bserrno)
1295 {
1296 	struct spdk_fs_request *req = ctx;
1297 	struct spdk_fs_cb_args *args = &req->args;
1298 
1299 	args->fn.fs_op(args->arg, bserrno);
1300 	free_fs_request(req);
1301 }
1302 
1303 static void
1304 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1305 {
1306 	struct spdk_fs_request *req = ctx;
1307 	struct spdk_fs_cb_args *args = &req->args;
1308 	const char *new_name = args->op.rename.new_name;
1309 
1310 	spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1311 	spdk_blob_close(blob, fs_rename_blob_close_cb, req);
1312 }
1313 
1314 static void
1315 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1316 {
1317 	struct spdk_fs_cb_args *args = &req->args;
1318 	struct spdk_file *f;
1319 
1320 	f = fs_find_file(args->fs, args->op.rename.old_name);
1321 	if (f == NULL) {
1322 		args->fn.fs_op(args->arg, -ENOENT);
1323 		free_fs_request(req);
1324 		return;
1325 	}
1326 
1327 	free(f->name);
1328 	f->name = strdup(args->op.rename.new_name);
1329 	_file_build_trace_arg_name(f);
1330 	args->file = f;
1331 	spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1332 }
1333 
1334 static void
1335 fs_rename_delete_done(void *arg, int fserrno)
1336 {
1337 	__spdk_fs_md_rename_file(arg);
1338 }
1339 
1340 void
1341 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1342 			  const char *old_name, const char *new_name,
1343 			  spdk_file_op_complete cb_fn, void *cb_arg)
1344 {
1345 	struct spdk_file *f;
1346 	struct spdk_fs_request *req;
1347 	struct spdk_fs_cb_args *args;
1348 
1349 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1350 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1351 		cb_fn(cb_arg, -ENAMETOOLONG);
1352 		return;
1353 	}
1354 
1355 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1356 	if (req == NULL) {
1357 		SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name,
1358 			    new_name);
1359 		cb_fn(cb_arg, -ENOMEM);
1360 		return;
1361 	}
1362 
1363 	args = &req->args;
1364 	args->fn.fs_op = cb_fn;
1365 	args->fs = fs;
1366 	args->arg = cb_arg;
1367 	args->op.rename.old_name = old_name;
1368 	args->op.rename.new_name = new_name;
1369 
1370 	f = fs_find_file(fs, new_name);
1371 	if (f == NULL) {
1372 		__spdk_fs_md_rename_file(req);
1373 		return;
1374 	}
1375 
1376 	/*
1377 	 * The rename overwrites an existing file.  So delete the existing file, then
1378 	 *  do the actual rename.
1379 	 */
1380 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1381 }
1382 
1383 static void
1384 __fs_rename_file_done(void *arg, int fserrno)
1385 {
1386 	struct spdk_fs_request *req = arg;
1387 	struct spdk_fs_cb_args *args = &req->args;
1388 
1389 	__wake_caller(args, fserrno);
1390 }
1391 
1392 static void
1393 __fs_rename_file(void *arg)
1394 {
1395 	struct spdk_fs_request *req = arg;
1396 	struct spdk_fs_cb_args *args = &req->args;
1397 
1398 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1399 				  __fs_rename_file_done, req);
1400 }
1401 
1402 int
1403 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1404 		    const char *old_name, const char *new_name)
1405 {
1406 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1407 	struct spdk_fs_request *req;
1408 	struct spdk_fs_cb_args *args;
1409 	int rc;
1410 
1411 	req = alloc_fs_request(channel);
1412 	if (req == NULL) {
1413 		SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name);
1414 		return -ENOMEM;
1415 	}
1416 
1417 	args = &req->args;
1418 
1419 	args->fs = fs;
1420 	args->op.rename.old_name = old_name;
1421 	args->op.rename.new_name = new_name;
1422 	args->sem = &channel->sem;
1423 	fs->send_request(__fs_rename_file, req);
1424 	sem_wait(&channel->sem);
1425 	rc = args->rc;
1426 	free_fs_request(req);
1427 	return rc;
1428 }
1429 
1430 static void
1431 blob_delete_cb(void *ctx, int bserrno)
1432 {
1433 	struct spdk_fs_request *req = ctx;
1434 	struct spdk_fs_cb_args *args = &req->args;
1435 
1436 	args->fn.file_op(args->arg, bserrno);
1437 	free_fs_request(req);
1438 }
1439 
1440 void
1441 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1442 			  spdk_file_op_complete cb_fn, void *cb_arg)
1443 {
1444 	struct spdk_file *f;
1445 	spdk_blob_id blobid;
1446 	struct spdk_fs_request *req;
1447 	struct spdk_fs_cb_args *args;
1448 
1449 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name);
1450 
1451 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1452 		cb_fn(cb_arg, -ENAMETOOLONG);
1453 		return;
1454 	}
1455 
1456 	f = fs_find_file(fs, name);
1457 	if (f == NULL) {
1458 		SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name);
1459 		cb_fn(cb_arg, -ENOENT);
1460 		return;
1461 	}
1462 
1463 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1464 	if (req == NULL) {
1465 		SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name);
1466 		cb_fn(cb_arg, -ENOMEM);
1467 		return;
1468 	}
1469 
1470 	args = &req->args;
1471 	args->fn.file_op = cb_fn;
1472 	args->arg = cb_arg;
1473 
1474 	if (f->ref_count > 0) {
1475 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1476 		f->is_deleted = true;
1477 		spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1478 		spdk_blob_sync_md(f->blob, blob_delete_cb, req);
1479 		return;
1480 	}
1481 
1482 	TAILQ_REMOVE(&fs->files, f, tailq);
1483 
1484 	cache_free_buffers(f);
1485 
1486 	blobid = f->blobid;
1487 
1488 	free(f->name);
1489 	free(f->tree);
1490 	free(f);
1491 
1492 	spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1493 }
1494 
1495 static uint64_t
1496 fs_name_to_uint64(const char *name)
1497 {
1498 	uint64_t result = 0;
1499 	memcpy(&result, name, spdk_min(sizeof(result), strlen(name)));
1500 	return result;
1501 }
1502 
1503 static void
1504 __fs_delete_file_done(void *arg, int fserrno)
1505 {
1506 	struct spdk_fs_request *req = arg;
1507 	struct spdk_fs_cb_args *args = &req->args;
1508 
1509 	spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1510 	__wake_caller(args, fserrno);
1511 }
1512 
1513 static void
1514 __fs_delete_file(void *arg)
1515 {
1516 	struct spdk_fs_request *req = arg;
1517 	struct spdk_fs_cb_args *args = &req->args;
1518 
1519 	spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name));
1520 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1521 }
1522 
1523 int
1524 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx,
1525 		    const char *name)
1526 {
1527 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1528 	struct spdk_fs_request *req;
1529 	struct spdk_fs_cb_args *args;
1530 	int rc;
1531 
1532 	req = alloc_fs_request(channel);
1533 	if (req == NULL) {
1534 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name);
1535 		return -ENOMEM;
1536 	}
1537 
1538 	args = &req->args;
1539 	args->fs = fs;
1540 	args->op.delete.name = name;
1541 	args->sem = &channel->sem;
1542 	fs->send_request(__fs_delete_file, req);
1543 	sem_wait(&channel->sem);
1544 	rc = args->rc;
1545 	free_fs_request(req);
1546 
1547 	return rc;
1548 }
1549 
1550 spdk_fs_iter
1551 spdk_fs_iter_first(struct spdk_filesystem *fs)
1552 {
1553 	struct spdk_file *f;
1554 
1555 	f = TAILQ_FIRST(&fs->files);
1556 	return f;
1557 }
1558 
1559 spdk_fs_iter
1560 spdk_fs_iter_next(spdk_fs_iter iter)
1561 {
1562 	struct spdk_file *f = iter;
1563 
1564 	if (f == NULL) {
1565 		return NULL;
1566 	}
1567 
1568 	f = TAILQ_NEXT(f, tailq);
1569 	return f;
1570 }
1571 
1572 const char *
1573 spdk_file_get_name(struct spdk_file *file)
1574 {
1575 	return file->name;
1576 }
1577 
1578 uint64_t
1579 spdk_file_get_length(struct spdk_file *file)
1580 {
1581 	uint64_t length;
1582 
1583 	assert(file != NULL);
1584 
1585 	length = file->append_pos >= file->length ? file->append_pos : file->length;
1586 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length);
1587 	return length;
1588 }
1589 
1590 static void
1591 fs_truncate_complete_cb(void *ctx, int bserrno)
1592 {
1593 	struct spdk_fs_request *req = ctx;
1594 	struct spdk_fs_cb_args *args = &req->args;
1595 
1596 	args->fn.file_op(args->arg, bserrno);
1597 	free_fs_request(req);
1598 }
1599 
1600 static void
1601 fs_truncate_resize_cb(void *ctx, int bserrno)
1602 {
1603 	struct spdk_fs_request *req = ctx;
1604 	struct spdk_fs_cb_args *args = &req->args;
1605 	struct spdk_file *file = args->file;
1606 	uint64_t *length = &args->op.truncate.length;
1607 
1608 	if (bserrno) {
1609 		args->fn.file_op(args->arg, bserrno);
1610 		free_fs_request(req);
1611 		return;
1612 	}
1613 
1614 	spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length));
1615 
1616 	file->length = *length;
1617 	if (file->append_pos > file->length) {
1618 		file->append_pos = file->length;
1619 	}
1620 
1621 	spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req);
1622 }
1623 
1624 static uint64_t
1625 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1626 {
1627 	return (length + cluster_sz - 1) / cluster_sz;
1628 }
1629 
1630 void
1631 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1632 			 spdk_file_op_complete cb_fn, void *cb_arg)
1633 {
1634 	struct spdk_filesystem *fs;
1635 	size_t num_clusters;
1636 	struct spdk_fs_request *req;
1637 	struct spdk_fs_cb_args *args;
1638 
1639 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1640 	if (length == file->length) {
1641 		cb_fn(cb_arg, 0);
1642 		return;
1643 	}
1644 
1645 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1646 	if (req == NULL) {
1647 		cb_fn(cb_arg, -ENOMEM);
1648 		return;
1649 	}
1650 
1651 	args = &req->args;
1652 	args->fn.file_op = cb_fn;
1653 	args->arg = cb_arg;
1654 	args->file = file;
1655 	args->op.truncate.length = length;
1656 	fs = file->fs;
1657 
1658 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1659 
1660 	spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req);
1661 }
1662 
1663 static void
1664 __truncate(void *arg)
1665 {
1666 	struct spdk_fs_request *req = arg;
1667 	struct spdk_fs_cb_args *args = &req->args;
1668 
1669 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1670 				 args->fn.file_op, args);
1671 }
1672 
1673 int
1674 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
1675 		   uint64_t length)
1676 {
1677 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
1678 	struct spdk_fs_request *req;
1679 	struct spdk_fs_cb_args *args;
1680 	int rc;
1681 
1682 	req = alloc_fs_request(channel);
1683 	if (req == NULL) {
1684 		return -ENOMEM;
1685 	}
1686 
1687 	args = &req->args;
1688 
1689 	args->file = file;
1690 	args->op.truncate.length = length;
1691 	args->fn.file_op = __wake_caller;
1692 	args->sem = &channel->sem;
1693 
1694 	channel->send_request(__truncate, req);
1695 	sem_wait(&channel->sem);
1696 	rc = args->rc;
1697 	free_fs_request(req);
1698 
1699 	return rc;
1700 }
1701 
1702 static void
1703 __rw_done(void *ctx, int bserrno)
1704 {
1705 	struct spdk_fs_request *req = ctx;
1706 	struct spdk_fs_cb_args *args = &req->args;
1707 
1708 	spdk_free(args->op.rw.pin_buf);
1709 	args->fn.file_op(args->arg, bserrno);
1710 	free_fs_request(req);
1711 }
1712 
1713 static void
1714 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
1715 {
1716 	int i;
1717 	size_t len;
1718 
1719 	for (i = 0; i < iovcnt; i++) {
1720 		len = spdk_min(iovs[i].iov_len, buf_len);
1721 		memcpy(buf, iovs[i].iov_base, len);
1722 		buf += len;
1723 		assert(buf_len >= len);
1724 		buf_len -= len;
1725 	}
1726 }
1727 
1728 static void
1729 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
1730 {
1731 	int i;
1732 	size_t len;
1733 
1734 	for (i = 0; i < iovcnt; i++) {
1735 		len = spdk_min(iovs[i].iov_len, buf_len);
1736 		memcpy(iovs[i].iov_base, buf, len);
1737 		buf += len;
1738 		assert(buf_len >= len);
1739 		buf_len -= len;
1740 	}
1741 }
1742 
1743 static void
1744 __read_done(void *ctx, int bserrno)
1745 {
1746 	struct spdk_fs_request *req = ctx;
1747 	struct spdk_fs_cb_args *args = &req->args;
1748 	void *buf;
1749 
1750 	assert(req != NULL);
1751 	buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)));
1752 	if (args->op.rw.is_read) {
1753 		_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length);
1754 		__rw_done(req, 0);
1755 	} else {
1756 		_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt);
1757 		spdk_blob_io_write(args->file->blob, args->op.rw.channel,
1758 				   args->op.rw.pin_buf,
1759 				   args->op.rw.start_lba, args->op.rw.num_lba,
1760 				   __rw_done, req);
1761 	}
1762 }
1763 
1764 static void
1765 __do_blob_read(void *ctx, int fserrno)
1766 {
1767 	struct spdk_fs_request *req = ctx;
1768 	struct spdk_fs_cb_args *args = &req->args;
1769 
1770 	if (fserrno) {
1771 		__rw_done(req, fserrno);
1772 		return;
1773 	}
1774 	spdk_blob_io_read(args->file->blob, args->op.rw.channel,
1775 			  args->op.rw.pin_buf,
1776 			  args->op.rw.start_lba, args->op.rw.num_lba,
1777 			  __read_done, req);
1778 }
1779 
1780 static void
1781 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1782 		      uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba)
1783 {
1784 	uint64_t end_lba;
1785 
1786 	*lba_size = spdk_bs_get_io_unit_size(file->fs->bs);
1787 	*start_lba = offset / *lba_size;
1788 	end_lba = (offset + length - 1) / *lba_size;
1789 	*num_lba = (end_lba - *start_lba + 1);
1790 }
1791 
1792 static void
1793 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt)
1794 {
1795 	uint32_t i;
1796 
1797 	for (i = 0; i < iovcnt; i++) {
1798 		req->args.iovs[i].iov_base = iovs[i].iov_base;
1799 		req->args.iovs[i].iov_len = iovs[i].iov_len;
1800 	}
1801 }
1802 
1803 static void
1804 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel,
1805 	      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1806 	      spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1807 {
1808 	struct spdk_fs_request *req;
1809 	struct spdk_fs_cb_args *args;
1810 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1811 	uint64_t start_lba, num_lba, pin_buf_length;
1812 	uint32_t lba_size;
1813 
1814 	if (is_read && offset + length > file->length) {
1815 		cb_fn(cb_arg, -EINVAL);
1816 		return;
1817 	}
1818 
1819 	req = alloc_fs_request_with_iov(channel, iovcnt);
1820 	if (req == NULL) {
1821 		cb_fn(cb_arg, -ENOMEM);
1822 		return;
1823 	}
1824 
1825 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
1826 
1827 	args = &req->args;
1828 	args->fn.file_op = cb_fn;
1829 	args->arg = cb_arg;
1830 	args->file = file;
1831 	args->op.rw.channel = channel->bs_channel;
1832 	_fs_request_setup_iovs(req, iovs, iovcnt);
1833 	args->op.rw.is_read = is_read;
1834 	args->op.rw.offset = offset;
1835 	args->op.rw.blocklen = lba_size;
1836 
1837 	pin_buf_length = num_lba * lba_size;
1838 	args->op.rw.length = pin_buf_length;
1839 	args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL,
1840 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1841 	if (args->op.rw.pin_buf == NULL) {
1842 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n",
1843 			      file->name, offset, length);
1844 		free_fs_request(req);
1845 		cb_fn(cb_arg, -ENOMEM);
1846 		return;
1847 	}
1848 
1849 	args->op.rw.start_lba = start_lba;
1850 	args->op.rw.num_lba = num_lba;
1851 
1852 	if (!is_read && file->length < offset + length) {
1853 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1854 	} else {
1855 		__do_blob_read(req, 0);
1856 	}
1857 }
1858 
1859 static void
1860 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel,
1861 	    void *payload, uint64_t offset, uint64_t length,
1862 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1863 {
1864 	struct iovec iov;
1865 
1866 	iov.iov_base = payload;
1867 	iov.iov_len = (size_t)length;
1868 
1869 	__readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read);
1870 }
1871 
1872 void
1873 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1874 		      void *payload, uint64_t offset, uint64_t length,
1875 		      spdk_file_op_complete cb_fn, void *cb_arg)
1876 {
1877 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1878 }
1879 
1880 void
1881 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel,
1882 		       struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1883 		       spdk_file_op_complete cb_fn, void *cb_arg)
1884 {
1885 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1886 		      file->name, offset, length);
1887 
1888 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0);
1889 }
1890 
1891 void
1892 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1893 		     void *payload, uint64_t offset, uint64_t length,
1894 		     spdk_file_op_complete cb_fn, void *cb_arg)
1895 {
1896 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1897 		      file->name, offset, length);
1898 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1899 }
1900 
1901 void
1902 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel,
1903 		      struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length,
1904 		      spdk_file_op_complete cb_fn, void *cb_arg)
1905 {
1906 	SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n",
1907 		      file->name, offset, length);
1908 
1909 	__readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1);
1910 }
1911 
1912 struct spdk_io_channel *
1913 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1914 {
1915 	struct spdk_io_channel *io_channel;
1916 	struct spdk_fs_channel *fs_channel;
1917 
1918 	io_channel = spdk_get_io_channel(&fs->io_target);
1919 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1920 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1921 	fs_channel->send_request = __send_request_direct;
1922 
1923 	return io_channel;
1924 }
1925 
1926 void
1927 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1928 {
1929 	spdk_put_io_channel(channel);
1930 }
1931 
1932 struct spdk_fs_thread_ctx *
1933 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs)
1934 {
1935 	struct spdk_fs_thread_ctx *ctx;
1936 
1937 	ctx = calloc(1, sizeof(*ctx));
1938 	if (!ctx) {
1939 		return NULL;
1940 	}
1941 
1942 	_spdk_fs_channel_create(fs, &ctx->ch, 512);
1943 
1944 	ctx->ch.send_request = fs->send_request;
1945 	ctx->ch.sync = 1;
1946 	pthread_spin_init(&ctx->ch.lock, 0);
1947 
1948 	return ctx;
1949 }
1950 
1951 
1952 void
1953 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx)
1954 {
1955 	assert(ctx->ch.sync == 1);
1956 
1957 	while (true) {
1958 		pthread_spin_lock(&ctx->ch.lock);
1959 		if (ctx->ch.outstanding_reqs == 0) {
1960 			pthread_spin_unlock(&ctx->ch.lock);
1961 			break;
1962 		}
1963 		pthread_spin_unlock(&ctx->ch.lock);
1964 		usleep(1000);
1965 	}
1966 
1967 	_spdk_fs_channel_destroy(NULL, &ctx->ch);
1968 	free(ctx);
1969 }
1970 
1971 void
1972 spdk_fs_set_cache_size(uint64_t size_in_mb)
1973 {
1974 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1975 }
1976 
1977 uint64_t
1978 spdk_fs_get_cache_size(void)
1979 {
1980 	return g_fs_cache_size / (1024 * 1024);
1981 }
1982 
1983 static void __file_flush(void *ctx);
1984 
1985 static void *
1986 alloc_cache_memory_buffer(struct spdk_file *context)
1987 {
1988 	struct spdk_file *file;
1989 	void *buf;
1990 
1991 	buf = spdk_mempool_get(g_cache_pool);
1992 	if (buf != NULL) {
1993 		return buf;
1994 	}
1995 
1996 	pthread_spin_lock(&g_caches_lock);
1997 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1998 		if (!file->open_for_writing &&
1999 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
2000 		    file != context) {
2001 			break;
2002 		}
2003 	}
2004 	pthread_spin_unlock(&g_caches_lock);
2005 	if (file != NULL) {
2006 		cache_free_buffers(file);
2007 		buf = spdk_mempool_get(g_cache_pool);
2008 		if (buf != NULL) {
2009 			return buf;
2010 		}
2011 	}
2012 
2013 	pthread_spin_lock(&g_caches_lock);
2014 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
2015 		if (!file->open_for_writing && file != context) {
2016 			break;
2017 		}
2018 	}
2019 	pthread_spin_unlock(&g_caches_lock);
2020 	if (file != NULL) {
2021 		cache_free_buffers(file);
2022 		buf = spdk_mempool_get(g_cache_pool);
2023 		if (buf != NULL) {
2024 			return buf;
2025 		}
2026 	}
2027 
2028 	pthread_spin_lock(&g_caches_lock);
2029 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
2030 		if (file != context) {
2031 			break;
2032 		}
2033 	}
2034 	pthread_spin_unlock(&g_caches_lock);
2035 	if (file != NULL) {
2036 		cache_free_buffers(file);
2037 		buf = spdk_mempool_get(g_cache_pool);
2038 		if (buf != NULL) {
2039 			return buf;
2040 		}
2041 	}
2042 
2043 	return NULL;
2044 }
2045 
2046 static struct cache_buffer *
2047 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
2048 {
2049 	struct cache_buffer *buf;
2050 	int count = 0;
2051 
2052 	buf = calloc(1, sizeof(*buf));
2053 	if (buf == NULL) {
2054 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n");
2055 		return NULL;
2056 	}
2057 
2058 	buf->buf = alloc_cache_memory_buffer(file);
2059 	while (buf->buf == NULL) {
2060 		/*
2061 		 * TODO: alloc_cache_memory_buffer() should eventually free
2062 		 *  some buffers.  Need a more sophisticated check here, instead
2063 		 *  of just bailing if 100 tries does not result in getting a
2064 		 *  free buffer.  This will involve using the sync channel's
2065 		 *  semaphore to block until a buffer becomes available.
2066 		 */
2067 		if (count++ == 100) {
2068 			SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n",
2069 				    file, offset);
2070 			free(buf);
2071 			return NULL;
2072 		}
2073 		buf->buf = alloc_cache_memory_buffer(file);
2074 	}
2075 
2076 	buf->buf_size = CACHE_BUFFER_SIZE;
2077 	buf->offset = offset;
2078 
2079 	pthread_spin_lock(&g_caches_lock);
2080 	if (file->tree->present_mask == 0) {
2081 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2082 	}
2083 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
2084 	pthread_spin_unlock(&g_caches_lock);
2085 
2086 	return buf;
2087 }
2088 
2089 static struct cache_buffer *
2090 cache_append_buffer(struct spdk_file *file)
2091 {
2092 	struct cache_buffer *last;
2093 
2094 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
2095 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
2096 
2097 	last = cache_insert_buffer(file, file->append_pos);
2098 	if (last == NULL) {
2099 		SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n");
2100 		return NULL;
2101 	}
2102 
2103 	file->last = last;
2104 
2105 	return last;
2106 }
2107 
2108 static void __check_sync_reqs(struct spdk_file *file);
2109 
2110 static void
2111 __file_cache_finish_sync(void *ctx, int bserrno)
2112 {
2113 	struct spdk_file *file;
2114 	struct spdk_fs_request *sync_req = ctx;
2115 	struct spdk_fs_cb_args *sync_args;
2116 
2117 	sync_args = &sync_req->args;
2118 	file = sync_args->file;
2119 	pthread_spin_lock(&file->lock);
2120 	file->length_xattr = sync_args->op.sync.length;
2121 	assert(sync_args->op.sync.offset <= file->length_flushed);
2122 	spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset,
2123 			  0, file->trace_arg_name);
2124 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
2125 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
2126 	pthread_spin_unlock(&file->lock);
2127 
2128 	sync_args->fn.file_op(sync_args->arg, bserrno);
2129 	pthread_spin_lock(&file->lock);
2130 	free_fs_request(sync_req);
2131 	pthread_spin_unlock(&file->lock);
2132 
2133 	__check_sync_reqs(file);
2134 }
2135 
2136 static void
2137 __check_sync_reqs(struct spdk_file *file)
2138 {
2139 	struct spdk_fs_request *sync_req;
2140 
2141 	pthread_spin_lock(&file->lock);
2142 
2143 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
2144 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
2145 			break;
2146 		}
2147 	}
2148 
2149 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
2150 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
2151 		sync_req->args.op.sync.xattr_in_progress = true;
2152 		sync_req->args.op.sync.length = file->length_flushed;
2153 		spdk_blob_set_xattr(file->blob, "length", &file->length_flushed,
2154 				    sizeof(file->length_flushed));
2155 
2156 		pthread_spin_unlock(&file->lock);
2157 		spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed,
2158 				  0, file->trace_arg_name);
2159 		spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req);
2160 	} else {
2161 		pthread_spin_unlock(&file->lock);
2162 	}
2163 }
2164 
2165 static void
2166 __file_flush_done(void *ctx, int bserrno)
2167 {
2168 	struct spdk_fs_request *req = ctx;
2169 	struct spdk_fs_cb_args *args = &req->args;
2170 	struct spdk_file *file = args->file;
2171 	struct cache_buffer *next = args->op.flush.cache_buffer;
2172 
2173 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
2174 
2175 	pthread_spin_lock(&file->lock);
2176 	next->in_progress = false;
2177 	next->bytes_flushed += args->op.flush.length;
2178 	file->length_flushed += args->op.flush.length;
2179 	if (file->length_flushed > file->length) {
2180 		file->length = file->length_flushed;
2181 	}
2182 	if (next->bytes_flushed == next->buf_size) {
2183 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
2184 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2185 	}
2186 
2187 	/*
2188 	 * Assert that there is no cached data that extends past the end of the underlying
2189 	 *  blob.
2190 	 */
2191 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
2192 	       next->bytes_filled == 0);
2193 
2194 	pthread_spin_unlock(&file->lock);
2195 
2196 	__check_sync_reqs(file);
2197 
2198 	__file_flush(req);
2199 }
2200 
2201 static void
2202 __file_flush(void *ctx)
2203 {
2204 	struct spdk_fs_request *req = ctx;
2205 	struct spdk_fs_cb_args *args = &req->args;
2206 	struct spdk_file *file = args->file;
2207 	struct cache_buffer *next;
2208 	uint64_t offset, length, start_lba, num_lba;
2209 	uint32_t lba_size;
2210 
2211 	pthread_spin_lock(&file->lock);
2212 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
2213 	if (next == NULL || next->in_progress ||
2214 	    ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) {
2215 		/*
2216 		 * There is either no data to flush, a flush I/O is already in
2217 		 *  progress, or the next buffer is partially filled but there's no
2218 		 *  outstanding request to sync it.
2219 		 * So return immediately - if a flush I/O is in progress we will flush
2220 		 *  more data after that is completed, or a partial buffer will get flushed
2221 		 *  when it is either filled or the file is synced.
2222 		 */
2223 		free_fs_request(req);
2224 		if (next == NULL) {
2225 			/*
2226 			 * For cases where a file's cache was evicted, and then the
2227 			 *  file was later appended, we will write the data directly
2228 			 *  to disk and bypass cache.  So just update length_flushed
2229 			 *  here to reflect that all data was already written to disk.
2230 			 */
2231 			file->length_flushed = file->append_pos;
2232 		}
2233 		pthread_spin_unlock(&file->lock);
2234 		if (next == NULL) {
2235 			/*
2236 			 * There is no data to flush, but we still need to check for any
2237 			 *  outstanding sync requests to make sure metadata gets updated.
2238 			 */
2239 			__check_sync_reqs(file);
2240 		}
2241 		return;
2242 	}
2243 
2244 	offset = next->offset + next->bytes_flushed;
2245 	length = next->bytes_filled - next->bytes_flushed;
2246 	if (length == 0) {
2247 		free_fs_request(req);
2248 		pthread_spin_unlock(&file->lock);
2249 		/*
2250 		 * There is no data to flush, but we still need to check for any
2251 		 *  outstanding sync requests to make sure metadata gets updated.
2252 		 */
2253 		__check_sync_reqs(file);
2254 		return;
2255 	}
2256 	args->op.flush.length = length;
2257 	args->op.flush.cache_buffer = next;
2258 
2259 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2260 
2261 	next->in_progress = true;
2262 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2263 		     offset, length, start_lba, num_lba);
2264 	pthread_spin_unlock(&file->lock);
2265 	spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2266 			   next->buf + (start_lba * lba_size) - next->offset,
2267 			   start_lba, num_lba, __file_flush_done, req);
2268 }
2269 
2270 static void
2271 __file_extend_done(void *arg, int bserrno)
2272 {
2273 	struct spdk_fs_cb_args *args = arg;
2274 
2275 	__wake_caller(args, bserrno);
2276 }
2277 
2278 static void
2279 __file_extend_resize_cb(void *_args, int bserrno)
2280 {
2281 	struct spdk_fs_cb_args *args = _args;
2282 	struct spdk_file *file = args->file;
2283 
2284 	if (bserrno) {
2285 		__wake_caller(args, bserrno);
2286 		return;
2287 	}
2288 
2289 	spdk_blob_sync_md(file->blob, __file_extend_done, args);
2290 }
2291 
2292 static void
2293 __file_extend_blob(void *_args)
2294 {
2295 	struct spdk_fs_cb_args *args = _args;
2296 	struct spdk_file *file = args->file;
2297 
2298 	spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args);
2299 }
2300 
2301 static void
2302 __rw_from_file_done(void *ctx, int bserrno)
2303 {
2304 	struct spdk_fs_request *req = ctx;
2305 
2306 	__wake_caller(&req->args, bserrno);
2307 	free_fs_request(req);
2308 }
2309 
2310 static void
2311 __rw_from_file(void *ctx)
2312 {
2313 	struct spdk_fs_request *req = ctx;
2314 	struct spdk_fs_cb_args *args = &req->args;
2315 	struct spdk_file *file = args->file;
2316 
2317 	if (args->op.rw.is_read) {
2318 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2319 				     args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2320 				     __rw_from_file_done, req);
2321 	} else {
2322 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base,
2323 				      args->op.rw.offset, (uint64_t)args->iovs[0].iov_len,
2324 				      __rw_from_file_done, req);
2325 	}
2326 }
2327 
2328 static int
2329 __send_rw_from_file(struct spdk_file *file, void *payload,
2330 		    uint64_t offset, uint64_t length, bool is_read,
2331 		    struct spdk_fs_channel *channel)
2332 {
2333 	struct spdk_fs_request *req;
2334 	struct spdk_fs_cb_args *args;
2335 
2336 	req = alloc_fs_request_with_iov(channel, 1);
2337 	if (req == NULL) {
2338 		sem_post(&channel->sem);
2339 		return -ENOMEM;
2340 	}
2341 
2342 	args = &req->args;
2343 	args->file = file;
2344 	args->sem = &channel->sem;
2345 	args->iovs[0].iov_base = payload;
2346 	args->iovs[0].iov_len = (size_t)length;
2347 	args->op.rw.offset = offset;
2348 	args->op.rw.is_read = is_read;
2349 	file->fs->send_request(__rw_from_file, req);
2350 	return 0;
2351 }
2352 
2353 int
2354 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2355 		void *payload, uint64_t offset, uint64_t length)
2356 {
2357 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2358 	struct spdk_fs_request *flush_req;
2359 	uint64_t rem_length, copy, blob_size, cluster_sz;
2360 	uint32_t cache_buffers_filled = 0;
2361 	uint8_t *cur_payload;
2362 	struct cache_buffer *last;
2363 
2364 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2365 
2366 	if (length == 0) {
2367 		return 0;
2368 	}
2369 
2370 	if (offset != file->append_pos) {
2371 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2372 		return -EINVAL;
2373 	}
2374 
2375 	pthread_spin_lock(&file->lock);
2376 	file->open_for_writing = true;
2377 
2378 	if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) {
2379 		cache_append_buffer(file);
2380 	}
2381 
2382 	if (file->last == NULL) {
2383 		int rc;
2384 
2385 		file->append_pos += length;
2386 		pthread_spin_unlock(&file->lock);
2387 		rc = __send_rw_from_file(file, payload, offset, length, false, channel);
2388 		sem_wait(&channel->sem);
2389 		return rc;
2390 	}
2391 
2392 	blob_size = __file_get_blob_size(file);
2393 
2394 	if ((offset + length) > blob_size) {
2395 		struct spdk_fs_cb_args extend_args = {};
2396 
2397 		cluster_sz = file->fs->bs_opts.cluster_sz;
2398 		extend_args.sem = &channel->sem;
2399 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2400 		extend_args.file = file;
2401 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2402 		pthread_spin_unlock(&file->lock);
2403 		file->fs->send_request(__file_extend_blob, &extend_args);
2404 		sem_wait(&channel->sem);
2405 		if (extend_args.rc) {
2406 			return extend_args.rc;
2407 		}
2408 	}
2409 
2410 	flush_req = alloc_fs_request(channel);
2411 	if (flush_req == NULL) {
2412 		pthread_spin_unlock(&file->lock);
2413 		return -ENOMEM;
2414 	}
2415 
2416 	last = file->last;
2417 	rem_length = length;
2418 	cur_payload = payload;
2419 	while (rem_length > 0) {
2420 		copy = last->buf_size - last->bytes_filled;
2421 		if (copy > rem_length) {
2422 			copy = rem_length;
2423 		}
2424 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2425 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2426 		file->append_pos += copy;
2427 		if (file->length < file->append_pos) {
2428 			file->length = file->append_pos;
2429 		}
2430 		cur_payload += copy;
2431 		last->bytes_filled += copy;
2432 		rem_length -= copy;
2433 		if (last->bytes_filled == last->buf_size) {
2434 			cache_buffers_filled++;
2435 			last = cache_append_buffer(file);
2436 			if (last == NULL) {
2437 				BLOBFS_TRACE(file, "nomem\n");
2438 				free_fs_request(flush_req);
2439 				pthread_spin_unlock(&file->lock);
2440 				return -ENOMEM;
2441 			}
2442 		}
2443 	}
2444 
2445 	pthread_spin_unlock(&file->lock);
2446 
2447 	if (cache_buffers_filled == 0) {
2448 		free_fs_request(flush_req);
2449 		return 0;
2450 	}
2451 
2452 	flush_req->args.file = file;
2453 	file->fs->send_request(__file_flush, flush_req);
2454 	return 0;
2455 }
2456 
2457 static void
2458 __readahead_done(void *ctx, int bserrno)
2459 {
2460 	struct spdk_fs_request *req = ctx;
2461 	struct spdk_fs_cb_args *args = &req->args;
2462 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2463 	struct spdk_file *file = args->file;
2464 
2465 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2466 
2467 	pthread_spin_lock(&file->lock);
2468 	cache_buffer->bytes_filled = args->op.readahead.length;
2469 	cache_buffer->bytes_flushed = args->op.readahead.length;
2470 	cache_buffer->in_progress = false;
2471 	pthread_spin_unlock(&file->lock);
2472 
2473 	free_fs_request(req);
2474 }
2475 
2476 static void
2477 __readahead(void *ctx)
2478 {
2479 	struct spdk_fs_request *req = ctx;
2480 	struct spdk_fs_cb_args *args = &req->args;
2481 	struct spdk_file *file = args->file;
2482 	uint64_t offset, length, start_lba, num_lba;
2483 	uint32_t lba_size;
2484 
2485 	offset = args->op.readahead.offset;
2486 	length = args->op.readahead.length;
2487 	assert(length > 0);
2488 
2489 	__get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba);
2490 
2491 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2492 		     offset, length, start_lba, num_lba);
2493 	spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2494 			  args->op.readahead.cache_buffer->buf,
2495 			  start_lba, num_lba, __readahead_done, req);
2496 }
2497 
2498 static uint64_t
2499 __next_cache_buffer_offset(uint64_t offset)
2500 {
2501 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2502 }
2503 
2504 static void
2505 check_readahead(struct spdk_file *file, uint64_t offset,
2506 		struct spdk_fs_channel *channel)
2507 {
2508 	struct spdk_fs_request *req;
2509 	struct spdk_fs_cb_args *args;
2510 
2511 	offset = __next_cache_buffer_offset(offset);
2512 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2513 		return;
2514 	}
2515 
2516 	req = alloc_fs_request(channel);
2517 	if (req == NULL) {
2518 		return;
2519 	}
2520 	args = &req->args;
2521 
2522 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2523 
2524 	args->file = file;
2525 	args->op.readahead.offset = offset;
2526 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2527 	if (!args->op.readahead.cache_buffer) {
2528 		BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset);
2529 		free_fs_request(req);
2530 		return;
2531 	}
2532 
2533 	args->op.readahead.cache_buffer->in_progress = true;
2534 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2535 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2536 	} else {
2537 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2538 	}
2539 	file->fs->send_request(__readahead, req);
2540 }
2541 
2542 static int
2543 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length,
2544 	    struct spdk_fs_channel *channel)
2545 {
2546 	struct cache_buffer *buf;
2547 	int rc;
2548 
2549 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2550 	if (buf == NULL) {
2551 		pthread_spin_unlock(&file->lock);
2552 		rc = __send_rw_from_file(file, payload, offset, length, true, channel);
2553 		pthread_spin_lock(&file->lock);
2554 		return rc;
2555 	}
2556 
2557 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2558 		length = buf->offset + buf->bytes_filled - offset;
2559 	}
2560 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2561 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2562 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2563 		pthread_spin_lock(&g_caches_lock);
2564 		spdk_tree_remove_buffer(file->tree, buf);
2565 		if (file->tree->present_mask == 0) {
2566 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2567 		}
2568 		pthread_spin_unlock(&g_caches_lock);
2569 	}
2570 
2571 	sem_post(&channel->sem);
2572 	return 0;
2573 }
2574 
2575 int64_t
2576 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx,
2577 	       void *payload, uint64_t offset, uint64_t length)
2578 {
2579 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2580 	uint64_t final_offset, final_length;
2581 	uint32_t sub_reads = 0;
2582 	int rc = 0;
2583 
2584 	pthread_spin_lock(&file->lock);
2585 
2586 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2587 
2588 	file->open_for_writing = false;
2589 
2590 	if (length == 0 || offset >= file->append_pos) {
2591 		pthread_spin_unlock(&file->lock);
2592 		return 0;
2593 	}
2594 
2595 	if (offset + length > file->append_pos) {
2596 		length = file->append_pos - offset;
2597 	}
2598 
2599 	if (offset != file->next_seq_offset) {
2600 		file->seq_byte_count = 0;
2601 	}
2602 	file->seq_byte_count += length;
2603 	file->next_seq_offset = offset + length;
2604 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2605 		check_readahead(file, offset, channel);
2606 		check_readahead(file, offset + CACHE_BUFFER_SIZE, channel);
2607 	}
2608 
2609 	final_length = 0;
2610 	final_offset = offset + length;
2611 	while (offset < final_offset) {
2612 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2613 		if (length > (final_offset - offset)) {
2614 			length = final_offset - offset;
2615 		}
2616 
2617 		sub_reads++;
2618 		rc = __file_read(file, payload, offset, length, channel);
2619 		if (rc == 0) {
2620 			final_length += length;
2621 		} else {
2622 			break;
2623 		}
2624 		payload += length;
2625 		offset += length;
2626 	}
2627 	pthread_spin_unlock(&file->lock);
2628 	while (sub_reads-- > 0) {
2629 		sem_wait(&channel->sem);
2630 	}
2631 	if (rc == 0) {
2632 		return final_length;
2633 	} else {
2634 		return rc;
2635 	}
2636 }
2637 
2638 static void
2639 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2640 	   spdk_file_op_complete cb_fn, void *cb_arg)
2641 {
2642 	struct spdk_fs_request *sync_req;
2643 	struct spdk_fs_request *flush_req;
2644 	struct spdk_fs_cb_args *sync_args;
2645 	struct spdk_fs_cb_args *flush_args;
2646 
2647 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2648 
2649 	pthread_spin_lock(&file->lock);
2650 	if (file->append_pos <= file->length_xattr) {
2651 		BLOBFS_TRACE(file, "done - file already synced\n");
2652 		pthread_spin_unlock(&file->lock);
2653 		cb_fn(cb_arg, 0);
2654 		return;
2655 	}
2656 
2657 	sync_req = alloc_fs_request(channel);
2658 	if (!sync_req) {
2659 		SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name);
2660 		pthread_spin_unlock(&file->lock);
2661 		cb_fn(cb_arg, -ENOMEM);
2662 		return;
2663 	}
2664 	sync_args = &sync_req->args;
2665 
2666 	flush_req = alloc_fs_request(channel);
2667 	if (!flush_req) {
2668 		SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name);
2669 		pthread_spin_unlock(&file->lock);
2670 		cb_fn(cb_arg, -ENOMEM);
2671 		return;
2672 	}
2673 	flush_args = &flush_req->args;
2674 
2675 	sync_args->file = file;
2676 	sync_args->fn.file_op = cb_fn;
2677 	sync_args->arg = cb_arg;
2678 	sync_args->op.sync.offset = file->append_pos;
2679 	sync_args->op.sync.xattr_in_progress = false;
2680 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2681 	pthread_spin_unlock(&file->lock);
2682 
2683 	flush_args->file = file;
2684 	channel->send_request(__file_flush, flush_req);
2685 }
2686 
2687 int
2688 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2689 {
2690 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2691 	struct spdk_fs_cb_args args = {};
2692 
2693 	args.sem = &channel->sem;
2694 	_file_sync(file, channel, __wake_caller, &args);
2695 	sem_wait(&channel->sem);
2696 
2697 	return args.rc;
2698 }
2699 
2700 void
2701 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2702 		     spdk_file_op_complete cb_fn, void *cb_arg)
2703 {
2704 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2705 
2706 	_file_sync(file, channel, cb_fn, cb_arg);
2707 }
2708 
2709 void
2710 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2711 {
2712 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2713 	file->priority = priority;
2714 
2715 }
2716 
2717 /*
2718  * Close routines
2719  */
2720 
2721 static void
2722 __file_close_async_done(void *ctx, int bserrno)
2723 {
2724 	struct spdk_fs_request *req = ctx;
2725 	struct spdk_fs_cb_args *args = &req->args;
2726 	struct spdk_file *file = args->file;
2727 
2728 	spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name);
2729 
2730 	if (file->is_deleted) {
2731 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2732 		return;
2733 	}
2734 
2735 	args->fn.file_op(args->arg, bserrno);
2736 	free_fs_request(req);
2737 }
2738 
2739 static void
2740 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2741 {
2742 	struct spdk_blob *blob;
2743 
2744 	pthread_spin_lock(&file->lock);
2745 	if (file->ref_count == 0) {
2746 		pthread_spin_unlock(&file->lock);
2747 		__file_close_async_done(req, -EBADF);
2748 		return;
2749 	}
2750 
2751 	file->ref_count--;
2752 	if (file->ref_count > 0) {
2753 		pthread_spin_unlock(&file->lock);
2754 		req->args.fn.file_op(req->args.arg, 0);
2755 		free_fs_request(req);
2756 		return;
2757 	}
2758 
2759 	pthread_spin_unlock(&file->lock);
2760 
2761 	blob = file->blob;
2762 	file->blob = NULL;
2763 	spdk_blob_close(blob, __file_close_async_done, req);
2764 }
2765 
2766 static void
2767 __file_close_async__sync_done(void *arg, int fserrno)
2768 {
2769 	struct spdk_fs_request *req = arg;
2770 	struct spdk_fs_cb_args *args = &req->args;
2771 
2772 	__file_close_async(args->file, req);
2773 }
2774 
2775 void
2776 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2777 {
2778 	struct spdk_fs_request *req;
2779 	struct spdk_fs_cb_args *args;
2780 
2781 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2782 	if (req == NULL) {
2783 		SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name);
2784 		cb_fn(cb_arg, -ENOMEM);
2785 		return;
2786 	}
2787 
2788 	args = &req->args;
2789 	args->file = file;
2790 	args->fn.file_op = cb_fn;
2791 	args->arg = cb_arg;
2792 
2793 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2794 }
2795 
2796 static void
2797 __file_close(void *arg)
2798 {
2799 	struct spdk_fs_request *req = arg;
2800 	struct spdk_fs_cb_args *args = &req->args;
2801 	struct spdk_file *file = args->file;
2802 
2803 	__file_close_async(file, req);
2804 }
2805 
2806 int
2807 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx)
2808 {
2809 	struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx;
2810 	struct spdk_fs_request *req;
2811 	struct spdk_fs_cb_args *args;
2812 
2813 	req = alloc_fs_request(channel);
2814 	if (req == NULL) {
2815 		SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name);
2816 		return -ENOMEM;
2817 	}
2818 
2819 	args = &req->args;
2820 
2821 	spdk_file_sync(file, ctx);
2822 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2823 	args->file = file;
2824 	args->sem = &channel->sem;
2825 	args->fn.file_op = __wake_caller;
2826 	args->arg = args;
2827 	channel->send_request(__file_close, req);
2828 	sem_wait(&channel->sem);
2829 
2830 	return args->rc;
2831 }
2832 
2833 int
2834 spdk_file_get_id(struct spdk_file *file, void *id, size_t size)
2835 {
2836 	if (size < sizeof(spdk_blob_id)) {
2837 		return -EINVAL;
2838 	}
2839 
2840 	memcpy(id, &file->blobid, sizeof(spdk_blob_id));
2841 
2842 	return sizeof(spdk_blob_id);
2843 }
2844 
2845 static void
2846 cache_free_buffers(struct spdk_file *file)
2847 {
2848 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2849 	pthread_spin_lock(&file->lock);
2850 	pthread_spin_lock(&g_caches_lock);
2851 	if (file->tree->present_mask == 0) {
2852 		pthread_spin_unlock(&g_caches_lock);
2853 		pthread_spin_unlock(&file->lock);
2854 		return;
2855 	}
2856 	spdk_tree_free_buffers(file->tree);
2857 
2858 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2859 	/* If not freed, put it in the end of the queue */
2860 	if (file->tree->present_mask != 0) {
2861 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2862 	}
2863 	file->last = NULL;
2864 	pthread_spin_unlock(&g_caches_lock);
2865 	pthread_spin_unlock(&file->lock);
2866 }
2867 
2868 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS)
2869 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW)
2870