xref: /spdk/lib/blobfs/blobfs.c (revision 161a3002750e4acd9e9da110b1dc70c0730e37e8)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "spdk/conf.h"
38 #include "blobfs_internal.h"
39 
40 #include "spdk/queue.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/assert.h"
43 #include "spdk/env.h"
44 #include "spdk/util.h"
45 #include "spdk_internal/log.h"
46 
47 #define BLOBFS_TRACE(file, str, args...) \
48 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args)
49 
50 #define BLOBFS_TRACE_RW(file, str, args...) \
51 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args)
52 
53 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
54 
55 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE;
56 static struct spdk_mempool *g_cache_pool;
57 static TAILQ_HEAD(, spdk_file) g_caches;
58 static int g_fs_count = 0;
59 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
60 static pthread_spinlock_t g_caches_lock;
61 
62 static void
63 __sem_post(void *arg, int bserrno)
64 {
65 	sem_t *sem = arg;
66 
67 	sem_post(sem);
68 }
69 
70 void
71 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
72 {
73 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
74 	free(cache_buffer);
75 }
76 
77 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
78 
79 struct spdk_file {
80 	struct spdk_filesystem	*fs;
81 	struct spdk_blob	*blob;
82 	char			*name;
83 	uint64_t		length;
84 	bool                    is_deleted;
85 	bool			open_for_writing;
86 	uint64_t		length_flushed;
87 	uint64_t		append_pos;
88 	uint64_t		seq_byte_count;
89 	uint64_t		next_seq_offset;
90 	uint32_t		priority;
91 	TAILQ_ENTRY(spdk_file)	tailq;
92 	spdk_blob_id		blobid;
93 	uint32_t		ref_count;
94 	pthread_spinlock_t	lock;
95 	struct cache_buffer	*last;
96 	struct cache_tree	*tree;
97 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
98 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
99 	TAILQ_ENTRY(spdk_file)	cache_tailq;
100 };
101 
102 struct spdk_deleted_file {
103 	spdk_blob_id	id;
104 	TAILQ_ENTRY(spdk_deleted_file)	tailq;
105 };
106 
107 struct spdk_filesystem {
108 	struct spdk_blob_store	*bs;
109 	TAILQ_HEAD(, spdk_file)	files;
110 	struct spdk_bs_opts	bs_opts;
111 	struct spdk_bs_dev	*bdev;
112 	fs_send_request_fn	send_request;
113 
114 	struct {
115 		uint32_t		max_ops;
116 		struct spdk_io_channel	*sync_io_channel;
117 		struct spdk_fs_channel	*sync_fs_channel;
118 	} sync_target;
119 
120 	struct {
121 		uint32_t		max_ops;
122 		struct spdk_io_channel	*md_io_channel;
123 		struct spdk_fs_channel	*md_fs_channel;
124 	} md_target;
125 
126 	struct {
127 		uint32_t		max_ops;
128 	} io_target;
129 };
130 
131 struct spdk_fs_cb_args {
132 	union {
133 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
134 		spdk_fs_op_complete			fs_op;
135 		spdk_file_op_with_handle_complete	file_op_with_handle;
136 		spdk_file_op_complete			file_op;
137 		spdk_file_stat_op_complete		stat_op;
138 	} fn;
139 	void *arg;
140 	sem_t *sem;
141 	struct spdk_filesystem *fs;
142 	struct spdk_file *file;
143 	int rc;
144 	bool from_request;
145 	union {
146 		struct {
147 			TAILQ_HEAD(, spdk_deleted_file)	deleted_files;
148 		} fs_load;
149 		struct {
150 			uint64_t	length;
151 		} truncate;
152 		struct {
153 			struct spdk_io_channel	*channel;
154 			void		*user_buf;
155 			void		*pin_buf;
156 			int		is_read;
157 			off_t		offset;
158 			size_t		length;
159 			uint64_t	start_page;
160 			uint64_t	num_pages;
161 			uint32_t	blocklen;
162 		} rw;
163 		struct {
164 			const char	*old_name;
165 			const char	*new_name;
166 		} rename;
167 		struct {
168 			struct cache_buffer	*cache_buffer;
169 			uint64_t		length;
170 		} flush;
171 		struct {
172 			struct cache_buffer	*cache_buffer;
173 			uint64_t		length;
174 			uint64_t		offset;
175 		} readahead;
176 		struct {
177 			uint64_t			offset;
178 			TAILQ_ENTRY(spdk_fs_request)	tailq;
179 			bool				xattr_in_progress;
180 		} sync;
181 		struct {
182 			uint32_t			num_clusters;
183 		} resize;
184 		struct {
185 			const char	*name;
186 			uint32_t	flags;
187 			TAILQ_ENTRY(spdk_fs_request)	tailq;
188 		} open;
189 		struct {
190 			const char	*name;
191 		} create;
192 		struct {
193 			const char	*name;
194 		} delete;
195 		struct {
196 			const char	*name;
197 		} stat;
198 	} op;
199 };
200 
201 static void cache_free_buffers(struct spdk_file *file);
202 
203 static void
204 __initialize_cache(void)
205 {
206 	assert(g_cache_pool == NULL);
207 
208 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
209 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
210 					   CACHE_BUFFER_SIZE,
211 					   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
212 					   SPDK_ENV_SOCKET_ID_ANY);
213 	TAILQ_INIT(&g_caches);
214 	pthread_spin_init(&g_caches_lock, 0);
215 }
216 
217 static void
218 __free_cache(void)
219 {
220 	assert(g_cache_pool != NULL);
221 
222 	spdk_mempool_free(g_cache_pool);
223 	g_cache_pool = NULL;
224 }
225 
226 static uint64_t
227 __file_get_blob_size(struct spdk_file *file)
228 {
229 	uint64_t cluster_sz;
230 
231 	cluster_sz = file->fs->bs_opts.cluster_sz;
232 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
233 }
234 
235 struct spdk_fs_request {
236 	struct spdk_fs_cb_args		args;
237 	TAILQ_ENTRY(spdk_fs_request)	link;
238 	struct spdk_fs_channel		*channel;
239 };
240 
241 struct spdk_fs_channel {
242 	struct spdk_fs_request		*req_mem;
243 	TAILQ_HEAD(, spdk_fs_request)	reqs;
244 	sem_t				sem;
245 	struct spdk_filesystem		*fs;
246 	struct spdk_io_channel		*bs_channel;
247 	fs_send_request_fn		send_request;
248 	bool				sync;
249 	pthread_spinlock_t		lock;
250 };
251 
252 static struct spdk_fs_request *
253 alloc_fs_request(struct spdk_fs_channel *channel)
254 {
255 	struct spdk_fs_request *req;
256 
257 	if (channel->sync) {
258 		pthread_spin_lock(&channel->lock);
259 	}
260 
261 	req = TAILQ_FIRST(&channel->reqs);
262 	if (req) {
263 		TAILQ_REMOVE(&channel->reqs, req, link);
264 	}
265 
266 	if (channel->sync) {
267 		pthread_spin_unlock(&channel->lock);
268 	}
269 
270 	if (req == NULL) {
271 		return NULL;
272 	}
273 	memset(req, 0, sizeof(*req));
274 	req->channel = channel;
275 	req->args.from_request = true;
276 
277 	return req;
278 }
279 
280 static void
281 free_fs_request(struct spdk_fs_request *req)
282 {
283 	struct spdk_fs_channel *channel = req->channel;
284 
285 	if (channel->sync) {
286 		pthread_spin_lock(&channel->lock);
287 	}
288 
289 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
290 
291 	if (channel->sync) {
292 		pthread_spin_unlock(&channel->lock);
293 	}
294 }
295 
296 static int
297 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
298 			uint32_t max_ops)
299 {
300 	uint32_t i;
301 
302 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
303 	if (!channel->req_mem) {
304 		return -1;
305 	}
306 
307 	TAILQ_INIT(&channel->reqs);
308 	sem_init(&channel->sem, 0, 0);
309 
310 	for (i = 0; i < max_ops; i++) {
311 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
312 	}
313 
314 	channel->fs = fs;
315 
316 	return 0;
317 }
318 
319 static int
320 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
321 {
322 	struct spdk_filesystem		*fs;
323 	struct spdk_fs_channel		*channel = ctx_buf;
324 
325 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
326 
327 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
328 }
329 
330 static int
331 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
332 {
333 	struct spdk_filesystem		*fs;
334 	struct spdk_fs_channel		*channel = ctx_buf;
335 
336 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
337 
338 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
339 }
340 
341 static int
342 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
343 {
344 	struct spdk_filesystem		*fs;
345 	struct spdk_fs_channel		*channel = ctx_buf;
346 
347 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
348 
349 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
350 }
351 
352 static void
353 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
354 {
355 	struct spdk_fs_channel *channel = ctx_buf;
356 
357 	free(channel->req_mem);
358 	if (channel->bs_channel != NULL) {
359 		spdk_bs_free_io_channel(channel->bs_channel);
360 	}
361 }
362 
363 static void
364 __send_request_direct(fs_request_fn fn, void *arg)
365 {
366 	fn(arg);
367 }
368 
369 static void
370 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
371 {
372 	fs->bs = bs;
373 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
374 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
375 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
376 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
377 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
378 
379 	pthread_mutex_lock(&g_cache_init_lock);
380 	if (g_fs_count == 0) {
381 		__initialize_cache();
382 	}
383 	g_fs_count++;
384 	pthread_mutex_unlock(&g_cache_init_lock);
385 }
386 
387 static void
388 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
389 {
390 	struct spdk_fs_request *req = ctx;
391 	struct spdk_fs_cb_args *args = &req->args;
392 	struct spdk_filesystem *fs = args->fs;
393 
394 	if (bserrno == 0) {
395 		common_fs_bs_init(fs, bs);
396 	} else {
397 		free(fs);
398 		fs = NULL;
399 	}
400 
401 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
402 	free_fs_request(req);
403 }
404 
405 static void
406 fs_conf_parse(void)
407 {
408 	struct spdk_conf_section *sp;
409 
410 	sp = spdk_conf_find_section(NULL, "Blobfs");
411 	if (sp == NULL) {
412 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
413 		return;
414 	}
415 
416 	g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift");
417 	if (g_fs_cache_buffer_shift <= 0) {
418 		g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT;
419 	}
420 }
421 
422 static struct spdk_filesystem *
423 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
424 {
425 	struct spdk_filesystem *fs;
426 
427 	fs = calloc(1, sizeof(*fs));
428 	if (fs == NULL) {
429 		return NULL;
430 	}
431 
432 	fs->bdev = dev;
433 	fs->send_request = send_request_fn;
434 	TAILQ_INIT(&fs->files);
435 
436 	fs->md_target.max_ops = 512;
437 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
438 				sizeof(struct spdk_fs_channel));
439 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
440 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
441 
442 	fs->sync_target.max_ops = 512;
443 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
444 				sizeof(struct spdk_fs_channel));
445 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
446 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
447 
448 	fs->io_target.max_ops = 512;
449 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
450 				sizeof(struct spdk_fs_channel));
451 
452 	return fs;
453 }
454 
455 void
456 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
457 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
458 {
459 	struct spdk_filesystem *fs;
460 	struct spdk_fs_request *req;
461 	struct spdk_fs_cb_args *args;
462 	struct spdk_bs_opts opts = {};
463 
464 	fs = fs_alloc(dev, send_request_fn);
465 	if (fs == NULL) {
466 		cb_fn(cb_arg, NULL, -ENOMEM);
467 		return;
468 	}
469 
470 	fs_conf_parse();
471 
472 	req = alloc_fs_request(fs->md_target.md_fs_channel);
473 	if (req == NULL) {
474 		spdk_put_io_channel(fs->md_target.md_io_channel);
475 		spdk_io_device_unregister(&fs->md_target, NULL);
476 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
477 		spdk_io_device_unregister(&fs->sync_target, NULL);
478 		spdk_io_device_unregister(&fs->io_target, NULL);
479 		free(fs);
480 		cb_fn(cb_arg, NULL, -ENOMEM);
481 		return;
482 	}
483 
484 	args = &req->args;
485 	args->fn.fs_op_with_handle = cb_fn;
486 	args->arg = cb_arg;
487 	args->fs = fs;
488 
489 	spdk_bs_opts_init(&opts);
490 	strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH);
491 
492 	spdk_bs_init(dev, &opts, init_cb, req);
493 }
494 
495 static struct spdk_file *
496 file_alloc(struct spdk_filesystem *fs)
497 {
498 	struct spdk_file *file;
499 
500 	file = calloc(1, sizeof(*file));
501 	if (file == NULL) {
502 		return NULL;
503 	}
504 
505 	file->tree = calloc(1, sizeof(*file->tree));
506 	if (file->tree == NULL) {
507 		free(file);
508 		return NULL;
509 	}
510 
511 	file->fs = fs;
512 	TAILQ_INIT(&file->open_requests);
513 	TAILQ_INIT(&file->sync_requests);
514 	pthread_spin_init(&file->lock, 0);
515 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
516 	file->priority = SPDK_FILE_PRIORITY_LOW;
517 	return file;
518 }
519 
520 static void iter_delete_cb(void *ctx, int bserrno);
521 
522 static int
523 _handle_deleted_files(struct spdk_fs_request *req)
524 {
525 	struct spdk_fs_cb_args *args = &req->args;
526 	struct spdk_filesystem *fs = args->fs;
527 
528 	if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) {
529 		struct spdk_deleted_file *deleted_file;
530 
531 		deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files);
532 		TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq);
533 		spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req);
534 		free(deleted_file);
535 		return 0;
536 	}
537 
538 	return 1;
539 }
540 
541 static void
542 iter_delete_cb(void *ctx, int bserrno)
543 {
544 	struct spdk_fs_request *req = ctx;
545 	struct spdk_fs_cb_args *args = &req->args;
546 	struct spdk_filesystem *fs = args->fs;
547 
548 	if (_handle_deleted_files(req) == 0)
549 		return;
550 
551 	args->fn.fs_op_with_handle(args->arg, fs, 0);
552 	free_fs_request(req);
553 
554 }
555 
556 static void
557 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
558 {
559 	struct spdk_fs_request *req = ctx;
560 	struct spdk_fs_cb_args *args = &req->args;
561 	struct spdk_filesystem *fs = args->fs;
562 	uint64_t *length;
563 	const char *name;
564 	uint32_t *is_deleted;
565 	size_t value_len;
566 
567 	if (rc == -ENOENT) {
568 		/* Finished iterating */
569 		if (_handle_deleted_files(req) == 0)
570 			return;
571 		args->fn.fs_op_with_handle(args->arg, fs, 0);
572 		free_fs_request(req);
573 		return;
574 	} else if (rc < 0) {
575 		args->fn.fs_op_with_handle(args->arg, fs, rc);
576 		free_fs_request(req);
577 		return;
578 	}
579 
580 	rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len);
581 	if (rc < 0) {
582 		args->fn.fs_op_with_handle(args->arg, fs, rc);
583 		free_fs_request(req);
584 		return;
585 	}
586 
587 	rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len);
588 	if (rc < 0) {
589 		args->fn.fs_op_with_handle(args->arg, fs, rc);
590 		free_fs_request(req);
591 		return;
592 	}
593 
594 	assert(value_len == 8);
595 
596 	/* This file could be deleted last time without close it, then app crashed, so we delete it now */
597 	rc = spdk_bs_md_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len);
598 	if (rc < 0) {
599 		struct spdk_file *f;
600 
601 		f = file_alloc(fs);
602 		if (f == NULL) {
603 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
604 			free_fs_request(req);
605 			return;
606 		}
607 
608 		f->name = strdup(name);
609 		f->blobid = spdk_blob_get_id(blob);
610 		f->length = *length;
611 		f->length_flushed = *length;
612 		f->append_pos = *length;
613 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
614 	} else {
615 		struct spdk_deleted_file *deleted_file;
616 
617 		deleted_file = calloc(1, sizeof(*deleted_file));
618 		if (deleted_file == NULL) {
619 			args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
620 			free_fs_request(req);
621 			return;
622 		}
623 		deleted_file->id = spdk_blob_get_id(blob);
624 		TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq);
625 	}
626 
627 	spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req);
628 }
629 
630 static void
631 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
632 {
633 	struct spdk_fs_request *req = ctx;
634 	struct spdk_fs_cb_args *args = &req->args;
635 	struct spdk_filesystem *fs = args->fs;
636 	struct spdk_bs_type bstype;
637 	static const struct spdk_bs_type blobfs_type = {"BLOBFS"};
638 	static const struct spdk_bs_type zeros;
639 
640 	if (bserrno != 0) {
641 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
642 		free_fs_request(req);
643 		free(fs);
644 		return;
645 	}
646 
647 	bstype = spdk_bs_get_bstype(bs);
648 
649 	if (!memcmp(&bstype, &zeros, sizeof(bstype))) {
650 		SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "assigning bstype\n");
651 		spdk_bs_set_bstype(bs, blobfs_type);
652 	} else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) {
653 		SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "not blobfs\n");
654 		SPDK_TRACEDUMP(SPDK_TRACE_BLOB, "bstype", &bstype, sizeof(bstype));
655 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
656 		free_fs_request(req);
657 		free(fs);
658 		return;
659 	}
660 
661 	common_fs_bs_init(fs, bs);
662 	spdk_bs_md_iter_first(fs->bs, iter_cb, req);
663 }
664 
665 void
666 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
667 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
668 {
669 	struct spdk_filesystem *fs;
670 	struct spdk_fs_cb_args *args;
671 	struct spdk_fs_request *req;
672 	struct spdk_bs_opts opts = {};
673 
674 	fs = fs_alloc(dev, send_request_fn);
675 	if (fs == NULL) {
676 		cb_fn(cb_arg, NULL, -ENOMEM);
677 		return;
678 	}
679 
680 	fs_conf_parse();
681 
682 	req = alloc_fs_request(fs->md_target.md_fs_channel);
683 	if (req == NULL) {
684 		spdk_put_io_channel(fs->md_target.md_io_channel);
685 		spdk_io_device_unregister(&fs->md_target, NULL);
686 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
687 		spdk_io_device_unregister(&fs->sync_target, NULL);
688 		spdk_io_device_unregister(&fs->io_target, NULL);
689 		free(fs);
690 		cb_fn(cb_arg, NULL, -ENOMEM);
691 		return;
692 	}
693 
694 	args = &req->args;
695 	args->fn.fs_op_with_handle = cb_fn;
696 	args->arg = cb_arg;
697 	args->fs = fs;
698 	TAILQ_INIT(&args->op.fs_load.deleted_files);
699 
700 	spdk_bs_opts_init(&opts);
701 
702 	spdk_bs_load(dev, &opts, load_cb, req);
703 }
704 
705 static void
706 unload_cb(void *ctx, int bserrno)
707 {
708 	struct spdk_fs_request *req = ctx;
709 	struct spdk_fs_cb_args *args = &req->args;
710 	struct spdk_filesystem *fs = args->fs;
711 
712 	pthread_mutex_lock(&g_cache_init_lock);
713 	g_fs_count--;
714 	if (g_fs_count == 0) {
715 		__free_cache();
716 	}
717 	pthread_mutex_unlock(&g_cache_init_lock);
718 
719 	args->fn.fs_op(args->arg, bserrno);
720 	free(req);
721 
722 	spdk_io_device_unregister(&fs->io_target, NULL);
723 	spdk_io_device_unregister(&fs->sync_target, NULL);
724 	spdk_io_device_unregister(&fs->md_target, NULL);
725 
726 	free(fs);
727 }
728 
729 void
730 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
731 {
732 	struct spdk_fs_request *req;
733 	struct spdk_fs_cb_args *args;
734 
735 	/*
736 	 * We must free the md_channel before unloading the blobstore, so just
737 	 *  allocate this request from the general heap.
738 	 */
739 	req = calloc(1, sizeof(*req));
740 	if (req == NULL) {
741 		cb_fn(cb_arg, -ENOMEM);
742 		return;
743 	}
744 
745 	args = &req->args;
746 	args->fn.fs_op = cb_fn;
747 	args->arg = cb_arg;
748 	args->fs = fs;
749 
750 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
751 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
752 	spdk_bs_unload(fs->bs, unload_cb, req);
753 }
754 
755 static struct spdk_file *
756 fs_find_file(struct spdk_filesystem *fs, const char *name)
757 {
758 	struct spdk_file *file;
759 
760 	TAILQ_FOREACH(file, &fs->files, tailq) {
761 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
762 			return file;
763 		}
764 	}
765 
766 	return NULL;
767 }
768 
769 void
770 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
771 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
772 {
773 	struct spdk_file_stat stat;
774 	struct spdk_file *f = NULL;
775 
776 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
777 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
778 		return;
779 	}
780 
781 	f = fs_find_file(fs, name);
782 	if (f != NULL) {
783 		stat.blobid = f->blobid;
784 		stat.size = f->length;
785 		cb_fn(cb_arg, &stat, 0);
786 		return;
787 	}
788 
789 	cb_fn(cb_arg, NULL, -ENOENT);
790 }
791 
792 static void
793 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
794 {
795 	struct spdk_fs_request *req = arg;
796 	struct spdk_fs_cb_args *args = &req->args;
797 
798 	args->rc = fserrno;
799 	if (fserrno == 0) {
800 		memcpy(args->arg, stat, sizeof(*stat));
801 	}
802 	sem_post(args->sem);
803 }
804 
805 static void
806 __file_stat(void *arg)
807 {
808 	struct spdk_fs_request *req = arg;
809 	struct spdk_fs_cb_args *args = &req->args;
810 
811 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
812 				args->fn.stat_op, req);
813 }
814 
815 int
816 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
817 		  const char *name, struct spdk_file_stat *stat)
818 {
819 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
820 	struct spdk_fs_request *req;
821 	int rc;
822 
823 	req = alloc_fs_request(channel);
824 	assert(req != NULL);
825 
826 	req->args.fs = fs;
827 	req->args.op.stat.name = name;
828 	req->args.fn.stat_op = __copy_stat;
829 	req->args.arg = stat;
830 	req->args.sem = &channel->sem;
831 	channel->send_request(__file_stat, req);
832 	sem_wait(&channel->sem);
833 
834 	rc = req->args.rc;
835 	free_fs_request(req);
836 
837 	return rc;
838 }
839 
840 static void
841 fs_create_blob_close_cb(void *ctx, int bserrno)
842 {
843 	struct spdk_fs_request *req = ctx;
844 	struct spdk_fs_cb_args *args = &req->args;
845 
846 	args->fn.file_op(args->arg, bserrno);
847 	free_fs_request(req);
848 }
849 
850 static void
851 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
852 {
853 	struct spdk_fs_request *req = ctx;
854 	struct spdk_fs_cb_args *args = &req->args;
855 	struct spdk_file *f = args->file;
856 	uint64_t length = 0;
857 
858 	f->blob = blob;
859 	spdk_bs_md_resize_blob(blob, 1);
860 	spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
861 	spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
862 
863 	spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args);
864 }
865 
866 static void
867 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
868 {
869 	struct spdk_fs_request *req = ctx;
870 	struct spdk_fs_cb_args *args = &req->args;
871 	struct spdk_file *f = args->file;
872 
873 	f->blobid = blobid;
874 	spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
875 }
876 
877 void
878 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
879 			  spdk_file_op_complete cb_fn, void *cb_arg)
880 {
881 	struct spdk_file *file;
882 	struct spdk_fs_request *req;
883 	struct spdk_fs_cb_args *args;
884 
885 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
886 		cb_fn(cb_arg, -ENAMETOOLONG);
887 		return;
888 	}
889 
890 	file = fs_find_file(fs, name);
891 	if (file != NULL) {
892 		cb_fn(cb_arg, -EEXIST);
893 		return;
894 	}
895 
896 	file = file_alloc(fs);
897 	if (file == NULL) {
898 		cb_fn(cb_arg, -ENOMEM);
899 		return;
900 	}
901 
902 	req = alloc_fs_request(fs->md_target.md_fs_channel);
903 	if (req == NULL) {
904 		cb_fn(cb_arg, -ENOMEM);
905 		return;
906 	}
907 
908 	args = &req->args;
909 	args->file = file;
910 	args->fn.file_op = cb_fn;
911 	args->arg = cb_arg;
912 
913 	file->name = strdup(name);
914 	spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args);
915 }
916 
917 static void
918 __fs_create_file_done(void *arg, int fserrno)
919 {
920 	struct spdk_fs_request *req = arg;
921 	struct spdk_fs_cb_args *args = &req->args;
922 
923 	args->rc = fserrno;
924 	sem_post(args->sem);
925 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
926 }
927 
928 static void
929 __fs_create_file(void *arg)
930 {
931 	struct spdk_fs_request *req = arg;
932 	struct spdk_fs_cb_args *args = &req->args;
933 
934 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
935 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
936 }
937 
938 int
939 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
940 {
941 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
942 	struct spdk_fs_request *req;
943 	struct spdk_fs_cb_args *args;
944 	int rc;
945 
946 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
947 
948 	req = alloc_fs_request(channel);
949 	assert(req != NULL);
950 
951 	args = &req->args;
952 	args->fs = fs;
953 	args->op.create.name = name;
954 	args->sem = &channel->sem;
955 	fs->send_request(__fs_create_file, req);
956 	sem_wait(&channel->sem);
957 	rc = args->rc;
958 	free_fs_request(req);
959 
960 	return rc;
961 }
962 
963 static void
964 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
965 {
966 	struct spdk_fs_request *req = ctx;
967 	struct spdk_fs_cb_args *args = &req->args;
968 	struct spdk_file *f = args->file;
969 
970 	f->blob = blob;
971 	while (!TAILQ_EMPTY(&f->open_requests)) {
972 		req = TAILQ_FIRST(&f->open_requests);
973 		args = &req->args;
974 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
975 		args->fn.file_op_with_handle(args->arg, f, bserrno);
976 		free_fs_request(req);
977 	}
978 }
979 
980 static void
981 fs_open_blob_create_cb(void *ctx, int bserrno)
982 {
983 	struct spdk_fs_request *req = ctx;
984 	struct spdk_fs_cb_args *args = &req->args;
985 	struct spdk_file *file = args->file;
986 	struct spdk_filesystem *fs = args->fs;
987 
988 	if (file == NULL) {
989 		/*
990 		 * This is from an open with CREATE flag - the file
991 		 *  is now created so look it up in the file list for this
992 		 *  filesystem.
993 		 */
994 		file = fs_find_file(fs, args->op.open.name);
995 		assert(file != NULL);
996 		args->file = file;
997 	}
998 
999 	file->ref_count++;
1000 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
1001 	if (file->ref_count == 1) {
1002 		assert(file->blob == NULL);
1003 		spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
1004 	} else if (file->blob != NULL) {
1005 		fs_open_blob_done(req, file->blob, 0);
1006 	} else {
1007 		/*
1008 		 * The blob open for this file is in progress due to a previous
1009 		 *  open request.  When that open completes, it will invoke the
1010 		 *  open callback for this request.
1011 		 */
1012 	}
1013 }
1014 
1015 void
1016 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
1017 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
1018 {
1019 	struct spdk_file *f = NULL;
1020 	struct spdk_fs_request *req;
1021 	struct spdk_fs_cb_args *args;
1022 
1023 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1024 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
1025 		return;
1026 	}
1027 
1028 	f = fs_find_file(fs, name);
1029 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
1030 		cb_fn(cb_arg, NULL, -ENOENT);
1031 		return;
1032 	}
1033 
1034 	if (f != NULL && f->is_deleted == true) {
1035 		cb_fn(cb_arg, NULL, -ENOENT);
1036 		return;
1037 	}
1038 
1039 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1040 	if (req == NULL) {
1041 		cb_fn(cb_arg, NULL, -ENOMEM);
1042 		return;
1043 	}
1044 
1045 	args = &req->args;
1046 	args->fn.file_op_with_handle = cb_fn;
1047 	args->arg = cb_arg;
1048 	args->file = f;
1049 	args->fs = fs;
1050 	args->op.open.name = name;
1051 
1052 	if (f == NULL) {
1053 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
1054 	} else {
1055 		fs_open_blob_create_cb(req, 0);
1056 	}
1057 }
1058 
1059 static void
1060 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
1061 {
1062 	struct spdk_fs_request *req = arg;
1063 	struct spdk_fs_cb_args *args = &req->args;
1064 
1065 	args->file = file;
1066 	args->rc = bserrno;
1067 	sem_post(args->sem);
1068 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
1069 }
1070 
1071 static void
1072 __fs_open_file(void *arg)
1073 {
1074 	struct spdk_fs_request *req = arg;
1075 	struct spdk_fs_cb_args *args = &req->args;
1076 
1077 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
1078 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
1079 				__fs_open_file_done, req);
1080 }
1081 
1082 int
1083 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1084 		  const char *name, uint32_t flags, struct spdk_file **file)
1085 {
1086 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1087 	struct spdk_fs_request *req;
1088 	struct spdk_fs_cb_args *args;
1089 	int rc;
1090 
1091 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1092 
1093 	req = alloc_fs_request(channel);
1094 	assert(req != NULL);
1095 
1096 	args = &req->args;
1097 	args->fs = fs;
1098 	args->op.open.name = name;
1099 	args->op.open.flags = flags;
1100 	args->sem = &channel->sem;
1101 	fs->send_request(__fs_open_file, req);
1102 	sem_wait(&channel->sem);
1103 	rc = args->rc;
1104 	if (rc == 0) {
1105 		*file = args->file;
1106 	} else {
1107 		*file = NULL;
1108 	}
1109 	free_fs_request(req);
1110 
1111 	return rc;
1112 }
1113 
1114 static void
1115 fs_rename_blob_close_cb(void *ctx, int bserrno)
1116 {
1117 	struct spdk_fs_request *req = ctx;
1118 	struct spdk_fs_cb_args *args = &req->args;
1119 
1120 	args->fn.fs_op(args->arg, bserrno);
1121 	free_fs_request(req);
1122 }
1123 
1124 static void
1125 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1126 {
1127 	struct spdk_fs_request *req = ctx;
1128 	struct spdk_fs_cb_args *args = &req->args;
1129 	struct spdk_file *f = args->file;
1130 	const char *new_name = args->op.rename.new_name;
1131 
1132 	f->blob = blob;
1133 	spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1134 	spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req);
1135 }
1136 
1137 static void
1138 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1139 {
1140 	struct spdk_fs_cb_args *args = &req->args;
1141 	struct spdk_file *f;
1142 
1143 	f = fs_find_file(args->fs, args->op.rename.old_name);
1144 	if (f == NULL) {
1145 		args->fn.fs_op(args->arg, -ENOENT);
1146 		free_fs_request(req);
1147 		return;
1148 	}
1149 
1150 	free(f->name);
1151 	f->name = strdup(args->op.rename.new_name);
1152 	args->file = f;
1153 	spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1154 }
1155 
1156 static void
1157 fs_rename_delete_done(void *arg, int fserrno)
1158 {
1159 	__spdk_fs_md_rename_file(arg);
1160 }
1161 
1162 void
1163 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1164 			  const char *old_name, const char *new_name,
1165 			  spdk_file_op_complete cb_fn, void *cb_arg)
1166 {
1167 	struct spdk_file *f;
1168 	struct spdk_fs_request *req;
1169 	struct spdk_fs_cb_args *args;
1170 
1171 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1172 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1173 		cb_fn(cb_arg, -ENAMETOOLONG);
1174 		return;
1175 	}
1176 
1177 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1178 	if (req == NULL) {
1179 		cb_fn(cb_arg, -ENOMEM);
1180 		return;
1181 	}
1182 
1183 	args = &req->args;
1184 	args->fn.fs_op = cb_fn;
1185 	args->fs = fs;
1186 	args->arg = cb_arg;
1187 	args->op.rename.old_name = old_name;
1188 	args->op.rename.new_name = new_name;
1189 
1190 	f = fs_find_file(fs, new_name);
1191 	if (f == NULL) {
1192 		__spdk_fs_md_rename_file(req);
1193 		return;
1194 	}
1195 
1196 	/*
1197 	 * The rename overwrites an existing file.  So delete the existing file, then
1198 	 *  do the actual rename.
1199 	 */
1200 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1201 }
1202 
1203 static void
1204 __fs_rename_file_done(void *arg, int fserrno)
1205 {
1206 	struct spdk_fs_request *req = arg;
1207 	struct spdk_fs_cb_args *args = &req->args;
1208 
1209 	args->rc = fserrno;
1210 	sem_post(args->sem);
1211 }
1212 
1213 static void
1214 __fs_rename_file(void *arg)
1215 {
1216 	struct spdk_fs_request *req = arg;
1217 	struct spdk_fs_cb_args *args = &req->args;
1218 
1219 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1220 				  __fs_rename_file_done, req);
1221 }
1222 
1223 int
1224 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1225 		    const char *old_name, const char *new_name)
1226 {
1227 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1228 	struct spdk_fs_request *req;
1229 	struct spdk_fs_cb_args *args;
1230 	int rc;
1231 
1232 	req = alloc_fs_request(channel);
1233 	assert(req != NULL);
1234 
1235 	args = &req->args;
1236 
1237 	args->fs = fs;
1238 	args->op.rename.old_name = old_name;
1239 	args->op.rename.new_name = new_name;
1240 	args->sem = &channel->sem;
1241 	fs->send_request(__fs_rename_file, req);
1242 	sem_wait(&channel->sem);
1243 	rc = args->rc;
1244 	free_fs_request(req);
1245 	return rc;
1246 }
1247 
1248 static void
1249 blob_delete_cb(void *ctx, int bserrno)
1250 {
1251 	struct spdk_fs_request *req = ctx;
1252 	struct spdk_fs_cb_args *args = &req->args;
1253 
1254 	args->fn.file_op(args->arg, bserrno);
1255 	free_fs_request(req);
1256 }
1257 
1258 void
1259 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1260 			  spdk_file_op_complete cb_fn, void *cb_arg)
1261 {
1262 	struct spdk_file *f;
1263 	spdk_blob_id blobid;
1264 	struct spdk_fs_request *req;
1265 	struct spdk_fs_cb_args *args;
1266 
1267 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1268 
1269 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1270 		cb_fn(cb_arg, -ENAMETOOLONG);
1271 		return;
1272 	}
1273 
1274 	f = fs_find_file(fs, name);
1275 	if (f == NULL) {
1276 		cb_fn(cb_arg, -ENOENT);
1277 		return;
1278 	}
1279 
1280 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1281 	if (req == NULL) {
1282 		cb_fn(cb_arg, -ENOMEM);
1283 		return;
1284 	}
1285 
1286 	args = &req->args;
1287 	args->fn.file_op = cb_fn;
1288 	args->arg = cb_arg;
1289 
1290 	if (f->ref_count > 0) {
1291 		/* If the ref > 0, we mark the file as deleted and delete it when we close it. */
1292 		f->is_deleted = true;
1293 		spdk_blob_md_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool));
1294 		spdk_bs_md_sync_blob(f->blob, blob_delete_cb, args);
1295 		return;
1296 	}
1297 
1298 	TAILQ_REMOVE(&fs->files, f, tailq);
1299 
1300 	cache_free_buffers(f);
1301 
1302 	blobid = f->blobid;
1303 
1304 	free(f->name);
1305 	free(f->tree);
1306 	free(f);
1307 
1308 	spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1309 }
1310 
1311 static void
1312 __fs_delete_file_done(void *arg, int fserrno)
1313 {
1314 	struct spdk_fs_request *req = arg;
1315 	struct spdk_fs_cb_args *args = &req->args;
1316 
1317 	args->rc = fserrno;
1318 	sem_post(args->sem);
1319 }
1320 
1321 static void
1322 __fs_delete_file(void *arg)
1323 {
1324 	struct spdk_fs_request *req = arg;
1325 	struct spdk_fs_cb_args *args = &req->args;
1326 
1327 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1328 }
1329 
1330 int
1331 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1332 		    const char *name)
1333 {
1334 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1335 	struct spdk_fs_request *req;
1336 	struct spdk_fs_cb_args *args;
1337 	int rc;
1338 
1339 	req = alloc_fs_request(channel);
1340 	assert(req != NULL);
1341 
1342 	args = &req->args;
1343 	args->fs = fs;
1344 	args->op.delete.name = name;
1345 	args->sem = &channel->sem;
1346 	fs->send_request(__fs_delete_file, req);
1347 	sem_wait(&channel->sem);
1348 	rc = args->rc;
1349 	free_fs_request(req);
1350 
1351 	return rc;
1352 }
1353 
1354 spdk_fs_iter
1355 spdk_fs_iter_first(struct spdk_filesystem *fs)
1356 {
1357 	struct spdk_file *f;
1358 
1359 	f = TAILQ_FIRST(&fs->files);
1360 	return f;
1361 }
1362 
1363 spdk_fs_iter
1364 spdk_fs_iter_next(spdk_fs_iter iter)
1365 {
1366 	struct spdk_file *f = iter;
1367 
1368 	if (f == NULL) {
1369 		return NULL;
1370 	}
1371 
1372 	f = TAILQ_NEXT(f, tailq);
1373 	return f;
1374 }
1375 
1376 const char *
1377 spdk_file_get_name(struct spdk_file *file)
1378 {
1379 	return file->name;
1380 }
1381 
1382 uint64_t
1383 spdk_file_get_length(struct spdk_file *file)
1384 {
1385 	assert(file != NULL);
1386 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1387 	return file->length;
1388 }
1389 
1390 static void
1391 fs_truncate_complete_cb(void *ctx, int bserrno)
1392 {
1393 	struct spdk_fs_request *req = ctx;
1394 	struct spdk_fs_cb_args *args = &req->args;
1395 
1396 	args->fn.file_op(args->arg, bserrno);
1397 	free_fs_request(req);
1398 }
1399 
1400 static uint64_t
1401 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1402 {
1403 	return (length + cluster_sz - 1) / cluster_sz;
1404 }
1405 
1406 void
1407 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1408 			 spdk_file_op_complete cb_fn, void *cb_arg)
1409 {
1410 	struct spdk_filesystem *fs;
1411 	size_t num_clusters;
1412 	struct spdk_fs_request *req;
1413 	struct spdk_fs_cb_args *args;
1414 
1415 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1416 	if (length == file->length) {
1417 		cb_fn(cb_arg, 0);
1418 		return;
1419 	}
1420 
1421 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1422 	if (req == NULL) {
1423 		cb_fn(cb_arg, -ENOMEM);
1424 		return;
1425 	}
1426 
1427 	args = &req->args;
1428 	args->fn.file_op = cb_fn;
1429 	args->arg = cb_arg;
1430 	args->file = file;
1431 	fs = file->fs;
1432 
1433 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1434 
1435 	spdk_bs_md_resize_blob(file->blob, num_clusters);
1436 	spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length));
1437 
1438 	file->length = length;
1439 	if (file->append_pos > file->length) {
1440 		file->append_pos = file->length;
1441 	}
1442 
1443 	spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args);
1444 }
1445 
1446 static void
1447 __truncate(void *arg)
1448 {
1449 	struct spdk_fs_request *req = arg;
1450 	struct spdk_fs_cb_args *args = &req->args;
1451 
1452 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1453 				 args->fn.file_op, args->arg);
1454 }
1455 
1456 void
1457 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1458 		   uint64_t length)
1459 {
1460 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1461 	struct spdk_fs_request *req;
1462 	struct spdk_fs_cb_args *args;
1463 
1464 	req = alloc_fs_request(channel);
1465 	assert(req != NULL);
1466 
1467 	args = &req->args;
1468 
1469 	args->file = file;
1470 	args->op.truncate.length = length;
1471 	args->fn.file_op = __sem_post;
1472 	args->arg = &channel->sem;
1473 
1474 	channel->send_request(__truncate, req);
1475 	sem_wait(&channel->sem);
1476 	free_fs_request(req);
1477 }
1478 
1479 static void
1480 __rw_done(void *ctx, int bserrno)
1481 {
1482 	struct spdk_fs_request *req = ctx;
1483 	struct spdk_fs_cb_args *args = &req->args;
1484 
1485 	spdk_dma_free(args->op.rw.pin_buf);
1486 	args->fn.file_op(args->arg, bserrno);
1487 	free_fs_request(req);
1488 }
1489 
1490 static void
1491 __read_done(void *ctx, int bserrno)
1492 {
1493 	struct spdk_fs_request *req = ctx;
1494 	struct spdk_fs_cb_args *args = &req->args;
1495 
1496 	if (args->op.rw.is_read) {
1497 		memcpy(args->op.rw.user_buf,
1498 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1499 		       args->op.rw.length);
1500 		__rw_done(req, 0);
1501 	} else {
1502 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1503 		       args->op.rw.user_buf,
1504 		       args->op.rw.length);
1505 		spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel,
1506 				      args->op.rw.pin_buf,
1507 				      args->op.rw.start_page, args->op.rw.num_pages,
1508 				      __rw_done, req);
1509 	}
1510 }
1511 
1512 static void
1513 __do_blob_read(void *ctx, int fserrno)
1514 {
1515 	struct spdk_fs_request *req = ctx;
1516 	struct spdk_fs_cb_args *args = &req->args;
1517 
1518 	spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel,
1519 			     args->op.rw.pin_buf,
1520 			     args->op.rw.start_page, args->op.rw.num_pages,
1521 			     __read_done, req);
1522 }
1523 
1524 static void
1525 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1526 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1527 {
1528 	uint64_t end_page;
1529 
1530 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1531 	*start_page = offset / *page_size;
1532 	end_page = (offset + length - 1) / *page_size;
1533 	*num_pages = (end_page - *start_page + 1);
1534 }
1535 
1536 static void
1537 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1538 	    void *payload, uint64_t offset, uint64_t length,
1539 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1540 {
1541 	struct spdk_fs_request *req;
1542 	struct spdk_fs_cb_args *args;
1543 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1544 	uint64_t start_page, num_pages, pin_buf_length;
1545 	uint32_t page_size;
1546 
1547 	if (is_read && offset + length > file->length) {
1548 		cb_fn(cb_arg, -EINVAL);
1549 		return;
1550 	}
1551 
1552 	req = alloc_fs_request(channel);
1553 	if (req == NULL) {
1554 		cb_fn(cb_arg, -ENOMEM);
1555 		return;
1556 	}
1557 
1558 	args = &req->args;
1559 	args->fn.file_op = cb_fn;
1560 	args->arg = cb_arg;
1561 	args->file = file;
1562 	args->op.rw.channel = channel->bs_channel;
1563 	args->op.rw.user_buf = payload;
1564 	args->op.rw.is_read = is_read;
1565 	args->op.rw.offset = offset;
1566 	args->op.rw.length = length;
1567 
1568 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1569 	pin_buf_length = num_pages * page_size;
1570 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL);
1571 
1572 	args->op.rw.start_page = start_page;
1573 	args->op.rw.num_pages = num_pages;
1574 
1575 	if (!is_read && file->length < offset + length) {
1576 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1577 	} else {
1578 		__do_blob_read(req, 0);
1579 	}
1580 }
1581 
1582 void
1583 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1584 		      void *payload, uint64_t offset, uint64_t length,
1585 		      spdk_file_op_complete cb_fn, void *cb_arg)
1586 {
1587 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1588 }
1589 
1590 void
1591 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1592 		     void *payload, uint64_t offset, uint64_t length,
1593 		     spdk_file_op_complete cb_fn, void *cb_arg)
1594 {
1595 	SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n",
1596 		      file->name, offset, length);
1597 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1598 }
1599 
1600 struct spdk_io_channel *
1601 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1602 {
1603 	struct spdk_io_channel *io_channel;
1604 	struct spdk_fs_channel *fs_channel;
1605 
1606 	io_channel = spdk_get_io_channel(&fs->io_target);
1607 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1608 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1609 	fs_channel->send_request = __send_request_direct;
1610 
1611 	return io_channel;
1612 }
1613 
1614 struct spdk_io_channel *
1615 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1616 {
1617 	struct spdk_io_channel *io_channel;
1618 	struct spdk_fs_channel *fs_channel;
1619 
1620 	io_channel = spdk_get_io_channel(&fs->io_target);
1621 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1622 	fs_channel->send_request = fs->send_request;
1623 	fs_channel->sync = 1;
1624 	pthread_spin_init(&fs_channel->lock, 0);
1625 
1626 	return io_channel;
1627 }
1628 
1629 void
1630 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1631 {
1632 	spdk_put_io_channel(channel);
1633 }
1634 
1635 void
1636 spdk_fs_set_cache_size(uint64_t size_in_mb)
1637 {
1638 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1639 }
1640 
1641 uint64_t
1642 spdk_fs_get_cache_size(void)
1643 {
1644 	return g_fs_cache_size / (1024 * 1024);
1645 }
1646 
1647 static void __file_flush(void *_args);
1648 
1649 static void *
1650 alloc_cache_memory_buffer(struct spdk_file *context)
1651 {
1652 	struct spdk_file *file;
1653 	void *buf;
1654 
1655 	buf = spdk_mempool_get(g_cache_pool);
1656 	if (buf != NULL) {
1657 		return buf;
1658 	}
1659 
1660 	pthread_spin_lock(&g_caches_lock);
1661 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1662 		if (!file->open_for_writing &&
1663 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1664 		    file != context) {
1665 			break;
1666 		}
1667 	}
1668 	pthread_spin_unlock(&g_caches_lock);
1669 	if (file != NULL) {
1670 		cache_free_buffers(file);
1671 		buf = spdk_mempool_get(g_cache_pool);
1672 		if (buf != NULL) {
1673 			return buf;
1674 		}
1675 	}
1676 
1677 	pthread_spin_lock(&g_caches_lock);
1678 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1679 		if (!file->open_for_writing && file != context) {
1680 			break;
1681 		}
1682 	}
1683 	pthread_spin_unlock(&g_caches_lock);
1684 	if (file != NULL) {
1685 		cache_free_buffers(file);
1686 		buf = spdk_mempool_get(g_cache_pool);
1687 		if (buf != NULL) {
1688 			return buf;
1689 		}
1690 	}
1691 
1692 	pthread_spin_lock(&g_caches_lock);
1693 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1694 		if (file != context) {
1695 			break;
1696 		}
1697 	}
1698 	pthread_spin_unlock(&g_caches_lock);
1699 	if (file != NULL) {
1700 		cache_free_buffers(file);
1701 		buf = spdk_mempool_get(g_cache_pool);
1702 		if (buf != NULL) {
1703 			return buf;
1704 		}
1705 	}
1706 
1707 	return NULL;
1708 }
1709 
1710 static struct cache_buffer *
1711 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1712 {
1713 	struct cache_buffer *buf;
1714 	int count = 0;
1715 
1716 	buf = calloc(1, sizeof(*buf));
1717 	if (buf == NULL) {
1718 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n");
1719 		return NULL;
1720 	}
1721 
1722 	buf->buf = alloc_cache_memory_buffer(file);
1723 	while (buf->buf == NULL) {
1724 		/*
1725 		 * TODO: alloc_cache_memory_buffer() should eventually free
1726 		 *  some buffers.  Need a more sophisticated check here, instead
1727 		 *  of just bailing if 100 tries does not result in getting a
1728 		 *  free buffer.  This will involve using the sync channel's
1729 		 *  semaphore to block until a buffer becomes available.
1730 		 */
1731 		if (count++ == 100) {
1732 			SPDK_ERRLOG("could not allocate cache buffer\n");
1733 			assert(false);
1734 			free(buf);
1735 			return NULL;
1736 		}
1737 		buf->buf = alloc_cache_memory_buffer(file);
1738 	}
1739 
1740 	buf->buf_size = CACHE_BUFFER_SIZE;
1741 	buf->offset = offset;
1742 
1743 	pthread_spin_lock(&g_caches_lock);
1744 	if (file->tree->present_mask == 0) {
1745 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1746 	}
1747 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1748 	pthread_spin_unlock(&g_caches_lock);
1749 
1750 	return buf;
1751 }
1752 
1753 static struct cache_buffer *
1754 cache_append_buffer(struct spdk_file *file)
1755 {
1756 	struct cache_buffer *last;
1757 
1758 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1759 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1760 
1761 	last = cache_insert_buffer(file, file->append_pos);
1762 	if (last == NULL) {
1763 		SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n");
1764 		return NULL;
1765 	}
1766 
1767 	file->last = last;
1768 
1769 	return last;
1770 }
1771 
1772 static void
1773 __wake_caller(struct spdk_fs_cb_args *args)
1774 {
1775 	sem_post(args->sem);
1776 }
1777 
1778 static void __check_sync_reqs(struct spdk_file *file);
1779 
1780 static void
1781 __file_cache_finish_sync(struct spdk_file *file)
1782 {
1783 	struct spdk_fs_request *sync_req;
1784 	struct spdk_fs_cb_args *sync_args;
1785 
1786 	pthread_spin_lock(&file->lock);
1787 	sync_req = TAILQ_FIRST(&file->sync_requests);
1788 	sync_args = &sync_req->args;
1789 	assert(sync_args->op.sync.offset <= file->length_flushed);
1790 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1791 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1792 	pthread_spin_unlock(&file->lock);
1793 
1794 	sync_args->fn.file_op(sync_args->arg, 0);
1795 	__check_sync_reqs(file);
1796 
1797 	pthread_spin_lock(&file->lock);
1798 	free_fs_request(sync_req);
1799 	pthread_spin_unlock(&file->lock);
1800 }
1801 
1802 static void
1803 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1804 {
1805 	struct spdk_file *file = ctx;
1806 
1807 	__file_cache_finish_sync(file);
1808 }
1809 
1810 static void
1811 __free_args(struct spdk_fs_cb_args *args)
1812 {
1813 	struct spdk_fs_request *req;
1814 
1815 	if (!args->from_request) {
1816 		free(args);
1817 	} else {
1818 		/* Depends on args being at the start of the spdk_fs_request structure. */
1819 		req = (struct spdk_fs_request *)args;
1820 		free_fs_request(req);
1821 	}
1822 }
1823 
1824 static void
1825 __check_sync_reqs(struct spdk_file *file)
1826 {
1827 	struct spdk_fs_request *sync_req;
1828 
1829 	pthread_spin_lock(&file->lock);
1830 
1831 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1832 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1833 			break;
1834 		}
1835 	}
1836 
1837 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1838 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1839 		sync_req->args.op.sync.xattr_in_progress = true;
1840 		spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed,
1841 				       sizeof(file->length_flushed));
1842 
1843 		pthread_spin_unlock(&file->lock);
1844 		spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file);
1845 	} else {
1846 		pthread_spin_unlock(&file->lock);
1847 	}
1848 }
1849 
1850 static void
1851 __file_flush_done(void *arg, int bserrno)
1852 {
1853 	struct spdk_fs_cb_args *args = arg;
1854 	struct spdk_file *file = args->file;
1855 	struct cache_buffer *next = args->op.flush.cache_buffer;
1856 
1857 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1858 
1859 	pthread_spin_lock(&file->lock);
1860 	next->in_progress = false;
1861 	next->bytes_flushed += args->op.flush.length;
1862 	file->length_flushed += args->op.flush.length;
1863 	if (file->length_flushed > file->length) {
1864 		file->length = file->length_flushed;
1865 	}
1866 	if (next->bytes_flushed == next->buf_size) {
1867 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1868 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1869 	}
1870 
1871 	/*
1872 	 * Assert that there is no cached data that extends past the end of the underlying
1873 	 *  blob.
1874 	 */
1875 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1876 	       next->bytes_filled == 0);
1877 
1878 	pthread_spin_unlock(&file->lock);
1879 
1880 	__check_sync_reqs(file);
1881 
1882 	__file_flush(args);
1883 }
1884 
1885 static void
1886 __file_flush(void *_args)
1887 {
1888 	struct spdk_fs_cb_args *args = _args;
1889 	struct spdk_file *file = args->file;
1890 	struct cache_buffer *next;
1891 	uint64_t offset, length, start_page, num_pages;
1892 	uint32_t page_size;
1893 
1894 	pthread_spin_lock(&file->lock);
1895 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1896 	if (next == NULL || next->in_progress) {
1897 		/*
1898 		 * There is either no data to flush, or a flush I/O is already in
1899 		 *  progress.  So return immediately - if a flush I/O is in
1900 		 *  progress we will flush more data after that is completed.
1901 		 */
1902 		__free_args(args);
1903 		pthread_spin_unlock(&file->lock);
1904 		return;
1905 	}
1906 
1907 	offset = next->offset + next->bytes_flushed;
1908 	length = next->bytes_filled - next->bytes_flushed;
1909 	if (length == 0) {
1910 		__free_args(args);
1911 		pthread_spin_unlock(&file->lock);
1912 		return;
1913 	}
1914 	args->op.flush.length = length;
1915 	args->op.flush.cache_buffer = next;
1916 
1917 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1918 
1919 	next->in_progress = true;
1920 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1921 		     offset, length, start_page, num_pages);
1922 	pthread_spin_unlock(&file->lock);
1923 	spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1924 			      next->buf + (start_page * page_size) - next->offset,
1925 			      start_page, num_pages,
1926 			      __file_flush_done, args);
1927 }
1928 
1929 static void
1930 __file_extend_done(void *arg, int bserrno)
1931 {
1932 	struct spdk_fs_cb_args *args = arg;
1933 
1934 	__wake_caller(args);
1935 }
1936 
1937 static void
1938 __file_extend_blob(void *_args)
1939 {
1940 	struct spdk_fs_cb_args *args = _args;
1941 	struct spdk_file *file = args->file;
1942 
1943 	spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters);
1944 
1945 	spdk_bs_md_sync_blob(file->blob, __file_extend_done, args);
1946 }
1947 
1948 static void
1949 __rw_from_file_done(void *arg, int bserrno)
1950 {
1951 	struct spdk_fs_cb_args *args = arg;
1952 
1953 	__wake_caller(args);
1954 	__free_args(args);
1955 }
1956 
1957 static void
1958 __rw_from_file(void *_args)
1959 {
1960 	struct spdk_fs_cb_args *args = _args;
1961 	struct spdk_file *file = args->file;
1962 
1963 	if (args->op.rw.is_read) {
1964 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1965 				     args->op.rw.offset, args->op.rw.length,
1966 				     __rw_from_file_done, args);
1967 	} else {
1968 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1969 				      args->op.rw.offset, args->op.rw.length,
1970 				      __rw_from_file_done, args);
1971 	}
1972 }
1973 
1974 static int
1975 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
1976 		    uint64_t offset, uint64_t length, bool is_read)
1977 {
1978 	struct spdk_fs_cb_args *args;
1979 
1980 	args = calloc(1, sizeof(*args));
1981 	if (args == NULL) {
1982 		sem_post(sem);
1983 		return -ENOMEM;
1984 	}
1985 
1986 	args->file = file;
1987 	args->sem = sem;
1988 	args->op.rw.user_buf = payload;
1989 	args->op.rw.offset = offset;
1990 	args->op.rw.length = length;
1991 	args->op.rw.is_read = is_read;
1992 	file->fs->send_request(__rw_from_file, args);
1993 	return 0;
1994 }
1995 
1996 int
1997 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
1998 		void *payload, uint64_t offset, uint64_t length)
1999 {
2000 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2001 	struct spdk_fs_cb_args *args;
2002 	uint64_t rem_length, copy, blob_size, cluster_sz;
2003 	uint32_t cache_buffers_filled = 0;
2004 	uint8_t *cur_payload;
2005 	struct cache_buffer *last;
2006 
2007 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
2008 
2009 	if (length == 0) {
2010 		return 0;
2011 	}
2012 
2013 	if (offset != file->append_pos) {
2014 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
2015 		return -EINVAL;
2016 	}
2017 
2018 	pthread_spin_lock(&file->lock);
2019 	file->open_for_writing = true;
2020 
2021 	if (file->last == NULL) {
2022 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
2023 			cache_append_buffer(file);
2024 		} else {
2025 			int rc;
2026 
2027 			file->append_pos += length;
2028 			pthread_spin_unlock(&file->lock);
2029 			rc = __send_rw_from_file(file, &channel->sem, payload,
2030 						 offset, length, false);
2031 			sem_wait(&channel->sem);
2032 			return rc;
2033 		}
2034 	}
2035 
2036 	blob_size = __file_get_blob_size(file);
2037 
2038 	if ((offset + length) > blob_size) {
2039 		struct spdk_fs_cb_args extend_args = {};
2040 
2041 		cluster_sz = file->fs->bs_opts.cluster_sz;
2042 		extend_args.sem = &channel->sem;
2043 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
2044 		extend_args.file = file;
2045 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
2046 		pthread_spin_unlock(&file->lock);
2047 		file->fs->send_request(__file_extend_blob, &extend_args);
2048 		sem_wait(&channel->sem);
2049 	}
2050 
2051 	last = file->last;
2052 	rem_length = length;
2053 	cur_payload = payload;
2054 	while (rem_length > 0) {
2055 		copy = last->buf_size - last->bytes_filled;
2056 		if (copy > rem_length) {
2057 			copy = rem_length;
2058 		}
2059 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
2060 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
2061 		file->append_pos += copy;
2062 		if (file->length < file->append_pos) {
2063 			file->length = file->append_pos;
2064 		}
2065 		cur_payload += copy;
2066 		last->bytes_filled += copy;
2067 		rem_length -= copy;
2068 		if (last->bytes_filled == last->buf_size) {
2069 			cache_buffers_filled++;
2070 			last = cache_append_buffer(file);
2071 			if (last == NULL) {
2072 				BLOBFS_TRACE(file, "nomem\n");
2073 				pthread_spin_unlock(&file->lock);
2074 				return -ENOMEM;
2075 			}
2076 		}
2077 	}
2078 
2079 	if (cache_buffers_filled == 0) {
2080 		pthread_spin_unlock(&file->lock);
2081 		return 0;
2082 	}
2083 
2084 	args = calloc(1, sizeof(*args));
2085 	if (args == NULL) {
2086 		pthread_spin_unlock(&file->lock);
2087 		return -ENOMEM;
2088 	}
2089 
2090 	args->file = file;
2091 	file->fs->send_request(__file_flush, args);
2092 	pthread_spin_unlock(&file->lock);
2093 	return 0;
2094 }
2095 
2096 static void
2097 __readahead_done(void *arg, int bserrno)
2098 {
2099 	struct spdk_fs_cb_args *args = arg;
2100 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
2101 	struct spdk_file *file = args->file;
2102 
2103 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
2104 
2105 	pthread_spin_lock(&file->lock);
2106 	cache_buffer->bytes_filled = args->op.readahead.length;
2107 	cache_buffer->bytes_flushed = args->op.readahead.length;
2108 	cache_buffer->in_progress = false;
2109 	pthread_spin_unlock(&file->lock);
2110 
2111 	__free_args(args);
2112 }
2113 
2114 static void
2115 __readahead(void *_args)
2116 {
2117 	struct spdk_fs_cb_args *args = _args;
2118 	struct spdk_file *file = args->file;
2119 	uint64_t offset, length, start_page, num_pages;
2120 	uint32_t page_size;
2121 
2122 	offset = args->op.readahead.offset;
2123 	length = args->op.readahead.length;
2124 	assert(length > 0);
2125 
2126 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2127 
2128 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2129 		     offset, length, start_page, num_pages);
2130 	spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2131 			     args->op.readahead.cache_buffer->buf,
2132 			     start_page, num_pages,
2133 			     __readahead_done, args);
2134 }
2135 
2136 static uint64_t
2137 __next_cache_buffer_offset(uint64_t offset)
2138 {
2139 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2140 }
2141 
2142 static void
2143 check_readahead(struct spdk_file *file, uint64_t offset)
2144 {
2145 	struct spdk_fs_cb_args *args;
2146 
2147 	offset = __next_cache_buffer_offset(offset);
2148 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2149 		return;
2150 	}
2151 
2152 	args = calloc(1, sizeof(*args));
2153 	if (args == NULL) {
2154 		return;
2155 	}
2156 
2157 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2158 
2159 	args->file = file;
2160 	args->op.readahead.offset = offset;
2161 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2162 	args->op.readahead.cache_buffer->in_progress = true;
2163 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2164 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2165 	} else {
2166 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2167 	}
2168 	file->fs->send_request(__readahead, args);
2169 }
2170 
2171 static int
2172 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2173 {
2174 	struct cache_buffer *buf;
2175 	int rc;
2176 
2177 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2178 	if (buf == NULL) {
2179 		pthread_spin_unlock(&file->lock);
2180 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2181 		pthread_spin_lock(&file->lock);
2182 		return rc;
2183 	}
2184 
2185 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2186 		length = buf->offset + buf->bytes_filled - offset;
2187 	}
2188 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2189 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2190 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2191 		pthread_spin_lock(&g_caches_lock);
2192 		spdk_tree_remove_buffer(file->tree, buf);
2193 		if (file->tree->present_mask == 0) {
2194 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2195 		}
2196 		pthread_spin_unlock(&g_caches_lock);
2197 	}
2198 
2199 	sem_post(sem);
2200 	return 0;
2201 }
2202 
2203 int64_t
2204 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2205 	       void *payload, uint64_t offset, uint64_t length)
2206 {
2207 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2208 	uint64_t final_offset, final_length;
2209 	uint32_t sub_reads = 0;
2210 	int rc = 0;
2211 
2212 	pthread_spin_lock(&file->lock);
2213 
2214 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2215 
2216 	file->open_for_writing = false;
2217 
2218 	if (length == 0 || offset >= file->append_pos) {
2219 		pthread_spin_unlock(&file->lock);
2220 		return 0;
2221 	}
2222 
2223 	if (offset + length > file->append_pos) {
2224 		length = file->append_pos - offset;
2225 	}
2226 
2227 	if (offset != file->next_seq_offset) {
2228 		file->seq_byte_count = 0;
2229 	}
2230 	file->seq_byte_count += length;
2231 	file->next_seq_offset = offset + length;
2232 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2233 		check_readahead(file, offset);
2234 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2235 	}
2236 
2237 	final_length = 0;
2238 	final_offset = offset + length;
2239 	while (offset < final_offset) {
2240 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2241 		if (length > (final_offset - offset)) {
2242 			length = final_offset - offset;
2243 		}
2244 		rc = __file_read(file, payload, offset, length, &channel->sem);
2245 		if (rc == 0) {
2246 			final_length += length;
2247 		} else {
2248 			break;
2249 		}
2250 		payload += length;
2251 		offset += length;
2252 		sub_reads++;
2253 	}
2254 	pthread_spin_unlock(&file->lock);
2255 	while (sub_reads-- > 0) {
2256 		sem_wait(&channel->sem);
2257 	}
2258 	if (rc == 0) {
2259 		return final_length;
2260 	} else {
2261 		return rc;
2262 	}
2263 }
2264 
2265 static void
2266 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2267 	   spdk_file_op_complete cb_fn, void *cb_arg)
2268 {
2269 	struct spdk_fs_request *sync_req;
2270 	struct spdk_fs_request *flush_req;
2271 	struct spdk_fs_cb_args *sync_args;
2272 	struct spdk_fs_cb_args *flush_args;
2273 
2274 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2275 
2276 	pthread_spin_lock(&file->lock);
2277 	if (file->append_pos <= file->length_flushed || file->last == NULL) {
2278 		BLOBFS_TRACE(file, "done - no data to flush\n");
2279 		pthread_spin_unlock(&file->lock);
2280 		cb_fn(cb_arg, 0);
2281 		return;
2282 	}
2283 
2284 	sync_req = alloc_fs_request(channel);
2285 	assert(sync_req != NULL);
2286 	sync_args = &sync_req->args;
2287 
2288 	flush_req = alloc_fs_request(channel);
2289 	assert(flush_req != NULL);
2290 	flush_args = &flush_req->args;
2291 
2292 	sync_args->file = file;
2293 	sync_args->fn.file_op = cb_fn;
2294 	sync_args->arg = cb_arg;
2295 	sync_args->op.sync.offset = file->append_pos;
2296 	sync_args->op.sync.xattr_in_progress = false;
2297 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2298 	pthread_spin_unlock(&file->lock);
2299 
2300 	flush_args->file = file;
2301 	channel->send_request(__file_flush, flush_args);
2302 }
2303 
2304 int
2305 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2306 {
2307 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2308 
2309 	_file_sync(file, channel, __sem_post, &channel->sem);
2310 	sem_wait(&channel->sem);
2311 
2312 	return 0;
2313 }
2314 
2315 void
2316 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2317 		     spdk_file_op_complete cb_fn, void *cb_arg)
2318 {
2319 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2320 
2321 	_file_sync(file, channel, cb_fn, cb_arg);
2322 }
2323 
2324 void
2325 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2326 {
2327 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2328 	file->priority = priority;
2329 
2330 }
2331 
2332 /*
2333  * Close routines
2334  */
2335 
2336 static void
2337 __file_close_async_done(void *ctx, int bserrno)
2338 {
2339 	struct spdk_fs_request *req = ctx;
2340 	struct spdk_fs_cb_args *args = &req->args;
2341 	struct spdk_file *file = args->file;
2342 
2343 	if (file->is_deleted) {
2344 		spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx);
2345 		return;
2346 	}
2347 	args->fn.file_op(args->arg, bserrno);
2348 	free_fs_request(req);
2349 }
2350 
2351 static void
2352 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2353 {
2354 	pthread_spin_lock(&file->lock);
2355 	if (file->ref_count == 0) {
2356 		pthread_spin_unlock(&file->lock);
2357 		__file_close_async_done(req, -EBADF);
2358 		return;
2359 	}
2360 
2361 	file->ref_count--;
2362 	if (file->ref_count > 0) {
2363 		pthread_spin_unlock(&file->lock);
2364 		__file_close_async_done(req, 0);
2365 		return;
2366 	}
2367 
2368 	pthread_spin_unlock(&file->lock);
2369 
2370 	spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req);
2371 }
2372 
2373 static void
2374 __file_close_async__sync_done(void *arg, int fserrno)
2375 {
2376 	struct spdk_fs_request *req = arg;
2377 	struct spdk_fs_cb_args *args = &req->args;
2378 
2379 	__file_close_async(args->file, req);
2380 }
2381 
2382 void
2383 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2384 {
2385 	struct spdk_fs_request *req;
2386 	struct spdk_fs_cb_args *args;
2387 
2388 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2389 	if (req == NULL) {
2390 		cb_fn(cb_arg, -ENOMEM);
2391 		return;
2392 	}
2393 
2394 	args = &req->args;
2395 	args->file = file;
2396 	args->fn.file_op = cb_fn;
2397 	args->arg = cb_arg;
2398 
2399 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2400 }
2401 
2402 static void
2403 __file_close_done(void *arg, int fserrno)
2404 {
2405 	struct spdk_fs_cb_args *args = arg;
2406 
2407 	args->rc = fserrno;
2408 	sem_post(args->sem);
2409 }
2410 
2411 static void
2412 __file_close(void *arg)
2413 {
2414 	struct spdk_fs_request *req = arg;
2415 	struct spdk_fs_cb_args *args = &req->args;
2416 	struct spdk_file *file = args->file;
2417 
2418 	__file_close_async(file, req);
2419 }
2420 
2421 int
2422 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2423 {
2424 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2425 	struct spdk_fs_request *req;
2426 	struct spdk_fs_cb_args *args;
2427 
2428 	req = alloc_fs_request(channel);
2429 	assert(req != NULL);
2430 
2431 	args = &req->args;
2432 
2433 	spdk_file_sync(file, _channel);
2434 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2435 	args->file = file;
2436 	args->sem = &channel->sem;
2437 	args->fn.file_op = __file_close_done;
2438 	args->arg = req;
2439 	channel->send_request(__file_close, req);
2440 	sem_wait(&channel->sem);
2441 
2442 	return args->rc;
2443 }
2444 
2445 static void
2446 cache_free_buffers(struct spdk_file *file)
2447 {
2448 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2449 	pthread_spin_lock(&file->lock);
2450 	pthread_spin_lock(&g_caches_lock);
2451 	if (file->tree->present_mask == 0) {
2452 		pthread_spin_unlock(&g_caches_lock);
2453 		pthread_spin_unlock(&file->lock);
2454 		return;
2455 	}
2456 	spdk_tree_free_buffers(file->tree);
2457 
2458 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2459 	/* If not freed, put it in the end of the queue */
2460 	if (file->tree->present_mask != 0) {
2461 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2462 	}
2463 	file->last = NULL;
2464 	pthread_spin_unlock(&g_caches_lock);
2465 	pthread_spin_unlock(&file->lock);
2466 }
2467 
2468 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS);
2469 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW);
2470