xref: /spdk/lib/blobfs/blobfs.c (revision 95399c118e682ce9a1411955349e6068ed54a166)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blobfs.h"
37 #include "blobfs_internal.h"
38 
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/assert.h"
42 #include "spdk/env.h"
43 #include "spdk/util.h"
44 #include "spdk_internal/log.h"
45 
46 #define BLOBFS_TRACE(file, str, args...) \
47 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args)
48 
49 #define BLOBFS_TRACE_RW(file, str, args...) \
50 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args)
51 
52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024)
53 
54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE;
55 static struct spdk_mempool *g_cache_pool;
56 static TAILQ_HEAD(, spdk_file) g_caches;
57 static int g_fs_count = 0;
58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER;
59 static pthread_spinlock_t g_caches_lock;
60 
61 static void
62 __sem_post(void *arg, int bserrno)
63 {
64 	sem_t *sem = arg;
65 
66 	sem_post(sem);
67 }
68 
69 void
70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer)
71 {
72 	spdk_mempool_put(g_cache_pool, cache_buffer->buf);
73 	free(cache_buffer);
74 }
75 
76 #define CACHE_READAHEAD_THRESHOLD	(128 * 1024)
77 
78 struct spdk_file {
79 	struct spdk_filesystem	*fs;
80 	struct spdk_blob	*blob;
81 	char			*name;
82 	uint64_t		length;
83 	bool			open_for_writing;
84 	uint64_t		length_flushed;
85 	uint64_t		append_pos;
86 	uint64_t		seq_byte_count;
87 	uint64_t		next_seq_offset;
88 	uint32_t		priority;
89 	TAILQ_ENTRY(spdk_file)	tailq;
90 	spdk_blob_id		blobid;
91 	uint32_t		ref_count;
92 	pthread_spinlock_t	lock;
93 	struct cache_buffer	*last;
94 	struct cache_tree	*tree;
95 	TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests;
96 	TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests;
97 	TAILQ_ENTRY(spdk_file)	cache_tailq;
98 };
99 
100 struct spdk_filesystem {
101 	struct spdk_blob_store	*bs;
102 	TAILQ_HEAD(, spdk_file)	files;
103 	struct spdk_bs_opts	bs_opts;
104 	struct spdk_bs_dev	*bdev;
105 	fs_send_request_fn	send_request;
106 
107 	struct {
108 		uint32_t		max_ops;
109 		struct spdk_io_channel	*sync_io_channel;
110 		struct spdk_fs_channel	*sync_fs_channel;
111 	} sync_target;
112 
113 	struct {
114 		uint32_t		max_ops;
115 		struct spdk_io_channel	*md_io_channel;
116 		struct spdk_fs_channel	*md_fs_channel;
117 	} md_target;
118 
119 	struct {
120 		uint32_t		max_ops;
121 	} io_target;
122 };
123 
124 struct spdk_fs_cb_args {
125 	union {
126 		spdk_fs_op_with_handle_complete		fs_op_with_handle;
127 		spdk_fs_op_complete			fs_op;
128 		spdk_file_op_with_handle_complete	file_op_with_handle;
129 		spdk_file_op_complete			file_op;
130 		spdk_file_stat_op_complete		stat_op;
131 	} fn;
132 	void *arg;
133 	sem_t *sem;
134 	struct spdk_filesystem *fs;
135 	struct spdk_file *file;
136 	int rc;
137 	bool from_request;
138 	union {
139 		struct {
140 			uint64_t	length;
141 		} truncate;
142 		struct {
143 			struct spdk_io_channel	*channel;
144 			void		*user_buf;
145 			void		*pin_buf;
146 			int		is_read;
147 			off_t		offset;
148 			size_t		length;
149 			uint64_t	start_page;
150 			uint64_t	num_pages;
151 			uint32_t	blocklen;
152 		} rw;
153 		struct {
154 			const char	*old_name;
155 			const char	*new_name;
156 		} rename;
157 		struct {
158 			struct cache_buffer	*cache_buffer;
159 			uint64_t		length;
160 		} flush;
161 		struct {
162 			struct cache_buffer	*cache_buffer;
163 			uint64_t		length;
164 			uint64_t		offset;
165 		} readahead;
166 		struct {
167 			uint64_t			offset;
168 			TAILQ_ENTRY(spdk_fs_request)	tailq;
169 			bool				xattr_in_progress;
170 		} sync;
171 		struct {
172 			uint32_t			num_clusters;
173 		} resize;
174 		struct {
175 			const char	*name;
176 			uint32_t	flags;
177 			TAILQ_ENTRY(spdk_fs_request)	tailq;
178 		} open;
179 		struct {
180 			const char	*name;
181 		} create;
182 		struct {
183 			const char	*name;
184 		} delete;
185 		struct {
186 			const char	*name;
187 		} stat;
188 	} op;
189 };
190 
191 static void cache_free_buffers(struct spdk_file *file);
192 
193 static void
194 __initialize_cache(void)
195 {
196 	assert(g_cache_pool == NULL);
197 
198 	g_cache_pool = spdk_mempool_create("spdk_fs_cache",
199 					   g_fs_cache_size / CACHE_BUFFER_SIZE,
200 					   CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY);
201 	TAILQ_INIT(&g_caches);
202 	pthread_spin_init(&g_caches_lock, 0);
203 }
204 
205 static void
206 __free_cache(void)
207 {
208 	assert(g_cache_pool != NULL);
209 
210 	spdk_mempool_free(g_cache_pool);
211 	g_cache_pool = NULL;
212 }
213 
214 static uint64_t
215 __file_get_blob_size(struct spdk_file *file)
216 {
217 	uint64_t cluster_sz;
218 
219 	cluster_sz = file->fs->bs_opts.cluster_sz;
220 	return cluster_sz * spdk_blob_get_num_clusters(file->blob);
221 }
222 
223 struct spdk_fs_request {
224 	struct spdk_fs_cb_args		args;
225 	TAILQ_ENTRY(spdk_fs_request)	link;
226 	struct spdk_fs_channel		*channel;
227 };
228 
229 struct spdk_fs_channel {
230 	struct spdk_fs_request		*req_mem;
231 	TAILQ_HEAD(, spdk_fs_request)	reqs;
232 	sem_t				sem;
233 	struct spdk_filesystem		*fs;
234 	struct spdk_io_channel		*bs_channel;
235 	fs_send_request_fn		send_request;
236 	bool				sync;
237 	pthread_spinlock_t		lock;
238 };
239 
240 static struct spdk_fs_request *
241 alloc_fs_request(struct spdk_fs_channel *channel)
242 {
243 	struct spdk_fs_request *req;
244 
245 	if (channel->sync) {
246 		pthread_spin_lock(&channel->lock);
247 	}
248 
249 	req = TAILQ_FIRST(&channel->reqs);
250 	if (req) {
251 		TAILQ_REMOVE(&channel->reqs, req, link);
252 	}
253 
254 	if (channel->sync) {
255 		pthread_spin_unlock(&channel->lock);
256 	}
257 
258 	if (req == NULL) {
259 		return NULL;
260 	}
261 	memset(req, 0, sizeof(*req));
262 	req->channel = channel;
263 	req->args.from_request = true;
264 
265 	return req;
266 }
267 
268 static void
269 free_fs_request(struct spdk_fs_request *req)
270 {
271 	struct spdk_fs_channel *channel = req->channel;
272 
273 	if (channel->sync) {
274 		pthread_spin_lock(&channel->lock);
275 	}
276 
277 	TAILQ_INSERT_HEAD(&req->channel->reqs, req, link);
278 
279 	if (channel->sync) {
280 		pthread_spin_unlock(&channel->lock);
281 	}
282 }
283 
284 static int
285 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel,
286 			uint32_t max_ops)
287 {
288 	uint32_t i;
289 
290 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request));
291 	if (!channel->req_mem) {
292 		return -1;
293 	}
294 
295 	TAILQ_INIT(&channel->reqs);
296 	sem_init(&channel->sem, 0, 0);
297 
298 	for (i = 0; i < max_ops; i++) {
299 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
300 	}
301 
302 	channel->fs = fs;
303 
304 	return 0;
305 }
306 
307 static int
308 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf)
309 {
310 	struct spdk_filesystem		*fs;
311 	struct spdk_fs_channel		*channel = ctx_buf;
312 
313 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target);
314 
315 	return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops);
316 }
317 
318 static int
319 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf)
320 {
321 	struct spdk_filesystem		*fs;
322 	struct spdk_fs_channel		*channel = ctx_buf;
323 
324 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target);
325 
326 	return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops);
327 }
328 
329 static int
330 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf)
331 {
332 	struct spdk_filesystem		*fs;
333 	struct spdk_fs_channel		*channel = ctx_buf;
334 
335 	fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target);
336 
337 	return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops);
338 }
339 
340 static void
341 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf)
342 {
343 	struct spdk_fs_channel *channel = ctx_buf;
344 
345 	free(channel->req_mem);
346 	if (channel->bs_channel != NULL) {
347 		spdk_bs_free_io_channel(channel->bs_channel);
348 	}
349 }
350 
351 static void
352 __send_request_direct(fs_request_fn fn, void *arg)
353 {
354 	fn(arg);
355 }
356 
357 static void
358 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs)
359 {
360 	fs->bs = bs;
361 	fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs);
362 	fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
363 	fs->md_target.md_fs_channel->send_request = __send_request_direct;
364 	fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
365 	fs->sync_target.sync_fs_channel->send_request = __send_request_direct;
366 
367 	pthread_mutex_lock(&g_cache_init_lock);
368 	if (g_fs_count == 0) {
369 		__initialize_cache();
370 	}
371 	g_fs_count++;
372 	pthread_mutex_unlock(&g_cache_init_lock);
373 }
374 
375 static void
376 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
377 {
378 	struct spdk_fs_request *req = ctx;
379 	struct spdk_fs_cb_args *args = &req->args;
380 	struct spdk_filesystem *fs = args->fs;
381 
382 	if (bserrno == 0) {
383 		common_fs_bs_init(fs, bs);
384 	} else {
385 		free(fs);
386 		fs = NULL;
387 	}
388 
389 	args->fn.fs_op_with_handle(args->arg, fs, bserrno);
390 	free_fs_request(req);
391 }
392 
393 static struct spdk_filesystem *
394 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn)
395 {
396 	struct spdk_filesystem *fs;
397 
398 	fs = calloc(1, sizeof(*fs));
399 	if (fs == NULL) {
400 		return NULL;
401 	}
402 
403 	fs->bdev = dev;
404 	fs->send_request = send_request_fn;
405 	TAILQ_INIT(&fs->files);
406 
407 	fs->md_target.max_ops = 512;
408 	spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy,
409 				sizeof(struct spdk_fs_channel));
410 	fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target);
411 	fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel);
412 
413 	fs->sync_target.max_ops = 512;
414 	spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy,
415 				sizeof(struct spdk_fs_channel));
416 	fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target);
417 	fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel);
418 
419 	fs->io_target.max_ops = 512;
420 	spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy,
421 				sizeof(struct spdk_fs_channel));
422 
423 	return fs;
424 }
425 
426 void
427 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
428 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
429 {
430 	struct spdk_filesystem *fs;
431 	struct spdk_fs_request *req;
432 	struct spdk_fs_cb_args *args;
433 
434 	fs = fs_alloc(dev, send_request_fn);
435 	if (fs == NULL) {
436 		cb_fn(cb_arg, NULL, -ENOMEM);
437 		return;
438 	}
439 
440 	req = alloc_fs_request(fs->md_target.md_fs_channel);
441 	if (req == NULL) {
442 		spdk_put_io_channel(fs->md_target.md_io_channel);
443 		spdk_io_device_unregister(&fs->md_target, NULL);
444 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
445 		spdk_io_device_unregister(&fs->sync_target, NULL);
446 		spdk_io_device_unregister(&fs->io_target, NULL);
447 		free(fs);
448 		cb_fn(cb_arg, NULL, -ENOMEM);
449 		return;
450 	}
451 
452 	args = &req->args;
453 	args->fn.fs_op_with_handle = cb_fn;
454 	args->arg = cb_arg;
455 	args->fs = fs;
456 
457 	spdk_bs_init(dev, NULL, init_cb, req);
458 }
459 
460 static struct spdk_file *
461 file_alloc(struct spdk_filesystem *fs)
462 {
463 	struct spdk_file *file;
464 
465 	file = calloc(1, sizeof(*file));
466 	if (file == NULL) {
467 		return NULL;
468 	}
469 
470 	file->tree = calloc(1, sizeof(*file->tree));
471 	if (file->tree == NULL) {
472 		free(file);
473 		return NULL;
474 	}
475 
476 	file->fs = fs;
477 	TAILQ_INIT(&file->open_requests);
478 	TAILQ_INIT(&file->sync_requests);
479 	pthread_spin_init(&file->lock, 0);
480 	TAILQ_INSERT_TAIL(&fs->files, file, tailq);
481 	file->priority = SPDK_FILE_PRIORITY_LOW;
482 	return file;
483 }
484 
485 static void
486 iter_cb(void *ctx, struct spdk_blob *blob, int rc)
487 {
488 	struct spdk_fs_request *req = ctx;
489 	struct spdk_fs_cb_args *args = &req->args;
490 	struct spdk_filesystem *fs = args->fs;
491 	struct spdk_file *f;
492 	uint64_t *length;
493 	const char *name;
494 	size_t value_len;
495 
496 	if (rc == -ENOENT) {
497 		/* Finished iterating */
498 		args->fn.fs_op_with_handle(args->arg, fs, 0);
499 		free_fs_request(req);
500 		return;
501 	} else if (rc < 0) {
502 		args->fn.fs_op_with_handle(args->arg, fs, rc);
503 		free_fs_request(req);
504 		return;
505 	}
506 
507 	rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len);
508 	if (rc < 0) {
509 		args->fn.fs_op_with_handle(args->arg, fs, rc);
510 		free_fs_request(req);
511 		return;
512 	}
513 
514 	rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len);
515 	if (rc < 0) {
516 		args->fn.fs_op_with_handle(args->arg, fs, rc);
517 		free_fs_request(req);
518 		return;
519 	}
520 	assert(value_len == 8);
521 
522 	f = file_alloc(fs);
523 	if (f == NULL) {
524 		args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM);
525 		free_fs_request(req);
526 		return;
527 	}
528 
529 	f->name = strdup(name);
530 	f->blobid = spdk_blob_get_id(blob);
531 	f->length = *length;
532 	f->length_flushed = *length;
533 	f->append_pos = *length;
534 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length);
535 
536 	spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req);
537 }
538 
539 static void
540 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno)
541 {
542 	struct spdk_fs_request *req = ctx;
543 	struct spdk_fs_cb_args *args = &req->args;
544 	struct spdk_filesystem *fs = args->fs;
545 
546 	if (bserrno != 0) {
547 		args->fn.fs_op_with_handle(args->arg, NULL, bserrno);
548 		free_fs_request(req);
549 		free(fs);
550 		return;
551 	}
552 
553 	common_fs_bs_init(fs, bs);
554 	spdk_bs_md_iter_first(fs->bs, iter_cb, req);
555 }
556 
557 void
558 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn,
559 	     spdk_fs_op_with_handle_complete cb_fn, void *cb_arg)
560 {
561 	struct spdk_filesystem *fs;
562 	struct spdk_fs_cb_args *args;
563 	struct spdk_fs_request *req;
564 
565 	fs = fs_alloc(dev, send_request_fn);
566 	if (fs == NULL) {
567 		cb_fn(cb_arg, NULL, -ENOMEM);
568 		return;
569 	}
570 
571 	req = alloc_fs_request(fs->md_target.md_fs_channel);
572 	if (req == NULL) {
573 		spdk_put_io_channel(fs->md_target.md_io_channel);
574 		spdk_io_device_unregister(&fs->md_target, NULL);
575 		spdk_put_io_channel(fs->sync_target.sync_io_channel);
576 		spdk_io_device_unregister(&fs->sync_target, NULL);
577 		spdk_io_device_unregister(&fs->io_target, NULL);
578 		free(fs);
579 		cb_fn(cb_arg, NULL, -ENOMEM);
580 		return;
581 	}
582 
583 	args = &req->args;
584 	args->fn.fs_op_with_handle = cb_fn;
585 	args->arg = cb_arg;
586 	args->fs = fs;
587 
588 	spdk_bs_load(dev, load_cb, req);
589 }
590 
591 static void
592 unload_cb(void *ctx, int bserrno)
593 {
594 	struct spdk_fs_request *req = ctx;
595 	struct spdk_fs_cb_args *args = &req->args;
596 	struct spdk_filesystem *fs = args->fs;
597 
598 	pthread_mutex_lock(&g_cache_init_lock);
599 	g_fs_count--;
600 	if (g_fs_count == 0) {
601 		__free_cache();
602 	}
603 	pthread_mutex_unlock(&g_cache_init_lock);
604 
605 	args->fn.fs_op(args->arg, bserrno);
606 	free(req);
607 
608 	spdk_io_device_unregister(&fs->io_target, NULL);
609 	spdk_io_device_unregister(&fs->sync_target, NULL);
610 	spdk_io_device_unregister(&fs->md_target, NULL);
611 
612 	free(fs);
613 }
614 
615 void
616 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg)
617 {
618 	struct spdk_fs_request *req;
619 	struct spdk_fs_cb_args *args;
620 
621 	/*
622 	 * We must free the md_channel before unloading the blobstore, so just
623 	 *  allocate this request from the general heap.
624 	 */
625 	req = calloc(1, sizeof(*req));
626 	if (req == NULL) {
627 		cb_fn(cb_arg, -ENOMEM);
628 		return;
629 	}
630 
631 	args = &req->args;
632 	args->fn.fs_op = cb_fn;
633 	args->arg = cb_arg;
634 	args->fs = fs;
635 
636 	spdk_fs_free_io_channel(fs->md_target.md_io_channel);
637 	spdk_fs_free_io_channel(fs->sync_target.sync_io_channel);
638 	spdk_bs_unload(fs->bs, unload_cb, req);
639 }
640 
641 static struct spdk_file *
642 fs_find_file(struct spdk_filesystem *fs, const char *name)
643 {
644 	struct spdk_file *file;
645 
646 	TAILQ_FOREACH(file, &fs->files, tailq) {
647 		if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) {
648 			return file;
649 		}
650 	}
651 
652 	return NULL;
653 }
654 
655 void
656 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name,
657 			spdk_file_stat_op_complete cb_fn, void *cb_arg)
658 {
659 	struct spdk_file_stat stat;
660 	struct spdk_file *f = NULL;
661 
662 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
663 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
664 		return;
665 	}
666 
667 	f = fs_find_file(fs, name);
668 	if (f != NULL) {
669 		stat.blobid = f->blobid;
670 		stat.size = f->length;
671 		cb_fn(cb_arg, &stat, 0);
672 		return;
673 	}
674 
675 	cb_fn(cb_arg, NULL, -ENOENT);
676 }
677 
678 static void
679 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno)
680 {
681 	struct spdk_fs_request *req = arg;
682 	struct spdk_fs_cb_args *args = &req->args;
683 
684 	args->rc = fserrno;
685 	if (fserrno == 0) {
686 		memcpy(args->arg, stat, sizeof(*stat));
687 	}
688 	sem_post(args->sem);
689 }
690 
691 static void
692 __file_stat(void *arg)
693 {
694 	struct spdk_fs_request *req = arg;
695 	struct spdk_fs_cb_args *args = &req->args;
696 
697 	spdk_fs_file_stat_async(args->fs, args->op.stat.name,
698 				args->fn.stat_op, req);
699 }
700 
701 int
702 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
703 		  const char *name, struct spdk_file_stat *stat)
704 {
705 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
706 	struct spdk_fs_request *req;
707 	int rc;
708 
709 	req = alloc_fs_request(channel);
710 	assert(req != NULL);
711 
712 	req->args.fs = fs;
713 	req->args.op.stat.name = name;
714 	req->args.fn.stat_op = __copy_stat;
715 	req->args.arg = stat;
716 	req->args.sem = &channel->sem;
717 	channel->send_request(__file_stat, req);
718 	sem_wait(&channel->sem);
719 
720 	rc = req->args.rc;
721 	free_fs_request(req);
722 
723 	return rc;
724 }
725 
726 static void
727 fs_create_blob_close_cb(void *ctx, int bserrno)
728 {
729 	struct spdk_fs_request *req = ctx;
730 	struct spdk_fs_cb_args *args = &req->args;
731 
732 	args->fn.file_op(args->arg, bserrno);
733 	free_fs_request(req);
734 }
735 
736 static void
737 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
738 {
739 	struct spdk_fs_request *req = ctx;
740 	struct spdk_fs_cb_args *args = &req->args;
741 	struct spdk_file *f = args->file;
742 	uint64_t length = 0;
743 
744 	f->blob = blob;
745 	spdk_bs_md_resize_blob(blob, 1);
746 	spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1);
747 	spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length));
748 
749 	spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args);
750 }
751 
752 static void
753 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno)
754 {
755 	struct spdk_fs_request *req = ctx;
756 	struct spdk_fs_cb_args *args = &req->args;
757 	struct spdk_file *f = args->file;
758 
759 	f->blobid = blobid;
760 	spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req);
761 }
762 
763 void
764 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name,
765 			  spdk_file_op_complete cb_fn, void *cb_arg)
766 {
767 	struct spdk_file *file;
768 	struct spdk_fs_request *req;
769 	struct spdk_fs_cb_args *args;
770 
771 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
772 		cb_fn(cb_arg, -ENAMETOOLONG);
773 		return;
774 	}
775 
776 	file = fs_find_file(fs, name);
777 	if (file != NULL) {
778 		cb_fn(cb_arg, -EEXIST);
779 		return;
780 	}
781 
782 	file = file_alloc(fs);
783 	if (file == NULL) {
784 		cb_fn(cb_arg, -ENOMEM);
785 		return;
786 	}
787 
788 	req = alloc_fs_request(fs->md_target.md_fs_channel);
789 	if (req == NULL) {
790 		cb_fn(cb_arg, -ENOMEM);
791 		return;
792 	}
793 
794 	args = &req->args;
795 	args->file = file;
796 	args->fn.file_op = cb_fn;
797 	args->arg = cb_arg;
798 
799 	file->name = strdup(name);
800 	spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args);
801 }
802 
803 static void
804 __fs_create_file_done(void *arg, int fserrno)
805 {
806 	struct spdk_fs_request *req = arg;
807 	struct spdk_fs_cb_args *args = &req->args;
808 
809 	args->rc = fserrno;
810 	sem_post(args->sem);
811 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
812 }
813 
814 static void
815 __fs_create_file(void *arg)
816 {
817 	struct spdk_fs_request *req = arg;
818 	struct spdk_fs_cb_args *args = &req->args;
819 
820 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name);
821 	spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req);
822 }
823 
824 int
825 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name)
826 {
827 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
828 	struct spdk_fs_request *req;
829 	struct spdk_fs_cb_args *args;
830 	int rc;
831 
832 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
833 
834 	req = alloc_fs_request(channel);
835 	assert(req != NULL);
836 
837 	args = &req->args;
838 	args->fs = fs;
839 	args->op.create.name = name;
840 	args->sem = &channel->sem;
841 	fs->send_request(__fs_create_file, req);
842 	sem_wait(&channel->sem);
843 	rc = args->rc;
844 	free_fs_request(req);
845 
846 	return rc;
847 }
848 
849 static void
850 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno)
851 {
852 	struct spdk_fs_request *req = ctx;
853 	struct spdk_fs_cb_args *args = &req->args;
854 	struct spdk_file *f = args->file;
855 
856 	f->blob = blob;
857 	while (!TAILQ_EMPTY(&f->open_requests)) {
858 		req = TAILQ_FIRST(&f->open_requests);
859 		args = &req->args;
860 		TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq);
861 		args->fn.file_op_with_handle(args->arg, f, bserrno);
862 		free_fs_request(req);
863 	}
864 }
865 
866 static void
867 fs_open_blob_create_cb(void *ctx, int bserrno)
868 {
869 	struct spdk_fs_request *req = ctx;
870 	struct spdk_fs_cb_args *args = &req->args;
871 	struct spdk_file *file = args->file;
872 	struct spdk_filesystem *fs = args->fs;
873 
874 	if (file == NULL) {
875 		/*
876 		 * This is from an open with CREATE flag - the file
877 		 *  is now created so look it up in the file list for this
878 		 *  filesystem.
879 		 */
880 		file = fs_find_file(fs, args->op.open.name);
881 		assert(file != NULL);
882 		args->file = file;
883 	}
884 
885 	file->ref_count++;
886 	TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq);
887 	if (file->ref_count == 1) {
888 		assert(file->blob == NULL);
889 		spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req);
890 	} else if (file->blob != NULL) {
891 		fs_open_blob_done(req, file->blob, 0);
892 	} else {
893 		/*
894 		 * The blob open for this file is in progress due to a previous
895 		 *  open request.  When that open completes, it will invoke the
896 		 *  open callback for this request.
897 		 */
898 	}
899 }
900 
901 void
902 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags,
903 			spdk_file_op_with_handle_complete cb_fn, void *cb_arg)
904 {
905 	struct spdk_file *f = NULL;
906 	struct spdk_fs_request *req;
907 	struct spdk_fs_cb_args *args;
908 
909 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
910 		cb_fn(cb_arg, NULL, -ENAMETOOLONG);
911 		return;
912 	}
913 
914 	f = fs_find_file(fs, name);
915 	if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) {
916 		cb_fn(cb_arg, NULL, -ENOENT);
917 		return;
918 	}
919 
920 	req = alloc_fs_request(fs->md_target.md_fs_channel);
921 	if (req == NULL) {
922 		cb_fn(cb_arg, NULL, -ENOMEM);
923 		return;
924 	}
925 
926 	args = &req->args;
927 	args->fn.file_op_with_handle = cb_fn;
928 	args->arg = cb_arg;
929 	args->file = f;
930 	args->fs = fs;
931 	args->op.open.name = name;
932 
933 	if (f == NULL) {
934 		spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req);
935 	} else {
936 		fs_open_blob_create_cb(req, 0);
937 	}
938 }
939 
940 static void
941 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno)
942 {
943 	struct spdk_fs_request *req = arg;
944 	struct spdk_fs_cb_args *args = &req->args;
945 
946 	args->file = file;
947 	args->rc = bserrno;
948 	sem_post(args->sem);
949 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
950 }
951 
952 static void
953 __fs_open_file(void *arg)
954 {
955 	struct spdk_fs_request *req = arg;
956 	struct spdk_fs_cb_args *args = &req->args;
957 
958 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name);
959 	spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags,
960 				__fs_open_file_done, req);
961 }
962 
963 int
964 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
965 		  const char *name, uint32_t flags, struct spdk_file **file)
966 {
967 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
968 	struct spdk_fs_request *req;
969 	struct spdk_fs_cb_args *args;
970 	int rc;
971 
972 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
973 
974 	req = alloc_fs_request(channel);
975 	assert(req != NULL);
976 
977 	args = &req->args;
978 	args->fs = fs;
979 	args->op.open.name = name;
980 	args->op.open.flags = flags;
981 	args->sem = &channel->sem;
982 	fs->send_request(__fs_open_file, req);
983 	sem_wait(&channel->sem);
984 	rc = args->rc;
985 	if (rc == 0) {
986 		*file = args->file;
987 	} else {
988 		*file = NULL;
989 	}
990 	free_fs_request(req);
991 
992 	return rc;
993 }
994 
995 static void
996 fs_rename_blob_close_cb(void *ctx, int bserrno)
997 {
998 	struct spdk_fs_request *req = ctx;
999 	struct spdk_fs_cb_args *args = &req->args;
1000 
1001 	args->fn.fs_op(args->arg, bserrno);
1002 	free_fs_request(req);
1003 }
1004 
1005 static void
1006 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno)
1007 {
1008 	struct spdk_fs_request *req = ctx;
1009 	struct spdk_fs_cb_args *args = &req->args;
1010 	struct spdk_file *f = args->file;
1011 	const char *new_name = args->op.rename.new_name;
1012 
1013 	f->blob = blob;
1014 	spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1015 	spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req);
1016 }
1017 
1018 static void
1019 __spdk_fs_md_rename_file(struct spdk_fs_request *req)
1020 {
1021 	struct spdk_fs_cb_args *args = &req->args;
1022 	struct spdk_file *f;
1023 
1024 	f = fs_find_file(args->fs, args->op.rename.old_name);
1025 	if (f == NULL) {
1026 		args->fn.fs_op(args->arg, -ENOENT);
1027 		free_fs_request(req);
1028 		return;
1029 	}
1030 
1031 	free(f->name);
1032 	f->name = strdup(args->op.rename.new_name);
1033 	args->file = f;
1034 	spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req);
1035 }
1036 
1037 static void
1038 fs_rename_delete_done(void *arg, int fserrno)
1039 {
1040 	__spdk_fs_md_rename_file(arg);
1041 }
1042 
1043 void
1044 spdk_fs_rename_file_async(struct spdk_filesystem *fs,
1045 			  const char *old_name, const char *new_name,
1046 			  spdk_file_op_complete cb_fn, void *cb_arg)
1047 {
1048 	struct spdk_file *f;
1049 	struct spdk_fs_request *req;
1050 	struct spdk_fs_cb_args *args;
1051 
1052 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name);
1053 	if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1054 		cb_fn(cb_arg, -ENAMETOOLONG);
1055 		return;
1056 	}
1057 
1058 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1059 	if (req == NULL) {
1060 		cb_fn(cb_arg, -ENOMEM);
1061 		return;
1062 	}
1063 
1064 	args = &req->args;
1065 	args->fn.fs_op = cb_fn;
1066 	args->fs = fs;
1067 	args->arg = cb_arg;
1068 	args->op.rename.old_name = old_name;
1069 	args->op.rename.new_name = new_name;
1070 
1071 	f = fs_find_file(fs, new_name);
1072 	if (f == NULL) {
1073 		__spdk_fs_md_rename_file(req);
1074 		return;
1075 	}
1076 
1077 	/*
1078 	 * The rename overwrites an existing file.  So delete the existing file, then
1079 	 *  do the actual rename.
1080 	 */
1081 	spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req);
1082 }
1083 
1084 static void
1085 __fs_rename_file_done(void *arg, int fserrno)
1086 {
1087 	struct spdk_fs_request *req = arg;
1088 	struct spdk_fs_cb_args *args = &req->args;
1089 
1090 	args->rc = fserrno;
1091 	sem_post(args->sem);
1092 }
1093 
1094 static void
1095 __fs_rename_file(void *arg)
1096 {
1097 	struct spdk_fs_request *req = arg;
1098 	struct spdk_fs_cb_args *args = &req->args;
1099 
1100 	spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name,
1101 				  __fs_rename_file_done, req);
1102 }
1103 
1104 int
1105 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1106 		    const char *old_name, const char *new_name)
1107 {
1108 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1109 	struct spdk_fs_request *req;
1110 	struct spdk_fs_cb_args *args;
1111 	int rc;
1112 
1113 	req = alloc_fs_request(channel);
1114 	assert(req != NULL);
1115 
1116 	args = &req->args;
1117 
1118 	args->fs = fs;
1119 	args->op.rename.old_name = old_name;
1120 	args->op.rename.new_name = new_name;
1121 	args->sem = &channel->sem;
1122 	fs->send_request(__fs_rename_file, req);
1123 	sem_wait(&channel->sem);
1124 	rc = args->rc;
1125 	free_fs_request(req);
1126 	return rc;
1127 }
1128 
1129 static void
1130 blob_delete_cb(void *ctx, int bserrno)
1131 {
1132 	struct spdk_fs_request *req = ctx;
1133 	struct spdk_fs_cb_args *args = &req->args;
1134 
1135 	args->fn.file_op(args->arg, bserrno);
1136 	free_fs_request(req);
1137 }
1138 
1139 void
1140 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name,
1141 			  spdk_file_op_complete cb_fn, void *cb_arg)
1142 {
1143 	struct spdk_file *f;
1144 	spdk_blob_id blobid;
1145 	struct spdk_fs_request *req;
1146 	struct spdk_fs_cb_args *args;
1147 
1148 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name);
1149 
1150 	if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) {
1151 		cb_fn(cb_arg, -ENAMETOOLONG);
1152 		return;
1153 	}
1154 
1155 	f = fs_find_file(fs, name);
1156 	if (f == NULL) {
1157 		cb_fn(cb_arg, -ENOENT);
1158 		return;
1159 	}
1160 
1161 	if (f->ref_count > 0) {
1162 		/* For now, do not allow deleting files with open references. */
1163 		cb_fn(cb_arg, -EBUSY);
1164 		return;
1165 	}
1166 
1167 	req = alloc_fs_request(fs->md_target.md_fs_channel);
1168 	if (req == NULL) {
1169 		cb_fn(cb_arg, -ENOMEM);
1170 		return;
1171 	}
1172 
1173 	TAILQ_REMOVE(&fs->files, f, tailq);
1174 
1175 	cache_free_buffers(f);
1176 
1177 	blobid = f->blobid;
1178 
1179 	free(f->name);
1180 	free(f->tree);
1181 	free(f);
1182 
1183 	args = &req->args;
1184 	args->fn.file_op = cb_fn;
1185 	args->arg = cb_arg;
1186 	spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req);
1187 }
1188 
1189 static void
1190 __fs_delete_file_done(void *arg, int fserrno)
1191 {
1192 	struct spdk_fs_request *req = arg;
1193 	struct spdk_fs_cb_args *args = &req->args;
1194 
1195 	args->rc = fserrno;
1196 	sem_post(args->sem);
1197 }
1198 
1199 static void
1200 __fs_delete_file(void *arg)
1201 {
1202 	struct spdk_fs_request *req = arg;
1203 	struct spdk_fs_cb_args *args = &req->args;
1204 
1205 	spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req);
1206 }
1207 
1208 int
1209 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel,
1210 		    const char *name)
1211 {
1212 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1213 	struct spdk_fs_request *req;
1214 	struct spdk_fs_cb_args *args;
1215 	int rc;
1216 
1217 	req = alloc_fs_request(channel);
1218 	assert(req != NULL);
1219 
1220 	args = &req->args;
1221 	args->fs = fs;
1222 	args->op.delete.name = name;
1223 	args->sem = &channel->sem;
1224 	fs->send_request(__fs_delete_file, req);
1225 	sem_wait(&channel->sem);
1226 	rc = args->rc;
1227 	free_fs_request(req);
1228 
1229 	return rc;
1230 }
1231 
1232 spdk_fs_iter
1233 spdk_fs_iter_first(struct spdk_filesystem *fs)
1234 {
1235 	struct spdk_file *f;
1236 
1237 	f = TAILQ_FIRST(&fs->files);
1238 	return f;
1239 }
1240 
1241 spdk_fs_iter
1242 spdk_fs_iter_next(spdk_fs_iter iter)
1243 {
1244 	struct spdk_file *f = iter;
1245 
1246 	if (f == NULL) {
1247 		return NULL;
1248 	}
1249 
1250 	f = TAILQ_NEXT(f, tailq);
1251 	return f;
1252 }
1253 
1254 const char *
1255 spdk_file_get_name(struct spdk_file *file)
1256 {
1257 	return file->name;
1258 }
1259 
1260 uint64_t
1261 spdk_file_get_length(struct spdk_file *file)
1262 {
1263 	assert(file != NULL);
1264 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length);
1265 	return file->length;
1266 }
1267 
1268 static void
1269 fs_truncate_complete_cb(void *ctx, int bserrno)
1270 {
1271 	struct spdk_fs_request *req = ctx;
1272 	struct spdk_fs_cb_args *args = &req->args;
1273 
1274 	args->fn.file_op(args->arg, bserrno);
1275 	free_fs_request(req);
1276 }
1277 
1278 static uint64_t
1279 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz)
1280 {
1281 	return (length + cluster_sz - 1) / cluster_sz;
1282 }
1283 
1284 void
1285 spdk_file_truncate_async(struct spdk_file *file, uint64_t length,
1286 			 spdk_file_op_complete cb_fn, void *cb_arg)
1287 {
1288 	struct spdk_filesystem *fs;
1289 	size_t num_clusters;
1290 	struct spdk_fs_request *req;
1291 	struct spdk_fs_cb_args *args;
1292 
1293 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length);
1294 	if (length == file->length) {
1295 		cb_fn(cb_arg, 0);
1296 		return;
1297 	}
1298 
1299 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
1300 	if (req == NULL) {
1301 		cb_fn(cb_arg, -ENOMEM);
1302 		return;
1303 	}
1304 
1305 	args = &req->args;
1306 	args->fn.file_op = cb_fn;
1307 	args->arg = cb_arg;
1308 	args->file = file;
1309 	fs = file->fs;
1310 
1311 	num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz);
1312 
1313 	spdk_bs_md_resize_blob(file->blob, num_clusters);
1314 	spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length));
1315 
1316 	file->length = length;
1317 	if (file->append_pos > file->length) {
1318 		file->append_pos = file->length;
1319 	}
1320 
1321 	spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args);
1322 }
1323 
1324 static void
1325 __truncate(void *arg)
1326 {
1327 	struct spdk_fs_request *req = arg;
1328 	struct spdk_fs_cb_args *args = &req->args;
1329 
1330 	spdk_file_truncate_async(args->file, args->op.truncate.length,
1331 				 args->fn.file_op, args->arg);
1332 }
1333 
1334 void
1335 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel,
1336 		   uint64_t length)
1337 {
1338 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1339 	struct spdk_fs_request *req;
1340 	struct spdk_fs_cb_args *args;
1341 
1342 	req = alloc_fs_request(channel);
1343 	assert(req != NULL);
1344 
1345 	args = &req->args;
1346 
1347 	args->file = file;
1348 	args->op.truncate.length = length;
1349 	args->fn.file_op = __sem_post;
1350 	args->arg = &channel->sem;
1351 
1352 	channel->send_request(__truncate, req);
1353 	sem_wait(&channel->sem);
1354 	free_fs_request(req);
1355 }
1356 
1357 static void
1358 __rw_done(void *ctx, int bserrno)
1359 {
1360 	struct spdk_fs_request *req = ctx;
1361 	struct spdk_fs_cb_args *args = &req->args;
1362 
1363 	spdk_dma_free(args->op.rw.pin_buf);
1364 	args->fn.file_op(args->arg, bserrno);
1365 	free_fs_request(req);
1366 }
1367 
1368 static void
1369 __read_done(void *ctx, int bserrno)
1370 {
1371 	struct spdk_fs_request *req = ctx;
1372 	struct spdk_fs_cb_args *args = &req->args;
1373 
1374 	if (args->op.rw.is_read) {
1375 		memcpy(args->op.rw.user_buf,
1376 		       args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1377 		       args->op.rw.length);
1378 		__rw_done(req, 0);
1379 	} else {
1380 		memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF),
1381 		       args->op.rw.user_buf,
1382 		       args->op.rw.length);
1383 		spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel,
1384 				      args->op.rw.pin_buf,
1385 				      args->op.rw.start_page, args->op.rw.num_pages,
1386 				      __rw_done, req);
1387 	}
1388 }
1389 
1390 static void
1391 __do_blob_read(void *ctx, int fserrno)
1392 {
1393 	struct spdk_fs_request *req = ctx;
1394 	struct spdk_fs_cb_args *args = &req->args;
1395 
1396 	spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel,
1397 			     args->op.rw.pin_buf,
1398 			     args->op.rw.start_page, args->op.rw.num_pages,
1399 			     __read_done, req);
1400 }
1401 
1402 static void
1403 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length,
1404 		      uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages)
1405 {
1406 	uint64_t end_page;
1407 
1408 	*page_size = spdk_bs_get_page_size(file->fs->bs);
1409 	*start_page = offset / *page_size;
1410 	end_page = (offset + length - 1) / *page_size;
1411 	*num_pages = (end_page - *start_page + 1);
1412 }
1413 
1414 static void
1415 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel,
1416 	    void *payload, uint64_t offset, uint64_t length,
1417 	    spdk_file_op_complete cb_fn, void *cb_arg, int is_read)
1418 {
1419 	struct spdk_fs_request *req;
1420 	struct spdk_fs_cb_args *args;
1421 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1422 	uint64_t start_page, num_pages, pin_buf_length;
1423 	uint32_t page_size;
1424 
1425 	if (is_read && offset + length > file->length) {
1426 		cb_fn(cb_arg, -EINVAL);
1427 		return;
1428 	}
1429 
1430 	req = alloc_fs_request(channel);
1431 	if (req == NULL) {
1432 		cb_fn(cb_arg, -ENOMEM);
1433 		return;
1434 	}
1435 
1436 	args = &req->args;
1437 	args->fn.file_op = cb_fn;
1438 	args->arg = cb_arg;
1439 	args->file = file;
1440 	args->op.rw.channel = channel->bs_channel;
1441 	args->op.rw.user_buf = payload;
1442 	args->op.rw.is_read = is_read;
1443 	args->op.rw.offset = offset;
1444 	args->op.rw.length = length;
1445 
1446 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1447 	pin_buf_length = num_pages * page_size;
1448 	args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL);
1449 
1450 	args->op.rw.start_page = start_page;
1451 	args->op.rw.num_pages = num_pages;
1452 
1453 	if (!is_read && file->length < offset + length) {
1454 		spdk_file_truncate_async(file, offset + length, __do_blob_read, req);
1455 	} else {
1456 		__do_blob_read(req, 0);
1457 	}
1458 }
1459 
1460 void
1461 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel,
1462 		      void *payload, uint64_t offset, uint64_t length,
1463 		      spdk_file_op_complete cb_fn, void *cb_arg)
1464 {
1465 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0);
1466 }
1467 
1468 void
1469 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel,
1470 		     void *payload, uint64_t offset, uint64_t length,
1471 		     spdk_file_op_complete cb_fn, void *cb_arg)
1472 {
1473 	SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n",
1474 		      file->name, offset, length);
1475 	__readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1);
1476 }
1477 
1478 struct spdk_io_channel *
1479 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs)
1480 {
1481 	struct spdk_io_channel *io_channel;
1482 	struct spdk_fs_channel *fs_channel;
1483 
1484 	io_channel = spdk_get_io_channel(&fs->io_target);
1485 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1486 	fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs);
1487 	fs_channel->send_request = __send_request_direct;
1488 
1489 	return io_channel;
1490 }
1491 
1492 struct spdk_io_channel *
1493 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs)
1494 {
1495 	struct spdk_io_channel *io_channel;
1496 	struct spdk_fs_channel *fs_channel;
1497 
1498 	io_channel = spdk_get_io_channel(&fs->io_target);
1499 	fs_channel = spdk_io_channel_get_ctx(io_channel);
1500 	fs_channel->send_request = fs->send_request;
1501 	fs_channel->sync = 1;
1502 	pthread_spin_init(&fs_channel->lock, 0);
1503 
1504 	return io_channel;
1505 }
1506 
1507 void
1508 spdk_fs_free_io_channel(struct spdk_io_channel *channel)
1509 {
1510 	spdk_put_io_channel(channel);
1511 }
1512 
1513 void
1514 spdk_fs_set_cache_size(uint64_t size_in_mb)
1515 {
1516 	g_fs_cache_size = size_in_mb * 1024 * 1024;
1517 }
1518 
1519 uint64_t
1520 spdk_fs_get_cache_size(void)
1521 {
1522 	return g_fs_cache_size / (1024 * 1024);
1523 }
1524 
1525 static void __file_flush(void *_args);
1526 
1527 static void *
1528 alloc_cache_memory_buffer(struct spdk_file *context)
1529 {
1530 	struct spdk_file *file;
1531 	void *buf;
1532 
1533 	buf = spdk_mempool_get(g_cache_pool);
1534 	if (buf != NULL) {
1535 		return buf;
1536 	}
1537 
1538 	pthread_spin_lock(&g_caches_lock);
1539 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1540 		if (!file->open_for_writing &&
1541 		    file->priority == SPDK_FILE_PRIORITY_LOW &&
1542 		    file != context) {
1543 			break;
1544 		}
1545 	}
1546 	pthread_spin_unlock(&g_caches_lock);
1547 	if (file != NULL) {
1548 		cache_free_buffers(file);
1549 		buf = spdk_mempool_get(g_cache_pool);
1550 		if (buf != NULL) {
1551 			return buf;
1552 		}
1553 	}
1554 
1555 	pthread_spin_lock(&g_caches_lock);
1556 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1557 		if (!file->open_for_writing && file != context) {
1558 			break;
1559 		}
1560 	}
1561 	pthread_spin_unlock(&g_caches_lock);
1562 	if (file != NULL) {
1563 		cache_free_buffers(file);
1564 		buf = spdk_mempool_get(g_cache_pool);
1565 		if (buf != NULL) {
1566 			return buf;
1567 		}
1568 	}
1569 
1570 	pthread_spin_lock(&g_caches_lock);
1571 	TAILQ_FOREACH(file, &g_caches, cache_tailq) {
1572 		if (file != context) {
1573 			break;
1574 		}
1575 	}
1576 	pthread_spin_unlock(&g_caches_lock);
1577 	if (file != NULL) {
1578 		cache_free_buffers(file);
1579 		buf = spdk_mempool_get(g_cache_pool);
1580 		if (buf != NULL) {
1581 			return buf;
1582 		}
1583 	}
1584 
1585 	return NULL;
1586 }
1587 
1588 static struct cache_buffer *
1589 cache_insert_buffer(struct spdk_file *file, uint64_t offset)
1590 {
1591 	struct cache_buffer *buf;
1592 	int count = 0;
1593 
1594 	buf = calloc(1, sizeof(*buf));
1595 	if (buf == NULL) {
1596 		SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n");
1597 		return NULL;
1598 	}
1599 
1600 	buf->buf = alloc_cache_memory_buffer(file);
1601 	while (buf->buf == NULL) {
1602 		/*
1603 		 * TODO: alloc_cache_memory_buffer() should eventually free
1604 		 *  some buffers.  Need a more sophisticated check here, instead
1605 		 *  of just bailing if 100 tries does not result in getting a
1606 		 *  free buffer.  This will involve using the sync channel's
1607 		 *  semaphore to block until a buffer becomes available.
1608 		 */
1609 		if (count++ == 100) {
1610 			SPDK_ERRLOG("could not allocate cache buffer\n");
1611 			assert(false);
1612 			free(buf);
1613 			return NULL;
1614 		}
1615 		buf->buf = alloc_cache_memory_buffer(file);
1616 	}
1617 
1618 	buf->buf_size = CACHE_BUFFER_SIZE;
1619 	buf->offset = offset;
1620 
1621 	pthread_spin_lock(&g_caches_lock);
1622 	if (file->tree->present_mask == 0) {
1623 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
1624 	}
1625 	file->tree = spdk_tree_insert_buffer(file->tree, buf);
1626 	pthread_spin_unlock(&g_caches_lock);
1627 
1628 	return buf;
1629 }
1630 
1631 static struct cache_buffer *
1632 cache_append_buffer(struct spdk_file *file)
1633 {
1634 	struct cache_buffer *last;
1635 
1636 	assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size);
1637 	assert((file->append_pos % CACHE_BUFFER_SIZE) == 0);
1638 
1639 	last = cache_insert_buffer(file, file->append_pos);
1640 	if (last == NULL) {
1641 		SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n");
1642 		return NULL;
1643 	}
1644 
1645 	if (file->last != NULL) {
1646 		file->last->next = last;
1647 	}
1648 	file->last = last;
1649 
1650 	return last;
1651 }
1652 
1653 static void
1654 __wake_caller(struct spdk_fs_cb_args *args)
1655 {
1656 	sem_post(args->sem);
1657 }
1658 
1659 static void __check_sync_reqs(struct spdk_file *file);
1660 
1661 static void
1662 __file_cache_finish_sync(struct spdk_file *file)
1663 {
1664 	struct spdk_fs_request *sync_req;
1665 	struct spdk_fs_cb_args *sync_args;
1666 
1667 	pthread_spin_lock(&file->lock);
1668 	sync_req = TAILQ_FIRST(&file->sync_requests);
1669 	sync_args = &sync_req->args;
1670 	assert(sync_args->op.sync.offset <= file->length_flushed);
1671 	BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset);
1672 	TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq);
1673 	pthread_spin_unlock(&file->lock);
1674 
1675 	sync_args->fn.file_op(sync_args->arg, 0);
1676 	__check_sync_reqs(file);
1677 
1678 	pthread_spin_lock(&file->lock);
1679 	free_fs_request(sync_req);
1680 	pthread_spin_unlock(&file->lock);
1681 }
1682 
1683 static void
1684 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno)
1685 {
1686 	struct spdk_file *file = ctx;
1687 
1688 	__file_cache_finish_sync(file);
1689 }
1690 
1691 static void
1692 __free_args(struct spdk_fs_cb_args *args)
1693 {
1694 	struct spdk_fs_request *req;
1695 
1696 	if (!args->from_request) {
1697 		free(args);
1698 	} else {
1699 		/* Depends on args being at the start of the spdk_fs_request structure. */
1700 		req = (struct spdk_fs_request *)args;
1701 		free_fs_request(req);
1702 	}
1703 }
1704 
1705 static void
1706 __check_sync_reqs(struct spdk_file *file)
1707 {
1708 	struct spdk_fs_request *sync_req;
1709 
1710 	pthread_spin_lock(&file->lock);
1711 
1712 	TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) {
1713 		if (sync_req->args.op.sync.offset <= file->length_flushed) {
1714 			break;
1715 		}
1716 	}
1717 
1718 	if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) {
1719 		BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed);
1720 		sync_req->args.op.sync.xattr_in_progress = true;
1721 		spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed,
1722 				       sizeof(file->length_flushed));
1723 
1724 		pthread_spin_unlock(&file->lock);
1725 		spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file);
1726 	} else {
1727 		pthread_spin_unlock(&file->lock);
1728 	}
1729 }
1730 
1731 static void
1732 __file_flush_done(void *arg, int bserrno)
1733 {
1734 	struct spdk_fs_cb_args *args = arg;
1735 	struct spdk_file *file = args->file;
1736 	struct cache_buffer *next = args->op.flush.cache_buffer;
1737 
1738 	BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length);
1739 
1740 	pthread_spin_lock(&file->lock);
1741 	next->in_progress = false;
1742 	next->bytes_flushed += args->op.flush.length;
1743 	file->length_flushed += args->op.flush.length;
1744 	if (file->length_flushed > file->length) {
1745 		file->length = file->length_flushed;
1746 	}
1747 	if (next->bytes_flushed == next->buf_size) {
1748 		BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed);
1749 		next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1750 	}
1751 
1752 	/*
1753 	 * Assert that there is no cached data that extends past the end of the underlying
1754 	 *  blob.
1755 	 */
1756 	assert(next == NULL || next->offset < __file_get_blob_size(file) ||
1757 	       next->bytes_filled == 0);
1758 
1759 	pthread_spin_unlock(&file->lock);
1760 
1761 	__check_sync_reqs(file);
1762 
1763 	__file_flush(args);
1764 }
1765 
1766 static void
1767 __file_flush(void *_args)
1768 {
1769 	struct spdk_fs_cb_args *args = _args;
1770 	struct spdk_file *file = args->file;
1771 	struct cache_buffer *next;
1772 	uint64_t offset, length, start_page, num_pages;
1773 	uint32_t page_size;
1774 
1775 	pthread_spin_lock(&file->lock);
1776 	next = spdk_tree_find_buffer(file->tree, file->length_flushed);
1777 	if (next == NULL || next->in_progress) {
1778 		/*
1779 		 * There is either no data to flush, or a flush I/O is already in
1780 		 *  progress.  So return immediately - if a flush I/O is in
1781 		 *  progress we will flush more data after that is completed.
1782 		 */
1783 		__free_args(args);
1784 		pthread_spin_unlock(&file->lock);
1785 		return;
1786 	}
1787 
1788 	offset = next->offset + next->bytes_flushed;
1789 	length = next->bytes_filled - next->bytes_flushed;
1790 	if (length == 0) {
1791 		__free_args(args);
1792 		pthread_spin_unlock(&file->lock);
1793 		return;
1794 	}
1795 	args->op.flush.length = length;
1796 	args->op.flush.cache_buffer = next;
1797 
1798 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
1799 
1800 	next->in_progress = true;
1801 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
1802 		     offset, length, start_page, num_pages);
1803 	pthread_spin_unlock(&file->lock);
1804 	spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
1805 			      next->buf + (start_page * page_size) - next->offset,
1806 			      start_page, num_pages,
1807 			      __file_flush_done, args);
1808 }
1809 
1810 static void
1811 __file_extend_done(void *arg, int bserrno)
1812 {
1813 	struct spdk_fs_cb_args *args = arg;
1814 
1815 	__wake_caller(args);
1816 }
1817 
1818 static void
1819 __file_extend_blob(void *_args)
1820 {
1821 	struct spdk_fs_cb_args *args = _args;
1822 	struct spdk_file *file = args->file;
1823 
1824 	spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters);
1825 
1826 	spdk_bs_md_sync_blob(file->blob, __file_extend_done, args);
1827 }
1828 
1829 static void
1830 __rw_from_file_done(void *arg, int bserrno)
1831 {
1832 	struct spdk_fs_cb_args *args = arg;
1833 
1834 	__wake_caller(args);
1835 	__free_args(args);
1836 }
1837 
1838 static void
1839 __rw_from_file(void *_args)
1840 {
1841 	struct spdk_fs_cb_args *args = _args;
1842 	struct spdk_file *file = args->file;
1843 
1844 	if (args->op.rw.is_read) {
1845 		spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1846 				     args->op.rw.offset, args->op.rw.length,
1847 				     __rw_from_file_done, args);
1848 	} else {
1849 		spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf,
1850 				      args->op.rw.offset, args->op.rw.length,
1851 				      __rw_from_file_done, args);
1852 	}
1853 }
1854 
1855 static int
1856 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload,
1857 		    uint64_t offset, uint64_t length, bool is_read)
1858 {
1859 	struct spdk_fs_cb_args *args;
1860 
1861 	args = calloc(1, sizeof(*args));
1862 	if (args == NULL) {
1863 		sem_post(sem);
1864 		return -ENOMEM;
1865 	}
1866 
1867 	args->file = file;
1868 	args->sem = sem;
1869 	args->op.rw.user_buf = payload;
1870 	args->op.rw.offset = offset;
1871 	args->op.rw.length = length;
1872 	args->op.rw.is_read = is_read;
1873 	file->fs->send_request(__rw_from_file, args);
1874 	return 0;
1875 }
1876 
1877 int
1878 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel,
1879 		void *payload, uint64_t offset, uint64_t length)
1880 {
1881 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
1882 	struct spdk_fs_cb_args *args;
1883 	uint64_t rem_length, copy, blob_size, cluster_sz;
1884 	uint32_t cache_buffers_filled = 0;
1885 	uint8_t *cur_payload;
1886 	struct cache_buffer *last;
1887 
1888 	BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length);
1889 
1890 	if (length == 0) {
1891 		return 0;
1892 	}
1893 
1894 	if (offset != file->append_pos) {
1895 		BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos);
1896 		return -EINVAL;
1897 	}
1898 
1899 	pthread_spin_lock(&file->lock);
1900 	file->open_for_writing = true;
1901 
1902 	if (file->last == NULL) {
1903 		if (file->append_pos % CACHE_BUFFER_SIZE == 0) {
1904 			cache_append_buffer(file);
1905 		} else {
1906 			int rc;
1907 
1908 			file->append_pos += length;
1909 			pthread_spin_unlock(&file->lock);
1910 			rc = __send_rw_from_file(file, &channel->sem, payload,
1911 						 offset, length, false);
1912 			sem_wait(&channel->sem);
1913 			return rc;
1914 		}
1915 	}
1916 
1917 	blob_size = __file_get_blob_size(file);
1918 
1919 	if ((offset + length) > blob_size) {
1920 		struct spdk_fs_cb_args extend_args = {};
1921 
1922 		cluster_sz = file->fs->bs_opts.cluster_sz;
1923 		extend_args.sem = &channel->sem;
1924 		extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz);
1925 		extend_args.file = file;
1926 		BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters);
1927 		pthread_spin_unlock(&file->lock);
1928 		file->fs->send_request(__file_extend_blob, &extend_args);
1929 		sem_wait(&channel->sem);
1930 	}
1931 
1932 	last = file->last;
1933 	rem_length = length;
1934 	cur_payload = payload;
1935 	while (rem_length > 0) {
1936 		copy = last->buf_size - last->bytes_filled;
1937 		if (copy > rem_length) {
1938 			copy = rem_length;
1939 		}
1940 		BLOBFS_TRACE_RW(file, "  fill offset=%jx length=%jx\n", file->append_pos, copy);
1941 		memcpy(&last->buf[last->bytes_filled], cur_payload, copy);
1942 		file->append_pos += copy;
1943 		if (file->length < file->append_pos) {
1944 			file->length = file->append_pos;
1945 		}
1946 		cur_payload += copy;
1947 		last->bytes_filled += copy;
1948 		rem_length -= copy;
1949 		if (last->bytes_filled == last->buf_size) {
1950 			cache_buffers_filled++;
1951 			last = cache_append_buffer(file);
1952 			if (last == NULL) {
1953 				BLOBFS_TRACE(file, "nomem\n");
1954 				pthread_spin_unlock(&file->lock);
1955 				return -ENOMEM;
1956 			}
1957 		}
1958 	}
1959 
1960 	if (cache_buffers_filled == 0) {
1961 		pthread_spin_unlock(&file->lock);
1962 		return 0;
1963 	}
1964 
1965 	args = calloc(1, sizeof(*args));
1966 	if (args == NULL) {
1967 		pthread_spin_unlock(&file->lock);
1968 		return -ENOMEM;
1969 	}
1970 
1971 	args->file = file;
1972 	file->fs->send_request(__file_flush, args);
1973 	pthread_spin_unlock(&file->lock);
1974 	return 0;
1975 }
1976 
1977 static void
1978 __readahead_done(void *arg, int bserrno)
1979 {
1980 	struct spdk_fs_cb_args *args = arg;
1981 	struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer;
1982 	struct spdk_file *file = args->file;
1983 
1984 	BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset);
1985 
1986 	pthread_spin_lock(&file->lock);
1987 	cache_buffer->bytes_filled = args->op.readahead.length;
1988 	cache_buffer->bytes_flushed = args->op.readahead.length;
1989 	cache_buffer->in_progress = false;
1990 	pthread_spin_unlock(&file->lock);
1991 
1992 	__free_args(args);
1993 }
1994 
1995 static void
1996 __readahead(void *_args)
1997 {
1998 	struct spdk_fs_cb_args *args = _args;
1999 	struct spdk_file *file = args->file;
2000 	uint64_t offset, length, start_page, num_pages;
2001 	uint32_t page_size;
2002 
2003 	offset = args->op.readahead.offset;
2004 	length = args->op.readahead.length;
2005 	assert(length > 0);
2006 
2007 	__get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages);
2008 
2009 	BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n",
2010 		     offset, length, start_page, num_pages);
2011 	spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel,
2012 			     args->op.readahead.cache_buffer->buf,
2013 			     start_page, num_pages,
2014 			     __readahead_done, args);
2015 }
2016 
2017 static uint64_t
2018 __next_cache_buffer_offset(uint64_t offset)
2019 {
2020 	return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0));
2021 }
2022 
2023 static void
2024 check_readahead(struct spdk_file *file, uint64_t offset)
2025 {
2026 	struct spdk_fs_cb_args *args;
2027 
2028 	offset = __next_cache_buffer_offset(offset);
2029 	if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) {
2030 		return;
2031 	}
2032 
2033 	args = calloc(1, sizeof(*args));
2034 	if (args == NULL) {
2035 		return;
2036 	}
2037 
2038 	BLOBFS_TRACE(file, "offset=%jx\n", offset);
2039 
2040 	args->file = file;
2041 	args->op.readahead.offset = offset;
2042 	args->op.readahead.cache_buffer = cache_insert_buffer(file, offset);
2043 	args->op.readahead.cache_buffer->in_progress = true;
2044 	if (file->length < (offset + CACHE_BUFFER_SIZE)) {
2045 		args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1);
2046 	} else {
2047 		args->op.readahead.length = CACHE_BUFFER_SIZE;
2048 	}
2049 	file->fs->send_request(__readahead, args);
2050 }
2051 
2052 static int
2053 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem)
2054 {
2055 	struct cache_buffer *buf;
2056 	int rc;
2057 
2058 	buf = spdk_tree_find_filled_buffer(file->tree, offset);
2059 	if (buf == NULL) {
2060 		pthread_spin_unlock(&file->lock);
2061 		rc = __send_rw_from_file(file, sem, payload, offset, length, true);
2062 		pthread_spin_lock(&file->lock);
2063 		return rc;
2064 	}
2065 
2066 	if ((offset + length) > (buf->offset + buf->bytes_filled)) {
2067 		length = buf->offset + buf->bytes_filled - offset;
2068 	}
2069 	BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length);
2070 	memcpy(payload, &buf->buf[offset - buf->offset], length);
2071 	if ((offset + length) % CACHE_BUFFER_SIZE == 0) {
2072 		pthread_spin_lock(&g_caches_lock);
2073 		spdk_tree_remove_buffer(file->tree, buf);
2074 		if (file->tree->present_mask == 0) {
2075 			TAILQ_REMOVE(&g_caches, file, cache_tailq);
2076 		}
2077 		pthread_spin_unlock(&g_caches_lock);
2078 	}
2079 
2080 	sem_post(sem);
2081 	return 0;
2082 }
2083 
2084 int64_t
2085 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel,
2086 	       void *payload, uint64_t offset, uint64_t length)
2087 {
2088 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2089 	uint64_t final_offset, final_length;
2090 	uint32_t sub_reads = 0;
2091 	int rc = 0;
2092 
2093 	pthread_spin_lock(&file->lock);
2094 
2095 	BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length);
2096 
2097 	file->open_for_writing = false;
2098 
2099 	if (length == 0 || offset >= file->append_pos) {
2100 		pthread_spin_unlock(&file->lock);
2101 		return 0;
2102 	}
2103 
2104 	if (offset + length > file->append_pos) {
2105 		length = file->append_pos - offset;
2106 	}
2107 
2108 	if (offset != file->next_seq_offset) {
2109 		file->seq_byte_count = 0;
2110 	}
2111 	file->seq_byte_count += length;
2112 	file->next_seq_offset = offset + length;
2113 	if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) {
2114 		check_readahead(file, offset);
2115 		check_readahead(file, offset + CACHE_BUFFER_SIZE);
2116 	}
2117 
2118 	final_length = 0;
2119 	final_offset = offset + length;
2120 	while (offset < final_offset) {
2121 		length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset;
2122 		if (length > (final_offset - offset)) {
2123 			length = final_offset - offset;
2124 		}
2125 		rc = __file_read(file, payload, offset, length, &channel->sem);
2126 		if (rc == 0) {
2127 			final_length += length;
2128 		} else {
2129 			break;
2130 		}
2131 		payload += length;
2132 		offset += length;
2133 		sub_reads++;
2134 	}
2135 	pthread_spin_unlock(&file->lock);
2136 	while (sub_reads-- > 0) {
2137 		sem_wait(&channel->sem);
2138 	}
2139 	if (rc == 0) {
2140 		return final_length;
2141 	} else {
2142 		return rc;
2143 	}
2144 }
2145 
2146 static void
2147 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel,
2148 	   spdk_file_op_complete cb_fn, void *cb_arg)
2149 {
2150 	struct spdk_fs_request *sync_req;
2151 	struct spdk_fs_request *flush_req;
2152 	struct spdk_fs_cb_args *sync_args;
2153 	struct spdk_fs_cb_args *flush_args;
2154 
2155 	BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos);
2156 
2157 	pthread_spin_lock(&file->lock);
2158 	if (file->append_pos <= file->length_flushed || file->last == NULL) {
2159 		BLOBFS_TRACE(file, "done - no data to flush\n");
2160 		pthread_spin_unlock(&file->lock);
2161 		cb_fn(cb_arg, 0);
2162 		return;
2163 	}
2164 
2165 	sync_req = alloc_fs_request(channel);
2166 	assert(sync_req != NULL);
2167 	sync_args = &sync_req->args;
2168 
2169 	flush_req = alloc_fs_request(channel);
2170 	assert(flush_req != NULL);
2171 	flush_args = &flush_req->args;
2172 
2173 	sync_args->file = file;
2174 	sync_args->fn.file_op = cb_fn;
2175 	sync_args->arg = cb_arg;
2176 	sync_args->op.sync.offset = file->append_pos;
2177 	sync_args->op.sync.xattr_in_progress = false;
2178 	TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq);
2179 	pthread_spin_unlock(&file->lock);
2180 
2181 	flush_args->file = file;
2182 	channel->send_request(__file_flush, flush_args);
2183 }
2184 
2185 int
2186 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel)
2187 {
2188 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2189 
2190 	_file_sync(file, channel, __sem_post, &channel->sem);
2191 	sem_wait(&channel->sem);
2192 
2193 	return 0;
2194 }
2195 
2196 void
2197 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel,
2198 		     spdk_file_op_complete cb_fn, void *cb_arg)
2199 {
2200 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2201 
2202 	_file_sync(file, channel, cb_fn, cb_arg);
2203 }
2204 
2205 void
2206 spdk_file_set_priority(struct spdk_file *file, uint32_t priority)
2207 {
2208 	BLOBFS_TRACE(file, "priority=%u\n", priority);
2209 	file->priority = priority;
2210 
2211 }
2212 
2213 /*
2214  * Close routines
2215  */
2216 
2217 static void
2218 __file_close_async_done(void *ctx, int bserrno)
2219 {
2220 	struct spdk_fs_request *req = ctx;
2221 	struct spdk_fs_cb_args *args = &req->args;
2222 
2223 	args->fn.file_op(args->arg, bserrno);
2224 	free_fs_request(req);
2225 }
2226 
2227 static void
2228 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req)
2229 {
2230 	pthread_spin_lock(&file->lock);
2231 	if (file->ref_count == 0) {
2232 		pthread_spin_unlock(&file->lock);
2233 		__file_close_async_done(req, -EBADF);
2234 		return;
2235 	}
2236 
2237 	file->ref_count--;
2238 	if (file->ref_count > 0) {
2239 		pthread_spin_unlock(&file->lock);
2240 		__file_close_async_done(req, 0);
2241 		return;
2242 	}
2243 
2244 	pthread_spin_unlock(&file->lock);
2245 
2246 	spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req);
2247 }
2248 
2249 static void
2250 __file_close_async__sync_done(void *arg, int fserrno)
2251 {
2252 	struct spdk_fs_request *req = arg;
2253 	struct spdk_fs_cb_args *args = &req->args;
2254 
2255 	__file_close_async(args->file, req);
2256 }
2257 
2258 void
2259 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg)
2260 {
2261 	struct spdk_fs_request *req;
2262 	struct spdk_fs_cb_args *args;
2263 
2264 	req = alloc_fs_request(file->fs->md_target.md_fs_channel);
2265 	if (req == NULL) {
2266 		cb_fn(cb_arg, -ENOMEM);
2267 		return;
2268 	}
2269 
2270 	args = &req->args;
2271 	args->file = file;
2272 	args->fn.file_op = cb_fn;
2273 	args->arg = cb_arg;
2274 
2275 	spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req);
2276 }
2277 
2278 static void
2279 __file_close_done(void *arg, int fserrno)
2280 {
2281 	struct spdk_fs_cb_args *args = arg;
2282 
2283 	args->rc = fserrno;
2284 	sem_post(args->sem);
2285 }
2286 
2287 static void
2288 __file_close(void *arg)
2289 {
2290 	struct spdk_fs_request *req = arg;
2291 	struct spdk_fs_cb_args *args = &req->args;
2292 	struct spdk_file *file = args->file;
2293 
2294 	__file_close_async(file, req);
2295 }
2296 
2297 int
2298 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel)
2299 {
2300 	struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel);
2301 	struct spdk_fs_request *req;
2302 	struct spdk_fs_cb_args *args;
2303 
2304 	req = alloc_fs_request(channel);
2305 	assert(req != NULL);
2306 
2307 	args = &req->args;
2308 
2309 	spdk_file_sync(file, _channel);
2310 	BLOBFS_TRACE(file, "name=%s\n", file->name);
2311 	args->file = file;
2312 	args->sem = &channel->sem;
2313 	args->fn.file_op = __file_close_done;
2314 	args->arg = req;
2315 	channel->send_request(__file_close, req);
2316 	sem_wait(&channel->sem);
2317 
2318 	return args->rc;
2319 }
2320 
2321 static void
2322 cache_free_buffers(struct spdk_file *file)
2323 {
2324 	BLOBFS_TRACE(file, "free=%s\n", file->name);
2325 	pthread_spin_lock(&file->lock);
2326 	pthread_spin_lock(&g_caches_lock);
2327 	if (file->tree->present_mask == 0) {
2328 		pthread_spin_unlock(&g_caches_lock);
2329 		pthread_spin_unlock(&file->lock);
2330 		return;
2331 	}
2332 	spdk_tree_free_buffers(file->tree);
2333 
2334 	TAILQ_REMOVE(&g_caches, file, cache_tailq);
2335 	/* If not freed, put it in the end of the queue */
2336 	if (file->tree->present_mask != 0) {
2337 		TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq);
2338 	}
2339 	file->last = NULL;
2340 	pthread_spin_unlock(&g_caches_lock);
2341 	pthread_spin_unlock(&file->lock);
2342 }
2343 
2344 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS);
2345 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW);
2346