xref: /spdk/lib/blob/blobstore.c (revision c0de8a8b0ec280d6303904fe9233abc0bc94ccf1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/env.h"
38 #include "spdk/queue.h"
39 #include "spdk/io_channel.h"
40 #include "spdk/bit_array.h"
41 
42 #include "spdk_internal/log.h"
43 
44 #include "blobstore.h"
45 #include "request.h"
46 
47 static inline size_t
48 divide_round_up(size_t num, size_t divisor)
49 {
50 	return (num + divisor - 1) / divisor;
51 }
52 
53 static void
54 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
55 {
56 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
57 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
58 	assert(bs->num_free_clusters > 0);
59 
60 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num);
61 
62 	spdk_bit_array_set(bs->used_clusters, cluster_num);
63 	bs->num_free_clusters--;
64 }
65 
66 static void
67 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
68 {
69 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
70 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
71 	assert(bs->num_free_clusters < bs->total_clusters);
72 
73 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num);
74 
75 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
76 	bs->num_free_clusters++;
77 }
78 
79 static struct spdk_blob *
80 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
81 {
82 	struct spdk_blob *blob;
83 
84 	blob = calloc(1, sizeof(*blob));
85 	if (!blob) {
86 		return NULL;
87 	}
88 
89 	blob->id = id;
90 	blob->bs = bs;
91 
92 	blob->state = SPDK_BLOB_STATE_DIRTY;
93 	blob->active.num_pages = 1;
94 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
95 	if (!blob->active.pages) {
96 		free(blob);
97 		return NULL;
98 	}
99 
100 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
101 
102 	TAILQ_INIT(&blob->xattrs);
103 
104 	return blob;
105 }
106 
107 static void
108 _spdk_blob_free(struct spdk_blob *blob)
109 {
110 	struct spdk_xattr 	*xattr, *xattr_tmp;
111 
112 	assert(blob != NULL);
113 
114 	free(blob->active.clusters);
115 	free(blob->clean.clusters);
116 	free(blob->active.pages);
117 	free(blob->clean.pages);
118 
119 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
120 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
121 		free(xattr->name);
122 		free(xattr->value);
123 		free(xattr);
124 	}
125 
126 	free(blob);
127 }
128 
129 static int
130 _spdk_blob_mark_clean(struct spdk_blob *blob)
131 {
132 	uint64_t *clusters = NULL;
133 	uint32_t *pages = NULL;
134 
135 	assert(blob != NULL);
136 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
137 	       blob->state == SPDK_BLOB_STATE_SYNCING);
138 
139 	if (blob->active.num_clusters) {
140 		assert(blob->active.clusters);
141 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
142 		if (!clusters) {
143 			return -1;
144 		}
145 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
146 	}
147 
148 	if (blob->active.num_pages) {
149 		assert(blob->active.pages);
150 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
151 		if (!pages) {
152 			free(clusters);
153 			return -1;
154 		}
155 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
156 	}
157 
158 	free(blob->clean.clusters);
159 	free(blob->clean.pages);
160 
161 	blob->clean.num_clusters = blob->active.num_clusters;
162 	blob->clean.clusters = blob->active.clusters;
163 	blob->clean.num_pages = blob->active.num_pages;
164 	blob->clean.pages = blob->active.pages;
165 
166 	blob->active.clusters = clusters;
167 	blob->active.pages = pages;
168 
169 	blob->state = SPDK_BLOB_STATE_CLEAN;
170 
171 	return 0;
172 }
173 
174 static void
175 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
176 {
177 	struct spdk_blob_md_descriptor *desc;
178 	size_t	cur_desc = 0;
179 	void *tmp;
180 
181 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
182 	while (cur_desc < sizeof(page->descriptors)) {
183 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
184 			if (desc->length == 0) {
185 				/* If padding and length are 0, this terminates the page */
186 				break;
187 			}
188 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
189 			struct spdk_blob_md_descriptor_extent	*desc_extent;
190 			unsigned int				i, j;
191 			unsigned int				cluster_count = blob->active.num_clusters;
192 
193 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
194 
195 			assert(desc_extent->length > 0);
196 			assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0);
197 
198 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
199 				for (j = 0; j < desc_extent->extents[i].length; j++) {
200 					assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j));
201 					cluster_count++;
202 				}
203 			}
204 
205 			assert(cluster_count > 0);
206 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
207 			assert(tmp != NULL);
208 			blob->active.clusters = tmp;
209 			blob->active.cluster_array_size = cluster_count;
210 
211 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
212 				for (j = 0; j < desc_extent->extents[i].length; j++) {
213 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
214 							desc_extent->extents[i].cluster_idx + j);
215 				}
216 			}
217 
218 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
219 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
220 			struct spdk_xattr 			*xattr;
221 
222 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
223 
224 			assert(desc_xattr->length == sizeof(desc_xattr->name_length) +
225 			       sizeof(desc_xattr->value_length) +
226 			       desc_xattr->name_length + desc_xattr->value_length);
227 
228 			xattr = calloc(1, sizeof(*xattr));
229 			assert(xattr != NULL);
230 
231 			xattr->name = malloc(desc_xattr->name_length + 1);
232 			assert(xattr->name);
233 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
234 			xattr->name[desc_xattr->name_length] = '\0';
235 
236 			xattr->value = malloc(desc_xattr->value_length);
237 			assert(xattr->value != NULL);
238 			xattr->value_len = desc_xattr->value_length;
239 			memcpy(xattr->value,
240 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
241 			       desc_xattr->value_length);
242 
243 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
244 		} else {
245 			/* Error */
246 			break;
247 		}
248 
249 		/* Advance to the next descriptor */
250 		cur_desc += sizeof(*desc) + desc->length;
251 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
252 			break;
253 		}
254 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
255 	}
256 }
257 
258 static int
259 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
260 		 struct spdk_blob *blob)
261 {
262 	const struct spdk_blob_md_page *page;
263 	uint32_t i;
264 
265 	assert(page_count > 0);
266 	assert(pages[0].sequence_num == 0);
267 	assert(blob != NULL);
268 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
269 	assert(blob->active.clusters == NULL);
270 	assert(blob->id == pages[0].id);
271 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
272 
273 	for (i = 0; i < page_count; i++) {
274 		page = &pages[i];
275 
276 		assert(page->id == blob->id);
277 		assert(page->sequence_num == i);
278 
279 		_spdk_blob_parse_page(page, blob);
280 	}
281 
282 	return 0;
283 }
284 
285 static int
286 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
287 			      struct spdk_blob_md_page **pages,
288 			      uint32_t *page_count,
289 			      struct spdk_blob_md_page **last_page)
290 {
291 	struct spdk_blob_md_page *page;
292 
293 	assert(pages != NULL);
294 	assert(page_count != NULL);
295 
296 	if (*page_count == 0) {
297 		assert(*pages == NULL);
298 		*page_count = 1;
299 		*pages = spdk_dma_malloc(sizeof(struct spdk_blob_md_page),
300 					 sizeof(struct spdk_blob_md_page),
301 					 NULL);
302 	} else {
303 		assert(*pages != NULL);
304 		(*page_count)++;
305 		*pages = spdk_dma_realloc(*pages,
306 					  sizeof(struct spdk_blob_md_page) * (*page_count),
307 					  sizeof(struct spdk_blob_md_page),
308 					  NULL);
309 	}
310 
311 	if (*pages == NULL) {
312 		*page_count = 0;
313 		*last_page = NULL;
314 		return -ENOMEM;
315 	}
316 
317 	page = &(*pages)[*page_count - 1];
318 	memset(page, 0, sizeof(*page));
319 	page->id = blob->id;
320 	page->sequence_num = *page_count - 1;
321 	page->next = SPDK_INVALID_MD_PAGE;
322 	*last_page = page;
323 
324 	return 0;
325 }
326 
327 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
328  * Update required_sz on both success and failure.
329  *
330  */
331 static int
332 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
333 			   uint8_t *buf, size_t buf_sz,
334 			   size_t *required_sz)
335 {
336 	struct spdk_blob_md_descriptor_xattr	*desc;
337 
338 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
339 		       strlen(xattr->name) +
340 		       xattr->value_len;
341 
342 	if (buf_sz < *required_sz) {
343 		return -1;
344 	}
345 
346 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
347 
348 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
349 	desc->length = sizeof(desc->name_length) +
350 		       sizeof(desc->value_length) +
351 		       strlen(xattr->name) +
352 		       xattr->value_len;
353 	desc->name_length = strlen(xattr->name);
354 	desc->value_length = xattr->value_len;
355 
356 	memcpy(desc->name, xattr->name, desc->name_length);
357 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
358 	       xattr->value,
359 	       desc->value_length);
360 
361 	return 0;
362 }
363 
364 static void
365 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
366 			    uint64_t start_cluster, uint64_t *next_cluster,
367 			    uint8_t *buf, size_t buf_sz)
368 {
369 	struct spdk_blob_md_descriptor_extent *desc;
370 	size_t cur_sz;
371 	uint64_t i, extent_idx;
372 	uint32_t lba, lba_per_cluster, lba_count;
373 
374 	/* The buffer must have room for at least one extent */
375 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
376 	if (buf_sz < cur_sz) {
377 		*next_cluster = start_cluster;
378 		return;
379 	}
380 
381 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
382 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
383 
384 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
385 
386 	lba = blob->active.clusters[start_cluster];
387 	lba_count = lba_per_cluster;
388 	extent_idx = 0;
389 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
390 		if ((lba + lba_count) == blob->active.clusters[i]) {
391 			lba_count += lba_per_cluster;
392 			continue;
393 		}
394 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
395 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
396 		extent_idx++;
397 
398 		cur_sz += sizeof(desc->extents[extent_idx]);
399 
400 		if (buf_sz < cur_sz) {
401 			/* If we ran out of buffer space, return */
402 			desc->length = sizeof(desc->extents[0]) * extent_idx;
403 			*next_cluster = i;
404 			return;
405 		}
406 
407 		lba = blob->active.clusters[i];
408 		lba_count = lba_per_cluster;
409 	}
410 
411 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
412 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
413 	extent_idx++;
414 
415 	desc->length = sizeof(desc->extents[0]) * extent_idx;
416 	*next_cluster = blob->active.num_clusters;
417 
418 	return;
419 }
420 
421 static int
422 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
423 		     uint32_t *page_count)
424 {
425 	struct spdk_blob_md_page		*cur_page;
426 	const struct spdk_xattr			*xattr;
427 	int 					rc;
428 	uint8_t					*buf;
429 	size_t					remaining_sz;
430 	uint64_t				last_cluster;
431 
432 	assert(pages != NULL);
433 	assert(page_count != NULL);
434 	assert(blob != NULL);
435 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
436 
437 	*pages = NULL;
438 	*page_count = 0;
439 
440 	/* A blob always has at least 1 page, even if it has no descriptors */
441 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
442 	if (rc < 0) {
443 		return rc;
444 	}
445 
446 	buf = (uint8_t *)cur_page->descriptors;
447 	remaining_sz = sizeof(cur_page->descriptors);
448 
449 	/* Serialize xattrs */
450 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
451 		size_t required_sz = 0;
452 		rc = _spdk_blob_serialize_xattr(xattr,
453 						buf, remaining_sz,
454 						&required_sz);
455 		if (rc < 0) {
456 			/* Need to add a new page to the chain */
457 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
458 							   &cur_page);
459 			if (rc < 0) {
460 				spdk_dma_free(*pages);
461 				*pages = NULL;
462 				*page_count = 0;
463 				return rc;
464 			}
465 
466 			buf = (uint8_t *)cur_page->descriptors;
467 			remaining_sz = sizeof(cur_page->descriptors);
468 
469 			/* Try again */
470 			required_sz = 0;
471 			rc = _spdk_blob_serialize_xattr(xattr,
472 							buf, remaining_sz,
473 							&required_sz);
474 
475 			if (rc < 0) {
476 				spdk_dma_free(*pages);
477 				*pages = NULL;
478 				*page_count = 0;
479 				return -1;
480 			}
481 		}
482 
483 		remaining_sz -= required_sz;
484 		buf += required_sz;
485 	}
486 
487 	/* Serialize extents */
488 	last_cluster = 0;
489 	while (last_cluster < blob->active.num_clusters) {
490 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
491 					    buf, remaining_sz);
492 
493 		if (last_cluster == blob->active.num_clusters) {
494 			break;
495 		}
496 
497 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
498 						   &cur_page);
499 		if (rc < 0) {
500 			return rc;
501 		}
502 
503 		buf = (uint8_t *)cur_page->descriptors;
504 		remaining_sz = sizeof(cur_page->descriptors);
505 	}
506 
507 	return 0;
508 }
509 
510 struct spdk_blob_load_ctx {
511 	struct spdk_blob 		*blob;
512 
513 	struct spdk_blob_md_page	*pages;
514 	uint32_t			num_pages;
515 
516 	spdk_bs_sequence_cpl		cb_fn;
517 	void				*cb_arg;
518 };
519 
520 static void
521 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
522 {
523 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
524 	struct spdk_blob 		*blob = ctx->blob;
525 	struct spdk_blob_md_page	*page;
526 	int				rc;
527 
528 	page = &ctx->pages[ctx->num_pages - 1];
529 
530 	if (page->next != SPDK_INVALID_MD_PAGE) {
531 		uint32_t next_page = page->next;
532 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
533 
534 
535 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
536 
537 		/* Read the next page */
538 		ctx->num_pages++;
539 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
540 					      sizeof(*page), NULL);
541 		if (ctx->pages == NULL) {
542 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
543 			free(ctx);
544 			return;
545 		}
546 
547 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
548 				      next_lba,
549 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
550 				      _spdk_blob_load_cpl, ctx);
551 		return;
552 	}
553 
554 	/* Parse the pages */
555 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
556 
557 	_spdk_blob_mark_clean(blob);
558 
559 	ctx->cb_fn(seq, ctx->cb_arg, rc);
560 
561 	/* Free the memory */
562 	spdk_dma_free(ctx->pages);
563 	free(ctx);
564 }
565 
566 /* Load a blob from disk given a blobid */
567 static void
568 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
569 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
570 {
571 	struct spdk_blob_load_ctx *ctx;
572 	struct spdk_blob_store *bs;
573 	uint32_t page_num;
574 	uint64_t lba;
575 
576 	assert(blob != NULL);
577 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
578 	       blob->state == SPDK_BLOB_STATE_DIRTY);
579 
580 	bs = blob->bs;
581 
582 	ctx = calloc(1, sizeof(*ctx));
583 	if (!ctx) {
584 		cb_fn(seq, cb_arg, -ENOMEM);
585 		return;
586 	}
587 
588 	ctx->blob = blob;
589 	ctx->pages = spdk_dma_realloc(ctx->pages, sizeof(struct spdk_blob_md_page),
590 				      sizeof(struct spdk_blob_md_page), NULL);
591 	if (!ctx->pages) {
592 		free(ctx);
593 		cb_fn(seq, cb_arg, -ENOMEM);
594 		return;
595 	}
596 	ctx->num_pages = 1;
597 	ctx->cb_fn = cb_fn;
598 	ctx->cb_arg = cb_arg;
599 
600 	page_num = _spdk_bs_blobid_to_page(blob->id);
601 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
602 
603 	blob->state = SPDK_BLOB_STATE_LOADING;
604 
605 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
606 			      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)),
607 			      _spdk_blob_load_cpl, ctx);
608 }
609 
610 struct spdk_blob_persist_ctx {
611 	struct spdk_blob 		*blob;
612 
613 	struct spdk_blob_md_page	*pages;
614 
615 	uint64_t			idx;
616 
617 	spdk_bs_sequence_cpl		cb_fn;
618 	void				*cb_arg;
619 };
620 
621 static void
622 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
623 {
624 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
625 	struct spdk_blob 		*blob = ctx->blob;
626 
627 	if (bserrno == 0) {
628 		_spdk_blob_mark_clean(blob);
629 	}
630 
631 	/* Call user callback */
632 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
633 
634 	/* Free the memory */
635 	spdk_dma_free(ctx->pages);
636 	free(ctx);
637 }
638 
639 static void
640 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
641 {
642 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
643 	struct spdk_blob 		*blob = ctx->blob;
644 	struct spdk_blob_store		*bs = blob->bs;
645 	void				*tmp;
646 	size_t				i;
647 
648 	/* Release all clusters that were truncated */
649 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
650 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
651 
652 		_spdk_bs_release_cluster(bs, cluster_num);
653 	}
654 
655 	if (blob->active.num_clusters == 0) {
656 		free(blob->active.clusters);
657 		blob->active.clusters = NULL;
658 		blob->active.cluster_array_size = 0;
659 	} else {
660 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
661 		assert(tmp != NULL);
662 		blob->active.clusters = tmp;
663 		blob->active.cluster_array_size = blob->active.num_clusters;
664 	}
665 
666 	_spdk_blob_persist_complete(seq, ctx, bserrno);
667 }
668 
669 static void
670 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
671 {
672 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
673 	struct spdk_blob 		*blob = ctx->blob;
674 	struct spdk_blob_store		*bs = blob->bs;
675 	spdk_bs_batch_t			*batch;
676 	size_t				i;
677 	uint64_t			lba;
678 	uint32_t			lba_count;
679 
680 	/* Clusters don't move around in blobs. The list shrinks or grows
681 	 * at the end, but no changes ever occur in the middle of the list.
682 	 */
683 
684 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
685 
686 	/* Unmap all clusters that were truncated */
687 	lba = 0;
688 	lba_count = 0;
689 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
690 		uint64_t next_lba = blob->active.clusters[i];
691 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
692 
693 		if ((lba + lba_count) == next_lba) {
694 			/* This cluster is contiguous with the previous one. */
695 			lba_count += next_lba_count;
696 			continue;
697 		}
698 
699 		/* This cluster is not contiguous with the previous one. */
700 
701 		/* If a run of LBAs previously existing, send them
702 		 * as an unmap.
703 		 */
704 		if (lba_count > 0) {
705 			spdk_bs_batch_unmap(batch, lba, lba_count);
706 		}
707 
708 		/* Start building the next batch */
709 		lba = next_lba;
710 		lba_count = next_lba_count;
711 	}
712 
713 	/* If we ended with a contiguous set of LBAs, send the unmap now */
714 	if (lba_count > 0) {
715 		spdk_bs_batch_unmap(batch, lba, lba_count);
716 	}
717 
718 	spdk_bs_batch_close(batch);
719 }
720 
721 static void
722 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
723 {
724 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
725 	struct spdk_blob 		*blob = ctx->blob;
726 	struct spdk_blob_store		*bs = blob->bs;
727 	size_t				i;
728 
729 	/* This loop starts at 1 because the first page is special and handled
730 	 * below. The pages (except the first) are never written in place,
731 	 * so any pages in the clean list must be unmapped.
732 	 */
733 	for (i = 1; i < blob->clean.num_pages; i++) {
734 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
735 	}
736 
737 	if (blob->active.num_pages == 0) {
738 		uint32_t page_num;
739 
740 		page_num = _spdk_bs_blobid_to_page(blob->id);
741 		spdk_bit_array_clear(bs->used_md_pages, page_num);
742 	}
743 
744 	/* Move on to unmapping clusters */
745 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
746 }
747 
748 static void
749 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
750 {
751 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
752 	struct spdk_blob 		*blob = ctx->blob;
753 	struct spdk_blob_store		*bs = blob->bs;
754 	uint64_t			lba;
755 	uint32_t			lba_count;
756 	spdk_bs_batch_t			*batch;
757 	size_t				i;
758 
759 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx);
760 
761 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page));
762 
763 	/* This loop starts at 1 because the first page is special and handled
764 	 * below. The pages (except the first) are never written in place,
765 	 * so any pages in the clean list must be unmapped.
766 	 */
767 	for (i = 1; i < blob->clean.num_pages; i++) {
768 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
769 
770 		spdk_bs_batch_unmap(batch, lba, lba_count);
771 	}
772 
773 	/* The first page will only be unmapped if this is a delete. */
774 	if (blob->active.num_pages == 0) {
775 		uint32_t page_num;
776 
777 		/* The first page in the metadata goes where the blobid indicates */
778 		page_num = _spdk_bs_blobid_to_page(blob->id);
779 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
780 
781 		spdk_bs_batch_unmap(batch, lba, lba_count);
782 	}
783 
784 	spdk_bs_batch_close(batch);
785 }
786 
787 static void
788 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
789 {
790 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
791 	struct spdk_blob		*blob = ctx->blob;
792 	struct spdk_blob_store		*bs = blob->bs;
793 	uint64_t			lba;
794 	uint32_t			lba_count;
795 	struct spdk_blob_md_page	*page;
796 
797 	if (blob->active.num_pages == 0) {
798 		/* Move on to the next step */
799 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
800 		return;
801 	}
802 
803 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
804 
805 	page = &ctx->pages[0];
806 	/* The first page in the metadata goes where the blobid indicates */
807 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
808 
809 	spdk_bs_sequence_write(seq, page, lba, lba_count,
810 			       _spdk_blob_persist_unmap_pages, ctx);
811 }
812 
813 static void
814 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
815 {
816 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
817 	struct spdk_blob 		*blob = ctx->blob;
818 	struct spdk_blob_store		*bs = blob->bs;
819 	uint64_t 			lba;
820 	uint32_t			lba_count;
821 	struct spdk_blob_md_page	*page;
822 	spdk_bs_batch_t			*batch;
823 	size_t				i;
824 
825 	/* Clusters don't move around in blobs. The list shrinks or grows
826 	 * at the end, but no changes ever occur in the middle of the list.
827 	 */
828 
829 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
830 
831 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
832 
833 	/* This starts at 1. The root page is not written until
834 	 * all of the others are finished
835 	 */
836 	for (i = 1; i < blob->active.num_pages; i++) {
837 		page = &ctx->pages[i];
838 		assert(page->sequence_num == i);
839 
840 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
841 
842 		spdk_bs_batch_write(batch, page, lba, lba_count);
843 	}
844 
845 	spdk_bs_batch_close(batch);
846 }
847 
848 static int
849 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz)
850 {
851 	uint64_t	i;
852 	uint64_t	*tmp;
853 	uint64_t	lfc; /* lowest free cluster */
854 	struct spdk_blob_store *bs;
855 
856 	bs = blob->bs;
857 
858 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
859 	       blob->state != SPDK_BLOB_STATE_SYNCING);
860 
861 	if (blob->active.num_clusters == sz) {
862 		return 0;
863 	}
864 
865 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
866 		/* If this blob was resized to be larger, then smaller, then
867 		 * larger without syncing, then the cluster array already
868 		 * contains spare assigned clusters we can use.
869 		 */
870 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
871 						     sz);
872 	}
873 
874 	blob->state = SPDK_BLOB_STATE_DIRTY;
875 
876 	/* Do two passes - one to verify that we can obtain enough clusters
877 	 * and another to actually claim them.
878 	 */
879 
880 	lfc = 0;
881 	for (i = blob->active.num_clusters; i < sz; i++) {
882 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
883 		if (lfc >= bs->total_clusters) {
884 			/* No more free clusters. Cannot satisfy the request */
885 			assert(false);
886 			return -1;
887 		}
888 		lfc++;
889 	}
890 
891 	if (sz > blob->active.num_clusters) {
892 		/* Expand the cluster array if necessary.
893 		 * We only shrink the array when persisting.
894 		 */
895 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
896 		if (sz > 0 && tmp == NULL) {
897 			assert(false);
898 			return -1;
899 		}
900 		blob->active.clusters = tmp;
901 		blob->active.cluster_array_size = sz;
902 	}
903 
904 	lfc = 0;
905 	for (i = blob->active.num_clusters; i < sz; i++) {
906 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
907 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
908 		_spdk_bs_claim_cluster(bs, lfc);
909 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
910 		lfc++;
911 	}
912 
913 	blob->active.num_clusters = sz;
914 
915 	return 0;
916 }
917 
918 /* Write a blob to disk */
919 static void
920 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
921 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
922 {
923 	struct spdk_blob_persist_ctx *ctx;
924 	int rc;
925 	uint64_t i;
926 	uint32_t page_num;
927 	struct spdk_blob_store *bs;
928 
929 	assert(blob != NULL);
930 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
931 	       blob->state == SPDK_BLOB_STATE_DIRTY);
932 
933 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
934 		cb_fn(seq, cb_arg, 0);
935 		return;
936 	}
937 
938 	bs = blob->bs;
939 
940 	ctx = calloc(1, sizeof(*ctx));
941 	if (!ctx) {
942 		cb_fn(seq, cb_arg, -ENOMEM);
943 		return;
944 	}
945 	ctx->blob = blob;
946 	ctx->cb_fn = cb_fn;
947 	ctx->cb_arg = cb_arg;
948 
949 	blob->state = SPDK_BLOB_STATE_SYNCING;
950 
951 	if (blob->active.num_pages == 0) {
952 		/* This is the signal that the blob should be deleted.
953 		 * Immediately jump to the clean up routine. */
954 		assert(blob->clean.num_pages > 0);
955 		ctx->idx = blob->clean.num_pages - 1;
956 		_spdk_blob_persist_unmap_pages(seq, ctx, 0);
957 		return;
958 
959 	}
960 
961 	/* Generate the new metadata */
962 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
963 	if (rc < 0) {
964 		free(ctx);
965 		cb_fn(seq, cb_arg, rc);
966 		return;
967 	}
968 
969 	assert(blob->active.num_pages >= 1);
970 
971 	/* Resize the cache of page indices */
972 	blob->active.pages = realloc(blob->active.pages,
973 				     blob->active.num_pages * sizeof(*blob->active.pages));
974 	if (!blob->active.pages) {
975 		free(ctx);
976 		cb_fn(seq, cb_arg, -ENOMEM);
977 		return;
978 	}
979 
980 	/* Assign this metadata to pages. This requires two passes -
981 	 * one to verify that there are enough pages and a second
982 	 * to actually claim them. */
983 	page_num = 0;
984 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
985 	for (i = 1; i < blob->active.num_pages; i++) {
986 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
987 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
988 			spdk_dma_free(ctx->pages);
989 			free(ctx);
990 			blob->state = SPDK_BLOB_STATE_DIRTY;
991 			cb_fn(seq, cb_arg, -ENOMEM);
992 			return;
993 		}
994 		page_num++;
995 	}
996 
997 	page_num = 0;
998 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
999 	for (i = 1; i < blob->active.num_pages; i++) {
1000 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1001 		ctx->pages[i - 1].next = page_num;
1002 		blob->active.pages[i] = page_num;
1003 		spdk_bit_array_set(bs->used_md_pages, page_num);
1004 		SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1005 		page_num++;
1006 	}
1007 
1008 	/* Start writing the metadata from last page to first */
1009 	ctx->idx = blob->active.num_pages - 1;
1010 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1011 }
1012 
1013 static void
1014 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1015 			     void *payload, uint64_t offset, uint64_t length,
1016 			     spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1017 {
1018 	spdk_bs_batch_t			*batch;
1019 	struct spdk_bs_cpl		cpl;
1020 	uint64_t			lba;
1021 	uint32_t			lba_count;
1022 	uint8_t				*buf;
1023 	uint64_t			page;
1024 
1025 	assert(blob != NULL);
1026 
1027 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1028 		cb_fn(cb_arg, -EINVAL);
1029 		return;
1030 	}
1031 
1032 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1033 	cpl.u.blob_basic.cb_fn = cb_fn;
1034 	cpl.u.blob_basic.cb_arg = cb_arg;
1035 
1036 	batch = spdk_bs_batch_open(_channel, &cpl);
1037 	if (!batch) {
1038 		cb_fn(cb_arg, -ENOMEM);
1039 		return;
1040 	}
1041 
1042 	length = _spdk_bs_page_to_lba(blob->bs, length);
1043 	page = offset;
1044 	buf = payload;
1045 	while (length > 0) {
1046 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1047 		lba_count = spdk_min(length,
1048 				     _spdk_bs_page_to_lba(blob->bs,
1049 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1050 
1051 		if (read) {
1052 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1053 		} else {
1054 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1055 		}
1056 
1057 		length -= lba_count;
1058 		buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1059 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1060 	}
1061 
1062 	spdk_bs_batch_close(batch);
1063 }
1064 
1065 static struct spdk_blob *
1066 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1067 {
1068 	struct spdk_blob *blob;
1069 
1070 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1071 		if (blob->id == blobid) {
1072 			return blob;
1073 		}
1074 	}
1075 
1076 	return NULL;
1077 }
1078 
1079 static int
1080 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel,
1081 			uint32_t max_ops)
1082 {
1083 	struct spdk_bs_dev		*dev;
1084 	uint32_t			i;
1085 
1086 	dev = bs->dev;
1087 
1088 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1089 	if (!channel->req_mem) {
1090 		return -1;
1091 	}
1092 
1093 	TAILQ_INIT(&channel->reqs);
1094 
1095 	for (i = 0; i < max_ops; i++) {
1096 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1097 	}
1098 
1099 	channel->bs = bs;
1100 	channel->dev = dev;
1101 	channel->dev_channel = dev->create_channel(dev);
1102 
1103 	return 0;
1104 }
1105 
1106 static int
1107 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf)
1108 {
1109 	struct spdk_blob_store		*bs;
1110 	struct spdk_bs_channel		*channel = ctx_buf;
1111 
1112 	bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target);
1113 
1114 	return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops);
1115 }
1116 
1117 static int
1118 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf)
1119 {
1120 	struct spdk_blob_store		*bs;
1121 	struct spdk_bs_channel		*channel = ctx_buf;
1122 
1123 	bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target);
1124 
1125 	return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops);
1126 }
1127 
1128 
1129 static void
1130 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1131 {
1132 	struct spdk_bs_channel *channel = ctx_buf;
1133 
1134 	free(channel->req_mem);
1135 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1136 }
1137 
1138 static void
1139 _spdk_bs_free(struct spdk_blob_store *bs)
1140 {
1141 	struct spdk_blob	*blob, *blob_tmp;
1142 
1143 	spdk_bs_unregister_md_thread(bs);
1144 	spdk_io_device_unregister(&bs->io_target, NULL);
1145 	spdk_io_device_unregister(&bs->md_target, NULL);
1146 
1147 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1148 		TAILQ_REMOVE(&bs->blobs, blob, link);
1149 		_spdk_blob_free(blob);
1150 	}
1151 
1152 	spdk_bit_array_free(&bs->used_md_pages);
1153 	spdk_bit_array_free(&bs->used_clusters);
1154 
1155 	bs->dev->destroy(bs->dev);
1156 	free(bs);
1157 }
1158 
1159 void
1160 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1161 {
1162 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1163 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1164 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1165 	opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS;
1166 }
1167 
1168 static struct spdk_blob_store *
1169 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1170 {
1171 	struct spdk_blob_store	*bs;
1172 
1173 	bs = calloc(1, sizeof(struct spdk_blob_store));
1174 	if (!bs) {
1175 		return NULL;
1176 	}
1177 
1178 	TAILQ_INIT(&bs->blobs);
1179 	bs->dev = dev;
1180 
1181 	/*
1182 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1183 	 *  even multiple of the cluster size.
1184 	 */
1185 	bs->cluster_sz = opts->cluster_sz;
1186 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1187 	bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1188 	bs->num_free_clusters = bs->total_clusters;
1189 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1190 	if (bs->used_clusters == NULL) {
1191 		_spdk_bs_free(bs);
1192 		return NULL;
1193 	}
1194 
1195 	bs->md_target.max_md_ops = opts->max_md_ops;
1196 	bs->io_target.max_channel_ops = opts->max_channel_ops;
1197 	bs->super_blob = SPDK_BLOBID_INVALID;
1198 
1199 	/* The metadata is assumed to be at least 1 page */
1200 	bs->used_md_pages = spdk_bit_array_create(1);
1201 
1202 	spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy,
1203 				sizeof(struct spdk_bs_channel));
1204 	spdk_bs_register_md_thread(bs);
1205 
1206 	spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy,
1207 				sizeof(struct spdk_bs_channel));
1208 
1209 	return bs;
1210 }
1211 
1212 /* START spdk_bs_load */
1213 
1214 struct spdk_bs_load_ctx {
1215 	struct spdk_blob_store		*bs;
1216 	struct spdk_bs_super_block	*super;
1217 
1218 	struct spdk_bs_md_mask		*mask;
1219 };
1220 
1221 static void
1222 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1223 {
1224 	struct spdk_bs_load_ctx *ctx = cb_arg;
1225 	uint32_t		i, j;
1226 	int			rc;
1227 
1228 	/* The type must be correct */
1229 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1230 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1231 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1232 					     struct spdk_blob_md_page) * 8));
1233 	/* The length of the mask must be exactly equal to the total number of clusters */
1234 	assert(ctx->mask->length == ctx->bs->total_clusters);
1235 
1236 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1237 	if (rc < 0) {
1238 		spdk_dma_free(ctx->super);
1239 		spdk_dma_free(ctx->mask);
1240 		_spdk_bs_free(ctx->bs);
1241 		free(ctx);
1242 		spdk_bs_sequence_finish(seq, -ENOMEM);
1243 		return;
1244 	}
1245 
1246 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1247 	for (i = 0; i < ctx->mask->length / 8; i++) {
1248 		uint8_t segment = ctx->mask->mask[i];
1249 		for (j = 0; segment && (j < 8); j++) {
1250 			if (segment & 1U) {
1251 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1252 				assert(ctx->bs->num_free_clusters > 0);
1253 				ctx->bs->num_free_clusters--;
1254 			}
1255 			segment >>= 1U;
1256 		}
1257 	}
1258 
1259 	spdk_dma_free(ctx->super);
1260 	spdk_dma_free(ctx->mask);
1261 	free(ctx);
1262 
1263 	spdk_bs_sequence_finish(seq, bserrno);
1264 }
1265 
1266 static void
1267 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1268 {
1269 	struct spdk_bs_load_ctx *ctx = cb_arg;
1270 	uint64_t		lba, lba_count, mask_size;
1271 	uint32_t		i, j;
1272 	int			rc;
1273 
1274 	/* The type must be correct */
1275 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1276 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1277 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) *
1278 				     8));
1279 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1280 	assert(ctx->mask->length == ctx->super->md_len);
1281 
1282 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1283 	if (rc < 0) {
1284 		spdk_dma_free(ctx->super);
1285 		spdk_dma_free(ctx->mask);
1286 		_spdk_bs_free(ctx->bs);
1287 		free(ctx);
1288 		spdk_bs_sequence_finish(seq, -ENOMEM);
1289 		return;
1290 	}
1291 
1292 	for (i = 0; i < ctx->mask->length / 8; i++) {
1293 		uint8_t segment = ctx->mask->mask[i];
1294 		for (j = 0; segment && (j < 8); j++) {
1295 			if (segment & 1U) {
1296 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1297 			}
1298 			segment >>= 1U;
1299 		}
1300 	}
1301 	spdk_dma_free(ctx->mask);
1302 
1303 	/* Read the used clusters mask */
1304 	mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page);
1305 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1306 	if (!ctx->mask) {
1307 		spdk_dma_free(ctx->super);
1308 		_spdk_bs_free(ctx->bs);
1309 		free(ctx);
1310 		spdk_bs_sequence_finish(seq, -ENOMEM);
1311 		return;
1312 	}
1313 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1314 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1315 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1316 			      _spdk_bs_load_used_clusters_cpl, ctx);
1317 }
1318 
1319 static void
1320 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1321 {
1322 	struct spdk_bs_load_ctx *ctx = cb_arg;
1323 	uint64_t		lba, lba_count, mask_size;
1324 
1325 	if (ctx->super->version != SPDK_BS_VERSION) {
1326 		spdk_dma_free(ctx->super);
1327 		_spdk_bs_free(ctx->bs);
1328 		free(ctx);
1329 		spdk_bs_sequence_finish(seq, -EILSEQ);
1330 		return;
1331 	}
1332 
1333 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1334 		   sizeof(ctx->super->signature)) != 0) {
1335 		spdk_dma_free(ctx->super);
1336 		_spdk_bs_free(ctx->bs);
1337 		free(ctx);
1338 		spdk_bs_sequence_finish(seq, -EILSEQ);
1339 		return;
1340 	}
1341 
1342 	if (ctx->super->clean != 1) {
1343 		/* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED.
1344 		 * All of the necessary data to recover is available
1345 		 * on disk - the code just has not been written yet.
1346 		 */
1347 		assert(false);
1348 		spdk_dma_free(ctx->super);
1349 		_spdk_bs_free(ctx->bs);
1350 		free(ctx);
1351 		spdk_bs_sequence_finish(seq, -EILSEQ);
1352 		return;
1353 	}
1354 	ctx->super->clean = 0;
1355 
1356 	/* Parse the super block */
1357 	ctx->bs->cluster_sz = ctx->super->cluster_size;
1358 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
1359 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page);
1360 	ctx->bs->md_start = ctx->super->md_start;
1361 	ctx->bs->md_len = ctx->super->md_len;
1362 	ctx->bs->super_blob = ctx->super->super_blob;
1363 
1364 	/* Read the used pages mask */
1365 	mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page);
1366 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1367 	if (!ctx->mask) {
1368 		spdk_dma_free(ctx->super);
1369 		_spdk_bs_free(ctx->bs);
1370 		free(ctx);
1371 		spdk_bs_sequence_finish(seq, -ENOMEM);
1372 		return;
1373 	}
1374 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1375 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1376 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1377 			      _spdk_bs_load_used_pages_cpl, ctx);
1378 }
1379 
1380 void
1381 spdk_bs_load(struct spdk_bs_dev *dev,
1382 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1383 {
1384 	struct spdk_blob_store	*bs;
1385 	struct spdk_bs_cpl	cpl;
1386 	spdk_bs_sequence_t	*seq;
1387 	struct spdk_bs_load_ctx *ctx;
1388 	struct spdk_bs_opts	opts = {};
1389 
1390 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev);
1391 
1392 	spdk_bs_opts_init(&opts);
1393 
1394 	bs = _spdk_bs_alloc(dev, &opts);
1395 	if (!bs) {
1396 		cb_fn(cb_arg, NULL, -ENOMEM);
1397 		return;
1398 	}
1399 
1400 	ctx = calloc(1, sizeof(*ctx));
1401 	if (!ctx) {
1402 		_spdk_bs_free(bs);
1403 		cb_fn(cb_arg, NULL, -ENOMEM);
1404 		return;
1405 	}
1406 
1407 	ctx->bs = bs;
1408 
1409 	/* Allocate memory for the super block */
1410 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1411 	if (!ctx->super) {
1412 		free(ctx);
1413 		_spdk_bs_free(bs);
1414 		return;
1415 	}
1416 
1417 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1418 	cpl.u.bs_handle.cb_fn = cb_fn;
1419 	cpl.u.bs_handle.cb_arg = cb_arg;
1420 	cpl.u.bs_handle.bs = bs;
1421 
1422 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1423 	if (!seq) {
1424 		spdk_dma_free(ctx->super);
1425 		free(ctx);
1426 		_spdk_bs_free(bs);
1427 		cb_fn(cb_arg, NULL, -ENOMEM);
1428 		return;
1429 	}
1430 
1431 	/* Read the super block */
1432 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1433 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1434 			      _spdk_bs_load_super_cpl, ctx);
1435 }
1436 
1437 /* END spdk_bs_load */
1438 
1439 /* START spdk_bs_init */
1440 
1441 struct spdk_bs_init_ctx {
1442 	struct spdk_blob_store		*bs;
1443 	struct spdk_bs_super_block	*super;
1444 };
1445 
1446 static void
1447 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1448 {
1449 	struct spdk_bs_init_ctx *ctx = cb_arg;
1450 
1451 	spdk_dma_free(ctx->super);
1452 	free(ctx);
1453 
1454 	spdk_bs_sequence_finish(seq, bserrno);
1455 }
1456 
1457 static void
1458 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1459 {
1460 	struct spdk_bs_init_ctx *ctx = cb_arg;
1461 
1462 	/* Write super block */
1463 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1464 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1465 			       _spdk_bs_init_persist_super_cpl, ctx);
1466 }
1467 
1468 void
1469 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
1470 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
1471 {
1472 	struct spdk_bs_init_ctx *ctx;
1473 	struct spdk_blob_store	*bs;
1474 	struct spdk_bs_cpl	cpl;
1475 	spdk_bs_sequence_t	*seq;
1476 	uint64_t		num_md_pages;
1477 	uint32_t		i;
1478 	struct spdk_bs_opts	opts = {};
1479 	int			rc;
1480 
1481 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev);
1482 
1483 	if (o) {
1484 		opts = *o;
1485 	} else {
1486 		spdk_bs_opts_init(&opts);
1487 	}
1488 
1489 	bs = _spdk_bs_alloc(dev, &opts);
1490 	if (!bs) {
1491 		cb_fn(cb_arg, NULL, -ENOMEM);
1492 		return;
1493 	}
1494 
1495 	if (opts.num_md_pages == UINT32_MAX) {
1496 		/* By default, allocate 1 page per cluster.
1497 		 * Technically, this over-allocates metadata
1498 		 * because more metadata will reduce the number
1499 		 * of usable clusters. This can be addressed with
1500 		 * more complex math in the future.
1501 		 */
1502 		bs->md_len = bs->total_clusters;
1503 	} else {
1504 		bs->md_len = opts.num_md_pages;
1505 	}
1506 
1507 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
1508 	if (rc < 0) {
1509 		_spdk_bs_free(bs);
1510 		cb_fn(cb_arg, NULL, -ENOMEM);
1511 		return;
1512 	}
1513 
1514 	ctx = calloc(1, sizeof(*ctx));
1515 	if (!ctx) {
1516 		_spdk_bs_free(bs);
1517 		cb_fn(cb_arg, NULL, -ENOMEM);
1518 		return;
1519 	}
1520 
1521 	ctx->bs = bs;
1522 
1523 	/* Allocate memory for the super block */
1524 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1525 	if (!ctx->super) {
1526 		free(ctx);
1527 		_spdk_bs_free(bs);
1528 		return;
1529 	}
1530 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1531 	       sizeof(ctx->super->signature));
1532 	ctx->super->version = SPDK_BS_VERSION;
1533 	ctx->super->length = sizeof(*ctx->super);
1534 	ctx->super->super_blob = bs->super_blob;
1535 	ctx->super->clean = 0;
1536 	ctx->super->cluster_size = bs->cluster_sz;
1537 
1538 	/* Calculate how many pages the metadata consumes at the front
1539 	 * of the disk.
1540 	 */
1541 
1542 	/* The super block uses 1 page */
1543 	num_md_pages = 1;
1544 
1545 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
1546 	 * up to the nearest page, plus a header.
1547 	 */
1548 	ctx->super->used_page_mask_start = num_md_pages;
1549 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1550 					 divide_round_up(bs->md_len, 8),
1551 					 sizeof(struct spdk_blob_md_page));
1552 	num_md_pages += ctx->super->used_page_mask_len;
1553 
1554 	/* The used_clusters mask requires 1 bit per cluster, rounded
1555 	 * up to the nearest page, plus a header.
1556 	 */
1557 	ctx->super->used_cluster_mask_start = num_md_pages;
1558 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
1559 					    divide_round_up(bs->total_clusters, 8),
1560 					    sizeof(struct spdk_blob_md_page));
1561 	num_md_pages += ctx->super->used_cluster_mask_len;
1562 
1563 	/* The metadata region size was chosen above */
1564 	ctx->super->md_start = bs->md_start = num_md_pages;
1565 	ctx->super->md_len = bs->md_len;
1566 	num_md_pages += bs->md_len;
1567 
1568 	/* Claim all of the clusters used by the metadata */
1569 	for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) {
1570 		_spdk_bs_claim_cluster(bs, i);
1571 	}
1572 
1573 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
1574 	cpl.u.bs_handle.cb_fn = cb_fn;
1575 	cpl.u.bs_handle.cb_arg = cb_arg;
1576 	cpl.u.bs_handle.bs = bs;
1577 
1578 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1579 	if (!seq) {
1580 		spdk_dma_free(ctx->super);
1581 		free(ctx);
1582 		_spdk_bs_free(bs);
1583 		cb_fn(cb_arg, NULL, -ENOMEM);
1584 		return;
1585 	}
1586 
1587 	/* TRIM the entire device */
1588 	spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx);
1589 }
1590 
1591 /* END spdk_bs_init */
1592 
1593 /* START spdk_bs_unload */
1594 
1595 struct spdk_bs_unload_ctx {
1596 	struct spdk_blob_store		*bs;
1597 	struct spdk_bs_super_block	*super;
1598 
1599 	struct spdk_bs_md_mask		*mask;
1600 };
1601 
1602 static void
1603 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1604 {
1605 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1606 
1607 	spdk_dma_free(ctx->super);
1608 
1609 	spdk_bs_sequence_finish(seq, bserrno);
1610 
1611 	_spdk_bs_free(ctx->bs);
1612 	free(ctx);
1613 }
1614 
1615 static void
1616 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1617 {
1618 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1619 
1620 	spdk_dma_free(ctx->mask);
1621 
1622 	/* Update the values in the super block */
1623 	ctx->super->super_blob = ctx->bs->super_blob;
1624 	ctx->super->clean = 1;
1625 
1626 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
1627 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
1628 			       _spdk_bs_unload_write_super_cpl, ctx);
1629 }
1630 
1631 static void
1632 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1633 {
1634 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1635 	uint32_t			i;
1636 	uint64_t			lba, lba_count, mask_size;
1637 
1638 	spdk_dma_free(ctx->mask);
1639 
1640 	/* Write out the used clusters mask */
1641 	mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page);
1642 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1643 	if (!ctx->mask) {
1644 		spdk_dma_free(ctx->super);
1645 		free(ctx);
1646 		spdk_bs_sequence_finish(seq, -ENOMEM);
1647 		return;
1648 	}
1649 
1650 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1651 	ctx->mask->length = ctx->bs->total_clusters;
1652 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1653 
1654 	i = 0;
1655 	while (true) {
1656 		i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i);
1657 		if (i > ctx->mask->length) {
1658 			break;
1659 		}
1660 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1661 		i++;
1662 	}
1663 
1664 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1665 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1666 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1667 			       _spdk_bs_unload_write_used_clusters_cpl, ctx);
1668 }
1669 
1670 static void
1671 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1672 {
1673 	struct spdk_bs_unload_ctx	*ctx = cb_arg;
1674 	uint32_t			i;
1675 	uint64_t			lba, lba_count, mask_size;
1676 
1677 	/* Write out the used page mask */
1678 	mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page);
1679 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1680 	if (!ctx->mask) {
1681 		spdk_dma_free(ctx->super);
1682 		free(ctx);
1683 		spdk_bs_sequence_finish(seq, -ENOMEM);
1684 		return;
1685 	}
1686 
1687 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1688 	ctx->mask->length = ctx->super->md_len;
1689 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1690 
1691 	i = 0;
1692 	while (true) {
1693 		i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i);
1694 		if (i > ctx->mask->length) {
1695 			break;
1696 		}
1697 		ctx->mask->mask[i / 8] |= 1U << (i % 8);
1698 		i++;
1699 	}
1700 
1701 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1702 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1703 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count,
1704 			       _spdk_bs_unload_write_used_pages_cpl, ctx);
1705 }
1706 
1707 void
1708 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
1709 {
1710 	struct spdk_bs_cpl	cpl;
1711 	spdk_bs_sequence_t	*seq;
1712 	struct spdk_bs_unload_ctx *ctx;
1713 
1714 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n");
1715 
1716 	ctx = calloc(1, sizeof(*ctx));
1717 	if (!ctx) {
1718 		cb_fn(cb_arg, -ENOMEM);
1719 		return;
1720 	}
1721 
1722 	ctx->bs = bs;
1723 
1724 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
1725 	if (!ctx->super) {
1726 		free(ctx);
1727 		cb_fn(cb_arg, -ENOMEM);
1728 		return;
1729 	}
1730 
1731 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
1732 	cpl.u.bs_basic.cb_fn = cb_fn;
1733 	cpl.u.bs_basic.cb_arg = cb_arg;
1734 
1735 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1736 	if (!seq) {
1737 		spdk_dma_free(ctx->super);
1738 		free(ctx);
1739 		cb_fn(cb_arg, -ENOMEM);
1740 		return;
1741 	}
1742 
1743 	assert(TAILQ_EMPTY(&bs->blobs));
1744 
1745 	/* Read super block */
1746 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
1747 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
1748 			      _spdk_bs_unload_read_super_cpl, ctx);
1749 }
1750 
1751 /* END spdk_bs_unload */
1752 
1753 void
1754 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
1755 		  spdk_bs_op_complete cb_fn, void *cb_arg)
1756 {
1757 	bs->super_blob = blobid;
1758 	cb_fn(cb_arg, 0);
1759 }
1760 
1761 void
1762 spdk_bs_get_super(struct spdk_blob_store *bs,
1763 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1764 {
1765 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
1766 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
1767 	} else {
1768 		cb_fn(cb_arg, bs->super_blob, 0);
1769 	}
1770 }
1771 
1772 uint64_t
1773 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
1774 {
1775 	return bs->cluster_sz;
1776 }
1777 
1778 uint64_t
1779 spdk_bs_get_page_size(struct spdk_blob_store *bs)
1780 {
1781 	return sizeof(struct spdk_blob_md_page);
1782 }
1783 
1784 uint64_t
1785 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
1786 {
1787 	return bs->num_free_clusters;
1788 }
1789 
1790 int spdk_bs_register_md_thread(struct spdk_blob_store *bs)
1791 {
1792 	bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target);
1793 
1794 	return 0;
1795 }
1796 
1797 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
1798 {
1799 	spdk_put_io_channel(bs->md_target.md_channel);
1800 
1801 	return 0;
1802 }
1803 
1804 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
1805 {
1806 	assert(blob != NULL);
1807 
1808 	return blob->id;
1809 }
1810 
1811 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
1812 {
1813 	assert(blob != NULL);
1814 
1815 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
1816 }
1817 
1818 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
1819 {
1820 	assert(blob != NULL);
1821 
1822 	return blob->active.num_clusters;
1823 }
1824 
1825 /* START spdk_bs_md_create_blob */
1826 
1827 static void
1828 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1829 {
1830 	struct spdk_blob *blob = cb_arg;
1831 
1832 	_spdk_blob_free(blob);
1833 
1834 	spdk_bs_sequence_finish(seq, bserrno);
1835 }
1836 
1837 void spdk_bs_md_create_blob(struct spdk_blob_store *bs,
1838 			    spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
1839 {
1840 	struct spdk_blob	*blob;
1841 	uint32_t		page_idx;
1842 	struct spdk_bs_cpl 	cpl;
1843 	spdk_bs_sequence_t	*seq;
1844 	spdk_blob_id		id;
1845 
1846 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
1847 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
1848 		cb_fn(cb_arg, 0, -ENOMEM);
1849 		return;
1850 	}
1851 	spdk_bit_array_set(bs->used_md_pages, page_idx);
1852 
1853 	/* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper
1854 	 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the
1855 	 * code assumes blob id == page_idx.
1856 	 */
1857 	id = (1ULL << 32) | page_idx;
1858 
1859 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
1860 
1861 	blob = _spdk_blob_alloc(bs, id);
1862 	if (!blob) {
1863 		cb_fn(cb_arg, 0, -ENOMEM);
1864 		return;
1865 	}
1866 
1867 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
1868 	cpl.u.blobid.cb_fn = cb_fn;
1869 	cpl.u.blobid.cb_arg = cb_arg;
1870 	cpl.u.blobid.blobid = blob->id;
1871 
1872 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1873 	if (!seq) {
1874 		_spdk_blob_free(blob);
1875 		cb_fn(cb_arg, 0, -ENOMEM);
1876 		return;
1877 	}
1878 
1879 	_spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob);
1880 }
1881 
1882 /* END spdk_bs_md_create_blob */
1883 
1884 /* START spdk_bs_md_resize_blob */
1885 int
1886 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz)
1887 {
1888 	int			rc;
1889 
1890 	assert(blob != NULL);
1891 
1892 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
1893 
1894 	if (sz == blob->active.num_clusters) {
1895 		return 0;
1896 	}
1897 
1898 	rc = _spdk_resize_blob(blob, sz);
1899 	if (rc < 0) {
1900 		return rc;
1901 	}
1902 
1903 	return 0;
1904 }
1905 
1906 /* END spdk_bs_md_resize_blob */
1907 
1908 
1909 /* START spdk_bs_md_delete_blob */
1910 
1911 static void
1912 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1913 {
1914 	struct spdk_blob *blob = cb_arg;
1915 
1916 	_spdk_blob_free(blob);
1917 
1918 	spdk_bs_sequence_finish(seq, bserrno);
1919 }
1920 
1921 static void
1922 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1923 {
1924 	struct spdk_blob *blob = cb_arg;
1925 
1926 	blob->state = SPDK_BLOB_STATE_DIRTY;
1927 	blob->active.num_pages = 0;
1928 	_spdk_resize_blob(blob, 0);
1929 
1930 	_spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob);
1931 }
1932 
1933 void
1934 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1935 		       spdk_blob_op_complete cb_fn, void *cb_arg)
1936 {
1937 	struct spdk_blob	*blob;
1938 	struct spdk_bs_cpl	cpl;
1939 	spdk_bs_sequence_t 	*seq;
1940 
1941 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid);
1942 
1943 	blob = _spdk_blob_lookup(bs, blobid);
1944 	if (blob) {
1945 		assert(blob->open_ref > 0);
1946 		cb_fn(cb_arg, -EINVAL);
1947 		return;
1948 	}
1949 
1950 	blob = _spdk_blob_alloc(bs, blobid);
1951 	if (!blob) {
1952 		cb_fn(cb_arg, -ENOMEM);
1953 		return;
1954 	}
1955 
1956 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1957 	cpl.u.blob_basic.cb_fn = cb_fn;
1958 	cpl.u.blob_basic.cb_arg = cb_arg;
1959 
1960 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
1961 	if (!seq) {
1962 		_spdk_blob_free(blob);
1963 		cb_fn(cb_arg, -ENOMEM);
1964 		return;
1965 	}
1966 
1967 	_spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob);
1968 }
1969 
1970 /* END spdk_bs_md_delete_blob */
1971 
1972 /* START spdk_bs_md_open_blob */
1973 
1974 static void
1975 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1976 {
1977 	struct spdk_blob *blob = cb_arg;
1978 
1979 	blob->open_ref++;
1980 
1981 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
1982 
1983 	spdk_bs_sequence_finish(seq, bserrno);
1984 }
1985 
1986 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
1987 			  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
1988 {
1989 	struct spdk_blob		*blob;
1990 	struct spdk_bs_cpl		cpl;
1991 	spdk_bs_sequence_t		*seq;
1992 	uint32_t			page_num;
1993 
1994 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid);
1995 
1996 	blob = _spdk_blob_lookup(bs, blobid);
1997 	if (blob) {
1998 		blob->open_ref++;
1999 		cb_fn(cb_arg, blob, 0);
2000 		return;
2001 	}
2002 
2003 	page_num = _spdk_bs_blobid_to_page(blobid);
2004 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
2005 		/* Invalid blobid */
2006 		cb_fn(cb_arg, NULL, -ENOENT);
2007 		return;
2008 	}
2009 
2010 	blob = _spdk_blob_alloc(bs, blobid);
2011 	if (!blob) {
2012 		cb_fn(cb_arg, NULL, -ENOMEM);
2013 		return;
2014 	}
2015 
2016 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
2017 	cpl.u.blob_handle.cb_fn = cb_fn;
2018 	cpl.u.blob_handle.cb_arg = cb_arg;
2019 	cpl.u.blob_handle.blob = blob;
2020 
2021 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2022 	if (!seq) {
2023 		_spdk_blob_free(blob);
2024 		cb_fn(cb_arg, NULL, -ENOMEM);
2025 		return;
2026 	}
2027 
2028 	_spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob);
2029 }
2030 
2031 /* START spdk_bs_md_sync_blob */
2032 static void
2033 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2034 {
2035 	spdk_bs_sequence_finish(seq, bserrno);
2036 }
2037 
2038 void spdk_bs_md_sync_blob(struct spdk_blob *blob,
2039 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2040 {
2041 	struct spdk_bs_cpl	cpl;
2042 	spdk_bs_sequence_t	*seq;
2043 
2044 	assert(blob != NULL);
2045 
2046 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id);
2047 
2048 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2049 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2050 
2051 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2052 		cb_fn(cb_arg, 0);
2053 		return;
2054 	}
2055 
2056 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2057 	cpl.u.blob_basic.cb_fn = cb_fn;
2058 	cpl.u.blob_basic.cb_arg = cb_arg;
2059 
2060 	seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl);
2061 	if (!seq) {
2062 		cb_fn(cb_arg, -ENOMEM);
2063 		return;
2064 	}
2065 
2066 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob);
2067 }
2068 
2069 /* END spdk_bs_md_sync_blob */
2070 
2071 /* START spdk_bs_md_close_blob */
2072 
2073 static void
2074 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2075 {
2076 	struct spdk_blob **blob = cb_arg;
2077 
2078 	if ((*blob)->open_ref == 0) {
2079 		TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link);
2080 		_spdk_blob_free((*blob));
2081 	}
2082 
2083 	*blob = NULL;
2084 
2085 	spdk_bs_sequence_finish(seq, bserrno);
2086 }
2087 
2088 void spdk_bs_md_close_blob(struct spdk_blob **b,
2089 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2090 {
2091 	struct spdk_bs_cpl	cpl;
2092 	struct spdk_blob	*blob;
2093 	spdk_bs_sequence_t	*seq;
2094 
2095 	assert(b != NULL);
2096 	blob = *b;
2097 	assert(blob != NULL);
2098 
2099 	SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id);
2100 
2101 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2102 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2103 
2104 	if (blob->open_ref == 0) {
2105 		cb_fn(cb_arg, -EBADF);
2106 		return;
2107 	}
2108 
2109 	blob->open_ref--;
2110 
2111 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2112 	cpl.u.blob_basic.cb_fn = cb_fn;
2113 	cpl.u.blob_basic.cb_arg = cb_arg;
2114 
2115 	seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl);
2116 	if (!seq) {
2117 		cb_fn(cb_arg, -ENOMEM);
2118 		return;
2119 	}
2120 
2121 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2122 		_spdk_blob_close_cpl(seq, b, 0);
2123 		return;
2124 	}
2125 
2126 	/* Sync metadata */
2127 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b);
2128 }
2129 
2130 /* END spdk_bs_md_close_blob */
2131 
2132 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
2133 {
2134 	return spdk_get_io_channel(&bs->io_target);
2135 }
2136 
2137 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2138 {
2139 	spdk_put_io_channel(channel);
2140 }
2141 
2142 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel,
2143 			      spdk_blob_op_complete cb_fn, void *cb_arg)
2144 {
2145 	/* Flush is synchronous right now */
2146 	cb_fn(cb_arg, 0);
2147 }
2148 
2149 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2150 			   void *payload, uint64_t offset, uint64_t length,
2151 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2152 {
2153 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false);
2154 }
2155 
2156 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2157 			  void *payload, uint64_t offset, uint64_t length,
2158 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2159 {
2160 	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true);
2161 }
2162 
2163 struct spdk_bs_iter_ctx {
2164 	int64_t page_num;
2165 	struct spdk_blob_store *bs;
2166 
2167 	spdk_blob_op_with_handle_complete cb_fn;
2168 	void *cb_arg;
2169 };
2170 
2171 static void
2172 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
2173 {
2174 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2175 	struct spdk_blob_store *bs = ctx->bs;
2176 	spdk_blob_id id;
2177 
2178 	if (bserrno == 0) {
2179 		ctx->cb_fn(ctx->cb_arg, blob, bserrno);
2180 		free(ctx);
2181 		return;
2182 	}
2183 
2184 	ctx->page_num++;
2185 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2186 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2187 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2188 		free(ctx);
2189 		return;
2190 	}
2191 
2192 	id = (1ULL << 32) | ctx->page_num;
2193 
2194 	blob = _spdk_blob_lookup(bs, id);
2195 	if (blob) {
2196 		blob->open_ref++;
2197 		ctx->cb_fn(ctx->cb_arg, blob, 0);
2198 		free(ctx);
2199 		return;
2200 	}
2201 
2202 	spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
2203 }
2204 
2205 void
2206 spdk_bs_md_iter_first(struct spdk_blob_store *bs,
2207 		      spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2208 {
2209 	struct spdk_bs_iter_ctx *ctx;
2210 
2211 	ctx = calloc(1, sizeof(*ctx));
2212 	if (!ctx) {
2213 		cb_fn(cb_arg, NULL, -ENOMEM);
2214 		return;
2215 	}
2216 
2217 	ctx->page_num = -1;
2218 	ctx->bs = bs;
2219 	ctx->cb_fn = cb_fn;
2220 	ctx->cb_arg = cb_arg;
2221 
2222 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2223 }
2224 
2225 static void
2226 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
2227 {
2228 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2229 
2230 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2231 }
2232 
2233 void
2234 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
2235 		     spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2236 {
2237 	struct spdk_bs_iter_ctx *ctx;
2238 	struct spdk_blob	*blob;
2239 
2240 	assert(b != NULL);
2241 	blob = *b;
2242 	assert(blob != NULL);
2243 
2244 	ctx = calloc(1, sizeof(*ctx));
2245 	if (!ctx) {
2246 		cb_fn(cb_arg, NULL, -ENOMEM);
2247 		return;
2248 	}
2249 
2250 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
2251 	ctx->bs = bs;
2252 	ctx->cb_fn = cb_fn;
2253 	ctx->cb_arg = cb_arg;
2254 
2255 	/* Close the existing blob */
2256 	spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx);
2257 }
2258 
2259 int
2260 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
2261 		       uint16_t value_len)
2262 {
2263 	struct spdk_xattr 	*xattr;
2264 
2265 	assert(blob != NULL);
2266 
2267 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2268 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2269 
2270 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2271 		if (!strcmp(name, xattr->name)) {
2272 			free(xattr->value);
2273 			xattr->value_len = value_len;
2274 			xattr->value = malloc(value_len);
2275 			memcpy(xattr->value, value, value_len);
2276 
2277 			blob->state = SPDK_BLOB_STATE_DIRTY;
2278 
2279 			return 0;
2280 		}
2281 	}
2282 
2283 	xattr = calloc(1, sizeof(*xattr));
2284 	if (!xattr) {
2285 		return -1;
2286 	}
2287 	xattr->name = strdup(name);
2288 	xattr->value_len = value_len;
2289 	xattr->value = malloc(value_len);
2290 	memcpy(xattr->value, value, value_len);
2291 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
2292 
2293 	blob->state = SPDK_BLOB_STATE_DIRTY;
2294 
2295 	return 0;
2296 }
2297 
2298 int
2299 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name)
2300 {
2301 	struct spdk_xattr	*xattr;
2302 
2303 	assert(blob != NULL);
2304 
2305 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2306 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2307 
2308 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2309 		if (!strcmp(name, xattr->name)) {
2310 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
2311 			free(xattr->value);
2312 			free(xattr->name);
2313 			free(xattr);
2314 
2315 			blob->state = SPDK_BLOB_STATE_DIRTY;
2316 
2317 			return 0;
2318 		}
2319 	}
2320 
2321 	return -ENOENT;
2322 }
2323 
2324 int
2325 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name,
2326 			   const void **value, size_t *value_len)
2327 {
2328 	struct spdk_xattr	*xattr;
2329 
2330 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2331 		if (!strcmp(name, xattr->name)) {
2332 			*value = xattr->value;
2333 			*value_len = xattr->value_len;
2334 			return 0;
2335 		}
2336 	}
2337 
2338 	return -ENOENT;
2339 }
2340 
2341 struct spdk_xattr_names {
2342 	uint32_t	count;
2343 	const char	*names[0];
2344 };
2345 
2346 int
2347 spdk_bs_md_get_xattr_names(struct spdk_blob *blob,
2348 			   struct spdk_xattr_names **names)
2349 {
2350 	struct spdk_xattr	*xattr;
2351 	int			count = 0;
2352 
2353 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2354 		count++;
2355 	}
2356 
2357 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
2358 	if (*names == NULL) {
2359 		return -ENOMEM;
2360 	}
2361 
2362 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
2363 		(*names)->names[(*names)->count++] = xattr->name;
2364 	}
2365 
2366 	return 0;
2367 }
2368 
2369 uint32_t
2370 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
2371 {
2372 	assert(names != NULL);
2373 
2374 	return names->count;
2375 }
2376 
2377 const char *
2378 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
2379 {
2380 	if (index >= names->count) {
2381 		return NULL;
2382 	}
2383 
2384 	return names->names[index];
2385 }
2386 
2387 void
2388 spdk_xattr_names_free(struct spdk_xattr_names *names)
2389 {
2390 	free(names);
2391 }
2392 
2393 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB);
2394