xref: /spdk/lib/blob/blobstore.c (revision 728d001395b3c20108183963f498a1b51b877ab5)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/env.h"
38 #include "spdk/queue.h"
39 #include "spdk/io_channel.h"
40 #include "spdk/bit_array.h"
41 
42 #include "spdk_internal/log.h"
43 
44 #include "blobstore.h"
45 #include "request.h"
46 
47 static inline size_t
48 divide_round_up(size_t num, size_t divisor)
49 {
50 	return (num + divisor - 1) / divisor;
51 }
52 
53 static void
54 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
55 {
56 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
57 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
58 	assert(bs->num_free_clusters > 0);
59 
60 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num);
61 
62 	spdk_bit_array_set(bs->used_clusters, cluster_num);
63 	bs->num_free_clusters--;
64 }
65 
66 static void
67 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
68 {
69 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
70 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
71 	assert(bs->num_free_clusters < bs->total_clusters);
72 
73 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num);
74 
75 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
76 	bs->num_free_clusters++;
77 }
78 
79 static struct spdk_blob *
80 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
81 {
82 	struct spdk_blob *blob;
83 
84 	blob = calloc(1, sizeof(*blob));
85 	if (!blob) {
86 		return NULL;
87 	}
88 
89 	blob->id = id;
90 	blob->bs = bs;
91 
92 	blob->state = SPDK_BLOB_STATE_DIRTY;
93 	blob->active.num_pages = 1;
94 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
95 	if (!blob->active.pages) {
96 		free(blob);
97 		return NULL;
98 	}
99 
100 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
101 
102 	TAILQ_INIT(&blob->xattrs);
103 
104 	return blob;
105 }
106 
107 static void
108 _spdk_blob_free(struct spdk_blob *blob)
109 {
110 	struct spdk_xattr 	*xattr, *xattr_tmp;
111 
112 	assert(blob != NULL);
113 
114 	free(blob->active.clusters);
115 	free(blob->clean.clusters);
116 	free(blob->active.pages);
117 	free(blob->clean.pages);
118 
119 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
120 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
121 		free(xattr->name);
122 		free(xattr->value);
123 		free(xattr);
124 	}
125 
126 	free(blob);
127 }
128 
129 static int
130 _spdk_blob_mark_clean(struct spdk_blob *blob)
131 {
132 	uint64_t *clusters = NULL;
133 	uint32_t *pages = NULL;
134 
135 	assert(blob != NULL);
136 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
137 	       blob->state == SPDK_BLOB_STATE_SYNCING);
138 
139 	if (blob->active.num_clusters) {
140 		assert(blob->active.clusters);
141 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
142 		if (!clusters) {
143 			return -1;
144 		}
145 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
146 	}
147 
148 	if (blob->active.num_pages) {
149 		assert(blob->active.pages);
150 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
151 		if (!pages) {
152 			free(clusters);
153 			return -1;
154 		}
155 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
156 	}
157 
158 	free(blob->clean.clusters);
159 	free(blob->clean.pages);
160 
161 	blob->clean.num_clusters = blob->active.num_clusters;
162 	blob->clean.clusters = blob->active.clusters;
163 	blob->clean.num_pages = blob->active.num_pages;
164 	blob->clean.pages = blob->active.pages;
165 
166 	blob->active.clusters = clusters;
167 	blob->active.pages = pages;
168 
169 	blob->state = SPDK_BLOB_STATE_CLEAN;
170 
171 	return 0;
172 }
173 
174 static void
175 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
176 {
177 	struct spdk_blob_md_descriptor *desc;
178 	size_t	cur_desc = 0;
179 	void *tmp;
180 
181 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
182 	while (cur_desc < sizeof(page->descriptors)) {
183 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
184 			if (desc->length == 0) {
185 				/* If padding and length are 0, this terminates the page */
186 				break;
187 			}
188 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
189 			struct spdk_blob_md_descriptor_extent	*desc_extent;
190 			unsigned int				i, j;
191 			unsigned int				cluster_count = blob->active.num_clusters;
192 
193 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
194 
195 			assert(desc_extent->length > 0);
196 			assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0);
197 
198 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
199 				for (j = 0; j < desc_extent->extents[i].length; j++) {
200 					assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j));
201 					cluster_count++;
202 				}
203 			}
204 
205 			assert(cluster_count > 0);
206 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
207 			assert(tmp != NULL);
208 			blob->active.clusters = tmp;
209 			blob->active.cluster_array_size = cluster_count;
210 
211 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
212 				for (j = 0; j < desc_extent->extents[i].length; j++) {
213 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
214 							desc_extent->extents[i].cluster_idx + j);
215 				}
216 			}
217 
218 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
219 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
220 			struct spdk_xattr 			*xattr;
221 
222 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
223 
224 			assert(desc_xattr->length == sizeof(desc_xattr->name_length) +
225 			       sizeof(desc_xattr->value_length) +
226 			       desc_xattr->name_length + desc_xattr->value_length);
227 
228 			xattr = calloc(1, sizeof(*xattr));
229 			assert(xattr != NULL);
230 
231 			xattr->name = malloc(desc_xattr->name_length + 1);
232 			assert(xattr->name);
233 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
234 			xattr->name[desc_xattr->name_length] = '\0';
235 
236 			xattr->value = malloc(desc_xattr->value_length);
237 			assert(xattr->value != NULL);
238 			xattr->value_len = desc_xattr->value_length;
239 			memcpy(xattr->value,
240 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
241 			       desc_xattr->value_length);
242 
243 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
244 		} else {
245 			/* Error */
246 			break;
247 		}
248 
249 		/* Advance to the next descriptor */
250 		cur_desc += sizeof(*desc) + desc->length;
251 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
252 			break;
253 		}
254 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
255 	}
256 }
257 
258 static int
259 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
260 		 struct spdk_blob *blob)
261 {
262 	const struct spdk_blob_md_page *page;
263 	uint32_t i;
264 
265 	assert(page_count > 0);
266 	assert(pages[0].sequence_num == 0);
267 	assert(blob != NULL);
268 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
269 	assert(blob->active.clusters == NULL);
270 	assert(blob->id == pages[0].id);
271 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
272 
273 	for (i = 0; i < page_count; i++) {
274 		page = &pages[i];
275 
276 		assert(page->id == blob->id);
277 		assert(page->sequence_num == i);
278 
279 		_spdk_blob_parse_page(page, blob);
280 	}
281 
282 	return 0;
283 }
284 
285 static int
286 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
287 			      struct spdk_blob_md_page **pages,
288 			      uint32_t *page_count,
289 			      struct spdk_blob_md_page **last_page)
290 {
291 	struct spdk_blob_md_page *page;
292 
293 	assert(pages != NULL);
294 	assert(page_count != NULL);
295 
296 	if (*page_count == 0) {
297 		assert(*pages == NULL);
298 		*page_count = 1;
299 		*pages = spdk_dma_malloc(sizeof(struct spdk_blob_md_page),
300 					 sizeof(struct spdk_blob_md_page),
301 					 NULL);
302 	} else {
303 		assert(*pages != NULL);
304 		(*page_count)++;
305 		*pages = spdk_dma_realloc(*pages,
306 					  sizeof(struct spdk_blob_md_page) * (*page_count),
307 					  sizeof(struct spdk_blob_md_page),
308 					  NULL);
309 	}
310 
311 	if (*pages == NULL) {
312 		*page_count = 0;
313 		*last_page = NULL;
314 		return -ENOMEM;
315 	}
316 
317 	page = &(*pages)[*page_count - 1];
318 	memset(page, 0, sizeof(*page));
319 	page->id = blob->id;
320 	page->sequence_num = *page_count - 1;
321 	page->next = SPDK_INVALID_MD_PAGE;
322 	*last_page = page;
323 
324 	return 0;
325 }
326 
327 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
328  * Update required_sz on both success and failure.
329  *
330  */
331 static int
332 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
333 			   uint8_t *buf, size_t buf_sz,
334 			   size_t *required_sz)
335 {
336 	struct spdk_blob_md_descriptor_xattr	*desc;
337 
338 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
339 		       strlen(xattr->name) +
340 		       xattr->value_len;
341 
342 	if (buf_sz < *required_sz) {
343 		return -1;
344 	}
345 
346 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
347 
348 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
349 	desc->length = sizeof(desc->name_length) +
350 		       sizeof(desc->value_length) +
351 		       strlen(xattr->name) +
352 		       xattr->value_len;
353 	desc->name_length = strlen(xattr->name);
354 	desc->value_length = xattr->value_len;
355 
356 	memcpy(desc->name, xattr->name, desc->name_length);
357 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
358 	       xattr->value,
359 	       desc->value_length);
360 
361 	return 0;
362 }
363 
364 static void
365 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
366 			    uint64_t start_cluster, uint64_t *next_cluster,
367 			    uint8_t *buf, size_t buf_sz)
368 {
369 	struct spdk_blob_md_descriptor_extent *desc;
370 	size_t cur_sz;
371 	uint64_t i, extent_idx;
372 	uint32_t lba, lba_per_cluster, lba_count;
373 
374 	/* The buffer must have room for at least one extent */
375 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
376 	if (buf_sz < cur_sz) {
377 		*next_cluster = start_cluster;
378 		return;
379 	}
380 
381 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
382 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
383 
384 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
385 
386 	lba = blob->active.clusters[start_cluster];
387 	lba_count = lba_per_cluster;
388 	extent_idx = 0;
389 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
390 		if ((lba + lba_count) == blob->active.clusters[i]) {
391 			lba_count += lba_per_cluster;
392 			continue;
393 		}
394 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
395 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
396 		extent_idx++;
397 
398 		cur_sz += sizeof(desc->extents[extent_idx]);
399 
400 		if (buf_sz < cur_sz) {
401 			/* If we ran out of buffer space, return */
402 			desc->length = sizeof(desc->extents[0]) * extent_idx;
403 			*next_cluster = i;
404 			return;
405 		}
406 
407 		lba = blob->active.clusters[i];
408 		lba_count = lba_per_cluster;
409 	}
410 
411 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
412 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
413 	extent_idx++;
414 
415 	desc->length = sizeof(desc->extents[0]) * extent_idx;
416 	*next_cluster = blob->active.num_clusters;
417 
418 	return;
419 }
420 
421 static int
422 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
423 		     uint32_t *page_count)
424 {
425 	struct spdk_blob_md_page		*cur_page;
426 	const struct spdk_xattr			*xattr;
427 	int 					rc;
428 	uint8_t					*buf;
429 	size_t					remaining_sz;
430 
431 	assert(pages != NULL);
432 	assert(page_count != NULL);
433 	assert(blob != NULL);
434 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
435 
436 	*pages = NULL;
437 	*page_count = 0;
438 
439 	/* A blob always has at least 1 page, even if it has no descriptors */
440 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
441 	if (rc < 0) {
442 		return rc;
443 	}
444 
445 	buf = (uint8_t *)cur_page->descriptors;
446 	remaining_sz = sizeof(cur_page->descriptors);
447 
448 	/* Serialize xattrs */
449 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
450 		size_t required_sz = 0;
451 		rc = _spdk_blob_serialize_xattr(xattr,
452 						buf, remaining_sz,
453 						&required_sz);
454 		if (rc < 0) {
455 			/* Need to add a new page to the chain */
456 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
457 							   &cur_page);
458 			if (rc < 0) {
459 				spdk_dma_free(*pages);
460 				*pages = NULL;
461 				*page_count = 0;
462 				return rc;
463 			}
464 
465 			buf = (uint8_t *)cur_page->descriptors;
466 			remaining_sz = sizeof(cur_page->descriptors);
467 
468 			/* Try again */
469 			required_sz = 0;
470 			rc = _spdk_blob_serialize_xattr(xattr,
471 							buf, remaining_sz,
472 							&required_sz);
473 
474 			if (rc < 0) {
475 				spdk_dma_free(*pages);
476 				*pages = NULL;
477 				*page_count = 0;
478 				return -1;
479 			}
480 		}
481 
482 		remaining_sz -= required_sz;
483 		buf += required_sz;
484 	}
485 
486 	/* Serialize extents */
487 	uint64_t last_cluster = 0;
488 	while (last_cluster < blob->active.num_clusters) {
489 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
490 					    buf, remaining_sz);
491 
492 		if (last_cluster == blob->active.num_clusters) {
493 			break;
494 		}
495 
496 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
497 						   &cur_page);
498 		if (rc < 0) {
499 			return rc;
500 		}
501 
502 		buf = (uint8_t *)cur_page->descriptors;
503 		remaining_sz = sizeof(cur_page->descriptors);
504 	}
505 
506 	return 0;
507 }
508 
509 struct spdk_blob_load_ctx {
510 	struct spdk_blob 		*blob;
511 
512 	struct spdk_blob_md_page	*pages;
513 	uint32_t			num_pages;
514 
515 	spdk_bs_sequence_cpl		cb_fn;
516 	void				*cb_arg;
517 };
518 
519 static void
520 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
521 {
522 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
523 	struct spdk_blob 		*blob = ctx->blob;
524 	struct spdk_blob_md_page	*page;
525 	int				rc;
526 
527 	page = &ctx->pages[ctx->num_pages - 1];
528 
529 	if (page->next != SPDK_INVALID_MD_PAGE) {
530 		uint32_t next_page = page->next;
531 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
532 
533 
534 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
535 
536 		/* Read the next page */
537 		ctx->num_pages++;
538 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
539 					      sizeof(*page), NULL);
540 		if (ctx->pages == NULL) {
541 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
542 			free(ctx);
543 			return;
544 		}
545 
546 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
547 				      next_lba,
548 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
549 				      _spdk_blob_load_cpl, ctx);
550 		return;
551 	}
552 
553 	/* Parse the pages */
554 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
555 
556 	_spdk_blob_mark_clean(blob);
557 
558 	ctx->cb_fn(seq, ctx->cb_arg, rc);
559 
560 	/* Free the memory */
561 	spdk_dma_free(ctx->pages);
562 	free(ctx);
563 }
564 
565 /* Load a blob from disk given a blobid */
566 static void
567 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
568 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
569 {
570 	struct spdk_blob_load_ctx *ctx;
571 	struct spdk_blob_store *bs;
572 	uint32_t page_num;
573 	uint64_t lba;
574 
575 	assert(blob != NULL);
576 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
577 	       blob->state == SPDK_BLOB_STATE_DIRTY);
578 
579 	bs = blob->bs;
580 
581 	ctx = calloc(1, sizeof(*ctx));
582 	if (!ctx) {
583 		cb_fn(seq, cb_arg, -ENOMEM);
584 		return;
585 	}
586 
587 	ctx->blob = blob;
588 	ctx->pages = spdk_dma_realloc(ctx->pages, sizeof(struct spdk_blob_md_page),
589 				      sizeof(struct spdk_blob_md_page), NULL);
590 	if (!ctx->pages) {
591 		free(ctx);
592 		cb_fn(seq, cb_arg, -ENOMEM);
593 		return;
594 	}
595 	ctx->num_pages = 1;
596 	ctx->cb_fn = cb_fn;
597 	ctx->cb_arg = cb_arg;
598 
599 	page_num = _spdk_bs_blobid_to_page(blob->id);
600 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
601 
602 	blob->state = SPDK_BLOB_STATE_LOADING;
603 
604 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
605 			      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)),
606 			      _spdk_blob_load_cpl, ctx);
607 }
608 
609 struct spdk_blob_persist_ctx {
610 	struct spdk_blob 		*blob;
611 
612 	struct spdk_blob_md_page	*pages;
613 
614 	uint64_t			idx;
615 
616 	spdk_bs_sequence_cpl		cb_fn;
617 	void				*cb_arg;
618 };
619 
620 static void
621 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
622 {
623 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
624 	struct spdk_blob 		*blob = ctx->blob;
625 
626 	if (bserrno == 0) {
627 		_spdk_blob_mark_clean(blob);
628 	}
629 
630 	/* Call user callback */
631 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
632 
633 	/* Free the memory */
634 	spdk_dma_free(ctx->pages);
635 	free(ctx);
636 }
637 
638 static void
639 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
640 {
641 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
642 	struct spdk_blob 		*blob = ctx->blob;
643 	struct spdk_blob_store		*bs = blob->bs;
644 	void				*tmp;
645 	size_t				i;
646 
647 	/* Release all clusters that were truncated */
648 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
649 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
650 
651 		_spdk_bs_release_cluster(bs, cluster_num);
652 	}
653 
654 	if (blob->active.num_clusters == 0) {
655 		free(blob->active.clusters);
656 		blob->active.clusters = NULL;
657 		blob->active.cluster_array_size = 0;
658 	} else {
659 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
660 		assert(tmp != NULL);
661 		blob->active.clusters = tmp;
662 		blob->active.cluster_array_size = blob->active.num_clusters;
663 	}
664 
665 	_spdk_blob_persist_complete(seq, ctx, bserrno);
666 }
667 
668 static void
669 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
670 {
671 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
672 	struct spdk_blob 		*blob = ctx->blob;
673 	struct spdk_blob_store		*bs = blob->bs;
674 	spdk_bs_batch_t			*batch;
675 	size_t				i;
676 	uint64_t			lba;
677 	uint32_t			lba_count;
678 
679 	/* Clusters don't move around in blobs. The list shrinks or grows
680 	 * at the end, but no changes ever occur in the middle of the list.
681 	 */
682 
683 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
684 
685 	/* Unmap all clusters that were truncated */
686 	lba = 0;
687 	lba_count = 0;
688 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
689 		uint64_t next_lba = blob->active.clusters[i];
690 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
691 
692 		if ((lba + lba_count) == next_lba) {
693 			/* This cluster is contiguous with the previous one. */
694 			lba_count += next_lba_count;
695 			continue;
696 		}
697 
698 		/* This cluster is not contiguous with the previous one. */
699 
700 		/* If a run of LBAs previously existing, send them
701 		 * as an unmap.
702 		 */
703 		if (lba_count > 0) {
704 			spdk_bs_batch_unmap(batch, lba, lba_count);
705 		}
706 
707 		/* Start building the next batch */
708 		lba = next_lba;
709 		lba_count = next_lba_count;
710 	}
711 
712 	/* If we ended with a contiguous set of LBAs, send the unmap now */
713 	if (lba_count > 0) {
714 		spdk_bs_batch_unmap(batch, lba, lba_count);
715 	}
716 
717 	spdk_bs_batch_close(batch);
718 }
719 
720 static void
721 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
722 {
723 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
724 	struct spdk_blob 		*blob = ctx->blob;
725 	struct spdk_blob_store		*bs = blob->bs;
726 	size_t				i;
727 
728 	/* This loop starts at 1 because the first page is special and handled
729 	 * below. The pages (except the first) are never written in place,
730 	 * so any pages in the clean list must be unmapped.
731 	 */
732 	for (i = 1; i < blob->clean.num_pages; i++) {
733 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
734 	}
735 
736 	if (blob->active.num_pages == 0) {
737 		uint32_t page_num;
738 
739 		page_num = _spdk_bs_blobid_to_page(blob->id);
740 		spdk_bit_array_clear(bs->used_md_pages, page_num);
741 	}
742 
743 	/* Move on to unmapping clusters */
744 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
745 }
746 
747 static void
748 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
749 {
750 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
751 	struct spdk_blob 		*blob = ctx->blob;
752 	struct spdk_blob_store		*bs = blob->bs;
753 	uint64_t			lba;
754 	uint32_t			lba_count;
755 	spdk_bs_batch_t			*batch;
756 	size_t				i;
757 
758 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx);
759 
760 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page));
761 
762 	/* This loop starts at 1 because the first page is special and handled
763 	 * below. The pages (except the first) are never written in place,
764 	 * so any pages in the clean list must be unmapped.
765 	 */
766 	for (i = 1; i < blob->clean.num_pages; i++) {
767 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
768 
769 		spdk_bs_batch_unmap(batch, lba, lba_count);
770 	}
771 
772 	/* The first page will only be unmapped if this is a delete. */
773 	if (blob->active.num_pages == 0) {
774 		uint32_t page_num;
775 
776 		/* The first page in the metadata goes where the blobid indicates */
777 		page_num = _spdk_bs_blobid_to_page(blob->id);
778 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
779 
780 		spdk_bs_batch_unmap(batch, lba, lba_count);
781 	}
782 
783 	spdk_bs_batch_close(batch);
784 }
785 
786 static void
787 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
788 {
789 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
790 	struct spdk_blob		*blob = ctx->blob;
791 	struct spdk_blob_store		*bs = blob->bs;
792 	uint64_t			lba;
793 	uint32_t			lba_count;
794 	struct spdk_blob_md_page	*page;
795 
796 	if (blob->active.num_pages == 0) {
797 		/* Move on to the next step */
798 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
799 		return;
800 	}
801 
802 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
803 
804 	page = &ctx->pages[0];
805 	/* The first page in the metadata goes where the blobid indicates */
806 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
807 
808 	spdk_bs_sequence_write(seq, page, lba, lba_count,
809 			       _spdk_blob_persist_unmap_pages, ctx);
810 }
811 
812 static void
813 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
814 {
815 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
816 	struct spdk_blob 		*blob = ctx->blob;
817 	struct spdk_blob_store		*bs = blob->bs;
818 	uint64_t 			lba;
819 	uint32_t			lba_count;
820 	struct spdk_blob_md_page	*page;
821 	spdk_bs_batch_t			*batch;
822 	size_t				i;
823 
824 	/* Clusters don't move around in blobs. The list shrinks or grows
825 	 * at the end, but no changes ever occur in the middle of the list.
826 	 */
827 
828 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
829 
830 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
831 
832 	/* This starts at 1. The root page is not written until
833 	 * all of the others are finished
834 	 */
835 	for (i = 1; i < blob->active.num_pages; i++) {
836 		page = &ctx->pages[i];
837 		assert(page->sequence_num == i);
838 
839 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
840 
841 		spdk_bs_batch_write(batch, page, lba, lba_count);
842 	}
843 
844 	spdk_bs_batch_close(batch);
845 }
846 
847 static int
848 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz)
849 {
850 	uint64_t	i;
851 	uint64_t	*tmp;
852 	uint64_t	lfc; /* lowest free cluster */
853 	struct spdk_blob_store *bs;
854 
855 	bs = blob->bs;
856 
857 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
858 	       blob->state != SPDK_BLOB_STATE_SYNCING);
859 
860 	if (blob->active.num_clusters == sz) {
861 		return 0;
862 	}
863 
864 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
865 		/* If this blob was resized to be larger, then smaller, then
866 		 * larger without syncing, then the cluster array already
867 		 * contains spare assigned clusters we can use.
868 		 */
869 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
870 						     sz);
871 	}
872 
873 	blob->state = SPDK_BLOB_STATE_DIRTY;
874 
875 	/* Do two passes - one to verify that we can obtain enough clusters
876 	 * and another to actually claim them.
877 	 */
878 
879 	lfc = 0;
880 	for (i = blob->active.num_clusters; i < sz; i++) {
881 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
882 		if (lfc >= bs->total_clusters) {
883 			/* No more free clusters. Cannot satisfy the request */
884 			assert(false);
885 			return -1;
886 		}
887 		lfc++;
888 	}
889 
890 	if (sz > blob->active.num_clusters) {
891 		/* Expand the cluster array if necessary.
892 		 * We only shrink the array when persisting.
893 		 */
894 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
895 		if (sz > 0 && tmp == NULL) {
896 			assert(false);
897 			return -1;
898 		}
899 		blob->active.clusters = tmp;
900 		blob->active.cluster_array_size = sz;
901 	}
902 
903 	lfc = 0;
904 	for (i = blob->active.num_clusters; i < sz; i++) {
905 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
906 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
907 		_spdk_bs_claim_cluster(bs, lfc);
908 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
909 		lfc++;
910 	}
911 
912 	blob->active.num_clusters = sz;
913 
914 	return 0;
915 }
916 
917 /* Write a blob to disk */
918 static void
919 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
920 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
921 {
922 	struct spdk_blob_persist_ctx *ctx;
923 	int rc;
924 	uint64_t i;
925 	uint32_t page_num;
926 	struct spdk_blob_store *bs;
927 
928 	assert(blob != NULL);
929 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
930 	       blob->state == SPDK_BLOB_STATE_DIRTY);
931 
932 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
933 		cb_fn(seq, cb_arg, 0);
934 		return;
935 	}
936 
937 	bs = blob->bs;
938 
939 	ctx = calloc(1, sizeof(*ctx));
940 	if (!ctx) {
941 		cb_fn(seq, cb_arg, -ENOMEM);
942 		return;
943 	}
944 	ctx->blob = blob;
945 	ctx->cb_fn = cb_fn;
946 	ctx->cb_arg = cb_arg;
947 
948 	blob->state = SPDK_BLOB_STATE_SYNCING;
949 
950 	if (blob->active.num_pages == 0) {
951 		/* This is the signal that the blob should be deleted.
952 		 * Immediately jump to the clean up routine. */
953 		assert(blob->clean.num_pages > 0);
954 		ctx->idx = blob->clean.num_pages - 1;
955 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
956 		return;
957 
958 	}
959 
960 	/* Generate the new metadata */
961 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
962 	if (rc < 0) {
963 		free(ctx);
964 		cb_fn(seq, cb_arg, rc);
965 		return;
966 	}
967 
968 	assert(blob->active.num_pages >= 1);
969 
970 	/* Resize the cache of page indices */
971 	blob->active.pages = realloc(blob->active.pages,
972 				     blob->active.num_pages * sizeof(*blob->active.pages));
973 	if (!blob->active.pages) {
974 		free(ctx);
975 		cb_fn(seq, cb_arg, -ENOMEM);
976 		return;
977 	}
978 
979 	/* Assign this metadata to pages. This requires two passes -
980 	 * one to verify that there are enough pages and a second
981 	 * to actually claim them. */
982 	page_num = 0;
983 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
984 	for (i = 1; i < blob->active.num_pages; i++) {
985 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
986 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
987 			spdk_dma_free(ctx->pages);
988 			free(ctx);
989 			blob->state = SPDK_BLOB_STATE_DIRTY;
990 			cb_fn(seq, cb_arg, -ENOMEM);
991 			return;
992 		}
993 		page_num++;
994 	}
995 
996 	page_num = 0;
997 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
998 	for (i = 1; i < blob->active.num_pages; i++) {
999 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1000 		ctx->pages[i - 1].next = page_num;
1001 		blob->active.pages[i] = page_num;
1002 		spdk_bit_array_set(bs->used_md_pages, page_num);
1003 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1004 		page_num++;
1005 	}
1006 
1007 	/* Start writing the metadata from last page to first */
1008 	ctx->idx = blob->active.num_pages - 1;
1009 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1010 }
1011 
1012 static void
1013 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1014 			     void *payload, uint64_t offset, uint64_t length,
1015 			     spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1016 {
1017 	spdk_bs_batch_t			*batch;
1018 	struct spdk_bs_cpl		cpl;
1019 	uint64_t			lba;
1020 	uint32_t			lba_count;
1021 	uint8_t				*buf;
1022 	uint64_t			page;
1023 
1024 	assert(blob != NULL);
1025 
1026 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1027 		cb_fn(cb_arg, -EINVAL);
1028 		return;
1029 	}
1030 
1031 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1032 	cpl.u.blob_basic.cb_fn = cb_fn;
1033 	cpl.u.blob_basic.cb_arg = cb_arg;
1034 
1035 	batch = spdk_bs_batch_open(_channel, &cpl);
1036 	if (!batch) {
1037 		cb_fn(cb_arg, -ENOMEM);
1038 		return;
1039 	}
1040 
1041 	length = _spdk_bs_page_to_lba(blob->bs, length);
1042 	page = offset;
1043 	buf = payload;
1044 	while (length > 0) {
1045 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1046 		lba_count = spdk_min(length,
1047 				     _spdk_bs_page_to_lba(blob->bs,
1048 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1049 
1050 		if (read) {
1051 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1052 		} else {
1053 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1054 		}
1055 
1056 		length -= lba_count;
1057 		buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1058 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1059 	}
1060 
1061 	spdk_bs_batch_close(batch);
1062 }
1063 
1064 static struct spdk_blob *
1065 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1066 {
1067 	struct spdk_blob *blob;
1068 
1069 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1070 		if (blob->id == blobid) {
1071 			return blob;
1072 		}
1073 	}
1074 
1075 	return NULL;
1076 }
1077 
1078 static int
1079 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel,
1080 			uint32_t max_ops)
1081 {
1082 	struct spdk_bs_dev		*dev;
1083 	uint32_t			i;
1084 
1085 	dev = bs->dev;
1086 
1087 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1088 	if (!channel->req_mem) {
1089 		return -1;
1090 	}
1091 
1092 	TAILQ_INIT(&channel->reqs);
1093 
1094 	for (i = 0; i < max_ops; i++) {
1095 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1096 	}
1097 
1098 	channel->bs = bs;
1099 	channel->dev = dev;
1100 	channel->dev_channel = dev->create_channel(dev);
1101 
1102 	return 0;
1103 }
1104 
1105 static int
1106 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf)
1107 {
1108 	struct spdk_blob_store		*bs;
1109 	struct spdk_bs_channel		*channel = ctx_buf;
1110 
1111 	bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target);
1112 
1113 	return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops);
1114 }
1115 
1116 static int
1117 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf)
1118 {
1119 	struct spdk_blob_store		*bs;
1120 	struct spdk_bs_channel		*channel = ctx_buf;
1121 
1122 	bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target);
1123 
1124 	return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops);
1125 }
1126 
1127 
1128 static void
1129 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1130 {
1131 	struct spdk_bs_channel *channel = ctx_buf;
1132 
1133 	free(channel->req_mem);
1134 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1135 }
1136 
1137 static void
1138 _spdk_bs_free(struct spdk_blob_store *bs)
1139 {
1140 	struct spdk_blob	*blob, *blob_tmp;
1141 
1142 	spdk_bs_unregister_md_thread(bs);
1143 	spdk_io_device_unregister(&bs->io_target);
1144 	spdk_io_device_unregister(&bs->md_target);
1145 
1146 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1147 		TAILQ_REMOVE(&bs->blobs, blob, link);
1148 		_spdk_blob_free(blob);
1149 	}
1150 
1151 	spdk_bit_array_free(&bs->used_md_pages);
1152 	spdk_bit_array_free(&bs->used_clusters);
1153 
1154 	bs->dev->destroy(bs->dev);
1155 	free(bs);
1156 }
1157 
1158 void
1159 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1160 {
1161 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1162 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1163 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1164 	opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS;
1165 }
1166 
1167 static struct spdk_blob_store *
1168 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1169 {
1170 	struct spdk_blob_store	*bs;
1171 
1172 	bs = calloc(1, sizeof(struct spdk_blob_store));
1173 	if (!bs) {
1174 		return NULL;
1175 	}
1176 
1177 	TAILQ_INIT(&bs->blobs);
1178 	bs->dev = dev;
1179 
1180 	/*
1181 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1182 	 *  even multiple of the cluster size.
1183 	 */
1184 	bs->cluster_sz = opts->cluster_sz;
1185 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1186 	bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1187 	bs->num_free_clusters = bs->total_clusters;
1188 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1189 	if (bs->used_clusters == NULL) {
1190 		_spdk_bs_free(bs);
1191 		return NULL;
1192 	}
1193 
1194 	bs->md_target.max_md_ops = opts->max_md_ops;
1195 	bs->io_target.max_channel_ops = opts->max_channel_ops;
1196 	bs->super_blob = SPDK_BLOBID_INVALID;
1197 
1198 	/* The metadata is assumed to be at least 1 page */
1199 	bs->used_md_pages = spdk_bit_array_create(1);
1200 
1201 	spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy,
1202 				sizeof(struct spdk_bs_channel));
1203 	spdk_bs_register_md_thread(bs);
1204 
1205 	spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy,
1206 				sizeof(struct spdk_bs_channel));
1207 
1208 	return bs;
1209 }
1210 
1211 /* START spdk_bs_load */
1212 
1213 struct spdk_bs_load_ctx {
1214 	struct spdk_blob_store		*bs;
1215 	struct spdk_bs_super_block	*super;
1216 
1217 	struct spdk_bs_md_mask		*mask;
1218 };
1219 
1220 static void
1221 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1222 {
1223 	struct spdk_bs_load_ctx *ctx = cb_arg;
1224 	uint32_t		i, j;
1225 	int			rc;
1226 
1227 	/* The type must be correct */
1228 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1229 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1230 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1231 					     struct spdk_blob_md_page) * 8));
1232 	/* The length of the mask must be exactly equal to the total number of clusters */
1233 	assert(ctx->mask->length == ctx->bs->total_clusters);
1234 
1235 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1236 	if (rc < 0) {
1237 		spdk_dma_free(ctx->super);
1238 		spdk_dma_free(ctx->mask);
1239 		_spdk_bs_free(ctx->bs);
1240 		free(ctx);
1241 		spdk_bs_sequence_finish(seq, -ENOMEM);
1242 		return;
1243 	}
1244 
1245 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1246 	for (i = 0; i < ctx->mask->length / 8; i++) {
1247 		uint8_t segment = ctx->mask->mask[i];
1248 		for (j = 0; segment && (j < 8); j++) {
1249 			if (segment & 1U) {
1250 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1251 				assert(ctx->bs->num_free_clusters > 0);
1252 				ctx->bs->num_free_clusters--;
1253 			}
1254 			segment >>= 1U;
1255 		}
1256 	}
1257 
1258 	spdk_dma_free(ctx->super);
1259 	spdk_dma_free(ctx->mask);
1260 	free(ctx);
1261 
1262 	spdk_bs_sequence_finish(seq, bserrno);
1263 }
1264 
1265 static void
1266 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1267 {
1268 	struct spdk_bs_load_ctx *ctx = cb_arg;
1269 	uint64_t		lba, lba_count, mask_size;
1270 	uint32_t		i, j;
1271 	int			rc;
1272 
1273 	/* The type must be correct */
1274 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1275 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1276 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) *
1277 				     8));
1278 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1279 	assert(ctx->mask->length == ctx->super->md_len);
1280 
1281 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1282 	if (rc < 0) {
1283 		spdk_dma_free(ctx->super);
1284 		spdk_dma_free(ctx->mask);
1285 		_spdk_bs_free(ctx->bs);
1286 		free(ctx);
1287 		spdk_bs_sequence_finish(seq, -ENOMEM);
1288 		return;
1289 	}
1290 
1291 	for (i = 0; i < ctx->mask->length / 8; i++) {
1292 		uint8_t segment = ctx->mask->mask[i];
1293 		for (j = 0; segment && (j < 8); j++) {
1294 			if (segment & 1U) {
1295 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1296 			}
1297 			segment >>= 1U;
1298 		}
1299 	}
1300 	spdk_dma_free(ctx->mask);
1301 
1302 	/* Read the used clusters mask */
1303 	mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page);
1304 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1305 	if (!ctx->mask) {
1306 		spdk_dma_free(ctx->super);
1307 		_spdk_bs_free(ctx->bs);
1308 		free(ctx);
1309 		spdk_bs_sequence_finish(seq, -ENOMEM);
1310 		return;
1311 	}
1312 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1313 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1314 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1315 			      _spdk_bs_load_used_clusters_cpl, ctx);
1316 }
1317 
1318 static void
1319 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1320 {
1321 	struct spdk_bs_load_ctx *ctx = cb_arg;
1322 	uint64_t		lba, lba_count, mask_size;
1323 
1324 	if (ctx->super->version != SPDK_BS_VERSION) {
1325 		spdk_dma_free(ctx->super);
1326 		_spdk_bs_free(ctx->bs);
1327 		free(ctx);
1328 		spdk_bs_sequence_finish(seq, -EILSEQ);
1329 		return;
1330 	}
1331 
1332 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1333 		   sizeof(ctx->super->signature)) != 0) {
1334 		spdk_dma_free(ctx->super);
1335 		_spdk_bs_free(ctx->bs);
1336 		free(ctx);
1337 		spdk_bs_sequence_finish(seq, -EILSEQ);
1338 		return;
1339 	}
1340 
1341 	if (ctx->super->clean != 1) {
1342 		/* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED.
1343 		 * All of the necessary data to recover is available
1344 		 * on disk - the code just has not been written yet.
1345 		 */
1346 		assert(false);
1347 		spdk_dma_free(ctx->super);
1348 		_spdk_bs_free(ctx->bs);
1349 		free(ctx);
1350 		spdk_bs_sequence_finish(seq, -EILSEQ);
1351 		return;
1352 	}
1353 	ctx->super->clean = 0;
1354 
1355 	/* Parse the super block */
1356 	ctx->bs->cluster_sz = ctx->super->cluster_size;
1357 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
1358 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1359 	ctx->bs->md_start = ctx->super->md_start;
1360 	ctx->bs->md_len = ctx->super->md_len;
1361 
1362 	/* Read the used pages mask */
1363 	mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page);
1364 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1365 	if (!ctx->mask) {
1366 		spdk_dma_free(ctx->super);
1367 		_spdk_bs_free(ctx->bs);
1368 		free(ctx);
1369 		spdk_bs_sequence_finish(seq, -ENOMEM);
1370 		return;
1371 	}
1372 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1373 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1374 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1375 			      _spdk_bs_load_used_pages_cpl, ctx);
1376 }
1377 
1378 void
1379 spdk_bs_load(struct spdk_bs_dev *dev,
1380 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1381 {
1382 	struct spdk_blob_store	*bs;
1383 	struct spdk_bs_cpl	cpl;
1384 	spdk_bs_sequence_t	*seq;
1385 	struct spdk_bs_load_ctx *ctx;
1386 	struct spdk_bs_opts	opts = {};
1387 
1388 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev);
1389 
1390 	spdk_bs_opts_init(&opts);
1391 
1392 	bs = _spdk_bs_alloc(dev, &opts);
1393 	if (!bs) {
1394 		cb_fn(cb_arg, NULL, -ENOMEM);
1395 		return;
1396 	}
1397 
1398 	ctx = calloc(1, sizeof(*ctx));
1399 	if (!ctx) {
1400 		_spdk_bs_free(bs);
1401 		cb_fn(cb_arg, NULL, -ENOMEM);
1402 		return;
1403 	}
1404 
1405 	ctx->bs = bs;
1406 
1407 	/* Allocate memory for the super block */
1408 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1409 	if (!ctx->super) {
1410 		free(ctx);
1411 		_spdk_bs_free(bs);
1412 		return;
1413 	}
1414 
1415 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1416 	cpl.u.bs_handle.cb_fn = cb_fn;
1417 	cpl.u.bs_handle.cb_arg = cb_arg;
1418 	cpl.u.bs_handle.bs = bs;
1419 
1420 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1421 	if (!seq) {
1422 		spdk_dma_free(ctx->super);
1423 		free(ctx);
1424 		_spdk_bs_free(bs);
1425 		cb_fn(cb_arg, NULL, -ENOMEM);
1426 		return;
1427 	}
1428 
1429 	/* Read the super block */
1430 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1431 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1432 			      _spdk_bs_load_super_cpl, ctx);
1433 }
1434 
1435 /* END spdk_bs_load */
1436 
1437 /* START spdk_bs_init */
1438 
1439 struct spdk_bs_init_ctx {
1440 	struct spdk_blob_store		*bs;
1441 	struct spdk_bs_super_block	*super;
1442 };
1443 
1444 static void
1445 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1446 {
1447 	struct spdk_bs_init_ctx *ctx = cb_arg;
1448 
1449 	spdk_dma_free(ctx->super);
1450 	free(ctx);
1451 
1452 	spdk_bs_sequence_finish(seq, bserrno);
1453 }
1454 
1455 static void
1456 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1457 {
1458 	struct spdk_bs_init_ctx *ctx = cb_arg;
1459 
1460 	/* Write super block */
1461 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1462 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1463 			       _spdk_bs_init_persist_super_cpl, ctx);
1464 }
1465 
1466 void
1467 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
1468 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1469 {
1470 	struct spdk_bs_init_ctx *ctx;
1471 	struct spdk_blob_store	*bs;
1472 	struct spdk_bs_cpl	cpl;
1473 	spdk_bs_sequence_t	*seq;
1474 	uint64_t		num_md_pages;
1475 	uint32_t		i;
1476 	struct spdk_bs_opts	opts = {};
1477 	int			rc;
1478 
1479 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev);
1480 
1481 	if (o) {
1482 		opts = *o;
1483 	} else {
1484 		spdk_bs_opts_init(&opts);
1485 	}
1486 
1487 	bs = _spdk_bs_alloc(dev, &opts);
1488 	if (!bs) {
1489 		cb_fn(cb_arg, NULL, -ENOMEM);
1490 		return;
1491 	}
1492 
1493 	if (opts.num_md_pages == UINT32_MAX) {
1494 		/* By default, allocate 1 page per cluster.
1495 		 * Technically, this over-allocates metadata
1496 		 * because more metadata will reduce the number
1497 		 * of usable clusters. This can be addressed with
1498 		 * more complex math in the future.
1499 		 */
1500 		bs->md_len = bs->total_clusters;
1501 	} else {
1502 		bs->md_len = opts.num_md_pages;
1503 	}
1504 
1505 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
1506 	if (rc < 0) {
1507 		_spdk_bs_free(bs);
1508 		cb_fn(cb_arg, NULL, -ENOMEM);
1509 		return;
1510 	}
1511 
1512 	ctx = calloc(1, sizeof(*ctx));
1513 	if (!ctx) {
1514 		_spdk_bs_free(bs);
1515 		cb_fn(cb_arg, NULL, -ENOMEM);
1516 		return;
1517 	}
1518 
1519 	ctx->bs = bs;
1520 
1521 	/* Allocate memory for the super block */
1522 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1523 	if (!ctx->super) {
1524 		free(ctx);
1525 		_spdk_bs_free(bs);
1526 		return;
1527 	}
1528 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1529 	       sizeof(ctx->super->signature));
1530 	ctx->super->version = SPDK_BS_VERSION;
1531 	ctx->super->length = sizeof(*ctx->super);
1532 	ctx->super->super_blob = bs->super_blob;
1533 	ctx->super->clean = 0;
1534 	ctx->super->cluster_size = bs->cluster_sz;
1535 
1536 	/* Calculate how many pages the metadata consumes at the front
1537 	 * of the disk.
1538 	 */
1539 
1540 	/* The super block uses 1 page */
1541 	num_md_pages = 1;
1542 
1543 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
1544 	 * up to the nearest page, plus a header.
1545 	 */
1546 	ctx->super->used_page_mask_start = num_md_pages;
1547 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1548 					 divide_round_up(bs->md_len, 8),
1549 					 sizeof(struct spdk_blob_md_page));
1550 	num_md_pages += ctx->super->used_page_mask_len;
1551 
1552 	/* The used_clusters mask requires 1 bit per cluster, rounded
1553 	 * up to the nearest page, plus a header.
1554 	 */
1555 	ctx->super->used_cluster_mask_start = num_md_pages;
1556 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1557 					    divide_round_up(bs->total_clusters, 8),
1558 					    sizeof(struct spdk_blob_md_page));
1559 	num_md_pages += ctx->super->used_cluster_mask_len;
1560 
1561 	/* The metadata region size was chosen above */
1562 	ctx->super->md_start = bs->md_start = num_md_pages;
1563 	ctx->super->md_len = bs->md_len;
1564 	num_md_pages += bs->md_len;
1565 
1566 	/* Claim all of the clusters used by the metadata */
1567 	for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) {
1568 		_spdk_bs_claim_cluster(bs, i);
1569 	}
1570 
1571 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1572 	cpl.u.bs_handle.cb_fn = cb_fn;
1573 	cpl.u.bs_handle.cb_arg = cb_arg;
1574 	cpl.u.bs_handle.bs = bs;
1575 
1576 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1577 	if (!seq) {
1578 		spdk_dma_free(ctx->super);
1579 		free(ctx);
1580 		_spdk_bs_free(bs);
1581 		cb_fn(cb_arg, NULL, -ENOMEM);
1582 		return;
1583 	}
1584 
1585 	/* TRIM the entire device */
1586 	spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx);
1587 }
1588 
1589 /* END spdk_bs_init */
1590 
1591 /* START spdk_bs_unload */
1592 
1593 struct spdk_bs_unload_ctx {
1594 	struct spdk_blob_store		*bs;
1595 	struct spdk_bs_super_block	*super;
1596 
1597 	struct spdk_bs_md_mask		*mask;
1598 };
1599 
1600 static void
1601 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1602 {
1603 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1604 
1605 	spdk_dma_free(ctx->super);
1606 
1607 	spdk_bs_sequence_finish(seq, bserrno);
1608 
1609 	_spdk_bs_free(ctx->bs);
1610 	free(ctx);
1611 }
1612 
1613 static void
1614 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1615 {
1616 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1617 
1618 	spdk_dma_free(ctx->mask);
1619 
1620 	/* Update the values in the super block */
1621 	ctx->super->super_blob = ctx->bs->super_blob;
1622 	ctx->super->clean = 1;
1623 
1624 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1625 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1626 			       _spdk_bs_unload_write_super_cpl, ctx);
1627 }
1628 
1629 static void
1630 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1631 {
1632 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1633 	uint32_t			i;
1634 	uint64_t			lba, lba_count, mask_size;
1635 
1636 	spdk_dma_free(ctx->mask);
1637 
1638 	/* Write out the used clusters mask */
1639 	mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page);
1640 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1641 	if (!ctx->mask) {
1642 		spdk_dma_free(ctx->super);
1643 		free(ctx);
1644 		spdk_bs_sequence_finish(seq, -ENOMEM);
1645 		return;
1646 	}
1647 
1648 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1649 	ctx->mask->length = ctx->bs->total_clusters;
1650 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1651 
1652 	i = 0;
1653 	while (true) {
1654 		i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i);
1655 		if (i > ctx->mask->length) {
1656 			break;
1657 		}
1658 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1659 		i++;
1660 	}
1661 
1662 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1663 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1664 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1665 			       _spdk_bs_unload_write_used_clusters_cpl, ctx);
1666 }
1667 
1668 static void
1669 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1670 {
1671 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1672 	uint32_t			i;
1673 	uint64_t			lba, lba_count, mask_size;
1674 
1675 	/* Write out the used page mask */
1676 	mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page);
1677 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1678 	if (!ctx->mask) {
1679 		spdk_dma_free(ctx->super);
1680 		free(ctx);
1681 		spdk_bs_sequence_finish(seq, -ENOMEM);
1682 		return;
1683 	}
1684 
1685 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1686 	ctx->mask->length = ctx->super->md_len;
1687 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1688 
1689 	i = 0;
1690 	while (true) {
1691 		i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i);
1692 		if (i > ctx->mask->length) {
1693 			break;
1694 		}
1695 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1696 		i++;
1697 	}
1698 
1699 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1700 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1701 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1702 			       _spdk_bs_unload_write_used_pages_cpl, ctx);
1703 }
1704 
1705 void
1706 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
1707 {
1708 	struct spdk_bs_cpl	cpl;
1709 	spdk_bs_sequence_t	*seq;
1710 	struct spdk_bs_unload_ctx *ctx;
1711 
1712 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n");
1713 
1714 	ctx = calloc(1, sizeof(*ctx));
1715 	if (!ctx) {
1716 		cb_fn(cb_arg, -ENOMEM);
1717 		return;
1718 	}
1719 
1720 	ctx->bs = bs;
1721 
1722 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1723 	if (!ctx->super) {
1724 		free(ctx);
1725 		cb_fn(cb_arg, -ENOMEM);
1726 		return;
1727 	}
1728 
1729 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
1730 	cpl.u.bs_basic.cb_fn = cb_fn;
1731 	cpl.u.bs_basic.cb_arg = cb_arg;
1732 
1733 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1734 	if (!seq) {
1735 		spdk_dma_free(ctx->super);
1736 		free(ctx);
1737 		cb_fn(cb_arg, -ENOMEM);
1738 		return;
1739 	}
1740 
1741 	assert(TAILQ_EMPTY(&bs->blobs));
1742 
1743 	/* Read super block */
1744 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1745 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1746 			      _spdk_bs_unload_read_super_cpl, ctx);
1747 }
1748 
1749 /* END spdk_bs_unload */
1750 
1751 void
1752 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
1753 		  spdk_bs_op_complete cb_fn, void *cb_arg)
1754 {
1755 	bs->super_blob = blobid;
1756 	cb_fn(cb_arg, 0);
1757 }
1758 
1759 void
1760 spdk_bs_get_super(struct spdk_blob_store *bs,
1761 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1762 {
1763 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
1764 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
1765 	} else {
1766 		cb_fn(cb_arg, bs->super_blob, 0);
1767 	}
1768 }
1769 
1770 uint64_t
1771 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
1772 {
1773 	return bs->cluster_sz;
1774 }
1775 
1776 uint64_t
1777 spdk_bs_get_page_size(struct spdk_blob_store *bs)
1778 {
1779 	return sizeof(struct spdk_blob_md_page);
1780 }
1781 
1782 uint64_t
1783 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
1784 {
1785 	return bs->num_free_clusters;
1786 }
1787 
1788 int spdk_bs_register_md_thread(struct spdk_blob_store *bs)
1789 {
1790 	bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target);
1791 
1792 	return 0;
1793 }
1794 
1795 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
1796 {
1797 	spdk_put_io_channel(bs->md_target.md_channel);
1798 
1799 	return 0;
1800 }
1801 
1802 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
1803 {
1804 	assert(blob != NULL);
1805 
1806 	return blob->id;
1807 }
1808 
1809 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
1810 {
1811 	assert(blob != NULL);
1812 
1813 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
1814 }
1815 
1816 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
1817 {
1818 	assert(blob != NULL);
1819 
1820 	return blob->active.num_clusters;
1821 }
1822 
1823 /* START spdk_bs_md_create_blob */
1824 
1825 static void
1826 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1827 {
1828 	struct spdk_blob *blob = cb_arg;
1829 
1830 	_spdk_blob_free(blob);
1831 
1832 	spdk_bs_sequence_finish(seq, bserrno);
1833 }
1834 
1835 void spdk_bs_md_create_blob(struct spdk_blob_store *bs,
1836 			    spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1837 {
1838 	struct spdk_blob	*blob;
1839 	uint32_t		page_idx;
1840 	struct spdk_bs_cpl 	cpl;
1841 	spdk_bs_sequence_t	*seq;
1842 	spdk_blob_id		id;
1843 
1844 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
1845 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
1846 		cb_fn(cb_arg, 0, -ENOMEM);
1847 		return;
1848 	}
1849 	spdk_bit_array_set(bs->used_md_pages, page_idx);
1850 
1851 	/* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper
1852 	 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the
1853 	 * code assumes blob id == page_idx.
1854 	 */
1855 	id = (1ULL << 32) | page_idx;
1856 
1857 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
1858 
1859 	blob = _spdk_blob_alloc(bs, id);
1860 	if (!blob) {
1861 		cb_fn(cb_arg, 0, -ENOMEM);
1862 		return;
1863 	}
1864 
1865 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
1866 	cpl.u.blobid.cb_fn = cb_fn;
1867 	cpl.u.blobid.cb_arg = cb_arg;
1868 	cpl.u.blobid.blobid = blob->id;
1869 
1870 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1871 	if (!seq) {
1872 		_spdk_blob_free(blob);
1873 		cb_fn(cb_arg, 0, -ENOMEM);
1874 		return;
1875 	}
1876 
1877 	_spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob);
1878 }
1879 
1880 /* END spdk_bs_md_create_blob */
1881 
1882 /* START spdk_bs_md_resize_blob */
1883 int
1884 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz)
1885 {
1886 	int			rc;
1887 
1888 	assert(blob != NULL);
1889 
1890 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
1891 
1892 	if (sz == blob->active.num_clusters) {
1893 		return 0;
1894 	}
1895 
1896 	rc = _spdk_resize_blob(blob, sz);
1897 	if (rc < 0) {
1898 		return rc;
1899 	}
1900 
1901 	return 0;
1902 }
1903 
1904 /* END spdk_bs_md_resize_blob */
1905 
1906 
1907 /* START spdk_bs_md_delete_blob */
1908 
1909 static void
1910 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1911 {
1912 	struct spdk_blob *blob = cb_arg;
1913 
1914 	_spdk_blob_free(blob);
1915 
1916 	spdk_bs_sequence_finish(seq, bserrno);
1917 }
1918 
1919 static void
1920 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1921 {
1922 	struct spdk_blob *blob = cb_arg;
1923 
1924 	blob->state = SPDK_BLOB_STATE_DIRTY;
1925 	blob->active.num_pages = 0;
1926 	_spdk_resize_blob(blob, 0);
1927 
1928 	_spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob);
1929 }
1930 
1931 void
1932 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1933 		       spdk_blob_op_complete cb_fn, void *cb_arg)
1934 {
1935 	struct spdk_blob	*blob;
1936 	struct spdk_bs_cpl	cpl;
1937 	spdk_bs_sequence_t 	*seq;
1938 
1939 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid);
1940 
1941 	blob = _spdk_blob_lookup(bs, blobid);
1942 	if (blob) {
1943 		assert(blob->open_ref > 0);
1944 		cb_fn(cb_arg, -EINVAL);
1945 		return;
1946 	}
1947 
1948 	blob = _spdk_blob_alloc(bs, blobid);
1949 	if (!blob) {
1950 		cb_fn(cb_arg, -ENOMEM);
1951 		return;
1952 	}
1953 
1954 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1955 	cpl.u.blob_basic.cb_fn = cb_fn;
1956 	cpl.u.blob_basic.cb_arg = cb_arg;
1957 
1958 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1959 	if (!seq) {
1960 		_spdk_blob_free(blob);
1961 		cb_fn(cb_arg, -ENOMEM);
1962 		return;
1963 	}
1964 
1965 	_spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob);
1966 }
1967 
1968 /* END spdk_bs_md_delete_blob */
1969 
1970 /* START spdk_bs_md_open_blob */
1971 
1972 static void
1973 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1974 {
1975 	struct spdk_blob *blob = cb_arg;
1976 
1977 	blob->open_ref++;
1978 
1979 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
1980 
1981 	spdk_bs_sequence_finish(seq, bserrno);
1982 }
1983 
1984 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1985 			  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
1986 {
1987 	struct spdk_blob		*blob;
1988 	struct spdk_bs_cpl		cpl;
1989 	spdk_bs_sequence_t		*seq;
1990 	uint32_t			page_num;
1991 
1992 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid);
1993 
1994 	blob = _spdk_blob_lookup(bs, blobid);
1995 	if (blob) {
1996 		blob->open_ref++;
1997 		cb_fn(cb_arg, blob, 0);
1998 		return;
1999 	}
2000 
2001 	page_num = _spdk_bs_blobid_to_page(blobid);
2002 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
2003 		/* Invalid blobid */
2004 		cb_fn(cb_arg, NULL, -ENOENT);
2005 		return;
2006 	}
2007 
2008 	blob = _spdk_blob_alloc(bs, blobid);
2009 	if (!blob) {
2010 		cb_fn(cb_arg, NULL, -ENOMEM);
2011 		return;
2012 	}
2013 
2014 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
2015 	cpl.u.blob_handle.cb_fn = cb_fn;
2016 	cpl.u.blob_handle.cb_arg = cb_arg;
2017 	cpl.u.blob_handle.blob = blob;
2018 
2019 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2020 	if (!seq) {
2021 		_spdk_blob_free(blob);
2022 		cb_fn(cb_arg, NULL, -ENOMEM);
2023 		return;
2024 	}
2025 
2026 	_spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob);
2027 }
2028 
2029 /* START spdk_bs_md_sync_blob */
2030 static void
2031 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2032 {
2033 	spdk_bs_sequence_finish(seq, bserrno);
2034 }
2035 
2036 void spdk_bs_md_sync_blob(struct spdk_blob *blob,
2037 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2038 {
2039 	struct spdk_bs_cpl	cpl;
2040 	spdk_bs_sequence_t	*seq;
2041 
2042 	assert(blob != NULL);
2043 
2044 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id);
2045 
2046 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2047 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2048 
2049 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2050 		cb_fn(cb_arg, 0);
2051 		return;
2052 	}
2053 
2054 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2055 	cpl.u.blob_basic.cb_fn = cb_fn;
2056 	cpl.u.blob_basic.cb_arg = cb_arg;
2057 
2058 	seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl);
2059 	if (!seq) {
2060 		cb_fn(cb_arg, -ENOMEM);
2061 		return;
2062 	}
2063 
2064 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob);
2065 }
2066 
2067 /* END spdk_bs_md_sync_blob */
2068 
2069 /* START spdk_bs_md_close_blob */
2070 
2071 static void
2072 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2073 {
2074 	struct spdk_blob **blob = cb_arg;
2075 
2076 	if ((*blob)->open_ref == 0) {
2077 		TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link);
2078 		_spdk_blob_free((*blob));
2079 	}
2080 
2081 	*blob = NULL;
2082 
2083 	spdk_bs_sequence_finish(seq, bserrno);
2084 }
2085 
2086 void spdk_bs_md_close_blob(struct spdk_blob **b,
2087 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2088 {
2089 	struct spdk_bs_cpl	cpl;
2090 	struct spdk_blob	*blob;
2091 	spdk_bs_sequence_t	*seq;
2092 
2093 	assert(b != NULL);
2094 	blob = *b;
2095 	assert(blob != NULL);
2096 
2097 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id);
2098 
2099 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2100 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2101 
2102 	if (blob->open_ref == 0) {
2103 		cb_fn(cb_arg, -EBADF);
2104 		return;
2105 	}
2106 
2107 	blob->open_ref--;
2108 
2109 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2110 	cpl.u.blob_basic.cb_fn = cb_fn;
2111 	cpl.u.blob_basic.cb_arg = cb_arg;
2112 
2113 	seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl);
2114 	if (!seq) {
2115 		cb_fn(cb_arg, -ENOMEM);
2116 		return;
2117 	}
2118 
2119 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2120 		_spdk_blob_close_cpl(seq, b, 0);
2121 		return;
2122 	}
2123 
2124 	/* Sync metadata */
2125 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b);
2126 }
2127 
2128 /* END spdk_bs_md_close_blob */
2129 
2130 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
2131 {
2132 	return spdk_get_io_channel(&bs->io_target);
2133 }
2134 
2135 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2136 {
2137 	spdk_put_io_channel(channel);
2138 }
2139 
2140 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel,
2141 			      spdk_blob_op_complete cb_fn, void *cb_arg)
2142 {
2143 	/* Flush is synchronous right now */
2144 	cb_fn(cb_arg, 0);
2145 }
2146 
2147 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2148 			   void *payload, uint64_t offset, uint64_t length,
2149 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2150 {
2151 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false);
2152 }
2153 
2154 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2155 			  void *payload, uint64_t offset, uint64_t length,
2156 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2157 {
2158 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true);
2159 }
2160 
2161 struct spdk_bs_iter_ctx {
2162 	int64_t page_num;
2163 	struct spdk_blob_store *bs;
2164 
2165 	spdk_blob_op_with_handle_complete cb_fn;
2166 	void *cb_arg;
2167 };
2168 
2169 static void
2170 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
2171 {
2172 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2173 	struct spdk_blob_store *bs = ctx->bs;
2174 	spdk_blob_id id;
2175 
2176 	if (bserrno == 0) {
2177 		ctx->cb_fn(ctx->cb_arg, blob, bserrno);
2178 		free(ctx);
2179 		return;
2180 	}
2181 
2182 	ctx->page_num++;
2183 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2184 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2185 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2186 		free(ctx);
2187 		return;
2188 	}
2189 
2190 	id = (1ULL << 32) | ctx->page_num;
2191 
2192 	blob = _spdk_blob_lookup(bs, id);
2193 	if (blob) {
2194 		blob->open_ref++;
2195 		ctx->cb_fn(ctx->cb_arg, blob, 0);
2196 		free(ctx);
2197 		return;
2198 	}
2199 
2200 	spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
2201 }
2202 
2203 void
2204 spdk_bs_md_iter_first(struct spdk_blob_store *bs,
2205 		      spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2206 {
2207 	struct spdk_bs_iter_ctx *ctx;
2208 
2209 	ctx = calloc(1, sizeof(*ctx));
2210 	if (!ctx) {
2211 		cb_fn(cb_arg, NULL, -ENOMEM);
2212 		return;
2213 	}
2214 
2215 	ctx->page_num = -1;
2216 	ctx->bs = bs;
2217 	ctx->cb_fn = cb_fn;
2218 	ctx->cb_arg = cb_arg;
2219 
2220 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2221 }
2222 
2223 static void
2224 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
2225 {
2226 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2227 
2228 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2229 }
2230 
2231 void
2232 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
2233 		     spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2234 {
2235 	struct spdk_bs_iter_ctx *ctx;
2236 	struct spdk_blob	*blob;
2237 
2238 	assert(b != NULL);
2239 	blob = *b;
2240 	assert(blob != NULL);
2241 
2242 	ctx = calloc(1, sizeof(*ctx));
2243 	if (!ctx) {
2244 		cb_fn(cb_arg, NULL, -ENOMEM);
2245 		return;
2246 	}
2247 
2248 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
2249 	ctx->bs = bs;
2250 	ctx->cb_fn = cb_fn;
2251 	ctx->cb_arg = cb_arg;
2252 
2253 	/* Close the existing blob */
2254 	spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx);
2255 }
2256 
2257 int
2258 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
2259 		       uint16_t value_len)
2260 {
2261 	struct spdk_xattr 	*xattr;
2262 
2263 	assert(blob != NULL);
2264 
2265 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2266 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2267 
2268 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2269 		if (!strcmp(name, xattr->name)) {
2270 			free(xattr->value);
2271 			xattr->value_len = value_len;
2272 			xattr->value = malloc(value_len);
2273 			memcpy(xattr->value, value, value_len);
2274 
2275 			blob->state = SPDK_BLOB_STATE_DIRTY;
2276 
2277 			return 0;
2278 		}
2279 	}
2280 
2281 	xattr = calloc(1, sizeof(*xattr));
2282 	if (!xattr) {
2283 		return -1;
2284 	}
2285 	xattr->name = strdup(name);
2286 	xattr->value_len = value_len;
2287 	xattr->value = malloc(value_len);
2288 	memcpy(xattr->value, value, value_len);
2289 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
2290 
2291 	blob->state = SPDK_BLOB_STATE_DIRTY;
2292 
2293 	return 0;
2294 }
2295 
2296 int
2297 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name)
2298 {
2299 	struct spdk_xattr	*xattr;
2300 
2301 	assert(blob != NULL);
2302 
2303 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2304 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2305 
2306 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2307 		if (!strcmp(name, xattr->name)) {
2308 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
2309 			free(xattr->value);
2310 			free(xattr->name);
2311 			free(xattr);
2312 
2313 			blob->state = SPDK_BLOB_STATE_DIRTY;
2314 
2315 			return 0;
2316 		}
2317 	}
2318 
2319 	return -ENOENT;
2320 }
2321 
2322 int
2323 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name,
2324 			   const void **value, size_t *value_len)
2325 {
2326 	struct spdk_xattr	*xattr;
2327 
2328 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2329 		if (!strcmp(name, xattr->name)) {
2330 			*value = xattr->value;
2331 			*value_len = xattr->value_len;
2332 			return 0;
2333 		}
2334 	}
2335 
2336 	return -ENOENT;
2337 }
2338 
2339 struct spdk_xattr_names {
2340 	uint32_t	count;
2341 	const char	*names[0];
2342 };
2343 
2344 int
2345 spdk_bs_md_get_xattr_names(struct spdk_blob *blob,
2346 			   struct spdk_xattr_names **names)
2347 {
2348 	struct spdk_xattr	*xattr;
2349 	int			count = 0;
2350 
2351 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2352 		count++;
2353 	}
2354 
2355 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
2356 	if (*names == NULL) {
2357 		return -ENOMEM;
2358 	}
2359 
2360 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2361 		(*names)->names[(*names)->count++] = xattr->name;
2362 	}
2363 
2364 	return 0;
2365 }
2366 
2367 uint32_t
2368 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
2369 {
2370 	assert(names != NULL);
2371 
2372 	return names->count;
2373 }
2374 
2375 const char *
2376 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
2377 {
2378 	if (index >= names->count) {
2379 		return NULL;
2380 	}
2381 
2382 	return names->names[index];
2383 }
2384 
2385 void
2386 spdk_xattr_names_free(struct spdk_xattr_names *names)
2387 {
2388 	free(names);
2389 }
2390 
2391 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB);
2392