xref: /spdk/lib/blob/blobstore.c (revision 57986fb884d424655cc53bc8e4d24495655b78fb)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <stdbool.h>
35 #include <assert.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <stdlib.h>
39 #include <string.h>
40 
41 #include "spdk/blob.h"
42 #include "spdk/env.h"
43 #include "spdk/queue.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/bit_array.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include "blobstore.h"
50 #include "request.h"
51 
52 static inline size_t
53 divide_round_up(size_t num, size_t divisor)
54 {
55 	return (num + divisor - 1) / divisor;
56 }
57 
58 static void
59 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
60 {
61 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
62 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
63 	assert(bs->num_free_clusters > 0);
64 
65 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num);
66 
67 	spdk_bit_array_set(bs->used_clusters, cluster_num);
68 	bs->num_free_clusters--;
69 }
70 
71 static void
72 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
73 {
74 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
75 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
76 	assert(bs->num_free_clusters < bs->total_clusters);
77 
78 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num);
79 
80 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
81 	bs->num_free_clusters++;
82 }
83 
84 static struct spdk_blob *
85 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
86 {
87 	struct spdk_blob *blob;
88 
89 	blob = calloc(1, sizeof(*blob));
90 	if (!blob) {
91 		return NULL;
92 	}
93 
94 	blob->id = id;
95 	blob->bs = bs;
96 
97 	blob->state = SPDK_BLOB_STATE_DIRTY;
98 	blob->active.num_pages = 1;
99 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
100 	if (!blob->active.pages) {
101 		free(blob);
102 		return NULL;
103 	}
104 
105 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
106 
107 	TAILQ_INIT(&blob->xattrs);
108 
109 	return blob;
110 }
111 
112 static void
113 _spdk_blob_free(struct spdk_blob *blob)
114 {
115 	struct spdk_xattr 	*xattr, *xattr_tmp;
116 
117 	assert(blob != NULL);
118 	assert(blob->state == SPDK_BLOB_STATE_CLEAN);
119 
120 	free(blob->active.clusters);
121 	free(blob->clean.clusters);
122 	free(blob->active.pages);
123 	free(blob->clean.pages);
124 
125 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
126 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
127 		free(xattr->name);
128 		free(xattr->value);
129 		free(xattr);
130 	}
131 
132 	free(blob);
133 }
134 
135 static int
136 _spdk_blob_mark_clean(struct spdk_blob *blob)
137 {
138 	uint64_t *clusters = NULL;
139 	uint32_t *pages = NULL;
140 
141 	assert(blob != NULL);
142 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
143 	       blob->state == SPDK_BLOB_STATE_SYNCING);
144 
145 	if (blob->active.num_clusters) {
146 		assert(blob->active.clusters);
147 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
148 		if (!clusters) {
149 			return -1;
150 		}
151 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
152 	}
153 
154 	if (blob->active.num_pages) {
155 		assert(blob->active.pages);
156 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
157 		if (!pages) {
158 			free(clusters);
159 			return -1;
160 		}
161 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
162 	}
163 
164 	free(blob->clean.clusters);
165 	free(blob->clean.pages);
166 
167 	blob->clean.num_clusters = blob->active.num_clusters;
168 	blob->clean.clusters = blob->active.clusters;
169 	blob->clean.num_pages = blob->active.num_pages;
170 	blob->clean.pages = blob->active.pages;
171 
172 	blob->active.clusters = clusters;
173 	blob->active.pages = pages;
174 
175 	blob->state = SPDK_BLOB_STATE_CLEAN;
176 
177 	return 0;
178 }
179 
180 static void
181 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
182 {
183 	struct spdk_blob_md_descriptor *desc;
184 	size_t	cur_desc = 0;
185 	void *tmp;
186 
187 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
188 	while (cur_desc < sizeof(page->descriptors)) {
189 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
190 			if (desc->length == 0) {
191 				/* If padding and length are 0, this terminates the page */
192 				break;
193 			}
194 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
195 			struct spdk_blob_md_descriptor_extent	*desc_extent;
196 			unsigned int				i, j;
197 			unsigned int				cluster_count = blob->active.num_clusters;
198 
199 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
200 
201 			assert(desc_extent->length > 0);
202 			assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0);
203 
204 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
205 				for (j = 0; j < desc_extent->extents[i].length; j++) {
206 					assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j));
207 					cluster_count++;
208 				}
209 			}
210 
211 			assert(cluster_count > 0);
212 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
213 			assert(tmp != NULL);
214 			blob->active.clusters = tmp;
215 			blob->active.cluster_array_size = cluster_count;
216 
217 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
218 				for (j = 0; j < desc_extent->extents[i].length; j++) {
219 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
220 							desc_extent->extents[i].cluster_idx + j);
221 				}
222 			}
223 
224 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
225 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
226 			struct spdk_xattr 			*xattr;
227 
228 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
229 
230 			xattr = calloc(1, sizeof(*xattr));
231 			assert(xattr != NULL);
232 
233 			xattr->name = malloc(desc_xattr->name_length + 1);
234 			assert(xattr->name);
235 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
236 			xattr->name[desc_xattr->name_length] = '\0';
237 
238 			xattr->value = malloc(desc_xattr->value_length);
239 			assert(xattr->value != NULL);
240 			xattr->value_len = desc_xattr->value_length;
241 			memcpy(xattr->value,
242 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
243 			       desc_xattr->value_length);
244 
245 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
246 		} else {
247 			/* Error */
248 			break;
249 		}
250 
251 		/* Advance to the next descriptor */
252 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)desc + sizeof(*desc) + desc->length);
253 		cur_desc += sizeof(*desc) + desc->length;
254 	}
255 }
256 
257 static int
258 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
259 		 struct spdk_blob *blob)
260 {
261 	const struct spdk_blob_md_page *page;
262 	uint32_t i;
263 
264 	assert(page_count > 0);
265 	assert(pages[0].sequence_num == 0);
266 	assert(blob != NULL);
267 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
268 	assert(blob->active.clusters == NULL);
269 	assert(blob->id == pages[0].id);
270 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
271 
272 	for (i = 0; i < page_count; i++) {
273 		page = &pages[i];
274 
275 		assert(page->id == blob->id);
276 		assert(page->sequence_num == i);
277 
278 		_spdk_blob_parse_page(page, blob);
279 	}
280 
281 	return 0;
282 }
283 
284 static int
285 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
286 			      struct spdk_blob_md_page **pages,
287 			      uint32_t *page_count,
288 			      struct spdk_blob_md_page **last_page)
289 {
290 	struct spdk_blob_md_page *page;
291 
292 	assert(pages != NULL);
293 	assert(page_count != NULL);
294 
295 	if (*page_count == 0) {
296 		assert(*pages == NULL);
297 		*page_count = 1;
298 		*pages = spdk_zmalloc(sizeof(struct spdk_blob_md_page),
299 				      sizeof(struct spdk_blob_md_page),
300 				      NULL);
301 	} else {
302 		assert(*pages != NULL);
303 		(*page_count)++;
304 		*pages = spdk_realloc(*pages,
305 				      sizeof(struct spdk_blob_md_page) * (*page_count),
306 				      sizeof(struct spdk_blob_md_page),
307 				      NULL);
308 	}
309 
310 	if (*pages == NULL) {
311 		*page_count = 0;
312 		*last_page = NULL;
313 		return -ENOMEM;
314 	}
315 
316 	page = &(*pages)[*page_count - 1];
317 	page->id = blob->id;
318 	page->sequence_num = *page_count - 1;
319 	page->next = SPDK_INVALID_MD_PAGE;
320 	*last_page = page;
321 
322 	return 0;
323 }
324 
325 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
326  * Update required_sz on both success and failure.
327  *
328  */
329 static int
330 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
331 			   uint8_t *buf, size_t buf_sz,
332 			   size_t *required_sz)
333 {
334 	struct spdk_blob_md_descriptor_xattr	*desc;
335 
336 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
337 		       strlen(xattr->name) +
338 		       xattr->value_len;
339 
340 	if (buf_sz < *required_sz) {
341 		return -1;
342 	}
343 
344 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
345 
346 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
347 	desc->length = sizeof(desc->name_length) +
348 		       sizeof(desc->value_length) +
349 		       strlen(xattr->name) +
350 		       xattr->value_len;
351 	desc->name_length = strlen(xattr->name);
352 	desc->value_length = xattr->value_len;
353 
354 	memcpy(desc->name, xattr->name, desc->name_length);
355 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
356 	       xattr->value,
357 	       desc->value_length);
358 
359 	return 0;
360 }
361 
362 static void
363 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
364 			    uint64_t start_cluster, uint64_t *next_cluster,
365 			    uint8_t *buf, size_t buf_sz)
366 {
367 	struct spdk_blob_md_descriptor_extent *desc;
368 	size_t cur_sz;
369 	uint64_t i, extent_idx;
370 	uint32_t lba, lba_per_cluster, lba_count;
371 
372 	/* The buffer must have room for at least one extent */
373 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
374 	if (buf_sz < cur_sz) {
375 		*next_cluster = start_cluster;
376 		return;
377 	}
378 
379 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
380 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
381 
382 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
383 
384 	lba = blob->active.clusters[start_cluster];
385 	lba_count = lba_per_cluster;
386 	extent_idx = 0;
387 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
388 		if ((lba + lba_count) == blob->active.clusters[i]) {
389 			lba_count += lba_per_cluster;
390 			continue;
391 		}
392 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
393 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
394 		extent_idx++;
395 
396 		cur_sz += sizeof(desc->extents[extent_idx]);
397 
398 		if (buf_sz < cur_sz) {
399 			/* If we ran out of buffer space, return */
400 			desc->length = sizeof(desc->extents[0]) * extent_idx;
401 			*next_cluster = i;
402 			return;
403 		}
404 
405 		lba = blob->active.clusters[i];
406 		lba_count = lba_per_cluster;
407 	}
408 
409 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
410 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
411 	extent_idx++;
412 
413 	desc->length = sizeof(desc->extents[0]) * extent_idx;
414 	*next_cluster = blob->active.num_clusters;
415 
416 	return;
417 }
418 
419 static int
420 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
421 		     uint32_t *page_count)
422 {
423 	struct spdk_blob_md_page		*cur_page;
424 	const struct spdk_xattr			*xattr;
425 	int 					rc;
426 	uint8_t					*buf;
427 	size_t					remaining_sz;
428 
429 	assert(pages != NULL);
430 	assert(page_count != NULL);
431 	assert(blob != NULL);
432 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
433 
434 	*pages = NULL;
435 	*page_count = 0;
436 
437 	/* A blob always has at least 1 page, even if it has no descriptors */
438 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
439 	if (rc < 0) {
440 		return rc;
441 	}
442 
443 	buf = (uint8_t *)cur_page->descriptors;
444 	remaining_sz = sizeof(cur_page->descriptors);
445 
446 	/* Serialize xattrs */
447 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
448 		size_t required_sz = 0;
449 		rc = _spdk_blob_serialize_xattr(xattr,
450 						buf, remaining_sz,
451 						&required_sz);
452 		if (rc < 0) {
453 			/* Need to add a new page to the chain */
454 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
455 							   &cur_page);
456 			if (rc < 0) {
457 				spdk_free(*pages);
458 				*pages = NULL;
459 				*page_count = 0;
460 				return rc;
461 			}
462 
463 			buf = (uint8_t *)cur_page->descriptors;
464 			remaining_sz = sizeof(cur_page->descriptors);
465 
466 			/* Try again */
467 			required_sz = 0;
468 			rc = _spdk_blob_serialize_xattr(xattr,
469 							buf, remaining_sz,
470 							&required_sz);
471 
472 			if (rc < 0) {
473 				spdk_free(*pages);
474 				*pages = NULL;
475 				*page_count = 0;
476 				return -1;
477 			}
478 		}
479 
480 		remaining_sz -= required_sz;
481 		buf += required_sz;
482 	}
483 
484 	/* Serialize extents */
485 	uint64_t last_cluster = 0;
486 	while (last_cluster < blob->active.num_clusters) {
487 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
488 					    buf, remaining_sz);
489 
490 		if (last_cluster == blob->active.num_clusters) {
491 			break;
492 		}
493 
494 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
495 						   &cur_page);
496 		if (rc < 0) {
497 			return rc;
498 		}
499 
500 		buf = (uint8_t *)cur_page->descriptors;
501 		remaining_sz = sizeof(cur_page->descriptors);
502 	}
503 
504 	return 0;
505 }
506 
507 struct spdk_blob_load_ctx {
508 	struct spdk_blob 		*blob;
509 
510 	struct spdk_blob_md_page	*pages;
511 	uint32_t			num_pages;
512 
513 	spdk_bs_sequence_cpl		cb_fn;
514 	void				*cb_arg;
515 };
516 
517 static void
518 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
519 {
520 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
521 	struct spdk_blob 		*blob = ctx->blob;
522 	struct spdk_blob_md_page	*page;
523 	int				rc;
524 
525 	page = &ctx->pages[ctx->num_pages - 1];
526 
527 	if (page->next != SPDK_INVALID_MD_PAGE) {
528 		uint32_t next_page = page->next;
529 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
530 
531 
532 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
533 
534 		/* Read the next page */
535 		ctx->num_pages++;
536 		ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
537 					  sizeof(*page), NULL);
538 		if (ctx->pages == NULL) {
539 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
540 			free(ctx);
541 			return;
542 		}
543 
544 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
545 				      next_lba,
546 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
547 				      _spdk_blob_load_cpl, ctx);
548 		return;
549 	}
550 
551 	/* Parse the pages */
552 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
553 
554 	_spdk_blob_mark_clean(blob);
555 
556 	ctx->cb_fn(seq, ctx->cb_arg, rc);
557 
558 	/* Free the memory */
559 	spdk_free(ctx->pages);
560 	free(ctx);
561 }
562 
563 /* Load a blob from disk given a blobid */
564 static void
565 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
566 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
567 {
568 	struct spdk_blob_load_ctx *ctx;
569 	struct spdk_blob_store *bs;
570 	uint32_t page_num;
571 	uint64_t lba;
572 
573 	assert(blob != NULL);
574 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
575 	       blob->state == SPDK_BLOB_STATE_DIRTY);
576 
577 	bs = blob->bs;
578 
579 	ctx = calloc(1, sizeof(*ctx));
580 	if (!ctx) {
581 		cb_fn(seq, cb_arg, -ENOMEM);
582 		return;
583 	}
584 
585 	ctx->blob = blob;
586 	ctx->pages = spdk_realloc(ctx->pages, sizeof(struct spdk_blob_md_page),
587 				  sizeof(struct spdk_blob_md_page), NULL);
588 	if (!ctx->pages) {
589 		free(ctx);
590 		cb_fn(seq, cb_arg, -ENOMEM);
591 		return;
592 	}
593 	ctx->num_pages = 1;
594 	ctx->cb_fn = cb_fn;
595 	ctx->cb_arg = cb_arg;
596 
597 	page_num = _spdk_bs_blobid_to_page(blob->id);
598 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
599 
600 	blob->state = SPDK_BLOB_STATE_LOADING;
601 
602 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
603 			      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)),
604 			      _spdk_blob_load_cpl, ctx);
605 }
606 
607 struct spdk_blob_persist_ctx {
608 	struct spdk_blob 		*blob;
609 
610 	struct spdk_blob_md_page	*pages;
611 
612 	uint64_t			idx;
613 
614 	spdk_bs_sequence_cpl		cb_fn;
615 	void				*cb_arg;
616 };
617 
618 static void
619 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
620 {
621 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
622 	struct spdk_blob 		*blob = ctx->blob;
623 
624 	if (bserrno == 0) {
625 		_spdk_blob_mark_clean(blob);
626 	}
627 
628 	/* Call user callback */
629 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
630 
631 	/* Free the memory */
632 	spdk_free(ctx->pages);
633 	free(ctx);
634 }
635 
636 static void
637 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
638 {
639 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
640 	struct spdk_blob 		*blob = ctx->blob;
641 	struct spdk_blob_store		*bs = blob->bs;
642 	void				*tmp;
643 	size_t				i;
644 
645 	/* Release all clusters that were truncated */
646 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
647 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
648 
649 		_spdk_bs_release_cluster(bs, cluster_num);
650 	}
651 
652 	if (blob->active.num_clusters == 0) {
653 		free(blob->active.clusters);
654 		blob->active.clusters = NULL;
655 		blob->active.cluster_array_size = 0;
656 	} else {
657 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
658 		assert(tmp != NULL);
659 		blob->active.clusters = tmp;
660 		blob->active.cluster_array_size = blob->active.num_clusters;
661 	}
662 
663 	_spdk_blob_persist_complete(seq, ctx, bserrno);
664 }
665 
666 static void
667 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
668 {
669 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
670 	struct spdk_blob 		*blob = ctx->blob;
671 	struct spdk_blob_store		*bs = blob->bs;
672 	spdk_bs_batch_t			*batch;
673 	size_t				i;
674 
675 	/* Clusters don't move around in blobs. The list shrinks or grows
676 	 * at the end, but no changes ever occur in the middle of the list.
677 	 */
678 
679 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
680 
681 	/* Unmap all clusters that were truncated */
682 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
683 		uint64_t lba = blob->active.clusters[i];
684 		uint32_t lba_count = _spdk_bs_cluster_to_lba(bs, 1);
685 
686 		spdk_bs_batch_unmap(batch, lba, lba_count);
687 	}
688 
689 	spdk_bs_batch_close(batch);
690 }
691 
692 static void
693 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
694 {
695 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
696 	struct spdk_blob 		*blob = ctx->blob;
697 	struct spdk_blob_store		*bs = blob->bs;
698 	size_t				i;
699 
700 	/* This loop starts at 1 because the first page is special and handled
701 	 * below. The pages (except the first) are never written in place,
702 	 * so any pages in the clean list must be unmapped.
703 	 */
704 	for (i = 1; i < blob->clean.num_pages; i++) {
705 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
706 	}
707 
708 	if (blob->active.num_pages == 0) {
709 		uint32_t page_num;
710 
711 		page_num = _spdk_bs_blobid_to_page(blob->id);
712 		spdk_bit_array_clear(bs->used_md_pages, page_num);
713 	}
714 
715 	/* Move on to unmapping clusters */
716 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
717 }
718 
719 static void
720 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
721 {
722 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
723 	struct spdk_blob 		*blob = ctx->blob;
724 	struct spdk_blob_store		*bs = blob->bs;
725 	uint64_t			lba;
726 	uint32_t			lba_count;
727 	spdk_bs_batch_t			*batch;
728 	size_t				i;
729 
730 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx);
731 
732 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page));
733 
734 	/* This loop starts at 1 because the first page is special and handled
735 	 * below. The pages (except the first) are never written in place,
736 	 * so any pages in the clean list must be unmapped.
737 	 */
738 	for (i = 1; i < blob->clean.num_pages; i++) {
739 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
740 
741 		spdk_bs_batch_unmap(batch, lba, lba_count);
742 	}
743 
744 	/* The first page will only be unmapped if this is a delete. */
745 	if (blob->active.num_pages == 0) {
746 		uint32_t page_num;
747 
748 		/* The first page in the metadata goes where the blobid indicates */
749 		page_num = _spdk_bs_blobid_to_page(blob->id);
750 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
751 
752 		spdk_bs_batch_unmap(batch, lba, lba_count);
753 	}
754 
755 	spdk_bs_batch_close(batch);
756 }
757 
758 static void
759 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
760 {
761 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
762 	struct spdk_blob		*blob = ctx->blob;
763 	struct spdk_blob_store		*bs = blob->bs;
764 	uint64_t			lba;
765 	uint32_t			lba_count;
766 	struct spdk_blob_md_page	*page;
767 
768 	if (blob->active.num_pages == 0) {
769 		/* Move on to the next step */
770 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
771 		return;
772 	}
773 
774 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
775 
776 	page = &ctx->pages[0];
777 	/* The first page in the metadata goes where the blobid indicates */
778 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
779 
780 	spdk_bs_sequence_write(seq, page, lba, lba_count,
781 			       _spdk_blob_persist_unmap_pages, ctx);
782 }
783 
784 static void
785 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
786 {
787 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
788 	struct spdk_blob 		*blob = ctx->blob;
789 	struct spdk_blob_store		*bs = blob->bs;
790 	uint64_t 			lba;
791 	uint32_t			lba_count;
792 	struct spdk_blob_md_page	*page;
793 	spdk_bs_batch_t			*batch;
794 	size_t				i;
795 
796 	/* Clusters don't move around in blobs. The list shrinks or grows
797 	 * at the end, but no changes ever occur in the middle of the list.
798 	 */
799 
800 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
801 
802 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
803 
804 	/* This starts at 1. The root page is not written until
805 	 * all of the others are finished
806 	 */
807 	for (i = 1; i < blob->active.num_pages; i++) {
808 		page = &ctx->pages[i];
809 		assert(page->sequence_num == i);
810 
811 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
812 
813 		spdk_bs_batch_write(batch, page, lba, lba_count);
814 	}
815 
816 	spdk_bs_batch_close(batch);
817 }
818 
819 static int
820 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz)
821 {
822 	uint64_t	i;
823 	uint64_t	*tmp;
824 	uint64_t	lfc; /* lowest free cluster */
825 	struct spdk_blob_store *bs;
826 
827 	bs = blob->bs;
828 
829 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
830 	       blob->state != SPDK_BLOB_STATE_SYNCING);
831 
832 	if (blob->active.num_clusters == sz) {
833 		return 0;
834 	}
835 
836 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
837 		/* If this blob was resized to be larger, then smaller, then
838 		 * larger without syncing, then the cluster array already
839 		 * contains spare assigned clusters we can use.
840 		 */
841 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
842 						     sz);
843 	}
844 
845 	blob->state = SPDK_BLOB_STATE_DIRTY;
846 
847 	/* Do two passes - one to verify that we can obtain enough clusters
848 	 * and another to actually claim them.
849 	 */
850 
851 	lfc = 0;
852 	for (i = blob->active.num_clusters; i < sz; i++) {
853 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
854 		if (lfc >= bs->total_clusters) {
855 			/* No more free clusters. Cannot satisfy the request */
856 			assert(false);
857 			return -1;
858 		}
859 		lfc++;
860 	}
861 
862 	if (sz > blob->active.num_clusters) {
863 		/* Expand the cluster array if necessary.
864 		 * We only shrink the array when persisting.
865 		 */
866 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
867 		if (sz > 0 && tmp == NULL) {
868 			assert(false);
869 			return -1;
870 		}
871 		blob->active.clusters = tmp;
872 		blob->active.cluster_array_size = sz;
873 	}
874 
875 	lfc = 0;
876 	for (i = blob->active.num_clusters; i < sz; i++) {
877 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
878 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
879 		_spdk_bs_claim_cluster(bs, lfc);
880 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
881 		lfc++;
882 	}
883 
884 	blob->active.num_clusters = sz;
885 
886 	return 0;
887 }
888 
889 /* Write a blob to disk */
890 static void
891 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
892 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
893 {
894 	struct spdk_blob_persist_ctx *ctx;
895 	int rc;
896 	uint64_t i;
897 	uint32_t page_num;
898 	struct spdk_blob_store *bs;
899 
900 	assert(blob != NULL);
901 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
902 	       blob->state == SPDK_BLOB_STATE_DIRTY);
903 
904 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
905 		cb_fn(seq, cb_arg, 0);
906 		return;
907 	}
908 
909 	bs = blob->bs;
910 
911 	ctx = calloc(1, sizeof(*ctx));
912 	if (!ctx) {
913 		cb_fn(seq, cb_arg, -ENOMEM);
914 		return;
915 	}
916 	ctx->blob = blob;
917 	ctx->cb_fn = cb_fn;
918 	ctx->cb_arg = cb_arg;
919 
920 	blob->state = SPDK_BLOB_STATE_SYNCING;
921 
922 	if (blob->active.num_pages == 0) {
923 		/* This is the signal that the blob should be deleted.
924 		 * Immediately jump to the clean up routine. */
925 		assert(blob->clean.num_pages > 0);
926 		ctx->idx = blob->clean.num_pages - 1;
927 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
928 		return;
929 
930 	}
931 
932 	/* Generate the new metadata */
933 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
934 	if (rc < 0) {
935 		free(ctx);
936 		cb_fn(seq, cb_arg, rc);
937 		return;
938 	}
939 
940 	assert(blob->active.num_pages >= 1);
941 
942 	/* Resize the cache of page indices */
943 	blob->active.pages = realloc(blob->active.pages,
944 				     blob->active.num_pages * sizeof(*blob->active.pages));
945 	if (!blob->active.pages) {
946 		free(ctx);
947 		cb_fn(seq, cb_arg, -ENOMEM);
948 		return;
949 	}
950 
951 	/* Assign this metadata to pages. This requires two passes -
952 	 * one to verify that there are enough pages and a second
953 	 * to actually claim them. */
954 	page_num = 0;
955 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
956 	for (i = 1; i < blob->active.num_pages; i++) {
957 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
958 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
959 			spdk_free(ctx->pages);
960 			free(ctx);
961 			blob->state = SPDK_BLOB_STATE_DIRTY;
962 			cb_fn(seq, cb_arg, -ENOMEM);
963 			return;
964 		}
965 		page_num++;
966 	}
967 
968 	page_num = 0;
969 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
970 	for (i = 1; i < blob->active.num_pages; i++) {
971 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
972 		ctx->pages[i - 1].next = page_num;
973 		blob->active.pages[i] = page_num;
974 		spdk_bit_array_set(bs->used_md_pages, page_num);
975 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
976 		page_num++;
977 	}
978 
979 	/* Start writing the metadata from last page to first */
980 	ctx->idx = blob->active.num_pages - 1;
981 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
982 }
983 
984 static void
985 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel,
986 			     void *payload, uint64_t offset, uint64_t length,
987 			     spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
988 {
989 	spdk_bs_batch_t			*batch;
990 	struct spdk_bs_cpl		cpl;
991 	uint64_t			lba;
992 	uint32_t			lba_count;
993 	uint8_t				*buf;
994 	uint64_t			page;
995 
996 	assert(blob != NULL);
997 
998 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
999 		cb_fn(cb_arg, -EINVAL);
1000 		return;
1001 	}
1002 
1003 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1004 	cpl.u.blob_basic.cb_fn = cb_fn;
1005 	cpl.u.blob_basic.cb_arg = cb_arg;
1006 
1007 	batch = spdk_bs_batch_open(_channel, &cpl);
1008 	if (!batch) {
1009 		cb_fn(cb_arg, -ENOMEM);
1010 		return;
1011 	}
1012 
1013 	length = _spdk_bs_page_to_lba(blob->bs, length);
1014 	page = offset;
1015 	buf = payload;
1016 	while (length > 0) {
1017 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1018 		lba_count = spdk_min(length,
1019 				     _spdk_bs_page_to_lba(blob->bs,
1020 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1021 
1022 		if (read) {
1023 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1024 		} else {
1025 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1026 		}
1027 
1028 		length -= lba_count;
1029 		buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1030 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1031 	}
1032 
1033 	spdk_bs_batch_close(batch);
1034 }
1035 
1036 static struct spdk_blob *
1037 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1038 {
1039 	struct spdk_blob *blob;
1040 
1041 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1042 		if (blob->id == blobid) {
1043 			return blob;
1044 		}
1045 	}
1046 
1047 	return NULL;
1048 }
1049 
1050 static int
1051 _spdk_bs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx)
1052 {
1053 	struct spdk_blob_store		*bs = io_device;
1054 	struct spdk_bs_dev		*dev = bs->dev;
1055 	struct spdk_bs_channel	*channel = ctx_buf;
1056 	uint32_t			max_ops = *(uint32_t *)unique_ctx;
1057 	uint32_t			i;
1058 
1059 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1060 	if (!channel->req_mem) {
1061 		free(channel);
1062 		return -1;
1063 	}
1064 
1065 	TAILQ_INIT(&channel->reqs);
1066 
1067 	for (i = 0; i < max_ops; i++) {
1068 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1069 	}
1070 
1071 	channel->bs = bs;
1072 	channel->dev = dev;
1073 	channel->dev_channel = dev->create_channel(dev);
1074 
1075 	return 0;
1076 }
1077 
1078 static void
1079 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1080 {
1081 	struct spdk_bs_channel *channel = ctx_buf;
1082 
1083 	free(channel->req_mem);
1084 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1085 }
1086 
1087 static void
1088 _spdk_bs_free(struct spdk_blob_store *bs)
1089 {
1090 	struct spdk_blob	*blob, *blob_tmp;
1091 
1092 	spdk_bs_unregister_md_thread(bs);
1093 	spdk_io_device_unregister(bs);
1094 
1095 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1096 		TAILQ_REMOVE(&bs->blobs, blob, link);
1097 		_spdk_blob_free(blob);
1098 	}
1099 
1100 	spdk_bit_array_free(&bs->used_md_pages);
1101 	spdk_bit_array_free(&bs->used_clusters);
1102 
1103 	bs->dev->destroy(bs->dev);
1104 	free(bs);
1105 }
1106 
1107 void
1108 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1109 {
1110 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1111 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1112 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1113 }
1114 
1115 static struct spdk_blob_store *
1116 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1117 {
1118 	struct spdk_blob_store	*bs;
1119 
1120 	bs = calloc(1, sizeof(struct spdk_blob_store));
1121 	if (!bs) {
1122 		return NULL;
1123 	}
1124 
1125 	TAILQ_INIT(&bs->blobs);
1126 	bs->dev = dev;
1127 
1128 	/*
1129 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1130 	 *  even multiple of the cluster size.
1131 	 */
1132 	bs->cluster_sz = opts->cluster_sz;
1133 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1134 	bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1135 	bs->num_free_clusters = bs->total_clusters;
1136 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1137 	if (bs->used_clusters == NULL) {
1138 		_spdk_bs_free(bs);
1139 		return NULL;
1140 	}
1141 
1142 	bs->max_md_ops = opts->max_md_ops;
1143 	bs->super_blob = SPDK_BLOBID_INVALID;
1144 
1145 	/* The metadata is assumed to be at least 1 page */
1146 	bs->used_md_pages = spdk_bit_array_create(1);
1147 
1148 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1149 				sizeof(struct spdk_bs_channel));
1150 	spdk_bs_register_md_thread(bs);
1151 
1152 	return bs;
1153 }
1154 
1155 /* START spdk_bs_load */
1156 
1157 struct spdk_bs_load_ctx {
1158 	struct spdk_blob_store		*bs;
1159 	struct spdk_bs_super_block	*super;
1160 
1161 	struct spdk_bs_md_mask		*mask;
1162 };
1163 
1164 static void
1165 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1166 {
1167 	struct spdk_bs_load_ctx *ctx = cb_arg;
1168 	uint32_t		i, j;
1169 	int			rc;
1170 
1171 	/* The type must be correct */
1172 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1173 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1174 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1175 					     struct spdk_blob_md_page) * 8));
1176 	/* The length of the mask must be exactly equal to the total number of clusters*/
1177 	assert(ctx->mask->length == ctx->bs->total_clusters);
1178 
1179 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1180 	if (rc < 0) {
1181 		spdk_free(ctx->super);
1182 		spdk_free(ctx->mask);
1183 		_spdk_bs_free(ctx->bs);
1184 		free(ctx);
1185 		spdk_bs_sequence_finish(seq, -ENOMEM);
1186 		return;
1187 	}
1188 
1189 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1190 	for (i = 0; i < ctx->mask->length / 8; i++) {
1191 		uint8_t segment = ctx->mask->mask[i];
1192 		for (j = 0; segment && (j < 8); j++) {
1193 			if (segment & 1U) {
1194 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1195 				assert(ctx->bs->num_free_clusters > 0);
1196 				ctx->bs->num_free_clusters--;
1197 			}
1198 			segment >>= 1U;
1199 		}
1200 	}
1201 
1202 	spdk_free(ctx->super);
1203 	spdk_free(ctx->mask);
1204 	free(ctx);
1205 
1206 	spdk_bs_sequence_finish(seq, bserrno);
1207 }
1208 
1209 static void
1210 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1211 {
1212 	struct spdk_bs_load_ctx *ctx = cb_arg;
1213 	uint64_t		lba, lba_count;
1214 	uint32_t		i, j;
1215 	int			rc;
1216 
1217 	/* The type must be correct */
1218 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1219 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1220 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) *
1221 				     8));
1222 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1223 	assert(ctx->mask->length == ctx->super->md_len);
1224 
1225 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1226 	if (rc < 0) {
1227 		spdk_free(ctx->super);
1228 		spdk_free(ctx->mask);
1229 		_spdk_bs_free(ctx->bs);
1230 		free(ctx);
1231 		spdk_bs_sequence_finish(seq, -ENOMEM);
1232 		return;
1233 	}
1234 
1235 	for (i = 0; i < ctx->mask->length / 8; i++) {
1236 		uint8_t segment = ctx->mask->mask[i];
1237 		for (j = 0; segment && (j < 8); j++) {
1238 			if (segment & 1U) {
1239 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1240 			}
1241 			segment >>= 1U;
1242 		}
1243 	}
1244 	spdk_free(ctx->mask);
1245 
1246 	/* Read the used clusters mask */
1247 	ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1248 				 0x1000, NULL);
1249 	if (!ctx->mask) {
1250 		spdk_free(ctx->super);
1251 		_spdk_bs_free(ctx->bs);
1252 		free(ctx);
1253 		spdk_bs_sequence_finish(seq, -ENOMEM);
1254 		return;
1255 	}
1256 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1257 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1258 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1259 			      _spdk_bs_load_used_clusters_cpl, ctx);
1260 }
1261 
1262 static void
1263 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1264 {
1265 	struct spdk_bs_load_ctx *ctx = cb_arg;
1266 	uint64_t		lba, lba_count;
1267 
1268 	if (ctx->super->version != SPDK_BS_VERSION) {
1269 		spdk_free(ctx->super);
1270 		_spdk_bs_free(ctx->bs);
1271 		free(ctx);
1272 		spdk_bs_sequence_finish(seq, -EILSEQ);
1273 		return;
1274 	}
1275 
1276 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1277 		   sizeof(ctx->super->signature)) != 0) {
1278 		spdk_free(ctx->super);
1279 		_spdk_bs_free(ctx->bs);
1280 		free(ctx);
1281 		spdk_bs_sequence_finish(seq, -EILSEQ);
1282 		return;
1283 	}
1284 
1285 	if (ctx->super->clean != 1) {
1286 		/* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED.
1287 		 * All of the necessary data to recover is available
1288 		 * on disk - the code just has not been written yet.
1289 		 */
1290 		assert(false);
1291 		spdk_free(ctx->super);
1292 		_spdk_bs_free(ctx->bs);
1293 		free(ctx);
1294 		spdk_bs_sequence_finish(seq, -EILSEQ);
1295 		return;
1296 	}
1297 	ctx->super->clean = 0;
1298 
1299 	/* Parse the super block */
1300 	ctx->bs->cluster_sz = ctx->super->cluster_size;
1301 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
1302 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1303 	ctx->bs->md_start = ctx->super->md_start;
1304 	ctx->bs->md_len = ctx->super->md_len;
1305 
1306 	/* Read the used pages mask */
1307 	ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 0x1000,
1308 				 NULL);
1309 	if (!ctx->mask) {
1310 		spdk_free(ctx->super);
1311 		_spdk_bs_free(ctx->bs);
1312 		free(ctx);
1313 		spdk_bs_sequence_finish(seq, -ENOMEM);
1314 		return;
1315 	}
1316 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1317 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1318 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1319 			      _spdk_bs_load_used_pages_cpl, ctx);
1320 }
1321 
1322 void
1323 spdk_bs_load(struct spdk_bs_dev *dev,
1324 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1325 {
1326 	struct spdk_blob_store	*bs;
1327 	struct spdk_bs_cpl	cpl;
1328 	spdk_bs_sequence_t	*seq;
1329 	struct spdk_bs_load_ctx *ctx;
1330 	struct spdk_bs_opts	opts = {};
1331 
1332 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev);
1333 
1334 	spdk_bs_opts_init(&opts);
1335 
1336 	bs = _spdk_bs_alloc(dev, &opts);
1337 	if (!bs) {
1338 		cb_fn(cb_arg, NULL, -ENOMEM);
1339 		return;
1340 	}
1341 
1342 	ctx = calloc(1, sizeof(*ctx));
1343 	if (!ctx) {
1344 		_spdk_bs_free(bs);
1345 		cb_fn(cb_arg, NULL, -ENOMEM);
1346 		return;
1347 	}
1348 
1349 	ctx->bs = bs;
1350 
1351 	/* Allocate memory for the super block */
1352 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1353 	if (!ctx->super) {
1354 		free(ctx);
1355 		_spdk_bs_free(bs);
1356 		return;
1357 	}
1358 
1359 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1360 	cpl.u.bs_handle.cb_fn = cb_fn;
1361 	cpl.u.bs_handle.cb_arg = cb_arg;
1362 	cpl.u.bs_handle.bs = bs;
1363 
1364 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1365 	if (!seq) {
1366 		spdk_free(ctx->super);
1367 		free(ctx);
1368 		_spdk_bs_free(bs);
1369 		cb_fn(cb_arg, NULL, -ENOMEM);
1370 		return;
1371 	}
1372 
1373 	/* Read the super block */
1374 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1375 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1376 			      _spdk_bs_load_super_cpl, ctx);
1377 }
1378 
1379 /* END spdk_bs_load */
1380 
1381 /* START spdk_bs_init */
1382 
1383 struct spdk_bs_init_ctx {
1384 	struct spdk_blob_store		*bs;
1385 	struct spdk_bs_super_block	*super;
1386 };
1387 
1388 static void
1389 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1390 {
1391 	struct spdk_bs_init_ctx *ctx = cb_arg;
1392 
1393 	spdk_free(ctx->super);
1394 	free(ctx);
1395 
1396 	spdk_bs_sequence_finish(seq, bserrno);
1397 }
1398 
1399 static void
1400 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1401 {
1402 	struct spdk_bs_init_ctx *ctx = cb_arg;
1403 
1404 	/* Write super block */
1405 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1406 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1407 			       _spdk_bs_init_persist_super_cpl, ctx);
1408 }
1409 
1410 void
1411 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
1412 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1413 {
1414 	struct spdk_bs_init_ctx *ctx;
1415 	struct spdk_blob_store	*bs;
1416 	struct spdk_bs_cpl	cpl;
1417 	spdk_bs_sequence_t	*seq;
1418 	uint64_t		num_md_pages;
1419 	uint32_t		i;
1420 	struct spdk_bs_opts	opts = {};
1421 	int			rc;
1422 
1423 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev);
1424 
1425 	if (o) {
1426 		opts = *o;
1427 	} else {
1428 		spdk_bs_opts_init(&opts);
1429 	}
1430 
1431 	bs = _spdk_bs_alloc(dev, &opts);
1432 	if (!bs) {
1433 		cb_fn(cb_arg, NULL, -ENOMEM);
1434 		return;
1435 	}
1436 
1437 	if (opts.num_md_pages == UINT32_MAX) {
1438 		/* By default, allocate 1 page per cluster.
1439 		 * Technically, this over-allocates metadata
1440 		 * because more metadata will reduce the number
1441 		 * of usable clusters. This can be addressed with
1442 		 * more complex math in the future.
1443 		 */
1444 		bs->md_len = bs->total_clusters;
1445 	} else {
1446 		bs->md_len = opts.num_md_pages;
1447 	}
1448 
1449 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
1450 	if (rc < 0) {
1451 		_spdk_bs_free(bs);
1452 		cb_fn(cb_arg, NULL, -ENOMEM);
1453 		return;
1454 	}
1455 
1456 	ctx = calloc(1, sizeof(*ctx));
1457 	if (!ctx) {
1458 		_spdk_bs_free(bs);
1459 		cb_fn(cb_arg, NULL, -ENOMEM);
1460 		return;
1461 	}
1462 
1463 	ctx->bs = bs;
1464 
1465 	/* Allocate memory for the super block */
1466 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1467 	if (!ctx->super) {
1468 		free(ctx);
1469 		_spdk_bs_free(bs);
1470 		return;
1471 	}
1472 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1473 	       sizeof(ctx->super->signature));
1474 	ctx->super->version = SPDK_BS_VERSION;
1475 	ctx->super->length = sizeof(*ctx->super);
1476 	ctx->super->super_blob = bs->super_blob;
1477 	ctx->super->clean = 0;
1478 	ctx->super->cluster_size = bs->cluster_sz;
1479 
1480 	/* Calculate how many pages the metadata consumes at the front
1481 	 * of the disk.
1482 	 */
1483 
1484 	/* The super block uses 1 page */
1485 	num_md_pages = 1;
1486 
1487 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
1488 	 * up to the nearest page, plus a header.
1489 	 */
1490 	ctx->super->used_page_mask_start = num_md_pages;
1491 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1492 					 divide_round_up(bs->md_len, 8),
1493 					 sizeof(struct spdk_blob_md_page));
1494 	num_md_pages += ctx->super->used_page_mask_len;
1495 
1496 	/* The used_clusters mask requires 1 bit per cluster, rounded
1497 	 * up to the nearest page, plus a header.
1498 	 */
1499 	ctx->super->used_cluster_mask_start = num_md_pages;
1500 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1501 					    divide_round_up(bs->total_clusters, 8),
1502 					    sizeof(struct spdk_blob_md_page));
1503 	num_md_pages += ctx->super->used_cluster_mask_len;
1504 
1505 	/* The metadata region size was chosen above */
1506 	ctx->super->md_start = bs->md_start = num_md_pages;
1507 	ctx->super->md_len = bs->md_len;
1508 	num_md_pages += bs->md_len;
1509 
1510 	/* Claim all of the clusters used by the metadata */
1511 	for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) {
1512 		_spdk_bs_claim_cluster(bs, i);
1513 	}
1514 
1515 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1516 	cpl.u.bs_handle.cb_fn = cb_fn;
1517 	cpl.u.bs_handle.cb_arg = cb_arg;
1518 	cpl.u.bs_handle.bs = bs;
1519 
1520 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1521 	if (!seq) {
1522 		spdk_free(ctx->super);
1523 		free(ctx);
1524 		_spdk_bs_free(bs);
1525 		cb_fn(cb_arg, NULL, -ENOMEM);
1526 		return;
1527 	}
1528 
1529 	/* TRIM the entire device */
1530 	spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx);
1531 }
1532 
1533 /* END spdk_bs_init */
1534 
1535 /* START spdk_bs_unload */
1536 
1537 struct spdk_bs_unload_ctx {
1538 	struct spdk_blob_store		*bs;
1539 	struct spdk_bs_super_block	*super;
1540 
1541 	struct spdk_bs_md_mask		*mask;
1542 };
1543 
1544 static void
1545 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1546 {
1547 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1548 
1549 	spdk_free(ctx->super);
1550 
1551 	spdk_bs_sequence_finish(seq, bserrno);
1552 
1553 	_spdk_bs_free(ctx->bs);
1554 	free(ctx);
1555 }
1556 
1557 static void
1558 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1559 {
1560 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1561 
1562 	spdk_free(ctx->mask);
1563 
1564 	/* Update the values in the super block */
1565 	ctx->super->super_blob = ctx->bs->super_blob;
1566 	ctx->super->clean = 1;
1567 
1568 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1569 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1570 			       _spdk_bs_unload_write_super_cpl, ctx);
1571 }
1572 
1573 static void
1574 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1575 {
1576 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1577 	uint32_t			i;
1578 	uint64_t			lba, lba_count;
1579 
1580 	spdk_free(ctx->mask);
1581 
1582 	/* Write out the used clusters mask */
1583 	ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1584 				 0x1000, NULL);
1585 	if (!ctx->mask) {
1586 		spdk_free(ctx->super);
1587 		free(ctx);
1588 		spdk_bs_sequence_finish(seq, -ENOMEM);
1589 		return;
1590 	}
1591 
1592 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1593 	ctx->mask->length = ctx->bs->total_clusters;
1594 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1595 
1596 	i = 0;
1597 	while (true) {
1598 		i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i);
1599 		if (i > ctx->mask->length) {
1600 			break;
1601 		}
1602 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1603 		i++;
1604 	}
1605 
1606 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1607 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1608 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1609 			       _spdk_bs_unload_write_used_clusters_cpl, ctx);
1610 }
1611 
1612 static void
1613 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1614 {
1615 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1616 	uint32_t			i;
1617 	uint64_t			lba, lba_count;
1618 
1619 	/* Write out the used page mask */
1620 	ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page),
1621 				 0x1000, NULL);
1622 	if (!ctx->mask) {
1623 		spdk_free(ctx->super);
1624 		free(ctx);
1625 		spdk_bs_sequence_finish(seq, -ENOMEM);
1626 		return;
1627 	}
1628 
1629 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1630 	ctx->mask->length = ctx->super->md_len;
1631 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1632 
1633 	i = 0;
1634 	while (true) {
1635 		i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i);
1636 		if (i > ctx->mask->length) {
1637 			break;
1638 		}
1639 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1640 		i++;
1641 	}
1642 
1643 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1644 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1645 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1646 			       _spdk_bs_unload_write_used_pages_cpl, ctx);
1647 }
1648 
1649 void
1650 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
1651 {
1652 	struct spdk_bs_cpl	cpl;
1653 	spdk_bs_sequence_t	*seq;
1654 	struct spdk_bs_unload_ctx *ctx;
1655 
1656 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n");
1657 
1658 	ctx = calloc(1, sizeof(*ctx));
1659 	if (!ctx) {
1660 		cb_fn(cb_arg, -ENOMEM);
1661 		return;
1662 	}
1663 
1664 	ctx->bs = bs;
1665 
1666 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1667 	if (!ctx->super) {
1668 		free(ctx);
1669 		cb_fn(cb_arg, -ENOMEM);
1670 		return;
1671 	}
1672 
1673 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
1674 	cpl.u.bs_basic.cb_fn = cb_fn;
1675 	cpl.u.bs_basic.cb_arg = cb_arg;
1676 
1677 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1678 	if (!seq) {
1679 		spdk_free(ctx->super);
1680 		free(ctx);
1681 		cb_fn(cb_arg, -ENOMEM);
1682 		return;
1683 	}
1684 
1685 	assert(TAILQ_EMPTY(&bs->blobs));
1686 
1687 	/* Read super block */
1688 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1689 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1690 			      _spdk_bs_unload_read_super_cpl, ctx);
1691 }
1692 
1693 /* END spdk_bs_unload */
1694 
1695 void
1696 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
1697 		  spdk_bs_op_complete cb_fn, void *cb_arg)
1698 {
1699 	bs->super_blob = blobid;
1700 	cb_fn(cb_arg, 0);
1701 }
1702 
1703 void
1704 spdk_bs_get_super(struct spdk_blob_store *bs,
1705 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1706 {
1707 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
1708 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
1709 	} else {
1710 		cb_fn(cb_arg, bs->super_blob, 0);
1711 	}
1712 }
1713 
1714 uint64_t
1715 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
1716 {
1717 	return bs->cluster_sz;
1718 }
1719 
1720 uint64_t
1721 spdk_bs_get_page_size(struct spdk_blob_store *bs)
1722 {
1723 	return sizeof(struct spdk_blob_md_page);
1724 }
1725 
1726 uint64_t
1727 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
1728 {
1729 	return bs->num_free_clusters;
1730 }
1731 
1732 int spdk_bs_register_md_thread(struct spdk_blob_store *bs)
1733 {
1734 	bs->md_channel = spdk_get_io_channel(bs, SPDK_IO_PRIORITY_DEFAULT, true,
1735 					     (void *)&bs->max_md_ops);
1736 
1737 	return 0;
1738 }
1739 
1740 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
1741 {
1742 	spdk_put_io_channel(bs->md_channel);
1743 
1744 	return 0;
1745 }
1746 
1747 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
1748 {
1749 	assert(blob != NULL);
1750 
1751 	return blob->id;
1752 }
1753 
1754 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
1755 {
1756 	assert(blob != NULL);
1757 
1758 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
1759 }
1760 
1761 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
1762 {
1763 	assert(blob != NULL);
1764 
1765 	return blob->active.num_clusters;
1766 }
1767 
1768 /* START spdk_bs_md_create_blob */
1769 
1770 static void
1771 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1772 {
1773 	struct spdk_blob *blob = cb_arg;
1774 
1775 	_spdk_blob_free(blob);
1776 
1777 	spdk_bs_sequence_finish(seq, bserrno);
1778 }
1779 
1780 void spdk_bs_md_create_blob(struct spdk_blob_store *bs,
1781 			    spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1782 {
1783 	struct spdk_blob	*blob;
1784 	uint32_t		page_idx;
1785 	struct spdk_bs_cpl 	cpl;
1786 	spdk_bs_sequence_t	*seq;
1787 	spdk_blob_id		id;
1788 
1789 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
1790 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
1791 		cb_fn(cb_arg, 0, -ENOMEM);
1792 		return;
1793 	}
1794 	spdk_bit_array_set(bs->used_md_pages, page_idx);
1795 
1796 	/* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper
1797 	 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the
1798 	 * code assumes blob id == page_idx.
1799 	 */
1800 	id = (1ULL << 32) | page_idx;
1801 
1802 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
1803 
1804 	blob = _spdk_blob_alloc(bs, id);
1805 	if (!blob) {
1806 		cb_fn(cb_arg, 0, -ENOMEM);
1807 		return;
1808 	}
1809 
1810 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
1811 	cpl.u.blobid.cb_fn = cb_fn;
1812 	cpl.u.blobid.cb_arg = cb_arg;
1813 	cpl.u.blobid.blobid = blob->id;
1814 
1815 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1816 	if (!seq) {
1817 		free(blob);
1818 		cb_fn(cb_arg, 0, -ENOMEM);
1819 		return;
1820 	}
1821 
1822 	_spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob);
1823 }
1824 
1825 /* END spdk_bs_md_create_blob */
1826 
1827 /* START spdk_bs_md_resize_blob */
1828 int
1829 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz)
1830 {
1831 	int			rc;
1832 
1833 	assert(blob != NULL);
1834 
1835 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
1836 
1837 	if (sz == blob->active.num_clusters) {
1838 		return 0;
1839 	}
1840 
1841 	rc = _spdk_resize_blob(blob, sz);
1842 	if (rc < 0) {
1843 		return rc;
1844 	}
1845 
1846 	return 0;
1847 }
1848 
1849 /* END spdk_bs_md_resize_blob */
1850 
1851 
1852 /* START spdk_bs_md_delete_blob */
1853 
1854 static void
1855 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1856 {
1857 	struct spdk_blob *blob = cb_arg;
1858 
1859 	_spdk_blob_free(blob);
1860 
1861 	spdk_bs_sequence_finish(seq, bserrno);
1862 }
1863 
1864 static void
1865 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1866 {
1867 	struct spdk_blob *blob = cb_arg;
1868 
1869 	blob->state = SPDK_BLOB_STATE_DIRTY;
1870 	blob->active.num_pages = 0;
1871 	_spdk_resize_blob(blob, 0);
1872 
1873 	_spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob);
1874 }
1875 
1876 void
1877 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1878 		       spdk_blob_op_complete cb_fn, void *cb_arg)
1879 {
1880 	struct spdk_blob	*blob;
1881 	struct spdk_bs_cpl	cpl;
1882 	spdk_bs_sequence_t 	*seq;
1883 
1884 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid);
1885 
1886 	blob = _spdk_blob_lookup(bs, blobid);
1887 	if (blob) {
1888 		assert(blob->open_ref > 0);
1889 		cb_fn(cb_arg, -EINVAL);
1890 		return;
1891 	}
1892 
1893 	blob = _spdk_blob_alloc(bs, blobid);
1894 	if (!blob) {
1895 		cb_fn(cb_arg, -ENOMEM);
1896 		return;
1897 	}
1898 
1899 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1900 	cpl.u.blob_basic.cb_fn = cb_fn;
1901 	cpl.u.blob_basic.cb_arg = cb_arg;
1902 
1903 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1904 	if (!seq) {
1905 		cb_fn(cb_arg, -ENOMEM);
1906 		_spdk_blob_free(blob);
1907 		return;
1908 	}
1909 
1910 	_spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob);
1911 }
1912 
1913 /* END spdk_bs_md_delete_blob */
1914 
1915 /* START spdk_bs_md_open_blob */
1916 
1917 static void
1918 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1919 {
1920 	struct spdk_blob *blob = cb_arg;
1921 
1922 	blob->open_ref++;
1923 
1924 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
1925 
1926 	spdk_bs_sequence_finish(seq, bserrno);
1927 }
1928 
1929 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1930 			  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
1931 {
1932 	struct spdk_blob		*blob;
1933 	struct spdk_bs_cpl		cpl;
1934 	spdk_bs_sequence_t		*seq;
1935 	uint32_t			page_num;
1936 
1937 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid);
1938 
1939 	blob = _spdk_blob_lookup(bs, blobid);
1940 	if (blob) {
1941 		blob->open_ref++;
1942 		cb_fn(cb_arg, blob, 0);
1943 		return;
1944 	}
1945 
1946 	page_num = _spdk_bs_blobid_to_page(blobid);
1947 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
1948 		/* Invalid blobid */
1949 		cb_fn(cb_arg, NULL, -ENOENT);
1950 		return;
1951 	}
1952 
1953 	blob = _spdk_blob_alloc(bs, blobid);
1954 	if (!blob) {
1955 		cb_fn(cb_arg, NULL, -ENOMEM);
1956 		return;
1957 	}
1958 
1959 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
1960 	cpl.u.blob_handle.cb_fn = cb_fn;
1961 	cpl.u.blob_handle.cb_arg = cb_arg;
1962 	cpl.u.blob_handle.blob = blob;
1963 
1964 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1965 	if (!seq) {
1966 		cb_fn(cb_arg, NULL, -ENOMEM);
1967 		return;
1968 	}
1969 
1970 	_spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob);
1971 }
1972 
1973 /* START spdk_bs_md_sync_blob */
1974 static void
1975 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1976 {
1977 	spdk_bs_sequence_finish(seq, bserrno);
1978 }
1979 
1980 void spdk_bs_md_sync_blob(struct spdk_blob *blob,
1981 			  spdk_blob_op_complete cb_fn, void *cb_arg)
1982 {
1983 	struct spdk_bs_cpl	cpl;
1984 	spdk_bs_sequence_t	*seq;
1985 
1986 	assert(blob != NULL);
1987 
1988 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id);
1989 
1990 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
1991 	       blob->state != SPDK_BLOB_STATE_SYNCING);
1992 
1993 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1994 		cb_fn(cb_arg, 0);
1995 		return;
1996 	}
1997 
1998 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1999 	cpl.u.blob_basic.cb_fn = cb_fn;
2000 	cpl.u.blob_basic.cb_arg = cb_arg;
2001 
2002 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2003 	if (!seq) {
2004 		cb_fn(cb_arg, -ENOMEM);
2005 		return;
2006 	}
2007 
2008 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob);
2009 }
2010 
2011 /* END spdk_bs_md_sync_blob */
2012 
2013 /* START spdk_bs_md_close_blob */
2014 
2015 static void
2016 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2017 {
2018 	struct spdk_blob **blob = cb_arg;
2019 
2020 	if ((*blob)->open_ref == 0) {
2021 		TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link);
2022 		_spdk_blob_free((*blob));
2023 	}
2024 
2025 	*blob = NULL;
2026 
2027 	spdk_bs_sequence_finish(seq, bserrno);
2028 }
2029 
2030 void spdk_bs_md_close_blob(struct spdk_blob **b,
2031 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2032 {
2033 	struct spdk_bs_cpl	cpl;
2034 	struct spdk_blob	*blob;
2035 	spdk_bs_sequence_t	*seq;
2036 
2037 	assert(b != NULL);
2038 	blob = *b;
2039 	assert(blob != NULL);
2040 
2041 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id);
2042 
2043 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2044 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2045 
2046 	if (blob->open_ref == 0) {
2047 		cb_fn(cb_arg, -EBADF);
2048 		return;
2049 	}
2050 
2051 	blob->open_ref--;
2052 
2053 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2054 	cpl.u.blob_basic.cb_fn = cb_fn;
2055 	cpl.u.blob_basic.cb_arg = cb_arg;
2056 
2057 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2058 	if (!seq) {
2059 		cb_fn(cb_arg, -ENOMEM);
2060 		return;
2061 	}
2062 
2063 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2064 		_spdk_blob_close_cpl(seq, b, 0);
2065 		return;
2066 	}
2067 
2068 	/* Sync metadata */
2069 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b);
2070 }
2071 
2072 /* END spdk_bs_md_close_blob */
2073 
2074 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs,
2075 		uint32_t priority, uint32_t max_ops)
2076 {
2077 	return spdk_get_io_channel(bs, priority, true, (void *)&max_ops);
2078 }
2079 
2080 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2081 {
2082 	spdk_put_io_channel(channel);
2083 }
2084 
2085 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel,
2086 			      spdk_blob_op_complete cb_fn, void *cb_arg)
2087 {
2088 	/* Flush is synchronous right now */
2089 	cb_fn(cb_arg, 0);
2090 }
2091 
2092 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2093 			   void *payload, uint64_t offset, uint64_t length,
2094 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2095 {
2096 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false);
2097 }
2098 
2099 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2100 			  void *payload, uint64_t offset, uint64_t length,
2101 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2102 {
2103 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true);
2104 }
2105 
2106 struct spdk_bs_iter_ctx {
2107 	int64_t page_num;
2108 	struct spdk_blob_store *bs;
2109 
2110 	spdk_blob_op_with_handle_complete cb_fn;
2111 	void *cb_arg;
2112 };
2113 
2114 static void
2115 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
2116 {
2117 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2118 	struct spdk_blob_store *bs = ctx->bs;
2119 	spdk_blob_id id;
2120 
2121 	if (bserrno == 0) {
2122 		ctx->cb_fn(ctx->cb_arg, blob, bserrno);
2123 		free(ctx);
2124 		return;
2125 	}
2126 
2127 	ctx->page_num++;
2128 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2129 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2130 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2131 		free(ctx);
2132 		return;
2133 	}
2134 
2135 	id = (1ULL << 32) | ctx->page_num;
2136 
2137 	blob = _spdk_blob_lookup(bs, id);
2138 	if (blob) {
2139 		blob->open_ref++;
2140 		ctx->cb_fn(ctx->cb_arg, blob, 0);
2141 		free(ctx);
2142 		return;
2143 	}
2144 
2145 	spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
2146 }
2147 
2148 void
2149 spdk_bs_md_iter_first(struct spdk_blob_store *bs,
2150 		      spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2151 {
2152 	struct spdk_bs_iter_ctx *ctx;
2153 
2154 	ctx = calloc(1, sizeof(*ctx));
2155 	if (!ctx) {
2156 		cb_fn(cb_arg, NULL, -ENOMEM);
2157 		return;
2158 	}
2159 
2160 	ctx->page_num = -1;
2161 	ctx->bs = bs;
2162 	ctx->cb_fn = cb_fn;
2163 	ctx->cb_arg = cb_arg;
2164 
2165 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2166 }
2167 
2168 static void
2169 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
2170 {
2171 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2172 
2173 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2174 }
2175 
2176 void
2177 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
2178 		     spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2179 {
2180 	struct spdk_bs_iter_ctx *ctx;
2181 	struct spdk_blob	*blob;
2182 
2183 	assert(b != NULL);
2184 	blob = *b;
2185 	assert(blob != NULL);
2186 
2187 	ctx = calloc(1, sizeof(*ctx));
2188 	if (!ctx) {
2189 		cb_fn(cb_arg, NULL, -ENOMEM);
2190 		return;
2191 	}
2192 
2193 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
2194 	ctx->bs = bs;
2195 	ctx->cb_fn = cb_fn;
2196 	ctx->cb_arg = cb_arg;
2197 
2198 	/* Close the existing blob */
2199 	spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx);
2200 }
2201 
2202 int
2203 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
2204 		       uint16_t value_len)
2205 {
2206 	struct spdk_xattr 	*xattr;
2207 
2208 	assert(blob != NULL);
2209 
2210 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2211 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2212 
2213 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2214 		if (!strcmp(name, xattr->name)) {
2215 			free(xattr->value);
2216 			xattr->value_len = value_len;
2217 			xattr->value = malloc(value_len);
2218 			memcpy(xattr->value, value, value_len);
2219 
2220 			blob->state = SPDK_BLOB_STATE_DIRTY;
2221 
2222 			return 0;
2223 		}
2224 	}
2225 
2226 	xattr = calloc(1, sizeof(*xattr));
2227 	if (!xattr) {
2228 		return -1;
2229 	}
2230 	xattr->name = strdup(name);
2231 	xattr->value_len = value_len;
2232 	xattr->value = malloc(value_len);
2233 	memcpy(xattr->value, value, value_len);
2234 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
2235 
2236 	blob->state = SPDK_BLOB_STATE_DIRTY;
2237 
2238 	return 0;
2239 }
2240 
2241 int
2242 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name)
2243 {
2244 	struct spdk_xattr	*xattr;
2245 
2246 	assert(blob != NULL);
2247 
2248 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2249 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2250 
2251 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2252 		if (!strcmp(name, xattr->name)) {
2253 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
2254 			free(xattr->value);
2255 			free(xattr->name);
2256 			free(xattr);
2257 
2258 			blob->state = SPDK_BLOB_STATE_DIRTY;
2259 
2260 			return 0;
2261 		}
2262 	}
2263 
2264 	return -ENOENT;
2265 }
2266 
2267 int
2268 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name,
2269 			   const void **value, size_t *value_len)
2270 {
2271 	struct spdk_xattr	*xattr;
2272 
2273 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2274 		if (!strcmp(name, xattr->name)) {
2275 			*value = xattr->value;
2276 			*value_len = xattr->value_len;
2277 			return 0;
2278 		}
2279 	}
2280 
2281 	return -ENOENT;
2282 }
2283 
2284 struct spdk_xattr_names {
2285 	uint32_t	count;
2286 	const char	*names[0];
2287 };
2288 
2289 int
2290 spdk_bs_md_get_xattr_names(struct spdk_blob *blob,
2291 			   struct spdk_xattr_names **names)
2292 {
2293 	struct spdk_xattr	*xattr;
2294 	int			count = 0;
2295 
2296 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2297 		count++;
2298 	}
2299 
2300 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
2301 	if (*names == NULL) {
2302 		return -ENOMEM;
2303 	}
2304 
2305 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2306 		(*names)->names[(*names)->count++] = xattr->name;
2307 	}
2308 
2309 	return 0;
2310 }
2311 
2312 uint32_t
2313 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
2314 {
2315 	assert(names != NULL);
2316 
2317 	return names->count;
2318 }
2319 
2320 const char *
2321 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
2322 {
2323 	if (index >= names->count) {
2324 		return NULL;
2325 	}
2326 
2327 	return names->names[index];
2328 }
2329 
2330 void
2331 spdk_xattr_names_free(struct spdk_xattr_names *names)
2332 {
2333 	free(names);
2334 }
2335 
2336 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB);
2337