xref: /spdk/lib/blob/blobstore.c (revision 1edd9bf3e467eb7a9591aee2216eccdfb8cb4dfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <stdbool.h>
35 #include <assert.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <stdlib.h>
39 #include <string.h>
40 
41 #include "spdk/blob.h"
42 #include "spdk/env.h"
43 #include "spdk/queue.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/bit_array.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include "blobstore.h"
50 #include "request.h"
51 
52 static inline size_t
53 divide_round_up(size_t num, size_t divisor)
54 {
55 	return (num + divisor - 1) / divisor;
56 }
57 
58 static void
59 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
60 {
61 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
62 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
63 	assert(bs->num_free_clusters > 0);
64 
65 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num);
66 
67 	spdk_bit_array_set(bs->used_clusters, cluster_num);
68 	bs->num_free_clusters--;
69 }
70 
71 static void
72 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
73 {
74 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
75 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
76 	assert(bs->num_free_clusters < bs->total_clusters);
77 
78 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num);
79 
80 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
81 	bs->num_free_clusters++;
82 }
83 
84 static struct spdk_blob *
85 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
86 {
87 	struct spdk_blob *blob;
88 
89 	blob = calloc(1, sizeof(*blob));
90 	if (!blob) {
91 		return NULL;
92 	}
93 
94 	blob->id = id;
95 	blob->bs = bs;
96 
97 	blob->state = SPDK_BLOB_STATE_DIRTY;
98 	blob->active.num_pages = 1;
99 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
100 	if (!blob->active.pages) {
101 		free(blob);
102 		return NULL;
103 	}
104 
105 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
106 
107 	TAILQ_INIT(&blob->xattrs);
108 
109 	return blob;
110 }
111 
112 static void
113 _spdk_blob_free(struct spdk_blob *blob)
114 {
115 	struct spdk_xattr 	*xattr, *xattr_tmp;
116 
117 	assert(blob != NULL);
118 	assert(blob->state == SPDK_BLOB_STATE_CLEAN);
119 
120 	free(blob->active.clusters);
121 	free(blob->clean.clusters);
122 	free(blob->active.pages);
123 	free(blob->clean.pages);
124 
125 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
126 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
127 		free(xattr->name);
128 		free(xattr->value);
129 		free(xattr);
130 	}
131 
132 	free(blob);
133 }
134 
135 static int
136 _spdk_blob_mark_clean(struct spdk_blob *blob)
137 {
138 	uint64_t *clusters = NULL;
139 	uint32_t *pages = NULL;
140 
141 	assert(blob != NULL);
142 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
143 	       blob->state == SPDK_BLOB_STATE_SYNCING);
144 
145 	if (blob->active.num_clusters) {
146 		assert(blob->active.clusters);
147 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
148 		if (!clusters) {
149 			return -1;
150 		}
151 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
152 	}
153 
154 	if (blob->active.num_pages) {
155 		assert(blob->active.pages);
156 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
157 		if (!pages) {
158 			free(clusters);
159 			return -1;
160 		}
161 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
162 	}
163 
164 	free(blob->clean.clusters);
165 	free(blob->clean.pages);
166 
167 	blob->clean.num_clusters = blob->active.num_clusters;
168 	blob->clean.clusters = blob->active.clusters;
169 	blob->clean.num_pages = blob->active.num_pages;
170 	blob->clean.pages = blob->active.pages;
171 
172 	blob->active.clusters = clusters;
173 	blob->active.pages = pages;
174 
175 	blob->state = SPDK_BLOB_STATE_CLEAN;
176 
177 	return 0;
178 }
179 
180 static void
181 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
182 {
183 	struct spdk_blob_md_descriptor *desc;
184 	size_t	cur_desc = 0;
185 	void *tmp;
186 
187 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
188 	while (cur_desc < sizeof(page->descriptors)) {
189 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
190 			if (desc->length == 0) {
191 				/* If padding and length are 0, this terminates the page */
192 				break;
193 			}
194 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
195 			struct spdk_blob_md_descriptor_extent	*desc_extent;
196 			unsigned int				i, j;
197 			unsigned int				cluster_count = blob->active.num_clusters;
198 
199 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
200 
201 			assert(desc_extent->length > 0);
202 			assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0);
203 
204 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
205 				for (j = 0; j < desc_extent->extents[i].length; j++) {
206 					assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j));
207 					cluster_count++;
208 				}
209 			}
210 
211 			assert(cluster_count > 0);
212 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
213 			assert(tmp != NULL);
214 			blob->active.clusters = tmp;
215 			blob->active.cluster_array_size = cluster_count;
216 
217 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
218 				for (j = 0; j < desc_extent->extents[i].length; j++) {
219 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
220 							desc_extent->extents[i].cluster_idx + j);
221 				}
222 			}
223 
224 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
225 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
226 			struct spdk_xattr 			*xattr;
227 
228 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
229 
230 			xattr = calloc(1, sizeof(*xattr));
231 			assert(xattr != NULL);
232 
233 			xattr->name = malloc(desc_xattr->name_length + 1);
234 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
235 			xattr->name[desc_xattr->name_length] = '\0';
236 
237 			xattr->value = malloc(desc_xattr->value_length);
238 			assert(xattr->value != NULL);
239 			xattr->value_len = desc_xattr->value_length;
240 			memcpy(xattr->value,
241 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
242 			       desc_xattr->value_length);
243 
244 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
245 		} else {
246 			/* Error */
247 			break;
248 		}
249 
250 		/* Advance to the next descriptor */
251 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)desc + sizeof(*desc) + desc->length);
252 		cur_desc += sizeof(*desc) + desc->length;
253 	}
254 }
255 
256 static int
257 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
258 		 struct spdk_blob *blob)
259 {
260 	const struct spdk_blob_md_page *page;
261 	uint32_t i;
262 
263 	assert(page_count > 0);
264 	assert(pages[0].sequence_num == 0);
265 	assert(blob != NULL);
266 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
267 	assert(blob->active.clusters == NULL);
268 	assert(blob->id == pages[0].id);
269 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
270 
271 	for (i = 0; i < page_count; i++) {
272 		page = &pages[i];
273 
274 		assert(page->id == blob->id);
275 		assert(page->sequence_num == i);
276 
277 		_spdk_blob_parse_page(page, blob);
278 	}
279 
280 	return 0;
281 }
282 
283 static int
284 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
285 			      struct spdk_blob_md_page **pages,
286 			      uint32_t *page_count,
287 			      struct spdk_blob_md_page **last_page)
288 {
289 	struct spdk_blob_md_page *page;
290 
291 	assert(pages != NULL);
292 	assert(page_count != NULL);
293 
294 	if (*page_count == 0) {
295 		assert(*pages == NULL);
296 		*page_count = 1;
297 		*pages = spdk_zmalloc(sizeof(struct spdk_blob_md_page),
298 				      sizeof(struct spdk_blob_md_page),
299 				      NULL);
300 	} else {
301 		assert(*pages != NULL);
302 		(*page_count)++;
303 		*pages = spdk_realloc(*pages,
304 				      sizeof(struct spdk_blob_md_page) * (*page_count),
305 				      sizeof(struct spdk_blob_md_page),
306 				      NULL);
307 	}
308 
309 	if (*pages == NULL) {
310 		*page_count = 0;
311 		*last_page = NULL;
312 		return -ENOMEM;
313 	}
314 
315 	page = &(*pages)[*page_count - 1];
316 	page->id = blob->id;
317 	page->sequence_num = *page_count - 1;
318 	page->next = SPDK_INVALID_MD_PAGE;
319 	*last_page = page;
320 
321 	return 0;
322 }
323 
324 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
325  * Update required_sz on both success and failure.
326  *
327  */
328 static int
329 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
330 			   uint8_t *buf, size_t buf_sz,
331 			   size_t *required_sz)
332 {
333 	struct spdk_blob_md_descriptor_xattr	*desc;
334 
335 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
336 		       strlen(xattr->name) +
337 		       xattr->value_len;
338 
339 	if (buf_sz < *required_sz) {
340 		return -1;
341 	}
342 
343 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
344 
345 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
346 	desc->length = sizeof(desc->name_length) +
347 		       sizeof(desc->value_length) +
348 		       strlen(xattr->name) +
349 		       xattr->value_len;
350 	desc->name_length = strlen(xattr->name);
351 	desc->value_length = xattr->value_len;
352 
353 	memcpy(desc->name, xattr->name, desc->name_length);
354 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
355 	       xattr->value,
356 	       desc->value_length);
357 
358 	return 0;
359 }
360 
361 static void
362 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
363 			    uint64_t start_cluster, uint64_t *next_cluster,
364 			    uint8_t *buf, size_t buf_sz)
365 {
366 	struct spdk_blob_md_descriptor_extent *desc;
367 	size_t cur_sz;
368 	uint64_t i, extent_idx;
369 	uint32_t lba, lba_per_cluster, lba_count;
370 
371 	/* The buffer must have room for at least one extent */
372 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
373 	if (buf_sz < cur_sz) {
374 		*next_cluster = start_cluster;
375 		return;
376 	}
377 
378 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
379 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
380 
381 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
382 
383 	lba = blob->active.clusters[start_cluster];
384 	lba_count = lba_per_cluster;
385 	extent_idx = 0;
386 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
387 		if ((lba + lba_count) == blob->active.clusters[i]) {
388 			lba_count += lba_per_cluster;
389 			continue;
390 		}
391 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
392 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
393 		extent_idx++;
394 
395 		cur_sz += sizeof(desc->extents[extent_idx]);
396 
397 		if (buf_sz < cur_sz) {
398 			/* If we ran out of buffer space, return */
399 			desc->length = sizeof(desc->extents[0]) * extent_idx;
400 			*next_cluster = i;
401 			return;
402 		}
403 
404 		lba = blob->active.clusters[i];
405 		lba_count = lba_per_cluster;
406 	}
407 
408 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
409 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
410 	extent_idx++;
411 
412 	desc->length = sizeof(desc->extents[0]) * extent_idx;
413 	*next_cluster = blob->active.num_clusters;
414 
415 	return;
416 }
417 
418 static int
419 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
420 		     uint32_t *page_count)
421 {
422 	struct spdk_blob_md_page		*cur_page;
423 	const struct spdk_xattr			*xattr;
424 	int 					rc;
425 	uint8_t					*buf;
426 	size_t					remaining_sz;
427 
428 	assert(pages != NULL);
429 	assert(page_count != NULL);
430 	assert(blob != NULL);
431 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
432 
433 	*pages = NULL;
434 	*page_count = 0;
435 
436 	/* A blob always has at least 1 page, even if it has no descriptors */
437 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
438 	if (rc < 0) {
439 		return rc;
440 	}
441 
442 	buf = (uint8_t *)cur_page->descriptors;
443 	remaining_sz = sizeof(cur_page->descriptors);
444 
445 	/* Serialize xattrs */
446 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
447 		size_t required_sz = 0;
448 		rc = _spdk_blob_serialize_xattr(xattr,
449 						buf, remaining_sz,
450 						&required_sz);
451 		if (rc < 0) {
452 			/* Need to add a new page to the chain */
453 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
454 							   &cur_page);
455 			if (rc < 0) {
456 				spdk_free(*pages);
457 				*pages = NULL;
458 				*page_count = 0;
459 				return rc;
460 			}
461 
462 			buf = (uint8_t *)cur_page->descriptors;
463 			remaining_sz = sizeof(cur_page->descriptors);
464 
465 			/* Try again */
466 			required_sz = 0;
467 			rc = _spdk_blob_serialize_xattr(xattr,
468 							buf, remaining_sz,
469 							&required_sz);
470 
471 			if (rc < 0) {
472 				spdk_free(*pages);
473 				*pages = NULL;
474 				*page_count = 0;
475 				return -1;
476 			}
477 		}
478 
479 		remaining_sz -= required_sz;
480 		buf += required_sz;
481 	}
482 
483 	/* Serialize extents */
484 	uint64_t last_cluster = 0;
485 	while (last_cluster < blob->active.num_clusters) {
486 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
487 					    buf, remaining_sz);
488 
489 		if (last_cluster == blob->active.num_clusters) {
490 			break;
491 		}
492 
493 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
494 						   &cur_page);
495 		if (rc < 0) {
496 			return rc;
497 		}
498 
499 		buf = (uint8_t *)cur_page->descriptors;
500 		remaining_sz = sizeof(cur_page->descriptors);
501 	}
502 
503 	return 0;
504 }
505 
506 struct spdk_blob_load_ctx {
507 	struct spdk_blob 		*blob;
508 
509 	struct spdk_blob_md_page	*pages;
510 	uint32_t			num_pages;
511 
512 	spdk_bs_sequence_cpl		cb_fn;
513 	void				*cb_arg;
514 };
515 
516 static void
517 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
518 {
519 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
520 	struct spdk_blob 		*blob = ctx->blob;
521 	struct spdk_blob_md_page	*page;
522 	int				rc;
523 
524 	page = &ctx->pages[ctx->num_pages - 1];
525 
526 	if (page->next != SPDK_INVALID_MD_PAGE) {
527 		uint32_t next_page = page->next;
528 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
529 
530 
531 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
532 
533 		/* Read the next page */
534 		ctx->num_pages++;
535 		ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
536 					  sizeof(*page), NULL);
537 		if (ctx->pages == NULL) {
538 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
539 			free(ctx);
540 			return;
541 		}
542 
543 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
544 				      next_lba,
545 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
546 				      _spdk_blob_load_cpl, ctx);
547 		return;
548 	}
549 
550 	/* Parse the pages */
551 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
552 
553 	_spdk_blob_mark_clean(blob);
554 
555 	ctx->cb_fn(seq, ctx->cb_arg, rc);
556 
557 	/* Free the memory */
558 	spdk_free(ctx->pages);
559 	free(ctx);
560 }
561 
562 /* Load a blob from disk given a blobid */
563 static void
564 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
565 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
566 {
567 	struct spdk_blob_load_ctx *ctx;
568 	struct spdk_blob_store *bs;
569 	uint32_t page_num;
570 	uint64_t lba;
571 
572 	assert(blob != NULL);
573 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
574 	       blob->state == SPDK_BLOB_STATE_DIRTY);
575 
576 	bs = blob->bs;
577 
578 	ctx = calloc(1, sizeof(*ctx));
579 	if (!ctx) {
580 		cb_fn(seq, cb_arg, -ENOMEM);
581 		return;
582 	}
583 
584 	ctx->blob = blob;
585 	ctx->pages = spdk_realloc(ctx->pages, sizeof(struct spdk_blob_md_page),
586 				  sizeof(struct spdk_blob_md_page), NULL);
587 	if (!ctx->pages) {
588 		free(ctx);
589 		cb_fn(seq, cb_arg, -ENOMEM);
590 		return;
591 	}
592 	ctx->num_pages = 1;
593 	ctx->cb_fn = cb_fn;
594 	ctx->cb_arg = cb_arg;
595 
596 	page_num = _spdk_bs_blobid_to_page(blob->id);
597 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
598 
599 	blob->state = SPDK_BLOB_STATE_LOADING;
600 
601 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
602 			      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)),
603 			      _spdk_blob_load_cpl, ctx);
604 }
605 
606 struct spdk_blob_persist_ctx {
607 	struct spdk_blob 		*blob;
608 
609 	struct spdk_blob_md_page	*pages;
610 
611 	uint64_t			idx;
612 
613 	spdk_bs_sequence_cpl		cb_fn;
614 	void				*cb_arg;
615 };
616 
617 static void
618 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
619 {
620 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
621 	struct spdk_blob 		*blob = ctx->blob;
622 
623 	if (bserrno == 0) {
624 		_spdk_blob_mark_clean(blob);
625 	}
626 
627 	/* Call user callback */
628 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
629 
630 	/* Free the memory */
631 	spdk_free(ctx->pages);
632 	free(ctx);
633 }
634 
635 static void
636 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
637 {
638 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
639 	struct spdk_blob 		*blob = ctx->blob;
640 	struct spdk_blob_store		*bs = blob->bs;
641 	void				*tmp;
642 	size_t				i;
643 
644 	/* Release all clusters that were truncated */
645 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
646 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
647 
648 		_spdk_bs_release_cluster(bs, cluster_num);
649 	}
650 
651 	if (blob->active.num_clusters == 0) {
652 		free(blob->active.clusters);
653 		blob->active.clusters = NULL;
654 		blob->active.cluster_array_size = 0;
655 	} else {
656 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
657 		assert(tmp != NULL);
658 		blob->active.clusters = tmp;
659 		blob->active.cluster_array_size = blob->active.num_clusters;
660 	}
661 
662 	_spdk_blob_persist_complete(seq, ctx, bserrno);
663 }
664 
665 static void
666 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
667 {
668 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
669 	struct spdk_blob 		*blob = ctx->blob;
670 	struct spdk_blob_store		*bs = blob->bs;
671 	spdk_bs_batch_t			*batch;
672 	size_t				i;
673 
674 	/* Clusters don't move around in blobs. The list shrinks or grows
675 	 * at the end, but no changes ever occur in the middle of the list.
676 	 */
677 
678 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
679 
680 	/* Unmap all clusters that were truncated */
681 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
682 		uint64_t lba = blob->active.clusters[i];
683 		uint32_t lba_count = _spdk_bs_cluster_to_lba(bs, 1);
684 
685 		spdk_bs_batch_unmap(batch, lba, lba_count);
686 	}
687 
688 	spdk_bs_batch_close(batch);
689 }
690 
691 static void
692 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
693 {
694 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
695 	struct spdk_blob 		*blob = ctx->blob;
696 	struct spdk_blob_store		*bs = blob->bs;
697 	size_t				i;
698 
699 	/* This loop starts at 1 because the first page is special and handled
700 	 * below. The pages (except the first) are never written in place,
701 	 * so any pages in the clean list must be unmapped.
702 	 */
703 	for (i = 1; i < blob->clean.num_pages; i++) {
704 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
705 	}
706 
707 	if (blob->active.num_pages == 0) {
708 		uint32_t page_num;
709 
710 		page_num = _spdk_bs_blobid_to_page(blob->id);
711 		spdk_bit_array_clear(bs->used_md_pages, page_num);
712 	}
713 
714 	/* Move on to unmapping clusters */
715 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
716 }
717 
718 static void
719 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
720 {
721 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
722 	struct spdk_blob 		*blob = ctx->blob;
723 	struct spdk_blob_store		*bs = blob->bs;
724 	uint64_t			lba;
725 	uint32_t			lba_count;
726 	spdk_bs_batch_t			*batch;
727 	size_t				i;
728 
729 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx);
730 
731 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page));
732 
733 	/* This loop starts at 1 because the first page is special and handled
734 	 * below. The pages (except the first) are never written in place,
735 	 * so any pages in the clean list must be unmapped.
736 	 */
737 	for (i = 1; i < blob->clean.num_pages; i++) {
738 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
739 
740 		spdk_bs_batch_unmap(batch, lba, lba_count);
741 	}
742 
743 	/* The first page will only be unmapped if this is a delete. */
744 	if (blob->active.num_pages == 0) {
745 		uint32_t page_num;
746 
747 		/* The first page in the metadata goes where the blobid indicates */
748 		page_num = _spdk_bs_blobid_to_page(blob->id);
749 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
750 
751 		spdk_bs_batch_unmap(batch, lba, lba_count);
752 	}
753 
754 	spdk_bs_batch_close(batch);
755 }
756 
757 static void
758 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
759 {
760 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
761 	struct spdk_blob		*blob = ctx->blob;
762 	struct spdk_blob_store		*bs = blob->bs;
763 	uint64_t			lba;
764 	uint32_t			lba_count;
765 	struct spdk_blob_md_page	*page;
766 
767 	if (blob->active.num_pages == 0) {
768 		/* Move on to the next step */
769 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
770 		return;
771 	}
772 
773 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
774 
775 	page = &ctx->pages[0];
776 	/* The first page in the metadata goes where the blobid indicates */
777 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
778 
779 	spdk_bs_sequence_write(seq, page, lba, lba_count,
780 			       _spdk_blob_persist_unmap_pages, ctx);
781 }
782 
783 static void
784 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
785 {
786 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
787 	struct spdk_blob 		*blob = ctx->blob;
788 	struct spdk_blob_store		*bs = blob->bs;
789 	uint64_t 			lba;
790 	uint32_t			lba_count;
791 	struct spdk_blob_md_page	*page;
792 	spdk_bs_batch_t			*batch;
793 	size_t				i;
794 
795 	/* Clusters don't move around in blobs. The list shrinks or grows
796 	 * at the end, but no changes ever occur in the middle of the list.
797 	 */
798 
799 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
800 
801 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
802 
803 	/* This starts at 1. The root page is not written until
804 	 * all of the others are finished
805 	 */
806 	for (i = 1; i < blob->active.num_pages; i++) {
807 		page = &ctx->pages[i];
808 		assert(page->sequence_num == i);
809 
810 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
811 
812 		spdk_bs_batch_write(batch, page, lba, lba_count);
813 	}
814 
815 	spdk_bs_batch_close(batch);
816 }
817 
818 static int
819 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz)
820 {
821 	uint64_t	i;
822 	uint64_t	*tmp;
823 	uint64_t	lfc; /* lowest free cluster */
824 	struct spdk_blob_store *bs;
825 
826 	bs = blob->bs;
827 
828 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
829 	       blob->state != SPDK_BLOB_STATE_SYNCING);
830 
831 	if (blob->active.num_clusters == sz) {
832 		return 0;
833 	}
834 
835 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
836 		/* If this blob was resized to be larger, then smaller, then
837 		 * larger without syncing, then the cluster array already
838 		 * contains spare assigned clusters we can use.
839 		 */
840 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
841 						     sz);
842 	}
843 
844 	blob->state = SPDK_BLOB_STATE_DIRTY;
845 
846 	/* Do two passes - one to verify that we can obtain enough clusters
847 	 * and another to actually claim them.
848 	 */
849 
850 	lfc = 0;
851 	for (i = blob->active.num_clusters; i < sz; i++) {
852 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
853 		if (lfc >= bs->total_clusters) {
854 			/* No more free clusters. Cannot satisfy the request */
855 			assert(false);
856 			return -1;
857 		}
858 		lfc++;
859 	}
860 
861 	if (sz > blob->active.num_clusters) {
862 		/* Expand the cluster array if necessary.
863 		 * We only shrink the array when persisting.
864 		 */
865 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
866 		if (sz > 0 && tmp == NULL) {
867 			assert(false);
868 			return -1;
869 		}
870 		blob->active.clusters = tmp;
871 		blob->active.cluster_array_size = sz;
872 	}
873 
874 	lfc = 0;
875 	for (i = blob->active.num_clusters; i < sz; i++) {
876 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
877 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
878 		_spdk_bs_claim_cluster(bs, lfc);
879 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
880 		lfc++;
881 	}
882 
883 	blob->active.num_clusters = sz;
884 
885 	return 0;
886 }
887 
888 /* Write a blob to disk */
889 static void
890 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
891 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
892 {
893 	struct spdk_blob_persist_ctx *ctx;
894 	int rc;
895 	uint64_t i;
896 	uint32_t page_num;
897 	struct spdk_blob_store *bs;
898 
899 	assert(blob != NULL);
900 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
901 	       blob->state == SPDK_BLOB_STATE_DIRTY);
902 
903 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
904 		cb_fn(seq, cb_arg, 0);
905 		return;
906 	}
907 
908 	bs = blob->bs;
909 
910 	ctx = calloc(1, sizeof(*ctx));
911 	if (!ctx) {
912 		cb_fn(seq, cb_arg, -ENOMEM);
913 		return;
914 	}
915 	ctx->blob = blob;
916 	ctx->cb_fn = cb_fn;
917 	ctx->cb_arg = cb_arg;
918 
919 	blob->state = SPDK_BLOB_STATE_SYNCING;
920 
921 	if (blob->active.num_pages == 0) {
922 		/* This is the signal that the blob should be deleted.
923 		 * Immediately jump to the clean up routine. */
924 		assert(blob->clean.num_pages > 0);
925 		ctx->idx = blob->clean.num_pages - 1;
926 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
927 		return;
928 
929 	}
930 
931 	/* Generate the new metadata */
932 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
933 	if (rc < 0) {
934 		free(ctx);
935 		cb_fn(seq, cb_arg, rc);
936 		return;
937 	}
938 
939 	assert(blob->active.num_pages >= 1);
940 
941 	/* Resize the cache of page indices */
942 	blob->active.pages = realloc(blob->active.pages,
943 				     blob->active.num_pages * sizeof(*blob->active.pages));
944 	if (!blob->active.pages) {
945 		free(ctx);
946 		cb_fn(seq, cb_arg, -ENOMEM);
947 		return;
948 	}
949 
950 	/* Assign this metadata to pages. This requires two passes -
951 	 * one to verify that there are enough pages and a second
952 	 * to actually claim them. */
953 	page_num = 0;
954 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
955 	for (i = 1; i < blob->active.num_pages; i++) {
956 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
957 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
958 			spdk_free(ctx->pages);
959 			free(ctx);
960 			blob->state = SPDK_BLOB_STATE_DIRTY;
961 			cb_fn(seq, cb_arg, -ENOMEM);
962 			return;
963 		}
964 		page_num++;
965 	}
966 
967 	page_num = 0;
968 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
969 	for (i = 1; i < blob->active.num_pages; i++) {
970 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
971 		ctx->pages[i - 1].next = page_num;
972 		blob->active.pages[i] = page_num;
973 		spdk_bit_array_set(bs->used_md_pages, page_num);
974 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
975 		page_num++;
976 	}
977 
978 	/* Start writing the metadata from last page to first */
979 	ctx->idx = blob->active.num_pages - 1;
980 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
981 }
982 
983 static void
984 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel,
985 			     void *payload, uint64_t offset, uint64_t length,
986 			     spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
987 {
988 	spdk_bs_batch_t			*batch;
989 	struct spdk_bs_cpl		cpl;
990 	uint64_t			lba;
991 	uint32_t			lba_count;
992 	uint8_t				*buf;
993 	uint64_t			page;
994 
995 	assert(blob != NULL);
996 
997 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
998 		cb_fn(cb_arg, -EINVAL);
999 		return;
1000 	}
1001 
1002 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1003 	cpl.u.blob_basic.cb_fn = cb_fn;
1004 	cpl.u.blob_basic.cb_arg = cb_arg;
1005 
1006 	batch = spdk_bs_batch_open(_channel, &cpl);
1007 	if (!batch) {
1008 		cb_fn(cb_arg, -ENOMEM);
1009 		return;
1010 	}
1011 
1012 	length = _spdk_bs_page_to_lba(blob->bs, length);
1013 	page = offset;
1014 	buf = payload;
1015 	while (length > 0) {
1016 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1017 		lba_count = spdk_min(length,
1018 				     _spdk_bs_page_to_lba(blob->bs,
1019 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1020 
1021 		if (read) {
1022 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1023 		} else {
1024 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1025 		}
1026 
1027 		length -= lba_count;
1028 		buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1029 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1030 	}
1031 
1032 	spdk_bs_batch_close(batch);
1033 }
1034 
1035 static struct spdk_blob *
1036 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1037 {
1038 	struct spdk_blob *blob;
1039 
1040 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1041 		if (blob->id == blobid) {
1042 			return blob;
1043 		}
1044 	}
1045 
1046 	return NULL;
1047 }
1048 
1049 static int
1050 _spdk_bs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx)
1051 {
1052 	struct spdk_blob_store		*bs = io_device;
1053 	struct spdk_bs_dev		*dev = bs->dev;
1054 	struct spdk_bs_channel	*channel = ctx_buf;
1055 	uint32_t			max_ops = *(uint32_t *)unique_ctx;
1056 	uint32_t			i;
1057 
1058 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1059 	if (!channel->req_mem) {
1060 		free(channel);
1061 		return -1;
1062 	}
1063 
1064 	TAILQ_INIT(&channel->reqs);
1065 
1066 	for (i = 0; i < max_ops; i++) {
1067 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1068 	}
1069 
1070 	channel->bs = bs;
1071 	channel->dev = dev;
1072 	channel->dev_channel = dev->create_channel(dev);
1073 
1074 	return 0;
1075 }
1076 
1077 static void
1078 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1079 {
1080 	struct spdk_bs_channel *channel = ctx_buf;
1081 
1082 	free(channel->req_mem);
1083 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1084 }
1085 
1086 static void
1087 _spdk_bs_free(struct spdk_blob_store *bs)
1088 {
1089 	struct spdk_blob	*blob, *blob_tmp;
1090 
1091 	spdk_bs_unregister_md_thread(bs);
1092 	spdk_io_device_unregister(bs);
1093 
1094 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1095 		TAILQ_REMOVE(&bs->blobs, blob, link);
1096 		_spdk_blob_free(blob);
1097 	}
1098 
1099 	spdk_bit_array_free(&bs->used_md_pages);
1100 	spdk_bit_array_free(&bs->used_clusters);
1101 
1102 	bs->dev->destroy(bs->dev);
1103 	free(bs);
1104 }
1105 
1106 void
1107 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1108 {
1109 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1110 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1111 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1112 }
1113 
1114 static struct spdk_blob_store *
1115 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1116 {
1117 	struct spdk_blob_store	*bs;
1118 
1119 	bs = calloc(1, sizeof(struct spdk_blob_store));
1120 	if (!bs) {
1121 		return NULL;
1122 	}
1123 
1124 	TAILQ_INIT(&bs->blobs);
1125 	bs->dev = dev;
1126 
1127 	/*
1128 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1129 	 *  even multiple of the cluster size.
1130 	 */
1131 	bs->cluster_sz = opts->cluster_sz;
1132 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1133 	bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1134 	bs->num_free_clusters = bs->total_clusters;
1135 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1136 	if (bs->used_clusters == NULL) {
1137 		_spdk_bs_free(bs);
1138 		return NULL;
1139 	}
1140 
1141 	bs->max_md_ops = opts->max_md_ops;
1142 	bs->super_blob = SPDK_BLOBID_INVALID;
1143 
1144 	/* The metadata is assumed to be at least 1 page */
1145 	bs->used_md_pages = spdk_bit_array_create(1);
1146 
1147 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1148 				sizeof(struct spdk_bs_channel));
1149 	spdk_bs_register_md_thread(bs);
1150 
1151 	return bs;
1152 }
1153 
1154 /* START spdk_bs_load */
1155 
1156 struct spdk_bs_load_ctx {
1157 	struct spdk_blob_store		*bs;
1158 	struct spdk_bs_super_block	*super;
1159 
1160 	struct spdk_bs_md_mask		*mask;
1161 };
1162 
1163 static void
1164 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1165 {
1166 	struct spdk_bs_load_ctx *ctx = cb_arg;
1167 	uint32_t		i, j;
1168 	int			rc;
1169 
1170 	/* The type must be correct */
1171 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1172 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1173 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1174 					     struct spdk_blob_md_page) * 8));
1175 	/* The length of the mask must be exactly equal to the total number of clusters*/
1176 	assert(ctx->mask->length == ctx->bs->total_clusters);
1177 
1178 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1179 	if (rc < 0) {
1180 		spdk_free(ctx->super);
1181 		spdk_free(ctx->mask);
1182 		_spdk_bs_free(ctx->bs);
1183 		free(ctx);
1184 		spdk_bs_sequence_finish(seq, -ENOMEM);
1185 		return;
1186 	}
1187 
1188 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1189 	for (i = 0; i < ctx->mask->length / 8; i++) {
1190 		uint8_t segment = ctx->mask->mask[i];
1191 		for (j = 0; segment && (j < 8); j++) {
1192 			if (segment & 1U) {
1193 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1194 				assert(ctx->bs->num_free_clusters > 0);
1195 				ctx->bs->num_free_clusters--;
1196 			}
1197 			segment >>= 1U;
1198 		}
1199 	}
1200 
1201 	spdk_free(ctx->super);
1202 	spdk_free(ctx->mask);
1203 	free(ctx);
1204 
1205 	spdk_bs_sequence_finish(seq, bserrno);
1206 }
1207 
1208 static void
1209 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1210 {
1211 	struct spdk_bs_load_ctx *ctx = cb_arg;
1212 	uint64_t		lba, lba_count;
1213 	uint32_t		i, j;
1214 	int			rc;
1215 
1216 	/* The type must be correct */
1217 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1218 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1219 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) *
1220 				     8));
1221 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1222 	assert(ctx->mask->length == ctx->super->md_len);
1223 
1224 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1225 	if (rc < 0) {
1226 		spdk_free(ctx->super);
1227 		spdk_free(ctx->mask);
1228 		_spdk_bs_free(ctx->bs);
1229 		free(ctx);
1230 		spdk_bs_sequence_finish(seq, -ENOMEM);
1231 		return;
1232 	}
1233 
1234 	for (i = 0; i < ctx->mask->length / 8; i++) {
1235 		uint8_t segment = ctx->mask->mask[i];
1236 		for (j = 0; segment && (j < 8); j++) {
1237 			if (segment & 1U) {
1238 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1239 			}
1240 			segment >>= 1U;
1241 		}
1242 	}
1243 	spdk_free(ctx->mask);
1244 
1245 	/* Read the used clusters mask */
1246 	ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1247 				 0x1000, NULL);
1248 	if (!ctx->mask) {
1249 		spdk_free(ctx->super);
1250 		_spdk_bs_free(ctx->bs);
1251 		free(ctx);
1252 		spdk_bs_sequence_finish(seq, -ENOMEM);
1253 		return;
1254 	}
1255 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1256 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1257 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1258 			      _spdk_bs_load_used_clusters_cpl, ctx);
1259 }
1260 
1261 static void
1262 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1263 {
1264 	struct spdk_bs_load_ctx *ctx = cb_arg;
1265 	uint64_t		lba, lba_count;
1266 
1267 	if (ctx->super->version != SPDK_BS_VERSION) {
1268 		spdk_free(ctx->super);
1269 		_spdk_bs_free(ctx->bs);
1270 		free(ctx);
1271 		spdk_bs_sequence_finish(seq, -EILSEQ);
1272 		return;
1273 	}
1274 
1275 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1276 		   sizeof(ctx->super->signature)) != 0) {
1277 		spdk_free(ctx->super);
1278 		_spdk_bs_free(ctx->bs);
1279 		free(ctx);
1280 		spdk_bs_sequence_finish(seq, -EILSEQ);
1281 		return;
1282 	}
1283 
1284 	if (ctx->super->clean != 1) {
1285 		/* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED.
1286 		 * All of the necessary data to recover is available
1287 		 * on disk - the code just has not been written yet.
1288 		 */
1289 		assert(false);
1290 		spdk_free(ctx->super);
1291 		_spdk_bs_free(ctx->bs);
1292 		free(ctx);
1293 		spdk_bs_sequence_finish(seq, -EILSEQ);
1294 		return;
1295 	}
1296 	ctx->super->clean = 0;
1297 
1298 	/* Parse the super block */
1299 	ctx->bs->cluster_sz = ctx->super->cluster_size;
1300 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
1301 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1302 	ctx->bs->md_start = ctx->super->md_start;
1303 	ctx->bs->md_len = ctx->super->md_len;
1304 
1305 	/* Read the used pages mask */
1306 	ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 0x1000,
1307 				 NULL);
1308 	if (!ctx->mask) {
1309 		spdk_free(ctx->super);
1310 		_spdk_bs_free(ctx->bs);
1311 		free(ctx);
1312 		spdk_bs_sequence_finish(seq, -ENOMEM);
1313 		return;
1314 	}
1315 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1316 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1317 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1318 			      _spdk_bs_load_used_pages_cpl, ctx);
1319 }
1320 
1321 void
1322 spdk_bs_load(struct spdk_bs_dev *dev,
1323 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1324 {
1325 	struct spdk_blob_store	*bs;
1326 	struct spdk_bs_cpl	cpl;
1327 	spdk_bs_sequence_t	*seq;
1328 	struct spdk_bs_load_ctx *ctx;
1329 	struct spdk_bs_opts	opts = {};
1330 
1331 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev);
1332 
1333 	spdk_bs_opts_init(&opts);
1334 
1335 	bs = _spdk_bs_alloc(dev, &opts);
1336 	if (!bs) {
1337 		cb_fn(cb_arg, NULL, -ENOMEM);
1338 		return;
1339 	}
1340 
1341 	ctx = calloc(1, sizeof(*ctx));
1342 	if (!ctx) {
1343 		_spdk_bs_free(bs);
1344 		cb_fn(cb_arg, NULL, -ENOMEM);
1345 		return;
1346 	}
1347 
1348 	ctx->bs = bs;
1349 
1350 	/* Allocate memory for the super block */
1351 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1352 	if (!ctx->super) {
1353 		free(ctx);
1354 		_spdk_bs_free(bs);
1355 		return;
1356 	}
1357 
1358 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1359 	cpl.u.bs_handle.cb_fn = cb_fn;
1360 	cpl.u.bs_handle.cb_arg = cb_arg;
1361 	cpl.u.bs_handle.bs = bs;
1362 
1363 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1364 	if (!seq) {
1365 		spdk_free(ctx->super);
1366 		free(ctx);
1367 		_spdk_bs_free(bs);
1368 		cb_fn(cb_arg, NULL, -ENOMEM);
1369 		return;
1370 	}
1371 
1372 	/* Read the super block */
1373 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1374 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1375 			      _spdk_bs_load_super_cpl, ctx);
1376 }
1377 
1378 /* END spdk_bs_load */
1379 
1380 /* START spdk_bs_init */
1381 
1382 struct spdk_bs_init_ctx {
1383 	struct spdk_blob_store		*bs;
1384 	struct spdk_bs_super_block	*super;
1385 };
1386 
1387 static void
1388 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1389 {
1390 	struct spdk_bs_init_ctx *ctx = cb_arg;
1391 
1392 	spdk_free(ctx->super);
1393 	free(ctx);
1394 
1395 	spdk_bs_sequence_finish(seq, bserrno);
1396 }
1397 
1398 static void
1399 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1400 {
1401 	struct spdk_bs_init_ctx *ctx = cb_arg;
1402 
1403 	/* Write super block */
1404 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1405 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1406 			       _spdk_bs_init_persist_super_cpl, ctx);
1407 }
1408 
1409 void
1410 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
1411 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1412 {
1413 	struct spdk_bs_init_ctx *ctx;
1414 	struct spdk_blob_store	*bs;
1415 	struct spdk_bs_cpl	cpl;
1416 	spdk_bs_sequence_t	*seq;
1417 	uint64_t		num_md_pages;
1418 	uint32_t		i;
1419 	struct spdk_bs_opts	opts = {};
1420 	int			rc;
1421 
1422 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev);
1423 
1424 	if (o) {
1425 		opts = *o;
1426 	} else {
1427 		spdk_bs_opts_init(&opts);
1428 	}
1429 
1430 	bs = _spdk_bs_alloc(dev, &opts);
1431 	if (!bs) {
1432 		cb_fn(cb_arg, NULL, -ENOMEM);
1433 		return;
1434 	}
1435 
1436 	if (opts.num_md_pages == UINT32_MAX) {
1437 		/* By default, allocate 1 page per cluster.
1438 		 * Technically, this over-allocates metadata
1439 		 * because more metadata will reduce the number
1440 		 * of usable clusters. This can be addressed with
1441 		 * more complex math in the future.
1442 		 */
1443 		bs->md_len = bs->total_clusters;
1444 	} else {
1445 		bs->md_len = opts.num_md_pages;
1446 	}
1447 
1448 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
1449 	if (rc < 0) {
1450 		_spdk_bs_free(bs);
1451 		cb_fn(cb_arg, NULL, -ENOMEM);
1452 		return;
1453 	}
1454 
1455 	ctx = calloc(1, sizeof(*ctx));
1456 	if (!ctx) {
1457 		_spdk_bs_free(bs);
1458 		cb_fn(cb_arg, NULL, -ENOMEM);
1459 		return;
1460 	}
1461 
1462 	ctx->bs = bs;
1463 
1464 	/* Allocate memory for the super block */
1465 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1466 	if (!ctx->super) {
1467 		free(ctx);
1468 		_spdk_bs_free(bs);
1469 		return;
1470 	}
1471 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1472 	       sizeof(ctx->super->signature));
1473 	ctx->super->version = SPDK_BS_VERSION;
1474 	ctx->super->length = sizeof(*ctx->super);
1475 	ctx->super->super_blob = bs->super_blob;
1476 	ctx->super->clean = 0;
1477 	ctx->super->cluster_size = bs->cluster_sz;
1478 
1479 	/* Calculate how many pages the metadata consumes at the front
1480 	 * of the disk.
1481 	 */
1482 
1483 	/* The super block uses 1 page */
1484 	num_md_pages = 1;
1485 
1486 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
1487 	 * up to the nearest page, plus a header.
1488 	 */
1489 	ctx->super->used_page_mask_start = num_md_pages;
1490 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1491 					 divide_round_up(bs->md_len, 8),
1492 					 sizeof(struct spdk_blob_md_page));
1493 	num_md_pages += ctx->super->used_page_mask_len;
1494 
1495 	/* The used_clusters mask requires 1 bit per cluster, rounded
1496 	 * up to the nearest page, plus a header.
1497 	 */
1498 	ctx->super->used_cluster_mask_start = num_md_pages;
1499 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1500 					    divide_round_up(bs->total_clusters, 8),
1501 					    sizeof(struct spdk_blob_md_page));
1502 	num_md_pages += ctx->super->used_cluster_mask_len;
1503 
1504 	/* The metadata region size was chosen above */
1505 	ctx->super->md_start = bs->md_start = num_md_pages;
1506 	ctx->super->md_len = bs->md_len;
1507 	num_md_pages += bs->md_len;
1508 
1509 	/* Claim all of the clusters used by the metadata */
1510 	for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) {
1511 		_spdk_bs_claim_cluster(bs, i);
1512 	}
1513 
1514 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1515 	cpl.u.bs_handle.cb_fn = cb_fn;
1516 	cpl.u.bs_handle.cb_arg = cb_arg;
1517 	cpl.u.bs_handle.bs = bs;
1518 
1519 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1520 	if (!seq) {
1521 		spdk_free(ctx->super);
1522 		free(ctx);
1523 		_spdk_bs_free(bs);
1524 		cb_fn(cb_arg, NULL, -ENOMEM);
1525 		return;
1526 	}
1527 
1528 	/* TRIM the entire device */
1529 	spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx);
1530 }
1531 
1532 /* END spdk_bs_init */
1533 
1534 /* START spdk_bs_unload */
1535 
1536 struct spdk_bs_unload_ctx {
1537 	struct spdk_blob_store		*bs;
1538 	struct spdk_bs_super_block	*super;
1539 
1540 	struct spdk_bs_md_mask		*mask;
1541 };
1542 
1543 static void
1544 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1545 {
1546 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1547 
1548 	spdk_free(ctx->super);
1549 
1550 	spdk_bs_sequence_finish(seq, bserrno);
1551 
1552 	_spdk_bs_free(ctx->bs);
1553 	free(ctx);
1554 }
1555 
1556 static void
1557 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1558 {
1559 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1560 
1561 	spdk_free(ctx->mask);
1562 
1563 	/* Update the values in the super block */
1564 	ctx->super->super_blob = ctx->bs->super_blob;
1565 	ctx->super->clean = 1;
1566 
1567 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1568 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1569 			       _spdk_bs_unload_write_super_cpl, ctx);
1570 }
1571 
1572 static void
1573 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1574 {
1575 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1576 	uint32_t			i;
1577 	uint64_t			lba, lba_count;
1578 
1579 	spdk_free(ctx->mask);
1580 
1581 	/* Write out the used clusters mask */
1582 	ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1583 				 0x1000, NULL);
1584 	if (!ctx->mask) {
1585 		spdk_free(ctx->super);
1586 		free(ctx);
1587 		spdk_bs_sequence_finish(seq, -ENOMEM);
1588 		return;
1589 	}
1590 
1591 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1592 	ctx->mask->length = ctx->bs->total_clusters;
1593 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1594 
1595 	i = 0;
1596 	while (true) {
1597 		i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i);
1598 		if (i > ctx->mask->length) {
1599 			break;
1600 		}
1601 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1602 		i++;
1603 	}
1604 
1605 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1606 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1607 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1608 			       _spdk_bs_unload_write_used_clusters_cpl, ctx);
1609 }
1610 
1611 static void
1612 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1613 {
1614 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1615 	uint32_t			i;
1616 	uint64_t			lba, lba_count;
1617 
1618 	/* Write out the used page mask */
1619 	ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page),
1620 				 0x1000, NULL);
1621 	if (!ctx->mask) {
1622 		spdk_free(ctx->super);
1623 		free(ctx);
1624 		spdk_bs_sequence_finish(seq, -ENOMEM);
1625 		return;
1626 	}
1627 
1628 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1629 	ctx->mask->length = ctx->super->md_len;
1630 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1631 
1632 	i = 0;
1633 	while (true) {
1634 		i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i);
1635 		if (i > ctx->mask->length) {
1636 			break;
1637 		}
1638 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1639 		i++;
1640 	}
1641 
1642 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1643 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1644 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1645 			       _spdk_bs_unload_write_used_pages_cpl, ctx);
1646 }
1647 
1648 void
1649 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
1650 {
1651 	struct spdk_bs_cpl	cpl;
1652 	spdk_bs_sequence_t	*seq;
1653 	struct spdk_bs_unload_ctx *ctx;
1654 
1655 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n");
1656 
1657 	ctx = calloc(1, sizeof(*ctx));
1658 	if (!ctx) {
1659 		cb_fn(cb_arg, -ENOMEM);
1660 		return;
1661 	}
1662 
1663 	ctx->bs = bs;
1664 
1665 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1666 	if (!ctx->super) {
1667 		free(ctx);
1668 		cb_fn(cb_arg, -ENOMEM);
1669 		return;
1670 	}
1671 
1672 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
1673 	cpl.u.bs_basic.cb_fn = cb_fn;
1674 	cpl.u.bs_basic.cb_arg = cb_arg;
1675 
1676 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1677 	if (!seq) {
1678 		spdk_free(ctx->super);
1679 		free(ctx);
1680 		cb_fn(cb_arg, -ENOMEM);
1681 		return;
1682 	}
1683 
1684 	assert(TAILQ_EMPTY(&bs->blobs));
1685 
1686 	/* Read super block */
1687 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1688 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1689 			      _spdk_bs_unload_read_super_cpl, ctx);
1690 }
1691 
1692 /* END spdk_bs_unload */
1693 
1694 void
1695 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
1696 		  spdk_bs_op_complete cb_fn, void *cb_arg)
1697 {
1698 	bs->super_blob = blobid;
1699 	cb_fn(cb_arg, 0);
1700 }
1701 
1702 void
1703 spdk_bs_get_super(struct spdk_blob_store *bs,
1704 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1705 {
1706 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
1707 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
1708 	} else {
1709 		cb_fn(cb_arg, bs->super_blob, 0);
1710 	}
1711 }
1712 
1713 uint64_t
1714 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
1715 {
1716 	return bs->cluster_sz;
1717 }
1718 
1719 uint64_t
1720 spdk_bs_get_page_size(struct spdk_blob_store *bs)
1721 {
1722 	return sizeof(struct spdk_blob_md_page);
1723 }
1724 
1725 uint64_t
1726 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
1727 {
1728 	return bs->num_free_clusters;
1729 }
1730 
1731 int spdk_bs_register_md_thread(struct spdk_blob_store *bs)
1732 {
1733 	bs->md_channel = spdk_get_io_channel(bs, SPDK_IO_PRIORITY_DEFAULT, true,
1734 					     (void *)&bs->max_md_ops);
1735 
1736 	return 0;
1737 }
1738 
1739 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
1740 {
1741 	spdk_put_io_channel(bs->md_channel);
1742 
1743 	return 0;
1744 }
1745 
1746 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
1747 {
1748 	assert(blob != NULL);
1749 
1750 	return blob->id;
1751 }
1752 
1753 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
1754 {
1755 	assert(blob != NULL);
1756 
1757 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
1758 }
1759 
1760 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
1761 {
1762 	assert(blob != NULL);
1763 
1764 	return blob->active.num_clusters;
1765 }
1766 
1767 /* START spdk_bs_md_create_blob */
1768 
1769 static void
1770 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1771 {
1772 	struct spdk_blob *blob = cb_arg;
1773 
1774 	_spdk_blob_free(blob);
1775 
1776 	spdk_bs_sequence_finish(seq, bserrno);
1777 }
1778 
1779 void spdk_bs_md_create_blob(struct spdk_blob_store *bs,
1780 			    spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1781 {
1782 	struct spdk_blob	*blob;
1783 	uint32_t		page_idx;
1784 	struct spdk_bs_cpl 	cpl;
1785 	spdk_bs_sequence_t	*seq;
1786 	spdk_blob_id		id;
1787 
1788 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
1789 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
1790 		cb_fn(cb_arg, 0, -ENOMEM);
1791 		return;
1792 	}
1793 	spdk_bit_array_set(bs->used_md_pages, page_idx);
1794 
1795 	/* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper
1796 	 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the
1797 	 * code assumes blob id == page_idx.
1798 	 */
1799 	id = (1ULL << 32) | page_idx;
1800 
1801 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
1802 
1803 	blob = _spdk_blob_alloc(bs, id);
1804 	if (!blob) {
1805 		cb_fn(cb_arg, 0, -ENOMEM);
1806 		return;
1807 	}
1808 
1809 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
1810 	cpl.u.blobid.cb_fn = cb_fn;
1811 	cpl.u.blobid.cb_arg = cb_arg;
1812 	cpl.u.blobid.blobid = blob->id;
1813 
1814 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1815 	if (!seq) {
1816 		free(blob);
1817 		cb_fn(cb_arg, 0, -ENOMEM);
1818 		return;
1819 	}
1820 
1821 	_spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob);
1822 }
1823 
1824 /* END spdk_bs_md_create_blob */
1825 
1826 /* START spdk_bs_md_resize_blob */
1827 int
1828 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz)
1829 {
1830 	int			rc;
1831 
1832 	assert(blob != NULL);
1833 
1834 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
1835 
1836 	if (sz == blob->active.num_clusters) {
1837 		return 0;
1838 	}
1839 
1840 	rc = _spdk_resize_blob(blob, sz);
1841 	if (rc < 0) {
1842 		return rc;
1843 	}
1844 
1845 	return 0;
1846 }
1847 
1848 /* END spdk_bs_md_resize_blob */
1849 
1850 
1851 /* START spdk_bs_md_delete_blob */
1852 
1853 static void
1854 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1855 {
1856 	struct spdk_blob *blob = cb_arg;
1857 
1858 	_spdk_blob_free(blob);
1859 
1860 	spdk_bs_sequence_finish(seq, bserrno);
1861 }
1862 
1863 static void
1864 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1865 {
1866 	struct spdk_blob *blob = cb_arg;
1867 
1868 	blob->state = SPDK_BLOB_STATE_DIRTY;
1869 	blob->active.num_pages = 0;
1870 	_spdk_resize_blob(blob, 0);
1871 
1872 	_spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob);
1873 }
1874 
1875 void
1876 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1877 		       spdk_blob_op_complete cb_fn, void *cb_arg)
1878 {
1879 	struct spdk_blob	*blob;
1880 	struct spdk_bs_cpl	cpl;
1881 	spdk_bs_sequence_t 	*seq;
1882 
1883 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid);
1884 
1885 	blob = _spdk_blob_lookup(bs, blobid);
1886 	if (blob) {
1887 		assert(blob->open_ref > 0);
1888 		cb_fn(cb_arg, -EINVAL);
1889 		return;
1890 	}
1891 
1892 	blob = _spdk_blob_alloc(bs, blobid);
1893 	if (!blob) {
1894 		cb_fn(cb_arg, -ENOMEM);
1895 		return;
1896 	}
1897 
1898 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1899 	cpl.u.blob_basic.cb_fn = cb_fn;
1900 	cpl.u.blob_basic.cb_arg = cb_arg;
1901 
1902 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1903 	if (!seq) {
1904 		cb_fn(cb_arg, -ENOMEM);
1905 		return;
1906 	}
1907 
1908 	_spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob);
1909 }
1910 
1911 /* END spdk_bs_md_delete_blob */
1912 
1913 /* START spdk_bs_md_open_blob */
1914 
1915 static void
1916 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1917 {
1918 	struct spdk_blob *blob = cb_arg;
1919 
1920 	blob->open_ref++;
1921 
1922 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
1923 
1924 	spdk_bs_sequence_finish(seq, bserrno);
1925 }
1926 
1927 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1928 			  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
1929 {
1930 	struct spdk_blob		*blob;
1931 	struct spdk_bs_cpl		cpl;
1932 	spdk_bs_sequence_t		*seq;
1933 	uint32_t			page_num;
1934 
1935 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid);
1936 
1937 	blob = _spdk_blob_lookup(bs, blobid);
1938 	if (blob) {
1939 		blob->open_ref++;
1940 		cb_fn(cb_arg, blob, 0);
1941 		return;
1942 	}
1943 
1944 	page_num = _spdk_bs_blobid_to_page(blobid);
1945 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
1946 		/* Invalid blobid */
1947 		cb_fn(cb_arg, NULL, -ENOENT);
1948 		return;
1949 	}
1950 
1951 	blob = _spdk_blob_alloc(bs, blobid);
1952 	if (!blob) {
1953 		cb_fn(cb_arg, NULL, -ENOMEM);
1954 		return;
1955 	}
1956 
1957 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
1958 	cpl.u.blob_handle.cb_fn = cb_fn;
1959 	cpl.u.blob_handle.cb_arg = cb_arg;
1960 	cpl.u.blob_handle.blob = blob;
1961 
1962 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1963 	if (!seq) {
1964 		cb_fn(cb_arg, NULL, -ENOMEM);
1965 		return;
1966 	}
1967 
1968 	_spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob);
1969 }
1970 
1971 /* START spdk_bs_md_sync_blob */
1972 static void
1973 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1974 {
1975 	spdk_bs_sequence_finish(seq, bserrno);
1976 }
1977 
1978 void spdk_bs_md_sync_blob(struct spdk_blob *blob,
1979 			  spdk_blob_op_complete cb_fn, void *cb_arg)
1980 {
1981 	struct spdk_bs_cpl	cpl;
1982 	spdk_bs_sequence_t	*seq;
1983 
1984 	assert(blob != NULL);
1985 
1986 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id);
1987 
1988 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
1989 	       blob->state != SPDK_BLOB_STATE_SYNCING);
1990 
1991 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1992 		cb_fn(cb_arg, 0);
1993 		return;
1994 	}
1995 
1996 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1997 	cpl.u.blob_basic.cb_fn = cb_fn;
1998 	cpl.u.blob_basic.cb_arg = cb_arg;
1999 
2000 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2001 	if (!seq) {
2002 		cb_fn(cb_arg, -ENOMEM);
2003 		return;
2004 	}
2005 
2006 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob);
2007 }
2008 
2009 /* END spdk_bs_md_sync_blob */
2010 
2011 /* START spdk_bs_md_close_blob */
2012 
2013 static void
2014 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2015 {
2016 	struct spdk_blob **blob = cb_arg;
2017 
2018 	if ((*blob)->open_ref == 0) {
2019 		TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link);
2020 		_spdk_blob_free((*blob));
2021 	}
2022 
2023 	*blob = NULL;
2024 
2025 	spdk_bs_sequence_finish(seq, bserrno);
2026 }
2027 
2028 void spdk_bs_md_close_blob(struct spdk_blob **b,
2029 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2030 {
2031 	struct spdk_bs_cpl	cpl;
2032 	struct spdk_blob	*blob;
2033 	spdk_bs_sequence_t	*seq;
2034 
2035 	assert(b != NULL);
2036 	blob = *b;
2037 	assert(blob != NULL);
2038 
2039 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id);
2040 
2041 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2042 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2043 
2044 	if (blob->open_ref == 0) {
2045 		cb_fn(cb_arg, -EBADF);
2046 		return;
2047 	}
2048 
2049 	blob->open_ref--;
2050 
2051 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2052 	cpl.u.blob_basic.cb_fn = cb_fn;
2053 	cpl.u.blob_basic.cb_arg = cb_arg;
2054 
2055 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2056 	if (!seq) {
2057 		cb_fn(cb_arg, -ENOMEM);
2058 		return;
2059 	}
2060 
2061 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2062 		_spdk_blob_close_cpl(seq, b, 0);
2063 		return;
2064 	}
2065 
2066 	/* Sync metadata */
2067 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b);
2068 }
2069 
2070 /* END spdk_bs_md_close_blob */
2071 
2072 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs,
2073 		uint32_t priority, uint32_t max_ops)
2074 {
2075 	return spdk_get_io_channel(bs, priority, true, (void *)&max_ops);
2076 }
2077 
2078 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2079 {
2080 	spdk_put_io_channel(channel);
2081 }
2082 
2083 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel,
2084 			      spdk_blob_op_complete cb_fn, void *cb_arg)
2085 {
2086 	/* Flush is synchronous right now */
2087 	cb_fn(cb_arg, 0);
2088 }
2089 
2090 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2091 			   void *payload, uint64_t offset, uint64_t length,
2092 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2093 {
2094 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false);
2095 }
2096 
2097 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2098 			  void *payload, uint64_t offset, uint64_t length,
2099 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2100 {
2101 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true);
2102 }
2103 
2104 struct spdk_bs_iter_ctx {
2105 	int64_t page_num;
2106 	struct spdk_blob_store *bs;
2107 
2108 	spdk_blob_op_with_handle_complete cb_fn;
2109 	void *cb_arg;
2110 };
2111 
2112 static void
2113 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
2114 {
2115 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2116 	struct spdk_blob_store *bs = ctx->bs;
2117 	spdk_blob_id id;
2118 
2119 	if (bserrno == 0) {
2120 		ctx->cb_fn(ctx->cb_arg, blob, bserrno);
2121 		free(ctx);
2122 		return;
2123 	}
2124 
2125 	ctx->page_num++;
2126 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2127 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2128 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2129 		free(ctx);
2130 		return;
2131 	}
2132 
2133 	id = (1ULL << 32) | ctx->page_num;
2134 
2135 	blob = _spdk_blob_lookup(bs, id);
2136 	if (blob) {
2137 		blob->open_ref++;
2138 		ctx->cb_fn(ctx->cb_arg, blob, 0);
2139 		free(ctx);
2140 		return;
2141 	}
2142 
2143 	spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
2144 }
2145 
2146 void
2147 spdk_bs_md_iter_first(struct spdk_blob_store *bs,
2148 		      spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2149 {
2150 	struct spdk_bs_iter_ctx *ctx;
2151 
2152 	ctx = calloc(1, sizeof(*ctx));
2153 	if (!ctx) {
2154 		cb_fn(cb_arg, NULL, -ENOMEM);
2155 		return;
2156 	}
2157 
2158 	ctx->page_num = -1;
2159 	ctx->bs = bs;
2160 	ctx->cb_fn = cb_fn;
2161 	ctx->cb_arg = cb_arg;
2162 
2163 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2164 }
2165 
2166 static void
2167 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
2168 {
2169 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2170 
2171 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2172 }
2173 
2174 void
2175 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
2176 		     spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2177 {
2178 	struct spdk_bs_iter_ctx *ctx;
2179 	struct spdk_blob	*blob;
2180 
2181 	assert(b != NULL);
2182 	blob = *b;
2183 	assert(blob != NULL);
2184 
2185 	ctx = calloc(1, sizeof(*ctx));
2186 	if (!ctx) {
2187 		cb_fn(cb_arg, NULL, -ENOMEM);
2188 		return;
2189 	}
2190 
2191 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
2192 	ctx->bs = bs;
2193 	ctx->cb_fn = cb_fn;
2194 	ctx->cb_arg = cb_arg;
2195 
2196 	/* Close the existing blob */
2197 	spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx);
2198 }
2199 
2200 int
2201 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
2202 		       uint16_t value_len)
2203 {
2204 	struct spdk_xattr 	*xattr;
2205 
2206 	assert(blob != NULL);
2207 
2208 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2209 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2210 
2211 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2212 		if (!strcmp(name, xattr->name)) {
2213 			free(xattr->value);
2214 			xattr->value_len = value_len;
2215 			xattr->value = malloc(value_len);
2216 			memcpy(xattr->value, value, value_len);
2217 
2218 			blob->state = SPDK_BLOB_STATE_DIRTY;
2219 
2220 			return 0;
2221 		}
2222 	}
2223 
2224 	/*
2225 	* This is probably all going to rewritten, so do not bother checking for failed
2226 	*  allocations for now.
2227 	*/
2228 	xattr = calloc(1, sizeof(*xattr));
2229 	xattr->name = strdup(name);
2230 	xattr->value_len = value_len;
2231 	xattr->value = malloc(value_len);
2232 	memcpy(xattr->value, value, value_len);
2233 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
2234 
2235 	blob->state = SPDK_BLOB_STATE_DIRTY;
2236 
2237 	return 0;
2238 }
2239 
2240 int
2241 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name)
2242 {
2243 	struct spdk_xattr	*xattr;
2244 
2245 	assert(blob != NULL);
2246 
2247 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2248 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2249 
2250 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2251 		if (!strcmp(name, xattr->name)) {
2252 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
2253 			free(xattr->value);
2254 			free(xattr->name);
2255 			free(xattr);
2256 
2257 			blob->state = SPDK_BLOB_STATE_DIRTY;
2258 
2259 			return 0;
2260 		}
2261 	}
2262 
2263 	return -ENOENT;
2264 }
2265 
2266 int
2267 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name,
2268 			   const void **value, size_t *value_len)
2269 {
2270 	struct spdk_xattr	*xattr;
2271 
2272 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2273 		if (!strcmp(name, xattr->name)) {
2274 			*value = xattr->value;
2275 			*value_len = xattr->value_len;
2276 			return 0;
2277 		}
2278 	}
2279 
2280 	return -ENOENT;
2281 }
2282 
2283 struct spdk_xattr_names {
2284 	uint32_t	count;
2285 	const char	*names[0];
2286 };
2287 
2288 int
2289 spdk_bs_md_get_xattr_names(struct spdk_blob *blob,
2290 			   struct spdk_xattr_names **names)
2291 {
2292 	struct spdk_xattr	*xattr;
2293 	int			count = 0;
2294 
2295 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2296 		count++;
2297 	}
2298 
2299 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
2300 	if (*names == NULL) {
2301 		return -ENOMEM;
2302 	}
2303 
2304 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2305 		(*names)->names[(*names)->count++] = xattr->name;
2306 	}
2307 
2308 	return 0;
2309 }
2310 
2311 uint32_t
2312 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
2313 {
2314 	assert(names != NULL);
2315 
2316 	return names->count;
2317 }
2318 
2319 const char *
2320 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
2321 {
2322 	if (index >= names->count) {
2323 		return NULL;
2324 	}
2325 
2326 	return names->names[index];
2327 }
2328 
2329 void
2330 spdk_xattr_names_free(struct spdk_xattr_names *names)
2331 {
2332 	free(names);
2333 }
2334 
2335 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB);
2336