xref: /spdk/lib/blob/blobstore.c (revision 08e6f94226aed509e984e296e88769fd8c10cd24)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <stdbool.h>
35 #include <assert.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <stdlib.h>
39 #include <string.h>
40 
41 #include "spdk/blob.h"
42 #include "spdk/env.h"
43 #include "spdk/queue.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/bit_array.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include "blobstore.h"
50 #include "request.h"
51 
52 static inline size_t
53 divide_round_up(size_t num, size_t divisor)
54 {
55 	return (num + divisor - 1) / divisor;
56 }
57 
58 static void
59 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
60 {
61 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
62 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
63 	assert(bs->num_free_clusters > 0);
64 
65 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num);
66 
67 	spdk_bit_array_set(bs->used_clusters, cluster_num);
68 	bs->num_free_clusters--;
69 }
70 
71 static void
72 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
73 {
74 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
75 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
76 	assert(bs->num_free_clusters < bs->total_clusters);
77 
78 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num);
79 
80 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
81 	bs->num_free_clusters++;
82 }
83 
84 static struct spdk_blob *
85 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
86 {
87 	struct spdk_blob *blob;
88 
89 	blob = calloc(1, sizeof(*blob));
90 	if (!blob) {
91 		return NULL;
92 	}
93 
94 	blob->id = id;
95 	blob->bs = bs;
96 
97 	blob->state = SPDK_BLOB_STATE_DIRTY;
98 	blob->active.num_pages = 1;
99 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
100 	if (!blob->active.pages) {
101 		free(blob);
102 		return NULL;
103 	}
104 
105 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
106 
107 	TAILQ_INIT(&blob->xattrs);
108 
109 	return blob;
110 }
111 
112 static void
113 _spdk_blob_free(struct spdk_blob *blob)
114 {
115 	struct spdk_xattr 	*xattr, *xattr_tmp;
116 
117 	assert(blob != NULL);
118 
119 	free(blob->active.clusters);
120 	free(blob->clean.clusters);
121 	free(blob->active.pages);
122 	free(blob->clean.pages);
123 
124 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
125 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
126 		free(xattr->name);
127 		free(xattr->value);
128 		free(xattr);
129 	}
130 
131 	free(blob);
132 }
133 
134 static int
135 _spdk_blob_mark_clean(struct spdk_blob *blob)
136 {
137 	uint64_t *clusters = NULL;
138 	uint32_t *pages = NULL;
139 
140 	assert(blob != NULL);
141 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
142 	       blob->state == SPDK_BLOB_STATE_SYNCING);
143 
144 	if (blob->active.num_clusters) {
145 		assert(blob->active.clusters);
146 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
147 		if (!clusters) {
148 			return -1;
149 		}
150 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
151 	}
152 
153 	if (blob->active.num_pages) {
154 		assert(blob->active.pages);
155 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
156 		if (!pages) {
157 			free(clusters);
158 			return -1;
159 		}
160 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
161 	}
162 
163 	free(blob->clean.clusters);
164 	free(blob->clean.pages);
165 
166 	blob->clean.num_clusters = blob->active.num_clusters;
167 	blob->clean.clusters = blob->active.clusters;
168 	blob->clean.num_pages = blob->active.num_pages;
169 	blob->clean.pages = blob->active.pages;
170 
171 	blob->active.clusters = clusters;
172 	blob->active.pages = pages;
173 
174 	blob->state = SPDK_BLOB_STATE_CLEAN;
175 
176 	return 0;
177 }
178 
179 static void
180 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
181 {
182 	struct spdk_blob_md_descriptor *desc;
183 	size_t	cur_desc = 0;
184 	void *tmp;
185 
186 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
187 	while (cur_desc < sizeof(page->descriptors)) {
188 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
189 			if (desc->length == 0) {
190 				/* If padding and length are 0, this terminates the page */
191 				break;
192 			}
193 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
194 			struct spdk_blob_md_descriptor_extent	*desc_extent;
195 			unsigned int				i, j;
196 			unsigned int				cluster_count = blob->active.num_clusters;
197 
198 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
199 
200 			assert(desc_extent->length > 0);
201 			assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0);
202 
203 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
204 				for (j = 0; j < desc_extent->extents[i].length; j++) {
205 					assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j));
206 					cluster_count++;
207 				}
208 			}
209 
210 			assert(cluster_count > 0);
211 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
212 			assert(tmp != NULL);
213 			blob->active.clusters = tmp;
214 			blob->active.cluster_array_size = cluster_count;
215 
216 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
217 				for (j = 0; j < desc_extent->extents[i].length; j++) {
218 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
219 							desc_extent->extents[i].cluster_idx + j);
220 				}
221 			}
222 
223 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
224 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
225 			struct spdk_xattr 			*xattr;
226 
227 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
228 
229 			xattr = calloc(1, sizeof(*xattr));
230 			assert(xattr != NULL);
231 
232 			xattr->name = malloc(desc_xattr->name_length + 1);
233 			assert(xattr->name);
234 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
235 			xattr->name[desc_xattr->name_length] = '\0';
236 
237 			xattr->value = malloc(desc_xattr->value_length);
238 			assert(xattr->value != NULL);
239 			xattr->value_len = desc_xattr->value_length;
240 			memcpy(xattr->value,
241 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
242 			       desc_xattr->value_length);
243 
244 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
245 		} else {
246 			/* Error */
247 			break;
248 		}
249 
250 		/* Advance to the next descriptor */
251 		cur_desc += sizeof(*desc) + desc->length;
252 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
253 			break;
254 		}
255 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
256 	}
257 }
258 
259 static int
260 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
261 		 struct spdk_blob *blob)
262 {
263 	const struct spdk_blob_md_page *page;
264 	uint32_t i;
265 
266 	assert(page_count > 0);
267 	assert(pages[0].sequence_num == 0);
268 	assert(blob != NULL);
269 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
270 	assert(blob->active.clusters == NULL);
271 	assert(blob->id == pages[0].id);
272 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
273 
274 	for (i = 0; i < page_count; i++) {
275 		page = &pages[i];
276 
277 		assert(page->id == blob->id);
278 		assert(page->sequence_num == i);
279 
280 		_spdk_blob_parse_page(page, blob);
281 	}
282 
283 	return 0;
284 }
285 
286 static int
287 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
288 			      struct spdk_blob_md_page **pages,
289 			      uint32_t *page_count,
290 			      struct spdk_blob_md_page **last_page)
291 {
292 	struct spdk_blob_md_page *page;
293 
294 	assert(pages != NULL);
295 	assert(page_count != NULL);
296 
297 	if (*page_count == 0) {
298 		assert(*pages == NULL);
299 		*page_count = 1;
300 		*pages = spdk_malloc(sizeof(struct spdk_blob_md_page),
301 				     sizeof(struct spdk_blob_md_page),
302 				     NULL);
303 	} else {
304 		assert(*pages != NULL);
305 		(*page_count)++;
306 		*pages = spdk_realloc(*pages,
307 				      sizeof(struct spdk_blob_md_page) * (*page_count),
308 				      sizeof(struct spdk_blob_md_page),
309 				      NULL);
310 	}
311 
312 	if (*pages == NULL) {
313 		*page_count = 0;
314 		*last_page = NULL;
315 		return -ENOMEM;
316 	}
317 
318 	page = &(*pages)[*page_count - 1];
319 	memset(page, 0, sizeof(*page));
320 	page->id = blob->id;
321 	page->sequence_num = *page_count - 1;
322 	page->next = SPDK_INVALID_MD_PAGE;
323 	*last_page = page;
324 
325 	return 0;
326 }
327 
328 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
329  * Update required_sz on both success and failure.
330  *
331  */
332 static int
333 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
334 			   uint8_t *buf, size_t buf_sz,
335 			   size_t *required_sz)
336 {
337 	struct spdk_blob_md_descriptor_xattr	*desc;
338 
339 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
340 		       strlen(xattr->name) +
341 		       xattr->value_len;
342 
343 	if (buf_sz < *required_sz) {
344 		return -1;
345 	}
346 
347 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
348 
349 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
350 	desc->length = sizeof(desc->name_length) +
351 		       sizeof(desc->value_length) +
352 		       strlen(xattr->name) +
353 		       xattr->value_len;
354 	desc->name_length = strlen(xattr->name);
355 	desc->value_length = xattr->value_len;
356 
357 	memcpy(desc->name, xattr->name, desc->name_length);
358 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
359 	       xattr->value,
360 	       desc->value_length);
361 
362 	return 0;
363 }
364 
365 static void
366 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
367 			    uint64_t start_cluster, uint64_t *next_cluster,
368 			    uint8_t *buf, size_t buf_sz)
369 {
370 	struct spdk_blob_md_descriptor_extent *desc;
371 	size_t cur_sz;
372 	uint64_t i, extent_idx;
373 	uint32_t lba, lba_per_cluster, lba_count;
374 
375 	/* The buffer must have room for at least one extent */
376 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
377 	if (buf_sz < cur_sz) {
378 		*next_cluster = start_cluster;
379 		return;
380 	}
381 
382 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
383 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
384 
385 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
386 
387 	lba = blob->active.clusters[start_cluster];
388 	lba_count = lba_per_cluster;
389 	extent_idx = 0;
390 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
391 		if ((lba + lba_count) == blob->active.clusters[i]) {
392 			lba_count += lba_per_cluster;
393 			continue;
394 		}
395 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
396 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
397 		extent_idx++;
398 
399 		cur_sz += sizeof(desc->extents[extent_idx]);
400 
401 		if (buf_sz < cur_sz) {
402 			/* If we ran out of buffer space, return */
403 			desc->length = sizeof(desc->extents[0]) * extent_idx;
404 			*next_cluster = i;
405 			return;
406 		}
407 
408 		lba = blob->active.clusters[i];
409 		lba_count = lba_per_cluster;
410 	}
411 
412 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
413 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
414 	extent_idx++;
415 
416 	desc->length = sizeof(desc->extents[0]) * extent_idx;
417 	*next_cluster = blob->active.num_clusters;
418 
419 	return;
420 }
421 
422 static int
423 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
424 		     uint32_t *page_count)
425 {
426 	struct spdk_blob_md_page		*cur_page;
427 	const struct spdk_xattr			*xattr;
428 	int 					rc;
429 	uint8_t					*buf;
430 	size_t					remaining_sz;
431 
432 	assert(pages != NULL);
433 	assert(page_count != NULL);
434 	assert(blob != NULL);
435 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
436 
437 	*pages = NULL;
438 	*page_count = 0;
439 
440 	/* A blob always has at least 1 page, even if it has no descriptors */
441 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
442 	if (rc < 0) {
443 		return rc;
444 	}
445 
446 	buf = (uint8_t *)cur_page->descriptors;
447 	remaining_sz = sizeof(cur_page->descriptors);
448 
449 	/* Serialize xattrs */
450 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
451 		size_t required_sz = 0;
452 		rc = _spdk_blob_serialize_xattr(xattr,
453 						buf, remaining_sz,
454 						&required_sz);
455 		if (rc < 0) {
456 			/* Need to add a new page to the chain */
457 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
458 							   &cur_page);
459 			if (rc < 0) {
460 				spdk_free(*pages);
461 				*pages = NULL;
462 				*page_count = 0;
463 				return rc;
464 			}
465 
466 			buf = (uint8_t *)cur_page->descriptors;
467 			remaining_sz = sizeof(cur_page->descriptors);
468 
469 			/* Try again */
470 			required_sz = 0;
471 			rc = _spdk_blob_serialize_xattr(xattr,
472 							buf, remaining_sz,
473 							&required_sz);
474 
475 			if (rc < 0) {
476 				spdk_free(*pages);
477 				*pages = NULL;
478 				*page_count = 0;
479 				return -1;
480 			}
481 		}
482 
483 		remaining_sz -= required_sz;
484 		buf += required_sz;
485 	}
486 
487 	/* Serialize extents */
488 	uint64_t last_cluster = 0;
489 	while (last_cluster < blob->active.num_clusters) {
490 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
491 					    buf, remaining_sz);
492 
493 		if (last_cluster == blob->active.num_clusters) {
494 			break;
495 		}
496 
497 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
498 						   &cur_page);
499 		if (rc < 0) {
500 			return rc;
501 		}
502 
503 		buf = (uint8_t *)cur_page->descriptors;
504 		remaining_sz = sizeof(cur_page->descriptors);
505 	}
506 
507 	return 0;
508 }
509 
510 struct spdk_blob_load_ctx {
511 	struct spdk_blob 		*blob;
512 
513 	struct spdk_blob_md_page	*pages;
514 	uint32_t			num_pages;
515 
516 	spdk_bs_sequence_cpl		cb_fn;
517 	void				*cb_arg;
518 };
519 
520 static void
521 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
522 {
523 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
524 	struct spdk_blob 		*blob = ctx->blob;
525 	struct spdk_blob_md_page	*page;
526 	int				rc;
527 
528 	page = &ctx->pages[ctx->num_pages - 1];
529 
530 	if (page->next != SPDK_INVALID_MD_PAGE) {
531 		uint32_t next_page = page->next;
532 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
533 
534 
535 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
536 
537 		/* Read the next page */
538 		ctx->num_pages++;
539 		ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
540 					  sizeof(*page), NULL);
541 		if (ctx->pages == NULL) {
542 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
543 			free(ctx);
544 			return;
545 		}
546 
547 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
548 				      next_lba,
549 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
550 				      _spdk_blob_load_cpl, ctx);
551 		return;
552 	}
553 
554 	/* Parse the pages */
555 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
556 
557 	_spdk_blob_mark_clean(blob);
558 
559 	ctx->cb_fn(seq, ctx->cb_arg, rc);
560 
561 	/* Free the memory */
562 	spdk_free(ctx->pages);
563 	free(ctx);
564 }
565 
566 /* Load a blob from disk given a blobid */
567 static void
568 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
569 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
570 {
571 	struct spdk_blob_load_ctx *ctx;
572 	struct spdk_blob_store *bs;
573 	uint32_t page_num;
574 	uint64_t lba;
575 
576 	assert(blob != NULL);
577 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
578 	       blob->state == SPDK_BLOB_STATE_DIRTY);
579 
580 	bs = blob->bs;
581 
582 	ctx = calloc(1, sizeof(*ctx));
583 	if (!ctx) {
584 		cb_fn(seq, cb_arg, -ENOMEM);
585 		return;
586 	}
587 
588 	ctx->blob = blob;
589 	ctx->pages = spdk_realloc(ctx->pages, sizeof(struct spdk_blob_md_page),
590 				  sizeof(struct spdk_blob_md_page), NULL);
591 	if (!ctx->pages) {
592 		free(ctx);
593 		cb_fn(seq, cb_arg, -ENOMEM);
594 		return;
595 	}
596 	ctx->num_pages = 1;
597 	ctx->cb_fn = cb_fn;
598 	ctx->cb_arg = cb_arg;
599 
600 	page_num = _spdk_bs_blobid_to_page(blob->id);
601 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
602 
603 	blob->state = SPDK_BLOB_STATE_LOADING;
604 
605 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
606 			      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)),
607 			      _spdk_blob_load_cpl, ctx);
608 }
609 
610 struct spdk_blob_persist_ctx {
611 	struct spdk_blob 		*blob;
612 
613 	struct spdk_blob_md_page	*pages;
614 
615 	uint64_t			idx;
616 
617 	spdk_bs_sequence_cpl		cb_fn;
618 	void				*cb_arg;
619 };
620 
621 static void
622 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
623 {
624 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
625 	struct spdk_blob 		*blob = ctx->blob;
626 
627 	if (bserrno == 0) {
628 		_spdk_blob_mark_clean(blob);
629 	}
630 
631 	/* Call user callback */
632 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
633 
634 	/* Free the memory */
635 	spdk_free(ctx->pages);
636 	free(ctx);
637 }
638 
639 static void
640 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
641 {
642 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
643 	struct spdk_blob 		*blob = ctx->blob;
644 	struct spdk_blob_store		*bs = blob->bs;
645 	void				*tmp;
646 	size_t				i;
647 
648 	/* Release all clusters that were truncated */
649 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
650 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
651 
652 		_spdk_bs_release_cluster(bs, cluster_num);
653 	}
654 
655 	if (blob->active.num_clusters == 0) {
656 		free(blob->active.clusters);
657 		blob->active.clusters = NULL;
658 		blob->active.cluster_array_size = 0;
659 	} else {
660 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
661 		assert(tmp != NULL);
662 		blob->active.clusters = tmp;
663 		blob->active.cluster_array_size = blob->active.num_clusters;
664 	}
665 
666 	_spdk_blob_persist_complete(seq, ctx, bserrno);
667 }
668 
669 static void
670 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
671 {
672 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
673 	struct spdk_blob 		*blob = ctx->blob;
674 	struct spdk_blob_store		*bs = blob->bs;
675 	spdk_bs_batch_t			*batch;
676 	size_t				i;
677 
678 	/* Clusters don't move around in blobs. The list shrinks or grows
679 	 * at the end, but no changes ever occur in the middle of the list.
680 	 */
681 
682 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
683 
684 	/* Unmap all clusters that were truncated */
685 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
686 		uint64_t lba = blob->active.clusters[i];
687 		uint32_t lba_count = _spdk_bs_cluster_to_lba(bs, 1);
688 
689 		spdk_bs_batch_unmap(batch, lba, lba_count);
690 	}
691 
692 	spdk_bs_batch_close(batch);
693 }
694 
695 static void
696 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
697 {
698 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
699 	struct spdk_blob 		*blob = ctx->blob;
700 	struct spdk_blob_store		*bs = blob->bs;
701 	size_t				i;
702 
703 	/* This loop starts at 1 because the first page is special and handled
704 	 * below. The pages (except the first) are never written in place,
705 	 * so any pages in the clean list must be unmapped.
706 	 */
707 	for (i = 1; i < blob->clean.num_pages; i++) {
708 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
709 	}
710 
711 	if (blob->active.num_pages == 0) {
712 		uint32_t page_num;
713 
714 		page_num = _spdk_bs_blobid_to_page(blob->id);
715 		spdk_bit_array_clear(bs->used_md_pages, page_num);
716 	}
717 
718 	/* Move on to unmapping clusters */
719 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
720 }
721 
722 static void
723 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
724 {
725 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
726 	struct spdk_blob 		*blob = ctx->blob;
727 	struct spdk_blob_store		*bs = blob->bs;
728 	uint64_t			lba;
729 	uint32_t			lba_count;
730 	spdk_bs_batch_t			*batch;
731 	size_t				i;
732 
733 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx);
734 
735 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page));
736 
737 	/* This loop starts at 1 because the first page is special and handled
738 	 * below. The pages (except the first) are never written in place,
739 	 * so any pages in the clean list must be unmapped.
740 	 */
741 	for (i = 1; i < blob->clean.num_pages; i++) {
742 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
743 
744 		spdk_bs_batch_unmap(batch, lba, lba_count);
745 	}
746 
747 	/* The first page will only be unmapped if this is a delete. */
748 	if (blob->active.num_pages == 0) {
749 		uint32_t page_num;
750 
751 		/* The first page in the metadata goes where the blobid indicates */
752 		page_num = _spdk_bs_blobid_to_page(blob->id);
753 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
754 
755 		spdk_bs_batch_unmap(batch, lba, lba_count);
756 	}
757 
758 	spdk_bs_batch_close(batch);
759 }
760 
761 static void
762 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
763 {
764 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
765 	struct spdk_blob		*blob = ctx->blob;
766 	struct spdk_blob_store		*bs = blob->bs;
767 	uint64_t			lba;
768 	uint32_t			lba_count;
769 	struct spdk_blob_md_page	*page;
770 
771 	if (blob->active.num_pages == 0) {
772 		/* Move on to the next step */
773 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
774 		return;
775 	}
776 
777 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
778 
779 	page = &ctx->pages[0];
780 	/* The first page in the metadata goes where the blobid indicates */
781 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
782 
783 	spdk_bs_sequence_write(seq, page, lba, lba_count,
784 			       _spdk_blob_persist_unmap_pages, ctx);
785 }
786 
787 static void
788 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
789 {
790 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
791 	struct spdk_blob 		*blob = ctx->blob;
792 	struct spdk_blob_store		*bs = blob->bs;
793 	uint64_t 			lba;
794 	uint32_t			lba_count;
795 	struct spdk_blob_md_page	*page;
796 	spdk_bs_batch_t			*batch;
797 	size_t				i;
798 
799 	/* Clusters don't move around in blobs. The list shrinks or grows
800 	 * at the end, but no changes ever occur in the middle of the list.
801 	 */
802 
803 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
804 
805 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
806 
807 	/* This starts at 1. The root page is not written until
808 	 * all of the others are finished
809 	 */
810 	for (i = 1; i < blob->active.num_pages; i++) {
811 		page = &ctx->pages[i];
812 		assert(page->sequence_num == i);
813 
814 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
815 
816 		spdk_bs_batch_write(batch, page, lba, lba_count);
817 	}
818 
819 	spdk_bs_batch_close(batch);
820 }
821 
822 static int
823 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz)
824 {
825 	uint64_t	i;
826 	uint64_t	*tmp;
827 	uint64_t	lfc; /* lowest free cluster */
828 	struct spdk_blob_store *bs;
829 
830 	bs = blob->bs;
831 
832 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
833 	       blob->state != SPDK_BLOB_STATE_SYNCING);
834 
835 	if (blob->active.num_clusters == sz) {
836 		return 0;
837 	}
838 
839 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
840 		/* If this blob was resized to be larger, then smaller, then
841 		 * larger without syncing, then the cluster array already
842 		 * contains spare assigned clusters we can use.
843 		 */
844 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
845 						     sz);
846 	}
847 
848 	blob->state = SPDK_BLOB_STATE_DIRTY;
849 
850 	/* Do two passes - one to verify that we can obtain enough clusters
851 	 * and another to actually claim them.
852 	 */
853 
854 	lfc = 0;
855 	for (i = blob->active.num_clusters; i < sz; i++) {
856 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
857 		if (lfc >= bs->total_clusters) {
858 			/* No more free clusters. Cannot satisfy the request */
859 			assert(false);
860 			return -1;
861 		}
862 		lfc++;
863 	}
864 
865 	if (sz > blob->active.num_clusters) {
866 		/* Expand the cluster array if necessary.
867 		 * We only shrink the array when persisting.
868 		 */
869 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
870 		if (sz > 0 && tmp == NULL) {
871 			assert(false);
872 			return -1;
873 		}
874 		blob->active.clusters = tmp;
875 		blob->active.cluster_array_size = sz;
876 	}
877 
878 	lfc = 0;
879 	for (i = blob->active.num_clusters; i < sz; i++) {
880 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
881 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
882 		_spdk_bs_claim_cluster(bs, lfc);
883 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
884 		lfc++;
885 	}
886 
887 	blob->active.num_clusters = sz;
888 
889 	return 0;
890 }
891 
892 /* Write a blob to disk */
893 static void
894 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
895 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
896 {
897 	struct spdk_blob_persist_ctx *ctx;
898 	int rc;
899 	uint64_t i;
900 	uint32_t page_num;
901 	struct spdk_blob_store *bs;
902 
903 	assert(blob != NULL);
904 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
905 	       blob->state == SPDK_BLOB_STATE_DIRTY);
906 
907 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
908 		cb_fn(seq, cb_arg, 0);
909 		return;
910 	}
911 
912 	bs = blob->bs;
913 
914 	ctx = calloc(1, sizeof(*ctx));
915 	if (!ctx) {
916 		cb_fn(seq, cb_arg, -ENOMEM);
917 		return;
918 	}
919 	ctx->blob = blob;
920 	ctx->cb_fn = cb_fn;
921 	ctx->cb_arg = cb_arg;
922 
923 	blob->state = SPDK_BLOB_STATE_SYNCING;
924 
925 	if (blob->active.num_pages == 0) {
926 		/* This is the signal that the blob should be deleted.
927 		 * Immediately jump to the clean up routine. */
928 		assert(blob->clean.num_pages > 0);
929 		ctx->idx = blob->clean.num_pages - 1;
930 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
931 		return;
932 
933 	}
934 
935 	/* Generate the new metadata */
936 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
937 	if (rc < 0) {
938 		free(ctx);
939 		cb_fn(seq, cb_arg, rc);
940 		return;
941 	}
942 
943 	assert(blob->active.num_pages >= 1);
944 
945 	/* Resize the cache of page indices */
946 	blob->active.pages = realloc(blob->active.pages,
947 				     blob->active.num_pages * sizeof(*blob->active.pages));
948 	if (!blob->active.pages) {
949 		free(ctx);
950 		cb_fn(seq, cb_arg, -ENOMEM);
951 		return;
952 	}
953 
954 	/* Assign this metadata to pages. This requires two passes -
955 	 * one to verify that there are enough pages and a second
956 	 * to actually claim them. */
957 	page_num = 0;
958 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
959 	for (i = 1; i < blob->active.num_pages; i++) {
960 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
961 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
962 			spdk_free(ctx->pages);
963 			free(ctx);
964 			blob->state = SPDK_BLOB_STATE_DIRTY;
965 			cb_fn(seq, cb_arg, -ENOMEM);
966 			return;
967 		}
968 		page_num++;
969 	}
970 
971 	page_num = 0;
972 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
973 	for (i = 1; i < blob->active.num_pages; i++) {
974 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
975 		ctx->pages[i - 1].next = page_num;
976 		blob->active.pages[i] = page_num;
977 		spdk_bit_array_set(bs->used_md_pages, page_num);
978 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
979 		page_num++;
980 	}
981 
982 	/* Start writing the metadata from last page to first */
983 	ctx->idx = blob->active.num_pages - 1;
984 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
985 }
986 
987 static void
988 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel,
989 			     void *payload, uint64_t offset, uint64_t length,
990 			     spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
991 {
992 	spdk_bs_batch_t			*batch;
993 	struct spdk_bs_cpl		cpl;
994 	uint64_t			lba;
995 	uint32_t			lba_count;
996 	uint8_t				*buf;
997 	uint64_t			page;
998 
999 	assert(blob != NULL);
1000 
1001 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1002 		cb_fn(cb_arg, -EINVAL);
1003 		return;
1004 	}
1005 
1006 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1007 	cpl.u.blob_basic.cb_fn = cb_fn;
1008 	cpl.u.blob_basic.cb_arg = cb_arg;
1009 
1010 	batch = spdk_bs_batch_open(_channel, &cpl);
1011 	if (!batch) {
1012 		cb_fn(cb_arg, -ENOMEM);
1013 		return;
1014 	}
1015 
1016 	length = _spdk_bs_page_to_lba(blob->bs, length);
1017 	page = offset;
1018 	buf = payload;
1019 	while (length > 0) {
1020 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1021 		lba_count = spdk_min(length,
1022 				     _spdk_bs_page_to_lba(blob->bs,
1023 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1024 
1025 		if (read) {
1026 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1027 		} else {
1028 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1029 		}
1030 
1031 		length -= lba_count;
1032 		buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1033 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1034 	}
1035 
1036 	spdk_bs_batch_close(batch);
1037 }
1038 
1039 static struct spdk_blob *
1040 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1041 {
1042 	struct spdk_blob *blob;
1043 
1044 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1045 		if (blob->id == blobid) {
1046 			return blob;
1047 		}
1048 	}
1049 
1050 	return NULL;
1051 }
1052 
1053 static int
1054 _spdk_bs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx)
1055 {
1056 	struct spdk_blob_store		*bs = io_device;
1057 	struct spdk_bs_dev		*dev = bs->dev;
1058 	struct spdk_bs_channel	*channel = ctx_buf;
1059 	uint32_t			max_ops = *(uint32_t *)unique_ctx;
1060 	uint32_t			i;
1061 
1062 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1063 	if (!channel->req_mem) {
1064 		free(channel);
1065 		return -1;
1066 	}
1067 
1068 	TAILQ_INIT(&channel->reqs);
1069 
1070 	for (i = 0; i < max_ops; i++) {
1071 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1072 	}
1073 
1074 	channel->bs = bs;
1075 	channel->dev = dev;
1076 	channel->dev_channel = dev->create_channel(dev);
1077 
1078 	return 0;
1079 }
1080 
1081 static void
1082 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1083 {
1084 	struct spdk_bs_channel *channel = ctx_buf;
1085 
1086 	free(channel->req_mem);
1087 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1088 }
1089 
1090 static void
1091 _spdk_bs_free(struct spdk_blob_store *bs)
1092 {
1093 	struct spdk_blob	*blob, *blob_tmp;
1094 
1095 	spdk_bs_unregister_md_thread(bs);
1096 	spdk_io_device_unregister(bs);
1097 
1098 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1099 		TAILQ_REMOVE(&bs->blobs, blob, link);
1100 		_spdk_blob_free(blob);
1101 	}
1102 
1103 	spdk_bit_array_free(&bs->used_md_pages);
1104 	spdk_bit_array_free(&bs->used_clusters);
1105 
1106 	bs->dev->destroy(bs->dev);
1107 	free(bs);
1108 }
1109 
1110 void
1111 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1112 {
1113 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1114 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1115 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1116 }
1117 
1118 static struct spdk_blob_store *
1119 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1120 {
1121 	struct spdk_blob_store	*bs;
1122 
1123 	bs = calloc(1, sizeof(struct spdk_blob_store));
1124 	if (!bs) {
1125 		return NULL;
1126 	}
1127 
1128 	TAILQ_INIT(&bs->blobs);
1129 	bs->dev = dev;
1130 
1131 	/*
1132 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1133 	 *  even multiple of the cluster size.
1134 	 */
1135 	bs->cluster_sz = opts->cluster_sz;
1136 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1137 	bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1138 	bs->num_free_clusters = bs->total_clusters;
1139 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1140 	if (bs->used_clusters == NULL) {
1141 		_spdk_bs_free(bs);
1142 		return NULL;
1143 	}
1144 
1145 	bs->max_md_ops = opts->max_md_ops;
1146 	bs->super_blob = SPDK_BLOBID_INVALID;
1147 
1148 	/* The metadata is assumed to be at least 1 page */
1149 	bs->used_md_pages = spdk_bit_array_create(1);
1150 
1151 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1152 				sizeof(struct spdk_bs_channel));
1153 	spdk_bs_register_md_thread(bs);
1154 
1155 	return bs;
1156 }
1157 
1158 /* START spdk_bs_load */
1159 
1160 struct spdk_bs_load_ctx {
1161 	struct spdk_blob_store		*bs;
1162 	struct spdk_bs_super_block	*super;
1163 
1164 	struct spdk_bs_md_mask		*mask;
1165 };
1166 
1167 static void
1168 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1169 {
1170 	struct spdk_bs_load_ctx *ctx = cb_arg;
1171 	uint32_t		i, j;
1172 	int			rc;
1173 
1174 	/* The type must be correct */
1175 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1176 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1177 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1178 					     struct spdk_blob_md_page) * 8));
1179 	/* The length of the mask must be exactly equal to the total number of clusters */
1180 	assert(ctx->mask->length == ctx->bs->total_clusters);
1181 
1182 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1183 	if (rc < 0) {
1184 		spdk_free(ctx->super);
1185 		spdk_free(ctx->mask);
1186 		_spdk_bs_free(ctx->bs);
1187 		free(ctx);
1188 		spdk_bs_sequence_finish(seq, -ENOMEM);
1189 		return;
1190 	}
1191 
1192 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1193 	for (i = 0; i < ctx->mask->length / 8; i++) {
1194 		uint8_t segment = ctx->mask->mask[i];
1195 		for (j = 0; segment && (j < 8); j++) {
1196 			if (segment & 1U) {
1197 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1198 				assert(ctx->bs->num_free_clusters > 0);
1199 				ctx->bs->num_free_clusters--;
1200 			}
1201 			segment >>= 1U;
1202 		}
1203 	}
1204 
1205 	spdk_free(ctx->super);
1206 	spdk_free(ctx->mask);
1207 	free(ctx);
1208 
1209 	spdk_bs_sequence_finish(seq, bserrno);
1210 }
1211 
1212 static void
1213 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1214 {
1215 	struct spdk_bs_load_ctx *ctx = cb_arg;
1216 	uint64_t		lba, lba_count;
1217 	uint32_t		i, j;
1218 	int			rc;
1219 
1220 	/* The type must be correct */
1221 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1222 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1223 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) *
1224 				     8));
1225 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1226 	assert(ctx->mask->length == ctx->super->md_len);
1227 
1228 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1229 	if (rc < 0) {
1230 		spdk_free(ctx->super);
1231 		spdk_free(ctx->mask);
1232 		_spdk_bs_free(ctx->bs);
1233 		free(ctx);
1234 		spdk_bs_sequence_finish(seq, -ENOMEM);
1235 		return;
1236 	}
1237 
1238 	for (i = 0; i < ctx->mask->length / 8; i++) {
1239 		uint8_t segment = ctx->mask->mask[i];
1240 		for (j = 0; segment && (j < 8); j++) {
1241 			if (segment & 1U) {
1242 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1243 			}
1244 			segment >>= 1U;
1245 		}
1246 	}
1247 	spdk_free(ctx->mask);
1248 
1249 	/* Read the used clusters mask */
1250 	ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1251 				 0x1000, NULL);
1252 	if (!ctx->mask) {
1253 		spdk_free(ctx->super);
1254 		_spdk_bs_free(ctx->bs);
1255 		free(ctx);
1256 		spdk_bs_sequence_finish(seq, -ENOMEM);
1257 		return;
1258 	}
1259 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1260 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1261 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1262 			      _spdk_bs_load_used_clusters_cpl, ctx);
1263 }
1264 
1265 static void
1266 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1267 {
1268 	struct spdk_bs_load_ctx *ctx = cb_arg;
1269 	uint64_t		lba, lba_count;
1270 
1271 	if (ctx->super->version != SPDK_BS_VERSION) {
1272 		spdk_free(ctx->super);
1273 		_spdk_bs_free(ctx->bs);
1274 		free(ctx);
1275 		spdk_bs_sequence_finish(seq, -EILSEQ);
1276 		return;
1277 	}
1278 
1279 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1280 		   sizeof(ctx->super->signature)) != 0) {
1281 		spdk_free(ctx->super);
1282 		_spdk_bs_free(ctx->bs);
1283 		free(ctx);
1284 		spdk_bs_sequence_finish(seq, -EILSEQ);
1285 		return;
1286 	}
1287 
1288 	if (ctx->super->clean != 1) {
1289 		/* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED.
1290 		 * All of the necessary data to recover is available
1291 		 * on disk - the code just has not been written yet.
1292 		 */
1293 		assert(false);
1294 		spdk_free(ctx->super);
1295 		_spdk_bs_free(ctx->bs);
1296 		free(ctx);
1297 		spdk_bs_sequence_finish(seq, -EILSEQ);
1298 		return;
1299 	}
1300 	ctx->super->clean = 0;
1301 
1302 	/* Parse the super block */
1303 	ctx->bs->cluster_sz = ctx->super->cluster_size;
1304 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
1305 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1306 	ctx->bs->md_start = ctx->super->md_start;
1307 	ctx->bs->md_len = ctx->super->md_len;
1308 
1309 	/* Read the used pages mask */
1310 	ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 0x1000,
1311 				 NULL);
1312 	if (!ctx->mask) {
1313 		spdk_free(ctx->super);
1314 		_spdk_bs_free(ctx->bs);
1315 		free(ctx);
1316 		spdk_bs_sequence_finish(seq, -ENOMEM);
1317 		return;
1318 	}
1319 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1320 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1321 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1322 			      _spdk_bs_load_used_pages_cpl, ctx);
1323 }
1324 
1325 void
1326 spdk_bs_load(struct spdk_bs_dev *dev,
1327 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1328 {
1329 	struct spdk_blob_store	*bs;
1330 	struct spdk_bs_cpl	cpl;
1331 	spdk_bs_sequence_t	*seq;
1332 	struct spdk_bs_load_ctx *ctx;
1333 	struct spdk_bs_opts	opts = {};
1334 
1335 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev);
1336 
1337 	spdk_bs_opts_init(&opts);
1338 
1339 	bs = _spdk_bs_alloc(dev, &opts);
1340 	if (!bs) {
1341 		cb_fn(cb_arg, NULL, -ENOMEM);
1342 		return;
1343 	}
1344 
1345 	ctx = calloc(1, sizeof(*ctx));
1346 	if (!ctx) {
1347 		_spdk_bs_free(bs);
1348 		cb_fn(cb_arg, NULL, -ENOMEM);
1349 		return;
1350 	}
1351 
1352 	ctx->bs = bs;
1353 
1354 	/* Allocate memory for the super block */
1355 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1356 	if (!ctx->super) {
1357 		free(ctx);
1358 		_spdk_bs_free(bs);
1359 		return;
1360 	}
1361 
1362 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1363 	cpl.u.bs_handle.cb_fn = cb_fn;
1364 	cpl.u.bs_handle.cb_arg = cb_arg;
1365 	cpl.u.bs_handle.bs = bs;
1366 
1367 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1368 	if (!seq) {
1369 		spdk_free(ctx->super);
1370 		free(ctx);
1371 		_spdk_bs_free(bs);
1372 		cb_fn(cb_arg, NULL, -ENOMEM);
1373 		return;
1374 	}
1375 
1376 	/* Read the super block */
1377 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1378 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1379 			      _spdk_bs_load_super_cpl, ctx);
1380 }
1381 
1382 /* END spdk_bs_load */
1383 
1384 /* START spdk_bs_init */
1385 
1386 struct spdk_bs_init_ctx {
1387 	struct spdk_blob_store		*bs;
1388 	struct spdk_bs_super_block	*super;
1389 };
1390 
1391 static void
1392 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1393 {
1394 	struct spdk_bs_init_ctx *ctx = cb_arg;
1395 
1396 	spdk_free(ctx->super);
1397 	free(ctx);
1398 
1399 	spdk_bs_sequence_finish(seq, bserrno);
1400 }
1401 
1402 static void
1403 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1404 {
1405 	struct spdk_bs_init_ctx *ctx = cb_arg;
1406 
1407 	/* Write super block */
1408 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1409 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1410 			       _spdk_bs_init_persist_super_cpl, ctx);
1411 }
1412 
1413 void
1414 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
1415 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1416 {
1417 	struct spdk_bs_init_ctx *ctx;
1418 	struct spdk_blob_store	*bs;
1419 	struct spdk_bs_cpl	cpl;
1420 	spdk_bs_sequence_t	*seq;
1421 	uint64_t		num_md_pages;
1422 	uint32_t		i;
1423 	struct spdk_bs_opts	opts = {};
1424 	int			rc;
1425 
1426 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev);
1427 
1428 	if (o) {
1429 		opts = *o;
1430 	} else {
1431 		spdk_bs_opts_init(&opts);
1432 	}
1433 
1434 	bs = _spdk_bs_alloc(dev, &opts);
1435 	if (!bs) {
1436 		cb_fn(cb_arg, NULL, -ENOMEM);
1437 		return;
1438 	}
1439 
1440 	if (opts.num_md_pages == UINT32_MAX) {
1441 		/* By default, allocate 1 page per cluster.
1442 		 * Technically, this over-allocates metadata
1443 		 * because more metadata will reduce the number
1444 		 * of usable clusters. This can be addressed with
1445 		 * more complex math in the future.
1446 		 */
1447 		bs->md_len = bs->total_clusters;
1448 	} else {
1449 		bs->md_len = opts.num_md_pages;
1450 	}
1451 
1452 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
1453 	if (rc < 0) {
1454 		_spdk_bs_free(bs);
1455 		cb_fn(cb_arg, NULL, -ENOMEM);
1456 		return;
1457 	}
1458 
1459 	ctx = calloc(1, sizeof(*ctx));
1460 	if (!ctx) {
1461 		_spdk_bs_free(bs);
1462 		cb_fn(cb_arg, NULL, -ENOMEM);
1463 		return;
1464 	}
1465 
1466 	ctx->bs = bs;
1467 
1468 	/* Allocate memory for the super block */
1469 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1470 	if (!ctx->super) {
1471 		free(ctx);
1472 		_spdk_bs_free(bs);
1473 		return;
1474 	}
1475 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1476 	       sizeof(ctx->super->signature));
1477 	ctx->super->version = SPDK_BS_VERSION;
1478 	ctx->super->length = sizeof(*ctx->super);
1479 	ctx->super->super_blob = bs->super_blob;
1480 	ctx->super->clean = 0;
1481 	ctx->super->cluster_size = bs->cluster_sz;
1482 
1483 	/* Calculate how many pages the metadata consumes at the front
1484 	 * of the disk.
1485 	 */
1486 
1487 	/* The super block uses 1 page */
1488 	num_md_pages = 1;
1489 
1490 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
1491 	 * up to the nearest page, plus a header.
1492 	 */
1493 	ctx->super->used_page_mask_start = num_md_pages;
1494 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1495 					 divide_round_up(bs->md_len, 8),
1496 					 sizeof(struct spdk_blob_md_page));
1497 	num_md_pages += ctx->super->used_page_mask_len;
1498 
1499 	/* The used_clusters mask requires 1 bit per cluster, rounded
1500 	 * up to the nearest page, plus a header.
1501 	 */
1502 	ctx->super->used_cluster_mask_start = num_md_pages;
1503 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1504 					    divide_round_up(bs->total_clusters, 8),
1505 					    sizeof(struct spdk_blob_md_page));
1506 	num_md_pages += ctx->super->used_cluster_mask_len;
1507 
1508 	/* The metadata region size was chosen above */
1509 	ctx->super->md_start = bs->md_start = num_md_pages;
1510 	ctx->super->md_len = bs->md_len;
1511 	num_md_pages += bs->md_len;
1512 
1513 	/* Claim all of the clusters used by the metadata */
1514 	for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) {
1515 		_spdk_bs_claim_cluster(bs, i);
1516 	}
1517 
1518 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1519 	cpl.u.bs_handle.cb_fn = cb_fn;
1520 	cpl.u.bs_handle.cb_arg = cb_arg;
1521 	cpl.u.bs_handle.bs = bs;
1522 
1523 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1524 	if (!seq) {
1525 		spdk_free(ctx->super);
1526 		free(ctx);
1527 		_spdk_bs_free(bs);
1528 		cb_fn(cb_arg, NULL, -ENOMEM);
1529 		return;
1530 	}
1531 
1532 	/* TRIM the entire device */
1533 	spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx);
1534 }
1535 
1536 /* END spdk_bs_init */
1537 
1538 /* START spdk_bs_unload */
1539 
1540 struct spdk_bs_unload_ctx {
1541 	struct spdk_blob_store		*bs;
1542 	struct spdk_bs_super_block	*super;
1543 
1544 	struct spdk_bs_md_mask		*mask;
1545 };
1546 
1547 static void
1548 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1549 {
1550 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1551 
1552 	spdk_free(ctx->super);
1553 
1554 	spdk_bs_sequence_finish(seq, bserrno);
1555 
1556 	_spdk_bs_free(ctx->bs);
1557 	free(ctx);
1558 }
1559 
1560 static void
1561 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1562 {
1563 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1564 
1565 	spdk_free(ctx->mask);
1566 
1567 	/* Update the values in the super block */
1568 	ctx->super->super_blob = ctx->bs->super_blob;
1569 	ctx->super->clean = 1;
1570 
1571 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1572 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1573 			       _spdk_bs_unload_write_super_cpl, ctx);
1574 }
1575 
1576 static void
1577 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1578 {
1579 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1580 	uint32_t			i;
1581 	uint64_t			lba, lba_count;
1582 
1583 	spdk_free(ctx->mask);
1584 
1585 	/* Write out the used clusters mask */
1586 	ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page),
1587 				 0x1000, NULL);
1588 	if (!ctx->mask) {
1589 		spdk_free(ctx->super);
1590 		free(ctx);
1591 		spdk_bs_sequence_finish(seq, -ENOMEM);
1592 		return;
1593 	}
1594 
1595 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1596 	ctx->mask->length = ctx->bs->total_clusters;
1597 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1598 
1599 	i = 0;
1600 	while (true) {
1601 		i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i);
1602 		if (i > ctx->mask->length) {
1603 			break;
1604 		}
1605 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1606 		i++;
1607 	}
1608 
1609 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1610 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1611 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1612 			       _spdk_bs_unload_write_used_clusters_cpl, ctx);
1613 }
1614 
1615 static void
1616 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1617 {
1618 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1619 	uint32_t			i;
1620 	uint64_t			lba, lba_count;
1621 
1622 	/* Write out the used page mask */
1623 	ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page),
1624 				 0x1000, NULL);
1625 	if (!ctx->mask) {
1626 		spdk_free(ctx->super);
1627 		free(ctx);
1628 		spdk_bs_sequence_finish(seq, -ENOMEM);
1629 		return;
1630 	}
1631 
1632 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1633 	ctx->mask->length = ctx->super->md_len;
1634 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1635 
1636 	i = 0;
1637 	while (true) {
1638 		i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i);
1639 		if (i > ctx->mask->length) {
1640 			break;
1641 		}
1642 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1643 		i++;
1644 	}
1645 
1646 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1647 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1648 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1649 			       _spdk_bs_unload_write_used_pages_cpl, ctx);
1650 }
1651 
1652 void
1653 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
1654 {
1655 	struct spdk_bs_cpl	cpl;
1656 	spdk_bs_sequence_t	*seq;
1657 	struct spdk_bs_unload_ctx *ctx;
1658 
1659 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n");
1660 
1661 	ctx = calloc(1, sizeof(*ctx));
1662 	if (!ctx) {
1663 		cb_fn(cb_arg, -ENOMEM);
1664 		return;
1665 	}
1666 
1667 	ctx->bs = bs;
1668 
1669 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1670 	if (!ctx->super) {
1671 		free(ctx);
1672 		cb_fn(cb_arg, -ENOMEM);
1673 		return;
1674 	}
1675 
1676 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
1677 	cpl.u.bs_basic.cb_fn = cb_fn;
1678 	cpl.u.bs_basic.cb_arg = cb_arg;
1679 
1680 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1681 	if (!seq) {
1682 		spdk_free(ctx->super);
1683 		free(ctx);
1684 		cb_fn(cb_arg, -ENOMEM);
1685 		return;
1686 	}
1687 
1688 	assert(TAILQ_EMPTY(&bs->blobs));
1689 
1690 	/* Read super block */
1691 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1692 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1693 			      _spdk_bs_unload_read_super_cpl, ctx);
1694 }
1695 
1696 /* END spdk_bs_unload */
1697 
1698 void
1699 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
1700 		  spdk_bs_op_complete cb_fn, void *cb_arg)
1701 {
1702 	bs->super_blob = blobid;
1703 	cb_fn(cb_arg, 0);
1704 }
1705 
1706 void
1707 spdk_bs_get_super(struct spdk_blob_store *bs,
1708 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1709 {
1710 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
1711 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
1712 	} else {
1713 		cb_fn(cb_arg, bs->super_blob, 0);
1714 	}
1715 }
1716 
1717 uint64_t
1718 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
1719 {
1720 	return bs->cluster_sz;
1721 }
1722 
1723 uint64_t
1724 spdk_bs_get_page_size(struct spdk_blob_store *bs)
1725 {
1726 	return sizeof(struct spdk_blob_md_page);
1727 }
1728 
1729 uint64_t
1730 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
1731 {
1732 	return bs->num_free_clusters;
1733 }
1734 
1735 int spdk_bs_register_md_thread(struct spdk_blob_store *bs)
1736 {
1737 	bs->md_channel = spdk_get_io_channel(bs, SPDK_IO_PRIORITY_DEFAULT, true,
1738 					     (void *)&bs->max_md_ops);
1739 
1740 	return 0;
1741 }
1742 
1743 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
1744 {
1745 	spdk_put_io_channel(bs->md_channel);
1746 
1747 	return 0;
1748 }
1749 
1750 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
1751 {
1752 	assert(blob != NULL);
1753 
1754 	return blob->id;
1755 }
1756 
1757 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
1758 {
1759 	assert(blob != NULL);
1760 
1761 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
1762 }
1763 
1764 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
1765 {
1766 	assert(blob != NULL);
1767 
1768 	return blob->active.num_clusters;
1769 }
1770 
1771 /* START spdk_bs_md_create_blob */
1772 
1773 static void
1774 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1775 {
1776 	struct spdk_blob *blob = cb_arg;
1777 
1778 	_spdk_blob_free(blob);
1779 
1780 	spdk_bs_sequence_finish(seq, bserrno);
1781 }
1782 
1783 void spdk_bs_md_create_blob(struct spdk_blob_store *bs,
1784 			    spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1785 {
1786 	struct spdk_blob	*blob;
1787 	uint32_t		page_idx;
1788 	struct spdk_bs_cpl 	cpl;
1789 	spdk_bs_sequence_t	*seq;
1790 	spdk_blob_id		id;
1791 
1792 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
1793 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
1794 		cb_fn(cb_arg, 0, -ENOMEM);
1795 		return;
1796 	}
1797 	spdk_bit_array_set(bs->used_md_pages, page_idx);
1798 
1799 	/* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper
1800 	 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the
1801 	 * code assumes blob id == page_idx.
1802 	 */
1803 	id = (1ULL << 32) | page_idx;
1804 
1805 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
1806 
1807 	blob = _spdk_blob_alloc(bs, id);
1808 	if (!blob) {
1809 		cb_fn(cb_arg, 0, -ENOMEM);
1810 		return;
1811 	}
1812 
1813 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
1814 	cpl.u.blobid.cb_fn = cb_fn;
1815 	cpl.u.blobid.cb_arg = cb_arg;
1816 	cpl.u.blobid.blobid = blob->id;
1817 
1818 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1819 	if (!seq) {
1820 		_spdk_blob_free(blob);
1821 		cb_fn(cb_arg, 0, -ENOMEM);
1822 		return;
1823 	}
1824 
1825 	_spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob);
1826 }
1827 
1828 /* END spdk_bs_md_create_blob */
1829 
1830 /* START spdk_bs_md_resize_blob */
1831 int
1832 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz)
1833 {
1834 	int			rc;
1835 
1836 	assert(blob != NULL);
1837 
1838 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
1839 
1840 	if (sz == blob->active.num_clusters) {
1841 		return 0;
1842 	}
1843 
1844 	rc = _spdk_resize_blob(blob, sz);
1845 	if (rc < 0) {
1846 		return rc;
1847 	}
1848 
1849 	return 0;
1850 }
1851 
1852 /* END spdk_bs_md_resize_blob */
1853 
1854 
1855 /* START spdk_bs_md_delete_blob */
1856 
1857 static void
1858 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1859 {
1860 	struct spdk_blob *blob = cb_arg;
1861 
1862 	_spdk_blob_free(blob);
1863 
1864 	spdk_bs_sequence_finish(seq, bserrno);
1865 }
1866 
1867 static void
1868 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1869 {
1870 	struct spdk_blob *blob = cb_arg;
1871 
1872 	blob->state = SPDK_BLOB_STATE_DIRTY;
1873 	blob->active.num_pages = 0;
1874 	_spdk_resize_blob(blob, 0);
1875 
1876 	_spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob);
1877 }
1878 
1879 void
1880 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1881 		       spdk_blob_op_complete cb_fn, void *cb_arg)
1882 {
1883 	struct spdk_blob	*blob;
1884 	struct spdk_bs_cpl	cpl;
1885 	spdk_bs_sequence_t 	*seq;
1886 
1887 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid);
1888 
1889 	blob = _spdk_blob_lookup(bs, blobid);
1890 	if (blob) {
1891 		assert(blob->open_ref > 0);
1892 		cb_fn(cb_arg, -EINVAL);
1893 		return;
1894 	}
1895 
1896 	blob = _spdk_blob_alloc(bs, blobid);
1897 	if (!blob) {
1898 		cb_fn(cb_arg, -ENOMEM);
1899 		return;
1900 	}
1901 
1902 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1903 	cpl.u.blob_basic.cb_fn = cb_fn;
1904 	cpl.u.blob_basic.cb_arg = cb_arg;
1905 
1906 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1907 	if (!seq) {
1908 		_spdk_blob_free(blob);
1909 		cb_fn(cb_arg, -ENOMEM);
1910 		return;
1911 	}
1912 
1913 	_spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob);
1914 }
1915 
1916 /* END spdk_bs_md_delete_blob */
1917 
1918 /* START spdk_bs_md_open_blob */
1919 
1920 static void
1921 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1922 {
1923 	struct spdk_blob *blob = cb_arg;
1924 
1925 	blob->open_ref++;
1926 
1927 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
1928 
1929 	spdk_bs_sequence_finish(seq, bserrno);
1930 }
1931 
1932 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1933 			  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
1934 {
1935 	struct spdk_blob		*blob;
1936 	struct spdk_bs_cpl		cpl;
1937 	spdk_bs_sequence_t		*seq;
1938 	uint32_t			page_num;
1939 
1940 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid);
1941 
1942 	blob = _spdk_blob_lookup(bs, blobid);
1943 	if (blob) {
1944 		blob->open_ref++;
1945 		cb_fn(cb_arg, blob, 0);
1946 		return;
1947 	}
1948 
1949 	page_num = _spdk_bs_blobid_to_page(blobid);
1950 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
1951 		/* Invalid blobid */
1952 		cb_fn(cb_arg, NULL, -ENOENT);
1953 		return;
1954 	}
1955 
1956 	blob = _spdk_blob_alloc(bs, blobid);
1957 	if (!blob) {
1958 		cb_fn(cb_arg, NULL, -ENOMEM);
1959 		return;
1960 	}
1961 
1962 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
1963 	cpl.u.blob_handle.cb_fn = cb_fn;
1964 	cpl.u.blob_handle.cb_arg = cb_arg;
1965 	cpl.u.blob_handle.blob = blob;
1966 
1967 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
1968 	if (!seq) {
1969 		_spdk_blob_free(blob);
1970 		cb_fn(cb_arg, NULL, -ENOMEM);
1971 		return;
1972 	}
1973 
1974 	_spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob);
1975 }
1976 
1977 /* START spdk_bs_md_sync_blob */
1978 static void
1979 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1980 {
1981 	spdk_bs_sequence_finish(seq, bserrno);
1982 }
1983 
1984 void spdk_bs_md_sync_blob(struct spdk_blob *blob,
1985 			  spdk_blob_op_complete cb_fn, void *cb_arg)
1986 {
1987 	struct spdk_bs_cpl	cpl;
1988 	spdk_bs_sequence_t	*seq;
1989 
1990 	assert(blob != NULL);
1991 
1992 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id);
1993 
1994 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
1995 	       blob->state != SPDK_BLOB_STATE_SYNCING);
1996 
1997 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1998 		cb_fn(cb_arg, 0);
1999 		return;
2000 	}
2001 
2002 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2003 	cpl.u.blob_basic.cb_fn = cb_fn;
2004 	cpl.u.blob_basic.cb_arg = cb_arg;
2005 
2006 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2007 	if (!seq) {
2008 		cb_fn(cb_arg, -ENOMEM);
2009 		return;
2010 	}
2011 
2012 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob);
2013 }
2014 
2015 /* END spdk_bs_md_sync_blob */
2016 
2017 /* START spdk_bs_md_close_blob */
2018 
2019 static void
2020 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2021 {
2022 	struct spdk_blob **blob = cb_arg;
2023 
2024 	if ((*blob)->open_ref == 0) {
2025 		TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link);
2026 		_spdk_blob_free((*blob));
2027 	}
2028 
2029 	*blob = NULL;
2030 
2031 	spdk_bs_sequence_finish(seq, bserrno);
2032 }
2033 
2034 void spdk_bs_md_close_blob(struct spdk_blob **b,
2035 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2036 {
2037 	struct spdk_bs_cpl	cpl;
2038 	struct spdk_blob	*blob;
2039 	spdk_bs_sequence_t	*seq;
2040 
2041 	assert(b != NULL);
2042 	blob = *b;
2043 	assert(blob != NULL);
2044 
2045 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id);
2046 
2047 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2048 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2049 
2050 	if (blob->open_ref == 0) {
2051 		cb_fn(cb_arg, -EBADF);
2052 		return;
2053 	}
2054 
2055 	blob->open_ref--;
2056 
2057 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2058 	cpl.u.blob_basic.cb_fn = cb_fn;
2059 	cpl.u.blob_basic.cb_arg = cb_arg;
2060 
2061 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2062 	if (!seq) {
2063 		cb_fn(cb_arg, -ENOMEM);
2064 		return;
2065 	}
2066 
2067 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2068 		_spdk_blob_close_cpl(seq, b, 0);
2069 		return;
2070 	}
2071 
2072 	/* Sync metadata */
2073 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b);
2074 }
2075 
2076 /* END spdk_bs_md_close_blob */
2077 
2078 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs,
2079 		uint32_t priority, uint32_t max_ops)
2080 {
2081 	return spdk_get_io_channel(bs, priority, true, (void *)&max_ops);
2082 }
2083 
2084 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2085 {
2086 	spdk_put_io_channel(channel);
2087 }
2088 
2089 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel,
2090 			      spdk_blob_op_complete cb_fn, void *cb_arg)
2091 {
2092 	/* Flush is synchronous right now */
2093 	cb_fn(cb_arg, 0);
2094 }
2095 
2096 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2097 			   void *payload, uint64_t offset, uint64_t length,
2098 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2099 {
2100 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false);
2101 }
2102 
2103 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2104 			  void *payload, uint64_t offset, uint64_t length,
2105 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2106 {
2107 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true);
2108 }
2109 
2110 struct spdk_bs_iter_ctx {
2111 	int64_t page_num;
2112 	struct spdk_blob_store *bs;
2113 
2114 	spdk_blob_op_with_handle_complete cb_fn;
2115 	void *cb_arg;
2116 };
2117 
2118 static void
2119 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
2120 {
2121 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2122 	struct spdk_blob_store *bs = ctx->bs;
2123 	spdk_blob_id id;
2124 
2125 	if (bserrno == 0) {
2126 		ctx->cb_fn(ctx->cb_arg, blob, bserrno);
2127 		free(ctx);
2128 		return;
2129 	}
2130 
2131 	ctx->page_num++;
2132 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2133 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2134 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2135 		free(ctx);
2136 		return;
2137 	}
2138 
2139 	id = (1ULL << 32) | ctx->page_num;
2140 
2141 	blob = _spdk_blob_lookup(bs, id);
2142 	if (blob) {
2143 		blob->open_ref++;
2144 		ctx->cb_fn(ctx->cb_arg, blob, 0);
2145 		free(ctx);
2146 		return;
2147 	}
2148 
2149 	spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
2150 }
2151 
2152 void
2153 spdk_bs_md_iter_first(struct spdk_blob_store *bs,
2154 		      spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2155 {
2156 	struct spdk_bs_iter_ctx *ctx;
2157 
2158 	ctx = calloc(1, sizeof(*ctx));
2159 	if (!ctx) {
2160 		cb_fn(cb_arg, NULL, -ENOMEM);
2161 		return;
2162 	}
2163 
2164 	ctx->page_num = -1;
2165 	ctx->bs = bs;
2166 	ctx->cb_fn = cb_fn;
2167 	ctx->cb_arg = cb_arg;
2168 
2169 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2170 }
2171 
2172 static void
2173 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
2174 {
2175 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2176 
2177 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2178 }
2179 
2180 void
2181 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
2182 		     spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2183 {
2184 	struct spdk_bs_iter_ctx *ctx;
2185 	struct spdk_blob	*blob;
2186 
2187 	assert(b != NULL);
2188 	blob = *b;
2189 	assert(blob != NULL);
2190 
2191 	ctx = calloc(1, sizeof(*ctx));
2192 	if (!ctx) {
2193 		cb_fn(cb_arg, NULL, -ENOMEM);
2194 		return;
2195 	}
2196 
2197 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
2198 	ctx->bs = bs;
2199 	ctx->cb_fn = cb_fn;
2200 	ctx->cb_arg = cb_arg;
2201 
2202 	/* Close the existing blob */
2203 	spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx);
2204 }
2205 
2206 int
2207 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
2208 		       uint16_t value_len)
2209 {
2210 	struct spdk_xattr 	*xattr;
2211 
2212 	assert(blob != NULL);
2213 
2214 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2215 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2216 
2217 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2218 		if (!strcmp(name, xattr->name)) {
2219 			free(xattr->value);
2220 			xattr->value_len = value_len;
2221 			xattr->value = malloc(value_len);
2222 			memcpy(xattr->value, value, value_len);
2223 
2224 			blob->state = SPDK_BLOB_STATE_DIRTY;
2225 
2226 			return 0;
2227 		}
2228 	}
2229 
2230 	xattr = calloc(1, sizeof(*xattr));
2231 	if (!xattr) {
2232 		return -1;
2233 	}
2234 	xattr->name = strdup(name);
2235 	xattr->value_len = value_len;
2236 	xattr->value = malloc(value_len);
2237 	memcpy(xattr->value, value, value_len);
2238 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
2239 
2240 	blob->state = SPDK_BLOB_STATE_DIRTY;
2241 
2242 	return 0;
2243 }
2244 
2245 int
2246 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name)
2247 {
2248 	struct spdk_xattr	*xattr;
2249 
2250 	assert(blob != NULL);
2251 
2252 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2253 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2254 
2255 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2256 		if (!strcmp(name, xattr->name)) {
2257 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
2258 			free(xattr->value);
2259 			free(xattr->name);
2260 			free(xattr);
2261 
2262 			blob->state = SPDK_BLOB_STATE_DIRTY;
2263 
2264 			return 0;
2265 		}
2266 	}
2267 
2268 	return -ENOENT;
2269 }
2270 
2271 int
2272 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name,
2273 			   const void **value, size_t *value_len)
2274 {
2275 	struct spdk_xattr	*xattr;
2276 
2277 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2278 		if (!strcmp(name, xattr->name)) {
2279 			*value = xattr->value;
2280 			*value_len = xattr->value_len;
2281 			return 0;
2282 		}
2283 	}
2284 
2285 	return -ENOENT;
2286 }
2287 
2288 struct spdk_xattr_names {
2289 	uint32_t	count;
2290 	const char	*names[0];
2291 };
2292 
2293 int
2294 spdk_bs_md_get_xattr_names(struct spdk_blob *blob,
2295 			   struct spdk_xattr_names **names)
2296 {
2297 	struct spdk_xattr	*xattr;
2298 	int			count = 0;
2299 
2300 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2301 		count++;
2302 	}
2303 
2304 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
2305 	if (*names == NULL) {
2306 		return -ENOMEM;
2307 	}
2308 
2309 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2310 		(*names)->names[(*names)->count++] = xattr->name;
2311 	}
2312 
2313 	return 0;
2314 }
2315 
2316 uint32_t
2317 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
2318 {
2319 	assert(names != NULL);
2320 
2321 	return names->count;
2322 }
2323 
2324 const char *
2325 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
2326 {
2327 	if (index >= names->count) {
2328 		return NULL;
2329 	}
2330 
2331 	return names->names[index];
2332 }
2333 
2334 void
2335 spdk_xattr_names_free(struct spdk_xattr_names *names)
2336 {
2337 	free(names);
2338 }
2339 
2340 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB);
2341