xref: /spdk/lib/blob/blobstore.c (revision 98d28d604d074d9003a5480597c9000779ede7d9)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
53 void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob_data *blob, uint32_t cluster_num,
54 		uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg);
55 
56 static inline size_t
57 divide_round_up(size_t num, size_t divisor)
58 {
59 	return (num + divisor - 1) / divisor;
60 }
61 
62 static void
63 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
64 {
65 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
66 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
67 	assert(bs->num_free_clusters > 0);
68 
69 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
70 
71 	spdk_bit_array_set(bs->used_clusters, cluster_num);
72 	bs->num_free_clusters--;
73 }
74 
75 static int
76 _spdk_blob_insert_cluster(struct spdk_blob_data *blob, uint32_t cluster_num, uint64_t cluster)
77 {
78 	uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
79 
80 	assert(spdk_get_thread() == blob->bs->md_thread);
81 
82 	if (*cluster_lba != 0) {
83 		return -EEXIST;
84 	}
85 
86 	*cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
87 	return 0;
88 }
89 
90 static int
91 _spdk_bs_allocate_cluster(struct spdk_blob_data *blob, uint32_t cluster_num,
92 			  uint64_t *lowest_free_cluster)
93 {
94 	*lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
95 			       *lowest_free_cluster);
96 	if (*lowest_free_cluster >= blob->bs->total_clusters) {
97 		/* No more free clusters. Cannot satisfy the request */
98 		return -ENOSPC;
99 	}
100 
101 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
102 	_spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
103 	_spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
104 
105 	return 0;
106 }
107 
108 static void
109 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
110 {
111 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
112 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
113 	assert(bs->num_free_clusters < bs->total_clusters);
114 
115 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
116 
117 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
118 	bs->num_free_clusters++;
119 }
120 
121 void
122 spdk_blob_opts_init(struct spdk_blob_opts *opts)
123 {
124 	opts->num_clusters = 0;
125 	opts->thin_provision = false;
126 	opts->xattr_count = 0;
127 	opts->xattr_names = NULL;
128 	opts->xattr_ctx = NULL;
129 	opts->get_xattr_value = NULL;
130 }
131 
132 static struct spdk_blob_data *
133 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
134 {
135 	struct spdk_blob_data *blob;
136 
137 	blob = calloc(1, sizeof(*blob));
138 	if (!blob) {
139 		return NULL;
140 	}
141 
142 	blob->id = id;
143 	blob->bs = bs;
144 
145 	blob->state = SPDK_BLOB_STATE_DIRTY;
146 	blob->active.num_pages = 1;
147 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
148 	if (!blob->active.pages) {
149 		free(blob);
150 		return NULL;
151 	}
152 
153 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
154 
155 	TAILQ_INIT(&blob->xattrs);
156 
157 	return blob;
158 }
159 
160 static void
161 _spdk_blob_free(struct spdk_blob_data *blob)
162 {
163 	struct spdk_xattr 	*xattr, *xattr_tmp;
164 
165 	assert(blob != NULL);
166 
167 	free(blob->active.clusters);
168 	free(blob->clean.clusters);
169 	free(blob->active.pages);
170 	free(blob->clean.pages);
171 
172 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
173 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
174 		free(xattr->name);
175 		free(xattr->value);
176 		free(xattr);
177 	}
178 
179 	free(blob);
180 }
181 
182 static int
183 _spdk_blob_mark_clean(struct spdk_blob_data *blob)
184 {
185 	uint64_t *clusters = NULL;
186 	uint32_t *pages = NULL;
187 
188 	assert(blob != NULL);
189 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
190 	       blob->state == SPDK_BLOB_STATE_SYNCING);
191 
192 	if (blob->active.num_clusters) {
193 		assert(blob->active.clusters);
194 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
195 		if (!clusters) {
196 			return -1;
197 		}
198 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
199 	}
200 
201 	if (blob->active.num_pages) {
202 		assert(blob->active.pages);
203 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
204 		if (!pages) {
205 			free(clusters);
206 			return -1;
207 		}
208 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
209 	}
210 
211 	free(blob->clean.clusters);
212 	free(blob->clean.pages);
213 
214 	blob->clean.num_clusters = blob->active.num_clusters;
215 	blob->clean.clusters = blob->active.clusters;
216 	blob->clean.num_pages = blob->active.num_pages;
217 	blob->clean.pages = blob->active.pages;
218 
219 	blob->active.clusters = clusters;
220 	blob->active.pages = pages;
221 
222 	blob->state = SPDK_BLOB_STATE_CLEAN;
223 
224 	return 0;
225 }
226 
227 static int
228 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob)
229 {
230 	struct spdk_blob_md_descriptor *desc;
231 	size_t	cur_desc = 0;
232 	void *tmp;
233 
234 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
235 	while (cur_desc < sizeof(page->descriptors)) {
236 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
237 			if (desc->length == 0) {
238 				/* If padding and length are 0, this terminates the page */
239 				break;
240 			}
241 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
242 			struct spdk_blob_md_descriptor_flags	*desc_flags;
243 
244 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
245 
246 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
247 				return -EINVAL;
248 			}
249 
250 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
251 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
252 				return -EINVAL;
253 			}
254 
255 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
256 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
257 				blob->data_ro = true;
258 				blob->md_ro = true;
259 			}
260 
261 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
262 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
263 				blob->md_ro = true;
264 			}
265 
266 			if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
267 				blob->data_ro = true;
268 				blob->md_ro = true;
269 			}
270 
271 			blob->invalid_flags = desc_flags->invalid_flags;
272 			blob->data_ro_flags = desc_flags->data_ro_flags;
273 			blob->md_ro_flags = desc_flags->md_ro_flags;
274 
275 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
276 			struct spdk_blob_md_descriptor_extent	*desc_extent;
277 			unsigned int				i, j;
278 			unsigned int				cluster_count = blob->active.num_clusters;
279 
280 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
281 
282 			if (desc_extent->length == 0 ||
283 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
284 				return -EINVAL;
285 			}
286 
287 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
288 				for (j = 0; j < desc_extent->extents[i].length; j++) {
289 					if (!spdk_bit_array_get(blob->bs->used_clusters,
290 								desc_extent->extents[i].cluster_idx + j)) {
291 						return -EINVAL;
292 					}
293 					cluster_count++;
294 				}
295 			}
296 
297 			if (cluster_count == 0) {
298 				return -EINVAL;
299 			}
300 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
301 			if (tmp == NULL) {
302 				return -ENOMEM;
303 			}
304 			blob->active.clusters = tmp;
305 			blob->active.cluster_array_size = cluster_count;
306 
307 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
308 				for (j = 0; j < desc_extent->extents[i].length; j++) {
309 					if (desc_extent->extents[i].cluster_idx != 0) {
310 						blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
311 								desc_extent->extents[i].cluster_idx + j);
312 					} else if (spdk_blob_is_thin_provisioned(blob)) {
313 						blob->active.clusters[blob->active.num_clusters++] = 0;
314 					} else {
315 						return -EINVAL;
316 					}
317 				}
318 			}
319 
320 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
321 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
322 			struct spdk_xattr 			*xattr;
323 
324 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
325 
326 			if (desc_xattr->length != sizeof(desc_xattr->name_length) +
327 			    sizeof(desc_xattr->value_length) +
328 			    desc_xattr->name_length + desc_xattr->value_length) {
329 				return -EINVAL;
330 			}
331 
332 			xattr = calloc(1, sizeof(*xattr));
333 			if (xattr == NULL) {
334 				return -ENOMEM;
335 			}
336 
337 			xattr->name = malloc(desc_xattr->name_length + 1);
338 			if (xattr->name == NULL) {
339 				free(xattr);
340 				return -ENOMEM;
341 			}
342 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
343 			xattr->name[desc_xattr->name_length] = '\0';
344 
345 			xattr->value = malloc(desc_xattr->value_length);
346 			if (xattr->value == NULL) {
347 				free(xattr->name);
348 				free(xattr);
349 				return -ENOMEM;
350 			}
351 			xattr->value_len = desc_xattr->value_length;
352 			memcpy(xattr->value,
353 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
354 			       desc_xattr->value_length);
355 
356 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
357 		} else {
358 			/* Unrecognized descriptor type.  Do not fail - just continue to the
359 			 *  next descriptor.  If this descriptor is associated with some feature
360 			 *  defined in a newer version of blobstore, that version of blobstore
361 			 *  should create and set an associated feature flag to specify if this
362 			 *  blob can be loaded or not.
363 			 */
364 		}
365 
366 		/* Advance to the next descriptor */
367 		cur_desc += sizeof(*desc) + desc->length;
368 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
369 			break;
370 		}
371 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
372 	}
373 
374 	return 0;
375 }
376 
377 static int
378 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
379 		 struct spdk_blob_data *blob)
380 {
381 	const struct spdk_blob_md_page *page;
382 	uint32_t i;
383 	int rc;
384 
385 	assert(page_count > 0);
386 	assert(pages[0].sequence_num == 0);
387 	assert(blob != NULL);
388 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
389 	assert(blob->active.clusters == NULL);
390 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
391 
392 	/* The blobid provided doesn't match what's in the MD, this can
393 	 * happen for example if a bogus blobid is passed in through open.
394 	 */
395 	if (blob->id != pages[0].id) {
396 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
397 			    blob->id, pages[0].id);
398 		return -ENOENT;
399 	}
400 
401 	for (i = 0; i < page_count; i++) {
402 		page = &pages[i];
403 
404 		assert(page->id == blob->id);
405 		assert(page->sequence_num == i);
406 
407 		rc = _spdk_blob_parse_page(page, blob);
408 		if (rc != 0) {
409 			return rc;
410 		}
411 	}
412 
413 	return 0;
414 }
415 
416 static int
417 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob,
418 			      struct spdk_blob_md_page **pages,
419 			      uint32_t *page_count,
420 			      struct spdk_blob_md_page **last_page)
421 {
422 	struct spdk_blob_md_page *page;
423 
424 	assert(pages != NULL);
425 	assert(page_count != NULL);
426 
427 	if (*page_count == 0) {
428 		assert(*pages == NULL);
429 		*page_count = 1;
430 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
431 					 SPDK_BS_PAGE_SIZE,
432 					 NULL);
433 	} else {
434 		assert(*pages != NULL);
435 		(*page_count)++;
436 		*pages = spdk_dma_realloc(*pages,
437 					  SPDK_BS_PAGE_SIZE * (*page_count),
438 					  SPDK_BS_PAGE_SIZE,
439 					  NULL);
440 	}
441 
442 	if (*pages == NULL) {
443 		*page_count = 0;
444 		*last_page = NULL;
445 		return -ENOMEM;
446 	}
447 
448 	page = &(*pages)[*page_count - 1];
449 	memset(page, 0, sizeof(*page));
450 	page->id = blob->id;
451 	page->sequence_num = *page_count - 1;
452 	page->next = SPDK_INVALID_MD_PAGE;
453 	*last_page = page;
454 
455 	return 0;
456 }
457 
458 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
459  * Update required_sz on both success and failure.
460  *
461  */
462 static int
463 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
464 			   uint8_t *buf, size_t buf_sz,
465 			   size_t *required_sz)
466 {
467 	struct spdk_blob_md_descriptor_xattr	*desc;
468 
469 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
470 		       strlen(xattr->name) +
471 		       xattr->value_len;
472 
473 	if (buf_sz < *required_sz) {
474 		return -1;
475 	}
476 
477 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
478 
479 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
480 	desc->length = sizeof(desc->name_length) +
481 		       sizeof(desc->value_length) +
482 		       strlen(xattr->name) +
483 		       xattr->value_len;
484 	desc->name_length = strlen(xattr->name);
485 	desc->value_length = xattr->value_len;
486 
487 	memcpy(desc->name, xattr->name, desc->name_length);
488 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
489 	       xattr->value,
490 	       desc->value_length);
491 
492 	return 0;
493 }
494 
495 static void
496 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob,
497 			    uint64_t start_cluster, uint64_t *next_cluster,
498 			    uint8_t *buf, size_t buf_sz)
499 {
500 	struct spdk_blob_md_descriptor_extent *desc;
501 	size_t cur_sz;
502 	uint64_t i, extent_idx;
503 	uint32_t lba, lba_per_cluster, lba_count;
504 
505 	/* The buffer must have room for at least one extent */
506 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
507 	if (buf_sz < cur_sz) {
508 		*next_cluster = start_cluster;
509 		return;
510 	}
511 
512 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
513 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
514 
515 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
516 
517 	lba = blob->active.clusters[start_cluster];
518 	lba_count = lba_per_cluster;
519 	extent_idx = 0;
520 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
521 		if ((lba + lba_count) == blob->active.clusters[i]) {
522 			lba_count += lba_per_cluster;
523 			continue;
524 		}
525 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
526 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
527 		extent_idx++;
528 
529 		cur_sz += sizeof(desc->extents[extent_idx]);
530 
531 		if (buf_sz < cur_sz) {
532 			/* If we ran out of buffer space, return */
533 			desc->length = sizeof(desc->extents[0]) * extent_idx;
534 			*next_cluster = i;
535 			return;
536 		}
537 
538 		lba = blob->active.clusters[i];
539 		lba_count = lba_per_cluster;
540 	}
541 
542 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
543 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
544 	extent_idx++;
545 
546 	desc->length = sizeof(desc->extents[0]) * extent_idx;
547 	*next_cluster = blob->active.num_clusters;
548 
549 	return;
550 }
551 
552 static void
553 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob,
554 			   uint8_t *buf, size_t *buf_sz)
555 {
556 	struct spdk_blob_md_descriptor_flags *desc;
557 
558 	/*
559 	 * Flags get serialized first, so we should always have room for the flags
560 	 *  descriptor.
561 	 */
562 	assert(*buf_sz >= sizeof(*desc));
563 
564 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
565 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
566 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
567 	desc->invalid_flags = blob->invalid_flags;
568 	desc->data_ro_flags = blob->data_ro_flags;
569 	desc->md_ro_flags = blob->md_ro_flags;
570 
571 	*buf_sz -= sizeof(*desc);
572 }
573 
574 static int
575 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages,
576 		     uint32_t *page_count)
577 {
578 	struct spdk_blob_md_page		*cur_page;
579 	const struct spdk_xattr			*xattr;
580 	int 					rc;
581 	uint8_t					*buf;
582 	size_t					remaining_sz;
583 	uint64_t				last_cluster;
584 
585 	assert(pages != NULL);
586 	assert(page_count != NULL);
587 	assert(blob != NULL);
588 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
589 
590 	*pages = NULL;
591 	*page_count = 0;
592 
593 	/* A blob always has at least 1 page, even if it has no descriptors */
594 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
595 	if (rc < 0) {
596 		return rc;
597 	}
598 
599 	buf = (uint8_t *)cur_page->descriptors;
600 	remaining_sz = sizeof(cur_page->descriptors);
601 
602 	/* Serialize flags */
603 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
604 	buf += sizeof(struct spdk_blob_md_descriptor_flags);
605 
606 	/* Serialize xattrs */
607 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
608 		size_t required_sz = 0;
609 		rc = _spdk_blob_serialize_xattr(xattr,
610 						buf, remaining_sz,
611 						&required_sz);
612 		if (rc < 0) {
613 			/* Need to add a new page to the chain */
614 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
615 							   &cur_page);
616 			if (rc < 0) {
617 				spdk_dma_free(*pages);
618 				*pages = NULL;
619 				*page_count = 0;
620 				return rc;
621 			}
622 
623 			buf = (uint8_t *)cur_page->descriptors;
624 			remaining_sz = sizeof(cur_page->descriptors);
625 
626 			/* Try again */
627 			required_sz = 0;
628 			rc = _spdk_blob_serialize_xattr(xattr,
629 							buf, remaining_sz,
630 							&required_sz);
631 
632 			if (rc < 0) {
633 				spdk_dma_free(*pages);
634 				*pages = NULL;
635 				*page_count = 0;
636 				return -1;
637 			}
638 		}
639 
640 		remaining_sz -= required_sz;
641 		buf += required_sz;
642 	}
643 
644 	/* Serialize extents */
645 	last_cluster = 0;
646 	while (last_cluster < blob->active.num_clusters) {
647 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
648 					    buf, remaining_sz);
649 
650 		if (last_cluster == blob->active.num_clusters) {
651 			break;
652 		}
653 
654 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
655 						   &cur_page);
656 		if (rc < 0) {
657 			return rc;
658 		}
659 
660 		buf = (uint8_t *)cur_page->descriptors;
661 		remaining_sz = sizeof(cur_page->descriptors);
662 	}
663 
664 	return 0;
665 }
666 
667 struct spdk_blob_load_ctx {
668 	struct spdk_blob_data 		*blob;
669 
670 	struct spdk_blob_md_page	*pages;
671 	uint32_t			num_pages;
672 
673 	spdk_bs_sequence_cpl		cb_fn;
674 	void				*cb_arg;
675 };
676 
677 static uint32_t
678 _spdk_blob_md_page_calc_crc(void *page)
679 {
680 	uint32_t		crc;
681 
682 	crc = BLOB_CRC32C_INITIAL;
683 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
684 	crc ^= BLOB_CRC32C_INITIAL;
685 
686 	return crc;
687 
688 }
689 
690 static void
691 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
692 {
693 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
694 	struct spdk_blob_data 		*blob = ctx->blob;
695 	struct spdk_blob_md_page	*page;
696 	int				rc;
697 	uint32_t			crc;
698 
699 	page = &ctx->pages[ctx->num_pages - 1];
700 	crc = _spdk_blob_md_page_calc_crc(page);
701 	if (crc != page->crc) {
702 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
703 		_spdk_blob_free(blob);
704 		ctx->cb_fn(seq, NULL, -EINVAL);
705 		spdk_dma_free(ctx->pages);
706 		free(ctx);
707 		return;
708 	}
709 
710 	if (page->next != SPDK_INVALID_MD_PAGE) {
711 		uint32_t next_page = page->next;
712 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
713 
714 
715 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
716 
717 		/* Read the next page */
718 		ctx->num_pages++;
719 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
720 					      sizeof(*page), NULL);
721 		if (ctx->pages == NULL) {
722 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
723 			free(ctx);
724 			return;
725 		}
726 
727 		spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
728 					  next_lba,
729 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
730 					  _spdk_blob_load_cpl, ctx);
731 		return;
732 	}
733 
734 	/* Parse the pages */
735 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
736 	if (rc) {
737 		_spdk_blob_free(blob);
738 		ctx->cb_fn(seq, NULL, rc);
739 		spdk_dma_free(ctx->pages);
740 		free(ctx);
741 		return;
742 	}
743 
744 	_spdk_blob_mark_clean(blob);
745 
746 	ctx->cb_fn(seq, ctx->cb_arg, rc);
747 
748 	/* Free the memory */
749 	spdk_dma_free(ctx->pages);
750 	free(ctx);
751 }
752 
753 /* Load a blob from disk given a blobid */
754 static void
755 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
756 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
757 {
758 	struct spdk_blob_load_ctx *ctx;
759 	struct spdk_blob_store *bs;
760 	uint32_t page_num;
761 	uint64_t lba;
762 
763 	assert(blob != NULL);
764 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
765 	       blob->state == SPDK_BLOB_STATE_DIRTY);
766 
767 	bs = blob->bs;
768 
769 	ctx = calloc(1, sizeof(*ctx));
770 	if (!ctx) {
771 		cb_fn(seq, cb_arg, -ENOMEM);
772 		return;
773 	}
774 
775 	ctx->blob = blob;
776 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
777 				      SPDK_BS_PAGE_SIZE, NULL);
778 	if (!ctx->pages) {
779 		free(ctx);
780 		cb_fn(seq, cb_arg, -ENOMEM);
781 		return;
782 	}
783 	ctx->num_pages = 1;
784 	ctx->cb_fn = cb_fn;
785 	ctx->cb_arg = cb_arg;
786 
787 	page_num = _spdk_bs_blobid_to_page(blob->id);
788 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
789 
790 	blob->state = SPDK_BLOB_STATE_LOADING;
791 
792 	spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
793 				  _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
794 				  _spdk_blob_load_cpl, ctx);
795 }
796 
797 struct spdk_blob_persist_ctx {
798 	struct spdk_blob_data 		*blob;
799 
800 	struct spdk_blob_md_page	*pages;
801 
802 	uint64_t			idx;
803 
804 	spdk_bs_sequence_cpl		cb_fn;
805 	void				*cb_arg;
806 };
807 
808 static void
809 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
810 {
811 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
812 	struct spdk_blob_data 		*blob = ctx->blob;
813 
814 	if (bserrno == 0) {
815 		_spdk_blob_mark_clean(blob);
816 	}
817 
818 	/* Call user callback */
819 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
820 
821 	/* Free the memory */
822 	spdk_dma_free(ctx->pages);
823 	free(ctx);
824 }
825 
826 static void
827 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
828 {
829 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
830 	struct spdk_blob_data 		*blob = ctx->blob;
831 	struct spdk_blob_store		*bs = blob->bs;
832 	void				*tmp;
833 	size_t				i;
834 
835 	/* Release all clusters that were truncated */
836 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
837 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
838 
839 		/* Nothing to release if it was not allocated */
840 		if (blob->active.clusters[i] != 0) {
841 			_spdk_bs_release_cluster(bs, cluster_num);
842 		}
843 	}
844 
845 	if (blob->active.num_clusters == 0) {
846 		free(blob->active.clusters);
847 		blob->active.clusters = NULL;
848 		blob->active.cluster_array_size = 0;
849 	} else {
850 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
851 		assert(tmp != NULL);
852 		blob->active.clusters = tmp;
853 		blob->active.cluster_array_size = blob->active.num_clusters;
854 	}
855 
856 	_spdk_blob_persist_complete(seq, ctx, bserrno);
857 }
858 
859 static void
860 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
861 {
862 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
863 	struct spdk_blob_data 		*blob = ctx->blob;
864 	struct spdk_blob_store		*bs = blob->bs;
865 	spdk_bs_batch_t			*batch;
866 	size_t				i;
867 	uint64_t			lba;
868 	uint32_t			lba_count;
869 
870 	/* Clusters don't move around in blobs. The list shrinks or grows
871 	 * at the end, but no changes ever occur in the middle of the list.
872 	 */
873 
874 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
875 
876 	/* Unmap all clusters that were truncated */
877 	lba = 0;
878 	lba_count = 0;
879 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
880 		uint64_t next_lba = blob->active.clusters[i];
881 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
882 
883 		if (next_lba > 0 && (lba + lba_count) == next_lba) {
884 			/* This cluster is contiguous with the previous one. */
885 			lba_count += next_lba_count;
886 			continue;
887 		}
888 
889 		/* This cluster is not contiguous with the previous one. */
890 
891 		/* If a run of LBAs previously existing, send them
892 		 * as an unmap.
893 		 */
894 		if (lba_count > 0) {
895 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
896 		}
897 
898 		/* Start building the next batch */
899 		lba = next_lba;
900 		if (next_lba > 0) {
901 			lba_count = next_lba_count;
902 		} else {
903 			lba_count = 0;
904 		}
905 	}
906 
907 	/* If we ended with a contiguous set of LBAs, send the unmap now */
908 	if (lba_count > 0) {
909 		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
910 	}
911 
912 	spdk_bs_batch_close(batch);
913 }
914 
915 static void
916 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
917 {
918 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
919 	struct spdk_blob_data 		*blob = ctx->blob;
920 	struct spdk_blob_store		*bs = blob->bs;
921 	size_t				i;
922 
923 	/* This loop starts at 1 because the first page is special and handled
924 	 * below. The pages (except the first) are never written in place,
925 	 * so any pages in the clean list must be zeroed.
926 	 */
927 	for (i = 1; i < blob->clean.num_pages; i++) {
928 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
929 	}
930 
931 	if (blob->active.num_pages == 0) {
932 		uint32_t page_num;
933 
934 		page_num = _spdk_bs_blobid_to_page(blob->id);
935 		spdk_bit_array_clear(bs->used_md_pages, page_num);
936 	}
937 
938 	/* Move on to unmapping clusters */
939 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
940 }
941 
942 static void
943 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
944 {
945 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
946 	struct spdk_blob_data 		*blob = ctx->blob;
947 	struct spdk_blob_store		*bs = blob->bs;
948 	uint64_t			lba;
949 	uint32_t			lba_count;
950 	spdk_bs_batch_t			*batch;
951 	size_t				i;
952 
953 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
954 
955 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
956 
957 	/* This loop starts at 1 because the first page is special and handled
958 	 * below. The pages (except the first) are never written in place,
959 	 * so any pages in the clean list must be zeroed.
960 	 */
961 	for (i = 1; i < blob->clean.num_pages; i++) {
962 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
963 
964 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
965 	}
966 
967 	/* The first page will only be zeroed if this is a delete. */
968 	if (blob->active.num_pages == 0) {
969 		uint32_t page_num;
970 
971 		/* The first page in the metadata goes where the blobid indicates */
972 		page_num = _spdk_bs_blobid_to_page(blob->id);
973 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
974 
975 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
976 	}
977 
978 	spdk_bs_batch_close(batch);
979 }
980 
981 static void
982 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
983 {
984 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
985 	struct spdk_blob_data		*blob = ctx->blob;
986 	struct spdk_blob_store		*bs = blob->bs;
987 	uint64_t			lba;
988 	uint32_t			lba_count;
989 	struct spdk_blob_md_page	*page;
990 
991 	if (blob->active.num_pages == 0) {
992 		/* Move on to the next step */
993 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
994 		return;
995 	}
996 
997 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
998 
999 	page = &ctx->pages[0];
1000 	/* The first page in the metadata goes where the blobid indicates */
1001 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
1002 
1003 	spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1004 				   _spdk_blob_persist_zero_pages, ctx);
1005 }
1006 
1007 static void
1008 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1009 {
1010 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
1011 	struct spdk_blob_data 		*blob = ctx->blob;
1012 	struct spdk_blob_store		*bs = blob->bs;
1013 	uint64_t 			lba;
1014 	uint32_t			lba_count;
1015 	struct spdk_blob_md_page	*page;
1016 	spdk_bs_batch_t			*batch;
1017 	size_t				i;
1018 
1019 	/* Clusters don't move around in blobs. The list shrinks or grows
1020 	 * at the end, but no changes ever occur in the middle of the list.
1021 	 */
1022 
1023 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1024 
1025 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1026 
1027 	/* This starts at 1. The root page is not written until
1028 	 * all of the others are finished
1029 	 */
1030 	for (i = 1; i < blob->active.num_pages; i++) {
1031 		page = &ctx->pages[i];
1032 		assert(page->sequence_num == i);
1033 
1034 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
1035 
1036 		spdk_bs_batch_write_dev(batch, page, lba, lba_count);
1037 	}
1038 
1039 	spdk_bs_batch_close(batch);
1040 }
1041 
1042 static int
1043 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz)
1044 {
1045 	uint64_t	i;
1046 	uint64_t	*tmp;
1047 	uint64_t	lfc; /* lowest free cluster */
1048 	uint64_t	num_clusters;
1049 	struct spdk_blob_store *bs;
1050 
1051 	bs = blob->bs;
1052 
1053 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
1054 	       blob->state != SPDK_BLOB_STATE_SYNCING);
1055 
1056 	if (blob->active.num_clusters == sz) {
1057 		return 0;
1058 	}
1059 
1060 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1061 		/* If this blob was resized to be larger, then smaller, then
1062 		 * larger without syncing, then the cluster array already
1063 		 * contains spare assigned clusters we can use.
1064 		 */
1065 		num_clusters = spdk_min(blob->active.cluster_array_size,
1066 					sz);
1067 	} else {
1068 		num_clusters = blob->active.num_clusters;
1069 	}
1070 
1071 	/* Do two passes - one to verify that we can obtain enough clusters
1072 	 * and another to actually claim them.
1073 	 */
1074 
1075 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1076 		lfc = 0;
1077 		for (i = num_clusters; i < sz; i++) {
1078 			lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1079 			if (lfc >= bs->total_clusters) {
1080 				/* No more free clusters. Cannot satisfy the request */
1081 				return -ENOSPC;
1082 			}
1083 			lfc++;
1084 		}
1085 	}
1086 
1087 	if (sz > num_clusters) {
1088 		/* Expand the cluster array if necessary.
1089 		 * We only shrink the array when persisting.
1090 		 */
1091 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1092 		if (sz > 0 && tmp == NULL) {
1093 			return -ENOMEM;
1094 		}
1095 		memset(tmp + blob->active.cluster_array_size, 0,
1096 		       sizeof(uint64_t) * (sz - blob->active.cluster_array_size));
1097 		blob->active.clusters = tmp;
1098 		blob->active.cluster_array_size = sz;
1099 	}
1100 
1101 	blob->state = SPDK_BLOB_STATE_DIRTY;
1102 
1103 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1104 		lfc = 0;
1105 		for (i = num_clusters; i < sz; i++) {
1106 			_spdk_bs_allocate_cluster(blob, i, &lfc);
1107 			lfc++;
1108 		}
1109 	}
1110 
1111 	blob->active.num_clusters = sz;
1112 
1113 	return 0;
1114 }
1115 
1116 /* Write a blob to disk */
1117 static void
1118 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
1119 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1120 {
1121 	struct spdk_blob_persist_ctx *ctx;
1122 	int rc;
1123 	uint64_t i;
1124 	uint32_t page_num;
1125 	struct spdk_blob_store *bs;
1126 
1127 	assert(blob != NULL);
1128 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
1129 	       blob->state == SPDK_BLOB_STATE_DIRTY);
1130 
1131 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1132 		cb_fn(seq, cb_arg, 0);
1133 		return;
1134 	}
1135 
1136 	bs = blob->bs;
1137 
1138 	ctx = calloc(1, sizeof(*ctx));
1139 	if (!ctx) {
1140 		cb_fn(seq, cb_arg, -ENOMEM);
1141 		return;
1142 	}
1143 	ctx->blob = blob;
1144 	ctx->cb_fn = cb_fn;
1145 	ctx->cb_arg = cb_arg;
1146 
1147 	blob->state = SPDK_BLOB_STATE_SYNCING;
1148 
1149 	if (blob->active.num_pages == 0) {
1150 		/* This is the signal that the blob should be deleted.
1151 		 * Immediately jump to the clean up routine. */
1152 		assert(blob->clean.num_pages > 0);
1153 		ctx->idx = blob->clean.num_pages - 1;
1154 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1155 		return;
1156 
1157 	}
1158 
1159 	/* Generate the new metadata */
1160 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1161 	if (rc < 0) {
1162 		free(ctx);
1163 		cb_fn(seq, cb_arg, rc);
1164 		return;
1165 	}
1166 
1167 	assert(blob->active.num_pages >= 1);
1168 
1169 	/* Resize the cache of page indices */
1170 	blob->active.pages = realloc(blob->active.pages,
1171 				     blob->active.num_pages * sizeof(*blob->active.pages));
1172 	if (!blob->active.pages) {
1173 		free(ctx);
1174 		cb_fn(seq, cb_arg, -ENOMEM);
1175 		return;
1176 	}
1177 
1178 	/* Assign this metadata to pages. This requires two passes -
1179 	 * one to verify that there are enough pages and a second
1180 	 * to actually claim them. */
1181 	page_num = 0;
1182 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1183 	for (i = 1; i < blob->active.num_pages; i++) {
1184 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1185 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1186 			spdk_dma_free(ctx->pages);
1187 			free(ctx);
1188 			blob->state = SPDK_BLOB_STATE_DIRTY;
1189 			cb_fn(seq, cb_arg, -ENOMEM);
1190 			return;
1191 		}
1192 		page_num++;
1193 	}
1194 
1195 	page_num = 0;
1196 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1197 	for (i = 1; i < blob->active.num_pages; i++) {
1198 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1199 		ctx->pages[i - 1].next = page_num;
1200 		/* Now that previous metadata page is complete, calculate the crc for it. */
1201 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1202 		blob->active.pages[i] = page_num;
1203 		spdk_bit_array_set(bs->used_md_pages, page_num);
1204 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1205 		page_num++;
1206 	}
1207 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1208 	/* Start writing the metadata from last page to first */
1209 	ctx->idx = blob->active.num_pages - 1;
1210 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1211 }
1212 
1213 static void
1214 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *_blob,
1215 				   void *payload, uint64_t offset, uint64_t length,
1216 				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1217 {
1218 	spdk_bs_batch_t		*batch;
1219 	struct spdk_bs_cpl	cpl;
1220 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
1221 	uint64_t		op_length;
1222 	uint8_t			*buf;
1223 
1224 	assert(blob != NULL);
1225 
1226 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1227 	cpl.u.blob_basic.cb_fn = cb_fn;
1228 	cpl.u.blob_basic.cb_arg = cb_arg;
1229 
1230 	batch = spdk_bs_batch_open(ch, &cpl);
1231 	if (!batch) {
1232 		cb_fn(cb_arg, -ENOMEM);
1233 		return;
1234 	}
1235 
1236 	buf = payload;
1237 	while (length > 0) {
1238 		op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset));
1239 
1240 		switch (op_type) {
1241 		case SPDK_BLOB_READ:
1242 			spdk_bs_batch_read_blob(batch, _blob, buf, offset, op_length);
1243 			break;
1244 		case SPDK_BLOB_WRITE:
1245 			spdk_bs_batch_write_blob(batch, _blob, buf, offset, op_length);
1246 			break;
1247 		case SPDK_BLOB_UNMAP:
1248 			spdk_bs_batch_unmap_blob(batch, _blob, offset, op_length);
1249 			break;
1250 		case SPDK_BLOB_WRITE_ZEROES:
1251 			spdk_bs_batch_write_zeroes_blob(batch, _blob, offset, op_length);
1252 			break;
1253 		case SPDK_BLOB_READV:
1254 		case SPDK_BLOB_WRITEV:
1255 			SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
1256 			break;
1257 		}
1258 
1259 		length -= op_length;
1260 		offset += op_length;
1261 		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1262 			buf += op_length * SPDK_BS_PAGE_SIZE;
1263 		}
1264 	}
1265 
1266 	spdk_bs_batch_close(batch);
1267 }
1268 
1269 static void
1270 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *_blob,
1271 				    void *payload, uint64_t offset, uint64_t length,
1272 				    spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1273 {
1274 	struct spdk_blob_data *blob = __blob_to_data(_blob);
1275 	spdk_bs_batch_t *batch;
1276 	struct spdk_bs_cpl cpl;
1277 	uint64_t lba;
1278 	uint32_t lba_count;
1279 
1280 	assert(blob != NULL);
1281 
1282 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1283 	cpl.u.blob_basic.cb_fn = cb_fn;
1284 	cpl.u.blob_basic.cb_arg = cb_arg;
1285 
1286 	batch = spdk_bs_batch_open(_ch, &cpl);
1287 	if (!batch) {
1288 		cb_fn(cb_arg, -ENOMEM);
1289 		return;
1290 	}
1291 
1292 	lba = _spdk_bs_blob_page_to_lba(blob, offset);
1293 	lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1294 
1295 	switch (op_type) {
1296 	case SPDK_BLOB_READ:
1297 		spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
1298 		break;
1299 	case SPDK_BLOB_WRITE:
1300 		spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
1301 		break;
1302 	case SPDK_BLOB_UNMAP:
1303 		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1304 		break;
1305 	case SPDK_BLOB_WRITE_ZEROES:
1306 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1307 		break;
1308 	case SPDK_BLOB_READV:
1309 	case SPDK_BLOB_WRITEV:
1310 		SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
1311 		break;
1312 	}
1313 
1314 	spdk_bs_batch_close(batch);
1315 }
1316 
1317 static void
1318 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1319 			     void *payload, uint64_t offset, uint64_t length,
1320 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1321 {
1322 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1323 
1324 	assert(blob != NULL);
1325 
1326 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1327 		cb_fn(cb_arg, -EPERM);
1328 		return;
1329 	}
1330 
1331 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1332 		cb_fn(cb_arg, -EINVAL);
1333 		return;
1334 	}
1335 
1336 	if (length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset)) {
1337 		_spdk_blob_request_submit_op_single(_channel, _blob, payload, offset, length,
1338 						    cb_fn, cb_arg, op_type);
1339 	} else {
1340 		_spdk_blob_request_submit_op_split(_channel, _blob, payload, offset, length,
1341 						   cb_fn, cb_arg, op_type);
1342 	}
1343 }
1344 
1345 struct rw_iov_ctx {
1346 	struct spdk_blob *blob;
1347 	struct spdk_io_channel *channel;
1348 	spdk_blob_op_complete cb_fn;
1349 	void *cb_arg;
1350 	bool read;
1351 	int iovcnt;
1352 	struct iovec *orig_iov;
1353 	uint64_t page_offset;
1354 	uint64_t pages_remaining;
1355 	uint64_t pages_done;
1356 	struct iovec iov[0];
1357 };
1358 
1359 static void
1360 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1361 {
1362 	assert(cb_arg == NULL);
1363 	spdk_bs_sequence_finish(seq, bserrno);
1364 }
1365 
1366 static void
1367 _spdk_rw_iov_split_next(void *cb_arg, int bserrno)
1368 {
1369 	struct rw_iov_ctx *ctx = cb_arg;
1370 	struct spdk_blob_data *blob = __blob_to_data(ctx->blob);
1371 	struct iovec *iov, *orig_iov;
1372 	int iovcnt;
1373 	size_t orig_iovoff;
1374 	uint64_t page_count, pages_to_boundary, page_offset;
1375 	uint64_t byte_count;
1376 
1377 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1378 		ctx->cb_fn(ctx->cb_arg, bserrno);
1379 		free(ctx);
1380 		return;
1381 	}
1382 
1383 	page_offset = ctx->page_offset;
1384 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(blob, page_offset);
1385 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1386 
1387 	/*
1388 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1389 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1390 	 *  point to the current position in the I/O sequence.
1391 	 */
1392 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1393 	orig_iov = &ctx->orig_iov[0];
1394 	orig_iovoff = 0;
1395 	while (byte_count > 0) {
1396 		if (byte_count >= orig_iov->iov_len) {
1397 			byte_count -= orig_iov->iov_len;
1398 			orig_iov++;
1399 		} else {
1400 			orig_iovoff = byte_count;
1401 			byte_count = 0;
1402 		}
1403 	}
1404 
1405 	/*
1406 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1407 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1408 	 */
1409 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1410 	iov = &ctx->iov[0];
1411 	iovcnt = 0;
1412 	while (byte_count > 0) {
1413 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1414 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1415 		byte_count -= iov->iov_len;
1416 		orig_iovoff = 0;
1417 		orig_iov++;
1418 		iov++;
1419 		iovcnt++;
1420 	}
1421 
1422 	ctx->page_offset += page_count;
1423 	ctx->pages_done += page_count;
1424 	ctx->pages_remaining -= page_count;
1425 	iov = &ctx->iov[0];
1426 
1427 	if (ctx->read) {
1428 		spdk_bs_io_readv_blob(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1429 				      page_count, _spdk_rw_iov_split_next, ctx);
1430 	} else {
1431 		spdk_bs_io_writev_blob(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1432 				       page_count, _spdk_rw_iov_split_next, ctx);
1433 	}
1434 }
1435 
1436 static void
1437 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1438 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1439 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1440 {
1441 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1442 	spdk_bs_sequence_t		*seq;
1443 	struct spdk_bs_cpl		cpl;
1444 
1445 	assert(blob != NULL);
1446 
1447 	if (!read && blob->data_ro) {
1448 		cb_fn(cb_arg, -EPERM);
1449 		return;
1450 	}
1451 
1452 	if (length == 0) {
1453 		cb_fn(cb_arg, 0);
1454 		return;
1455 	}
1456 
1457 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1458 		cb_fn(cb_arg, -EINVAL);
1459 		return;
1460 	}
1461 
1462 	/*
1463 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1464 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1465 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1466 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1467 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1468 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1469 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1470 	 *
1471 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1472 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1473 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1474 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1475 	 */
1476 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1477 		uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
1478 		uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1479 
1480 		cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1481 		cpl.u.blob_basic.cb_fn = cb_fn;
1482 		cpl.u.blob_basic.cb_arg = cb_arg;
1483 
1484 		seq = spdk_bs_sequence_start(_channel, &cpl);
1485 		if (!seq) {
1486 			cb_fn(cb_arg, -ENOMEM);
1487 			return;
1488 		}
1489 
1490 		if (read) {
1491 			spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1492 		} else {
1493 			spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1494 		}
1495 	} else {
1496 		struct rw_iov_ctx *ctx;
1497 
1498 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1499 		if (ctx == NULL) {
1500 			cb_fn(cb_arg, -ENOMEM);
1501 			return;
1502 		}
1503 
1504 		ctx->blob = _blob;
1505 		ctx->channel = _channel;
1506 		ctx->cb_fn = cb_fn;
1507 		ctx->cb_arg = cb_arg;
1508 		ctx->read = read;
1509 		ctx->orig_iov = iov;
1510 		ctx->iovcnt = iovcnt;
1511 		ctx->page_offset = offset;
1512 		ctx->pages_remaining = length;
1513 		ctx->pages_done = 0;
1514 
1515 		_spdk_rw_iov_split_next(ctx, 0);
1516 	}
1517 }
1518 
1519 static struct spdk_blob_data *
1520 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1521 {
1522 	struct spdk_blob_data *blob;
1523 
1524 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1525 		if (blob->id == blobid) {
1526 			return blob;
1527 		}
1528 	}
1529 
1530 	return NULL;
1531 }
1532 
1533 static int
1534 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1535 {
1536 	struct spdk_blob_store		*bs = io_device;
1537 	struct spdk_bs_channel		*channel = ctx_buf;
1538 	struct spdk_bs_dev		*dev;
1539 	uint32_t			max_ops = bs->max_channel_ops;
1540 	uint32_t			i;
1541 
1542 	dev = bs->dev;
1543 
1544 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1545 	if (!channel->req_mem) {
1546 		return -1;
1547 	}
1548 
1549 	TAILQ_INIT(&channel->reqs);
1550 
1551 	for (i = 0; i < max_ops; i++) {
1552 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1553 	}
1554 
1555 	channel->bs = bs;
1556 	channel->dev = dev;
1557 	channel->dev_channel = dev->create_channel(dev);
1558 
1559 	if (!channel->dev_channel) {
1560 		SPDK_ERRLOG("Failed to create device channel.\n");
1561 		free(channel->req_mem);
1562 		return -1;
1563 	}
1564 
1565 	return 0;
1566 }
1567 
1568 static void
1569 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1570 {
1571 	struct spdk_bs_channel *channel = ctx_buf;
1572 
1573 	free(channel->req_mem);
1574 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1575 }
1576 
1577 static void
1578 _spdk_bs_dev_destroy(void *io_device)
1579 {
1580 	struct spdk_blob_store *bs = io_device;
1581 	struct spdk_blob_data	*blob, *blob_tmp;
1582 
1583 	bs->dev->destroy(bs->dev);
1584 
1585 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1586 		TAILQ_REMOVE(&bs->blobs, blob, link);
1587 		_spdk_blob_free(blob);
1588 	}
1589 
1590 	spdk_bit_array_free(&bs->used_blobids);
1591 	spdk_bit_array_free(&bs->used_md_pages);
1592 	spdk_bit_array_free(&bs->used_clusters);
1593 	/*
1594 	 * If this function is called for any reason except a successful unload,
1595 	 * the unload_cpl type will be NONE and this will be a nop.
1596 	 */
1597 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
1598 
1599 	free(bs);
1600 }
1601 
1602 static void
1603 _spdk_bs_free(struct spdk_blob_store *bs)
1604 {
1605 	spdk_bs_unregister_md_thread(bs);
1606 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
1607 }
1608 
1609 void
1610 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1611 {
1612 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1613 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1614 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1615 	opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
1616 	memset(&opts->bstype, 0, sizeof(opts->bstype));
1617 }
1618 
1619 static int
1620 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
1621 {
1622 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
1623 	    opts->max_channel_ops == 0) {
1624 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
1625 		return -1;
1626 	}
1627 
1628 	return 0;
1629 }
1630 
1631 static struct spdk_blob_store *
1632 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1633 {
1634 	struct spdk_blob_store	*bs;
1635 	uint64_t dev_size;
1636 	int rc;
1637 
1638 	dev_size = dev->blocklen * dev->blockcnt;
1639 	if (dev_size < opts->cluster_sz) {
1640 		/* Device size cannot be smaller than cluster size of blobstore */
1641 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size,
1642 			    opts->cluster_sz);
1643 		return NULL;
1644 	}
1645 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
1646 		/* Cluster size cannot be smaller than page size */
1647 		SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n",
1648 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
1649 		return NULL;
1650 	}
1651 	bs = calloc(1, sizeof(struct spdk_blob_store));
1652 	if (!bs) {
1653 		return NULL;
1654 	}
1655 
1656 	TAILQ_INIT(&bs->blobs);
1657 	bs->dev = dev;
1658 	bs->md_thread = spdk_get_thread();
1659 	assert(bs->md_thread != NULL);
1660 
1661 	/*
1662 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1663 	 *  even multiple of the cluster size.
1664 	 */
1665 	bs->cluster_sz = opts->cluster_sz;
1666 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1667 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
1668 	bs->num_free_clusters = bs->total_clusters;
1669 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1670 	if (bs->used_clusters == NULL) {
1671 		free(bs);
1672 		return NULL;
1673 	}
1674 
1675 	bs->max_channel_ops = opts->max_channel_ops;
1676 	bs->super_blob = SPDK_BLOBID_INVALID;
1677 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
1678 
1679 	/* The metadata is assumed to be at least 1 page */
1680 	bs->used_md_pages = spdk_bit_array_create(1);
1681 	bs->used_blobids = spdk_bit_array_create(0);
1682 
1683 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1684 				sizeof(struct spdk_bs_channel));
1685 	rc = spdk_bs_register_md_thread(bs);
1686 	if (rc == -1) {
1687 		spdk_io_device_unregister(bs, NULL);
1688 		spdk_bit_array_free(&bs->used_blobids);
1689 		spdk_bit_array_free(&bs->used_md_pages);
1690 		spdk_bit_array_free(&bs->used_clusters);
1691 		free(bs);
1692 		return NULL;
1693 	}
1694 
1695 	return bs;
1696 }
1697 
1698 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
1699 
1700 struct spdk_bs_load_ctx {
1701 	struct spdk_blob_store		*bs;
1702 	struct spdk_bs_super_block	*super;
1703 
1704 	struct spdk_bs_md_mask		*mask;
1705 	bool				in_page_chain;
1706 	uint32_t			page_index;
1707 	uint32_t			cur_page;
1708 	struct spdk_blob_md_page	*page;
1709 	bool				is_load;
1710 };
1711 
1712 static void
1713 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
1714 {
1715 	assert(bserrno != 0);
1716 
1717 	spdk_dma_free(ctx->super);
1718 	/*
1719 	 * Only free the blobstore when a load fails.  If an unload fails (for some reason)
1720 	 *  we want to keep the blobstore in case the caller wants to try again.
1721 	 */
1722 	if (ctx->is_load) {
1723 		_spdk_bs_free(ctx->bs);
1724 	}
1725 	free(ctx);
1726 	spdk_bs_sequence_finish(seq, bserrno);
1727 }
1728 
1729 static void
1730 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
1731 {
1732 	uint32_t i = 0;
1733 
1734 	while (true) {
1735 		i = spdk_bit_array_find_first_set(array, i);
1736 		if (i >= mask->length) {
1737 			break;
1738 		}
1739 		mask->mask[i / 8] |= 1U << (i % 8);
1740 		i++;
1741 	}
1742 }
1743 
1744 static void
1745 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1746 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1747 {
1748 	/* Update the values in the super block */
1749 	super->super_blob = bs->super_blob;
1750 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
1751 	super->crc = _spdk_blob_md_page_calc_crc(super);
1752 	spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
1753 				   _spdk_bs_byte_to_lba(bs, sizeof(*super)),
1754 				   cb_fn, cb_arg);
1755 }
1756 
1757 static void
1758 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1759 {
1760 	struct spdk_bs_load_ctx	*ctx = arg;
1761 	uint64_t	mask_size, lba, lba_count;
1762 
1763 	/* Write out the used clusters mask */
1764 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1765 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1766 	if (!ctx->mask) {
1767 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1768 		return;
1769 	}
1770 
1771 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1772 	ctx->mask->length = ctx->bs->total_clusters;
1773 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1774 
1775 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
1776 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1777 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1778 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1779 }
1780 
1781 static void
1782 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1783 {
1784 	struct spdk_bs_load_ctx	*ctx = arg;
1785 	uint64_t	mask_size, lba, lba_count;
1786 
1787 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1788 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1789 	if (!ctx->mask) {
1790 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1791 		return;
1792 	}
1793 
1794 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1795 	ctx->mask->length = ctx->super->md_len;
1796 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1797 
1798 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
1799 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1800 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1801 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1802 }
1803 
1804 static void
1805 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1806 {
1807 	struct spdk_bs_load_ctx	*ctx = arg;
1808 	uint64_t	mask_size, lba, lba_count;
1809 
1810 	if (ctx->super->used_blobid_mask_len == 0) {
1811 		/*
1812 		 * This is a pre-v3 on-disk format where the blobid mask does not get
1813 		 *  written to disk.
1814 		 */
1815 		cb_fn(seq, arg, 0);
1816 		return;
1817 	}
1818 
1819 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
1820 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1821 	if (!ctx->mask) {
1822 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1823 		return;
1824 	}
1825 
1826 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
1827 	ctx->mask->length = ctx->super->md_len;
1828 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
1829 
1830 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
1831 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
1832 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
1833 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1834 }
1835 
1836 static void
1837 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1838 {
1839 	struct spdk_bs_load_ctx *ctx = cb_arg;
1840 	uint32_t i, j;
1841 	int rc;
1842 
1843 	/* The type must be correct */
1844 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
1845 
1846 	/* The length of the mask (in bits) must not be greater than
1847 	 * the length of the buffer (converted to bits) */
1848 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
1849 
1850 	/* The length of the mask must be exactly equal to the size
1851 	 * (in pages) of the metadata region */
1852 	assert(ctx->mask->length == ctx->super->md_len);
1853 
1854 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->mask->length);
1855 	if (rc < 0) {
1856 		spdk_dma_free(ctx->mask);
1857 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1858 		return;
1859 	}
1860 
1861 	for (i = 0; i < ctx->mask->length / 8; i++) {
1862 		uint8_t segment = ctx->mask->mask[i];
1863 		for (j = 0; segment; j++) {
1864 			if (segment & 1U) {
1865 				spdk_bit_array_set(ctx->bs->used_blobids, (i * 8) + j);
1866 			}
1867 			segment >>= 1U;
1868 		}
1869 	}
1870 
1871 	spdk_dma_free(ctx->super);
1872 	spdk_dma_free(ctx->mask);
1873 	free(ctx);
1874 
1875 	spdk_bs_sequence_finish(seq, bserrno);
1876 }
1877 
1878 static void
1879 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1880 {
1881 	struct spdk_bs_load_ctx *ctx = cb_arg;
1882 	uint64_t		lba, lba_count, mask_size;
1883 	uint32_t		i, j;
1884 	int			rc;
1885 
1886 	/* The type must be correct */
1887 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1888 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1889 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1890 					     struct spdk_blob_md_page) * 8));
1891 	/* The length of the mask must be exactly equal to the total number of clusters */
1892 	assert(ctx->mask->length == ctx->bs->total_clusters);
1893 
1894 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1895 	if (rc < 0) {
1896 		spdk_dma_free(ctx->mask);
1897 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1898 		return;
1899 	}
1900 
1901 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1902 	for (i = 0; i < ctx->mask->length / 8; i++) {
1903 		uint8_t segment = ctx->mask->mask[i];
1904 		for (j = 0; segment && (j < 8); j++) {
1905 			if (segment & 1U) {
1906 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1907 				assert(ctx->bs->num_free_clusters > 0);
1908 				ctx->bs->num_free_clusters--;
1909 			}
1910 			segment >>= 1U;
1911 		}
1912 	}
1913 
1914 	spdk_dma_free(ctx->mask);
1915 
1916 	/* Read the used blobids mask */
1917 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
1918 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1919 	if (!ctx->mask) {
1920 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1921 		return;
1922 	}
1923 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
1924 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
1925 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
1926 				  _spdk_bs_load_used_blobids_cpl, ctx);
1927 }
1928 
1929 static void
1930 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1931 {
1932 	struct spdk_bs_load_ctx *ctx = cb_arg;
1933 	uint64_t		lba, lba_count, mask_size;
1934 	uint32_t		i, j;
1935 	int			rc;
1936 
1937 	/* The type must be correct */
1938 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1939 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1940 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
1941 				     8));
1942 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1943 	assert(ctx->mask->length == ctx->super->md_len);
1944 
1945 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1946 	if (rc < 0) {
1947 		spdk_dma_free(ctx->mask);
1948 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1949 		return;
1950 	}
1951 
1952 	for (i = 0; i < ctx->mask->length / 8; i++) {
1953 		uint8_t segment = ctx->mask->mask[i];
1954 		for (j = 0; segment && (j < 8); j++) {
1955 			if (segment & 1U) {
1956 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1957 			}
1958 			segment >>= 1U;
1959 		}
1960 	}
1961 	spdk_dma_free(ctx->mask);
1962 
1963 	/* Read the used clusters mask */
1964 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1965 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1966 	if (!ctx->mask) {
1967 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1968 		return;
1969 	}
1970 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1971 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1972 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
1973 				  _spdk_bs_load_used_clusters_cpl, ctx);
1974 }
1975 
1976 static void
1977 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1978 {
1979 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1980 	uint64_t lba, lba_count, mask_size;
1981 
1982 	/* Read the used pages mask */
1983 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1984 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1985 	if (!ctx->mask) {
1986 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1987 		return;
1988 	}
1989 
1990 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1991 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1992 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
1993 				  _spdk_bs_load_used_pages_cpl, ctx);
1994 }
1995 
1996 static int
1997 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
1998 {
1999 	struct spdk_blob_md_descriptor *desc;
2000 	size_t	cur_desc = 0;
2001 
2002 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
2003 	while (cur_desc < sizeof(page->descriptors)) {
2004 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
2005 			if (desc->length == 0) {
2006 				/* If padding and length are 0, this terminates the page */
2007 				break;
2008 			}
2009 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
2010 			struct spdk_blob_md_descriptor_extent	*desc_extent;
2011 			unsigned int				i, j;
2012 			unsigned int				cluster_count = 0;
2013 
2014 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
2015 
2016 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
2017 				for (j = 0; j < desc_extent->extents[i].length; j++) {
2018 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
2019 					if (bs->num_free_clusters == 0) {
2020 						return -1;
2021 					}
2022 					bs->num_free_clusters--;
2023 					cluster_count++;
2024 				}
2025 			}
2026 			if (cluster_count == 0) {
2027 				return -1;
2028 			}
2029 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
2030 			/* Skip this item */
2031 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
2032 			/* Skip this item */
2033 		} else {
2034 			/* Error */
2035 			return -1;
2036 		}
2037 		/* Advance to the next descriptor */
2038 		cur_desc += sizeof(*desc) + desc->length;
2039 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
2040 			break;
2041 		}
2042 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
2043 	}
2044 	return 0;
2045 }
2046 
2047 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
2048 {
2049 	uint32_t crc;
2050 
2051 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
2052 	if (crc != ctx->page->crc) {
2053 		return false;
2054 	}
2055 
2056 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
2057 		return false;
2058 	}
2059 	return true;
2060 }
2061 
2062 static void
2063 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
2064 
2065 static void
2066 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2067 {
2068 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2069 
2070 	spdk_dma_free(ctx->mask);
2071 	spdk_dma_free(ctx->super);
2072 	spdk_bs_sequence_finish(seq, bserrno);
2073 	free(ctx);
2074 }
2075 
2076 static void
2077 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2078 {
2079 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2080 
2081 	spdk_dma_free(ctx->mask);
2082 	ctx->mask = NULL;
2083 
2084 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
2085 }
2086 
2087 static void
2088 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2089 {
2090 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2091 
2092 	spdk_dma_free(ctx->mask);
2093 	ctx->mask = NULL;
2094 
2095 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
2096 }
2097 
2098 static void
2099 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2100 {
2101 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
2102 }
2103 
2104 static void
2105 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2106 {
2107 	struct spdk_bs_load_ctx *ctx = cb_arg;
2108 	uint64_t num_md_clusters;
2109 	uint64_t i;
2110 	uint32_t page_num;
2111 
2112 	if (bserrno != 0) {
2113 		_spdk_bs_load_ctx_fail(seq, ctx, bserrno);
2114 		return;
2115 	}
2116 
2117 	page_num = ctx->cur_page;
2118 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
2119 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
2120 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
2121 			if (ctx->page->sequence_num == 0) {
2122 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
2123 			}
2124 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
2125 				_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2126 				return;
2127 			}
2128 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
2129 				ctx->in_page_chain = true;
2130 				ctx->cur_page = ctx->page->next;
2131 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2132 				return;
2133 			}
2134 		}
2135 	}
2136 
2137 	ctx->in_page_chain = false;
2138 
2139 	do {
2140 		ctx->page_index++;
2141 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
2142 
2143 	if (ctx->page_index < ctx->super->md_len) {
2144 		ctx->cur_page = ctx->page_index;
2145 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2146 	} else {
2147 		/* Claim all of the clusters used by the metadata */
2148 		num_md_clusters = divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
2149 		for (i = 0; i < num_md_clusters; i++) {
2150 			_spdk_bs_claim_cluster(ctx->bs, i);
2151 		}
2152 		spdk_dma_free(ctx->page);
2153 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
2154 	}
2155 }
2156 
2157 static void
2158 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
2159 {
2160 	struct spdk_bs_load_ctx *ctx = cb_arg;
2161 	uint64_t lba;
2162 
2163 	assert(ctx->cur_page < ctx->super->md_len);
2164 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
2165 	spdk_bs_sequence_read_dev(seq, ctx->page, lba,
2166 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
2167 				  _spdk_bs_load_replay_md_cpl, ctx);
2168 }
2169 
2170 static void
2171 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
2172 {
2173 	struct spdk_bs_load_ctx *ctx = cb_arg;
2174 
2175 	ctx->page_index = 0;
2176 	ctx->cur_page = 0;
2177 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
2178 				     SPDK_BS_PAGE_SIZE,
2179 				     NULL);
2180 	if (!ctx->page) {
2181 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2182 		return;
2183 	}
2184 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2185 }
2186 
2187 static void
2188 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2189 {
2190 	struct spdk_bs_load_ctx *ctx = cb_arg;
2191 	int 		rc;
2192 
2193 	if (bserrno != 0) {
2194 		_spdk_bs_load_ctx_fail(seq, ctx, -EIO);
2195 		return;
2196 	}
2197 
2198 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
2199 	if (rc < 0) {
2200 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2201 		return;
2202 	}
2203 
2204 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
2205 	if (rc < 0) {
2206 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2207 		return;
2208 	}
2209 
2210 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2211 	if (rc < 0) {
2212 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2213 		return;
2214 	}
2215 
2216 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2217 	_spdk_bs_load_replay_md(seq, cb_arg);
2218 }
2219 
2220 static void
2221 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2222 {
2223 	struct spdk_bs_load_ctx *ctx = cb_arg;
2224 	uint32_t	crc;
2225 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
2226 
2227 	if (ctx->super->version > SPDK_BS_VERSION ||
2228 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
2229 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2230 		return;
2231 	}
2232 
2233 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2234 		   sizeof(ctx->super->signature)) != 0) {
2235 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2236 		return;
2237 	}
2238 
2239 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2240 	if (crc != ctx->super->crc) {
2241 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2242 		return;
2243 	}
2244 
2245 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2246 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2247 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2248 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2249 	} else {
2250 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2251 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2252 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2253 		_spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
2254 		return;
2255 	}
2256 
2257 	/* Parse the super block */
2258 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2259 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2260 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2261 	ctx->bs->md_start = ctx->super->md_start;
2262 	ctx->bs->md_len = ctx->super->md_len;
2263 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2264 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2265 	ctx->bs->super_blob = ctx->super->super_blob;
2266 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2267 
2268 	if (ctx->super->clean == 0) {
2269 		_spdk_bs_recover(seq, ctx, 0);
2270 	} else if (ctx->super->used_blobid_mask_len == 0) {
2271 		/*
2272 		 * Metadata is clean, but this is an old metadata format without
2273 		 *  a blobid mask.  Clear the clean bit and then build the masks
2274 		 *  using _spdk_bs_recover.
2275 		 */
2276 		ctx->super->clean = 0;
2277 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_recover, ctx);
2278 	} else {
2279 		ctx->super->clean = 0;
2280 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2281 	}
2282 }
2283 
2284 void
2285 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2286 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2287 {
2288 	struct spdk_blob_store	*bs;
2289 	struct spdk_bs_cpl	cpl;
2290 	spdk_bs_sequence_t	*seq;
2291 	struct spdk_bs_load_ctx *ctx;
2292 	struct spdk_bs_opts	opts = {};
2293 
2294 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2295 
2296 	if (o) {
2297 		opts = *o;
2298 	} else {
2299 		spdk_bs_opts_init(&opts);
2300 	}
2301 
2302 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2303 		cb_fn(cb_arg, NULL, -EINVAL);
2304 		return;
2305 	}
2306 
2307 	bs = _spdk_bs_alloc(dev, &opts);
2308 	if (!bs) {
2309 		cb_fn(cb_arg, NULL, -ENOMEM);
2310 		return;
2311 	}
2312 
2313 	ctx = calloc(1, sizeof(*ctx));
2314 	if (!ctx) {
2315 		_spdk_bs_free(bs);
2316 		cb_fn(cb_arg, NULL, -ENOMEM);
2317 		return;
2318 	}
2319 
2320 	ctx->bs = bs;
2321 	ctx->is_load = true;
2322 
2323 	/* Allocate memory for the super block */
2324 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2325 	if (!ctx->super) {
2326 		free(ctx);
2327 		_spdk_bs_free(bs);
2328 		return;
2329 	}
2330 
2331 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2332 	cpl.u.bs_handle.cb_fn = cb_fn;
2333 	cpl.u.bs_handle.cb_arg = cb_arg;
2334 	cpl.u.bs_handle.bs = bs;
2335 
2336 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2337 	if (!seq) {
2338 		spdk_dma_free(ctx->super);
2339 		free(ctx);
2340 		_spdk_bs_free(bs);
2341 		cb_fn(cb_arg, NULL, -ENOMEM);
2342 		return;
2343 	}
2344 
2345 	/* Read the super block */
2346 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2347 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2348 				  _spdk_bs_load_super_cpl, ctx);
2349 }
2350 
2351 /* END spdk_bs_load */
2352 
2353 /* START spdk_bs_init */
2354 
2355 struct spdk_bs_init_ctx {
2356 	struct spdk_blob_store		*bs;
2357 	struct spdk_bs_super_block	*super;
2358 };
2359 
2360 static void
2361 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2362 {
2363 	struct spdk_bs_init_ctx *ctx = cb_arg;
2364 
2365 	spdk_dma_free(ctx->super);
2366 	free(ctx);
2367 
2368 	spdk_bs_sequence_finish(seq, bserrno);
2369 }
2370 
2371 static void
2372 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2373 {
2374 	struct spdk_bs_init_ctx *ctx = cb_arg;
2375 
2376 	/* Write super block */
2377 	spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2378 				   _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2379 				   _spdk_bs_init_persist_super_cpl, ctx);
2380 }
2381 
2382 void
2383 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2384 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2385 {
2386 	struct spdk_bs_init_ctx *ctx;
2387 	struct spdk_blob_store	*bs;
2388 	struct spdk_bs_cpl	cpl;
2389 	spdk_bs_sequence_t	*seq;
2390 	spdk_bs_batch_t		*batch;
2391 	uint64_t		num_md_lba;
2392 	uint64_t		num_md_pages;
2393 	uint64_t		num_md_clusters;
2394 	uint32_t		i;
2395 	struct spdk_bs_opts	opts = {};
2396 	int			rc;
2397 
2398 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2399 
2400 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2401 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2402 			    dev->blocklen);
2403 		dev->destroy(dev);
2404 		cb_fn(cb_arg, NULL, -EINVAL);
2405 		return;
2406 	}
2407 
2408 	if (o) {
2409 		opts = *o;
2410 	} else {
2411 		spdk_bs_opts_init(&opts);
2412 	}
2413 
2414 	if (_spdk_bs_opts_verify(&opts) != 0) {
2415 		dev->destroy(dev);
2416 		cb_fn(cb_arg, NULL, -EINVAL);
2417 		return;
2418 	}
2419 
2420 	bs = _spdk_bs_alloc(dev, &opts);
2421 	if (!bs) {
2422 		dev->destroy(dev);
2423 		cb_fn(cb_arg, NULL, -ENOMEM);
2424 		return;
2425 	}
2426 
2427 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2428 		/* By default, allocate 1 page per cluster.
2429 		 * Technically, this over-allocates metadata
2430 		 * because more metadata will reduce the number
2431 		 * of usable clusters. This can be addressed with
2432 		 * more complex math in the future.
2433 		 */
2434 		bs->md_len = bs->total_clusters;
2435 	} else {
2436 		bs->md_len = opts.num_md_pages;
2437 	}
2438 
2439 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2440 	if (rc < 0) {
2441 		_spdk_bs_free(bs);
2442 		cb_fn(cb_arg, NULL, -ENOMEM);
2443 		return;
2444 	}
2445 
2446 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
2447 	if (rc < 0) {
2448 		_spdk_bs_free(bs);
2449 		cb_fn(cb_arg, NULL, -ENOMEM);
2450 		return;
2451 	}
2452 
2453 	ctx = calloc(1, sizeof(*ctx));
2454 	if (!ctx) {
2455 		_spdk_bs_free(bs);
2456 		cb_fn(cb_arg, NULL, -ENOMEM);
2457 		return;
2458 	}
2459 
2460 	ctx->bs = bs;
2461 
2462 	/* Allocate memory for the super block */
2463 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2464 	if (!ctx->super) {
2465 		free(ctx);
2466 		_spdk_bs_free(bs);
2467 		return;
2468 	}
2469 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2470 	       sizeof(ctx->super->signature));
2471 	ctx->super->version = SPDK_BS_VERSION;
2472 	ctx->super->length = sizeof(*ctx->super);
2473 	ctx->super->super_blob = bs->super_blob;
2474 	ctx->super->clean = 0;
2475 	ctx->super->cluster_size = bs->cluster_sz;
2476 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
2477 
2478 	/* Calculate how many pages the metadata consumes at the front
2479 	 * of the disk.
2480 	 */
2481 
2482 	/* The super block uses 1 page */
2483 	num_md_pages = 1;
2484 
2485 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
2486 	 * up to the nearest page, plus a header.
2487 	 */
2488 	ctx->super->used_page_mask_start = num_md_pages;
2489 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2490 					 divide_round_up(bs->md_len, 8),
2491 					 SPDK_BS_PAGE_SIZE);
2492 	num_md_pages += ctx->super->used_page_mask_len;
2493 
2494 	/* The used_clusters mask requires 1 bit per cluster, rounded
2495 	 * up to the nearest page, plus a header.
2496 	 */
2497 	ctx->super->used_cluster_mask_start = num_md_pages;
2498 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2499 					    divide_round_up(bs->total_clusters, 8),
2500 					    SPDK_BS_PAGE_SIZE);
2501 	num_md_pages += ctx->super->used_cluster_mask_len;
2502 
2503 	/* The used_blobids mask requires 1 bit per metadata page, rounded
2504 	 * up to the nearest page, plus a header.
2505 	 */
2506 	ctx->super->used_blobid_mask_start = num_md_pages;
2507 	ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2508 					   divide_round_up(bs->md_len, 8),
2509 					   SPDK_BS_PAGE_SIZE);
2510 	num_md_pages += ctx->super->used_blobid_mask_len;
2511 
2512 	/* The metadata region size was chosen above */
2513 	ctx->super->md_start = bs->md_start = num_md_pages;
2514 	ctx->super->md_len = bs->md_len;
2515 	num_md_pages += bs->md_len;
2516 
2517 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
2518 
2519 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
2520 
2521 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
2522 	if (num_md_clusters > bs->total_clusters) {
2523 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
2524 			    "please decrease number of pages reserved for metadata "
2525 			    "or increase cluster size.\n");
2526 		spdk_dma_free(ctx->super);
2527 		free(ctx);
2528 		_spdk_bs_free(bs);
2529 		cb_fn(cb_arg, NULL, -ENOMEM);
2530 		return;
2531 	}
2532 	/* Claim all of the clusters used by the metadata */
2533 	for (i = 0; i < num_md_clusters; i++) {
2534 		_spdk_bs_claim_cluster(bs, i);
2535 	}
2536 
2537 	bs->total_data_clusters = bs->num_free_clusters;
2538 
2539 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2540 	cpl.u.bs_handle.cb_fn = cb_fn;
2541 	cpl.u.bs_handle.cb_arg = cb_arg;
2542 	cpl.u.bs_handle.bs = bs;
2543 
2544 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2545 	if (!seq) {
2546 		spdk_dma_free(ctx->super);
2547 		free(ctx);
2548 		_spdk_bs_free(bs);
2549 		cb_fn(cb_arg, NULL, -ENOMEM);
2550 		return;
2551 	}
2552 
2553 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
2554 
2555 	/* Clear metadata space */
2556 	spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
2557 	/* Trim data clusters */
2558 	spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
2559 
2560 	spdk_bs_batch_close(batch);
2561 }
2562 
2563 /* END spdk_bs_init */
2564 
2565 /* START spdk_bs_destroy */
2566 
2567 static void
2568 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2569 {
2570 	struct spdk_bs_init_ctx *ctx = cb_arg;
2571 	struct spdk_blob_store *bs = ctx->bs;
2572 
2573 	/*
2574 	 * We need to defer calling spdk_bs_call_cpl() until after
2575 	 * dev destruction, so tuck these away for later use.
2576 	 */
2577 	bs->unload_err = bserrno;
2578 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2579 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2580 
2581 	spdk_bs_sequence_finish(seq, bserrno);
2582 
2583 	_spdk_bs_free(bs);
2584 	free(ctx);
2585 }
2586 
2587 void
2588 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
2589 		void *cb_arg)
2590 {
2591 	struct spdk_bs_cpl	cpl;
2592 	spdk_bs_sequence_t	*seq;
2593 	struct spdk_bs_init_ctx *ctx;
2594 
2595 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
2596 
2597 	if (!TAILQ_EMPTY(&bs->blobs)) {
2598 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2599 		cb_fn(cb_arg, -EBUSY);
2600 		return;
2601 	}
2602 
2603 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2604 	cpl.u.bs_basic.cb_fn = cb_fn;
2605 	cpl.u.bs_basic.cb_arg = cb_arg;
2606 
2607 	ctx = calloc(1, sizeof(*ctx));
2608 	if (!ctx) {
2609 		cb_fn(cb_arg, -ENOMEM);
2610 		return;
2611 	}
2612 
2613 	ctx->bs = bs;
2614 
2615 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2616 	if (!seq) {
2617 		free(ctx);
2618 		cb_fn(cb_arg, -ENOMEM);
2619 		return;
2620 	}
2621 
2622 	/* Write zeroes to the super block */
2623 	spdk_bs_sequence_write_zeroes_dev(seq,
2624 					  _spdk_bs_page_to_lba(bs, 0),
2625 					  _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
2626 					  _spdk_bs_destroy_trim_cpl, ctx);
2627 }
2628 
2629 /* END spdk_bs_destroy */
2630 
2631 /* START spdk_bs_unload */
2632 
2633 static void
2634 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2635 {
2636 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2637 
2638 	spdk_dma_free(ctx->super);
2639 
2640 	/*
2641 	 * We need to defer calling spdk_bs_call_cpl() until after
2642 	 * dev destuction, so tuck these away for later use.
2643 	 */
2644 	ctx->bs->unload_err = bserrno;
2645 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2646 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2647 
2648 	spdk_bs_sequence_finish(seq, bserrno);
2649 
2650 	_spdk_bs_free(ctx->bs);
2651 	free(ctx);
2652 }
2653 
2654 static void
2655 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2656 {
2657 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2658 
2659 	spdk_dma_free(ctx->mask);
2660 	ctx->super->clean = 1;
2661 
2662 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
2663 }
2664 
2665 static void
2666 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2667 {
2668 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2669 
2670 	spdk_dma_free(ctx->mask);
2671 	ctx->mask = NULL;
2672 
2673 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
2674 }
2675 
2676 static void
2677 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2678 {
2679 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2680 
2681 	spdk_dma_free(ctx->mask);
2682 	ctx->mask = NULL;
2683 
2684 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
2685 }
2686 
2687 static void
2688 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2689 {
2690 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
2691 }
2692 
2693 void
2694 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
2695 {
2696 	struct spdk_bs_cpl	cpl;
2697 	spdk_bs_sequence_t	*seq;
2698 	struct spdk_bs_load_ctx *ctx;
2699 
2700 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
2701 
2702 	if (!TAILQ_EMPTY(&bs->blobs)) {
2703 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2704 		cb_fn(cb_arg, -EBUSY);
2705 		return;
2706 	}
2707 
2708 	ctx = calloc(1, sizeof(*ctx));
2709 	if (!ctx) {
2710 		cb_fn(cb_arg, -ENOMEM);
2711 		return;
2712 	}
2713 
2714 	ctx->bs = bs;
2715 	ctx->is_load = false;
2716 
2717 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2718 	if (!ctx->super) {
2719 		free(ctx);
2720 		cb_fn(cb_arg, -ENOMEM);
2721 		return;
2722 	}
2723 
2724 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2725 	cpl.u.bs_basic.cb_fn = cb_fn;
2726 	cpl.u.bs_basic.cb_arg = cb_arg;
2727 
2728 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2729 	if (!seq) {
2730 		spdk_dma_free(ctx->super);
2731 		free(ctx);
2732 		cb_fn(cb_arg, -ENOMEM);
2733 		return;
2734 	}
2735 
2736 	/* Read super block */
2737 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2738 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2739 				  _spdk_bs_unload_read_super_cpl, ctx);
2740 }
2741 
2742 /* END spdk_bs_unload */
2743 
2744 void
2745 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
2746 		  spdk_bs_op_complete cb_fn, void *cb_arg)
2747 {
2748 	bs->super_blob = blobid;
2749 	cb_fn(cb_arg, 0);
2750 }
2751 
2752 void
2753 spdk_bs_get_super(struct spdk_blob_store *bs,
2754 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2755 {
2756 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
2757 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
2758 	} else {
2759 		cb_fn(cb_arg, bs->super_blob, 0);
2760 	}
2761 }
2762 
2763 uint64_t
2764 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
2765 {
2766 	return bs->cluster_sz;
2767 }
2768 
2769 uint64_t
2770 spdk_bs_get_page_size(struct spdk_blob_store *bs)
2771 {
2772 	return SPDK_BS_PAGE_SIZE;
2773 }
2774 
2775 uint64_t
2776 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
2777 {
2778 	return bs->num_free_clusters;
2779 }
2780 
2781 uint64_t
2782 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
2783 {
2784 	return bs->total_data_clusters;
2785 }
2786 
2787 static int
2788 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
2789 {
2790 	bs->md_channel = spdk_get_io_channel(bs);
2791 	if (!bs->md_channel) {
2792 		SPDK_ERRLOG("Failed to get IO channel.\n");
2793 		return -1;
2794 	}
2795 
2796 	return 0;
2797 }
2798 
2799 static int
2800 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
2801 {
2802 	spdk_put_io_channel(bs->md_channel);
2803 
2804 	return 0;
2805 }
2806 
2807 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob)
2808 {
2809 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2810 
2811 	assert(blob != NULL);
2812 
2813 	return blob->id;
2814 }
2815 
2816 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob)
2817 {
2818 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2819 
2820 	assert(blob != NULL);
2821 
2822 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
2823 }
2824 
2825 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob)
2826 {
2827 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2828 
2829 	assert(blob != NULL);
2830 
2831 	return blob->active.num_clusters;
2832 }
2833 
2834 /* START spdk_bs_create_blob */
2835 
2836 static void
2837 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2838 {
2839 	struct spdk_blob_data *blob = cb_arg;
2840 
2841 	_spdk_blob_free(blob);
2842 
2843 	spdk_bs_sequence_finish(seq, bserrno);
2844 }
2845 
2846 static int
2847 _spdk_blob_set_xattrs(struct spdk_blob	*blob, const struct spdk_blob_opts *opts)
2848 {
2849 	uint64_t i;
2850 	size_t value_len = 0;
2851 	int rc;
2852 	const void *value = NULL;
2853 	if (opts->xattr_count > 0 && opts->get_xattr_value == NULL) {
2854 		return -EINVAL;
2855 	}
2856 	for (i = 0; i < opts->xattr_count; i++) {
2857 		opts->get_xattr_value(opts->xattr_ctx, opts->xattr_names[i], &value, &value_len);
2858 		if (value == NULL || value_len == 0) {
2859 			return -EINVAL;
2860 		}
2861 		rc = spdk_blob_set_xattr(blob, opts->xattr_names[i], value, value_len);
2862 		if (rc < 0) {
2863 			return rc;
2864 		}
2865 	}
2866 	return 0;
2867 }
2868 
2869 static void
2870 _spdk_blob_set_thin_provision(struct spdk_blob_data *blob)
2871 {
2872 	blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
2873 	blob->state = SPDK_BLOB_STATE_DIRTY;
2874 }
2875 
2876 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
2877 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2878 {
2879 	struct spdk_blob_data	*blob;
2880 	uint32_t		page_idx;
2881 	struct spdk_bs_cpl 	cpl;
2882 	struct spdk_blob_opts	opts_default;
2883 	spdk_bs_sequence_t	*seq;
2884 	spdk_blob_id		id;
2885 	int rc;
2886 
2887 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
2888 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
2889 		cb_fn(cb_arg, 0, -ENOMEM);
2890 		return;
2891 	}
2892 	spdk_bit_array_set(bs->used_blobids, page_idx);
2893 	spdk_bit_array_set(bs->used_md_pages, page_idx);
2894 
2895 	id = _spdk_bs_page_to_blobid(page_idx);
2896 
2897 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
2898 
2899 	blob = _spdk_blob_alloc(bs, id);
2900 	if (!blob) {
2901 		cb_fn(cb_arg, 0, -ENOMEM);
2902 		return;
2903 	}
2904 
2905 	if (!opts) {
2906 		spdk_blob_opts_init(&opts_default);
2907 		opts = &opts_default;
2908 	}
2909 	rc = _spdk_blob_set_xattrs(__data_to_blob(blob), opts);
2910 	if (rc < 0) {
2911 		_spdk_blob_free(blob);
2912 		cb_fn(cb_arg, 0, rc);
2913 		return;
2914 	}
2915 	if (opts->thin_provision) {
2916 		_spdk_blob_set_thin_provision(blob);
2917 	}
2918 
2919 	rc = spdk_blob_resize(__data_to_blob(blob), opts->num_clusters);
2920 	if (rc < 0) {
2921 		_spdk_blob_free(blob);
2922 		cb_fn(cb_arg, 0, rc);
2923 		return;
2924 	}
2925 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
2926 	cpl.u.blobid.cb_fn = cb_fn;
2927 	cpl.u.blobid.cb_arg = cb_arg;
2928 	cpl.u.blobid.blobid = blob->id;
2929 
2930 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2931 	if (!seq) {
2932 		_spdk_blob_free(blob);
2933 		cb_fn(cb_arg, 0, -ENOMEM);
2934 		return;
2935 	}
2936 
2937 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
2938 }
2939 
2940 void spdk_bs_create_blob(struct spdk_blob_store *bs,
2941 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2942 {
2943 	spdk_bs_create_blob_ext(bs, NULL, cb_fn, cb_arg);
2944 }
2945 
2946 /* END spdk_bs_create_blob */
2947 
2948 /* START spdk_blob_resize */
2949 int
2950 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz)
2951 {
2952 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2953 	int			rc;
2954 
2955 	assert(blob != NULL);
2956 	assert(spdk_get_thread() == blob->bs->md_thread);
2957 
2958 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
2959 
2960 	if (blob->md_ro) {
2961 		return -EPERM;
2962 	}
2963 
2964 	if (sz == blob->active.num_clusters) {
2965 		return 0;
2966 	}
2967 
2968 	rc = _spdk_resize_blob(blob, sz);
2969 	if (rc < 0) {
2970 		return rc;
2971 	}
2972 
2973 	return 0;
2974 }
2975 
2976 /* END spdk_blob_resize */
2977 
2978 
2979 /* START spdk_bs_delete_blob */
2980 
2981 static void
2982 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
2983 {
2984 	spdk_bs_sequence_t *seq = cb_arg;
2985 
2986 	spdk_bs_sequence_finish(seq, bserrno);
2987 }
2988 
2989 static void
2990 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2991 {
2992 	struct spdk_blob *_blob = cb_arg;
2993 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2994 
2995 	if (bserrno != 0) {
2996 		/*
2997 		 * We already removed this blob from the blobstore tailq, so
2998 		 *  we need to free it here since this is the last reference
2999 		 *  to it.
3000 		 */
3001 		_spdk_blob_free(blob);
3002 		_spdk_bs_delete_close_cpl(seq, bserrno);
3003 		return;
3004 	}
3005 
3006 	/*
3007 	 * This will immediately decrement the ref_count and call
3008 	 *  the completion routine since the metadata state is clean.
3009 	 *  By calling spdk_blob_close, we reduce the number of call
3010 	 *  points into code that touches the blob->open_ref count
3011 	 *  and the blobstore's blob list.
3012 	 */
3013 	spdk_blob_close(_blob, _spdk_bs_delete_close_cpl, seq);
3014 }
3015 
3016 static void
3017 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
3018 {
3019 	spdk_bs_sequence_t *seq = cb_arg;
3020 	struct spdk_blob_data *blob = __blob_to_data(_blob);
3021 	uint32_t page_num;
3022 
3023 	if (bserrno != 0) {
3024 		spdk_bs_sequence_finish(seq, bserrno);
3025 		return;
3026 	}
3027 
3028 	if (blob->open_ref > 1) {
3029 		/*
3030 		 * Someone has this blob open (besides this delete context).
3031 		 *  Decrement the ref count directly and return -EBUSY.
3032 		 */
3033 		blob->open_ref--;
3034 		spdk_bs_sequence_finish(seq, -EBUSY);
3035 		return;
3036 	}
3037 
3038 	/*
3039 	 * Remove the blob from the blob_store list now, to ensure it does not
3040 	 *  get returned after this point by _spdk_blob_lookup().
3041 	 */
3042 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3043 	page_num = _spdk_bs_blobid_to_page(blob->id);
3044 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
3045 	blob->state = SPDK_BLOB_STATE_DIRTY;
3046 	blob->active.num_pages = 0;
3047 	_spdk_resize_blob(blob, 0);
3048 
3049 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, _blob);
3050 }
3051 
3052 void
3053 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
3054 		    spdk_blob_op_complete cb_fn, void *cb_arg)
3055 {
3056 	struct spdk_bs_cpl	cpl;
3057 	spdk_bs_sequence_t 	*seq;
3058 
3059 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
3060 
3061 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3062 	cpl.u.blob_basic.cb_fn = cb_fn;
3063 	cpl.u.blob_basic.cb_arg = cb_arg;
3064 
3065 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3066 	if (!seq) {
3067 		cb_fn(cb_arg, -ENOMEM);
3068 		return;
3069 	}
3070 
3071 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
3072 }
3073 
3074 /* END spdk_bs_delete_blob */
3075 
3076 /* START spdk_bs_open_blob */
3077 
3078 static void
3079 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3080 {
3081 	struct spdk_blob_data *blob = cb_arg;
3082 
3083 	/* If the blob have crc error, we just return NULL. */
3084 	if (blob == NULL) {
3085 		seq->cpl.u.blob_handle.blob = NULL;
3086 		spdk_bs_sequence_finish(seq, bserrno);
3087 		return;
3088 	}
3089 
3090 	blob->open_ref++;
3091 
3092 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
3093 
3094 	spdk_bs_sequence_finish(seq, bserrno);
3095 }
3096 
3097 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
3098 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3099 {
3100 	struct spdk_blob_data		*blob;
3101 	struct spdk_bs_cpl		cpl;
3102 	spdk_bs_sequence_t		*seq;
3103 	uint32_t			page_num;
3104 
3105 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
3106 
3107 	page_num = _spdk_bs_blobid_to_page(blobid);
3108 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
3109 		/* Invalid blobid */
3110 		cb_fn(cb_arg, NULL, -ENOENT);
3111 		return;
3112 	}
3113 
3114 	blob = _spdk_blob_lookup(bs, blobid);
3115 	if (blob) {
3116 		blob->open_ref++;
3117 		cb_fn(cb_arg, __data_to_blob(blob), 0);
3118 		return;
3119 	}
3120 
3121 	blob = _spdk_blob_alloc(bs, blobid);
3122 	if (!blob) {
3123 		cb_fn(cb_arg, NULL, -ENOMEM);
3124 		return;
3125 	}
3126 
3127 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
3128 	cpl.u.blob_handle.cb_fn = cb_fn;
3129 	cpl.u.blob_handle.cb_arg = cb_arg;
3130 	cpl.u.blob_handle.blob = __data_to_blob(blob);
3131 
3132 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3133 	if (!seq) {
3134 		_spdk_blob_free(blob);
3135 		cb_fn(cb_arg, NULL, -ENOMEM);
3136 		return;
3137 	}
3138 
3139 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
3140 }
3141 /* END spdk_bs_open_blob */
3142 
3143 /* START spdk_blob_set_read_only */
3144 void spdk_blob_set_read_only(struct spdk_blob *b)
3145 {
3146 	struct spdk_blob_data *blob = __blob_to_data(b);
3147 
3148 	assert(spdk_get_thread() == blob->bs->md_thread);
3149 
3150 	blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
3151 
3152 	blob->state = SPDK_BLOB_STATE_DIRTY;
3153 }
3154 /* END spdk_blob_set_read_only */
3155 
3156 /* START spdk_blob_sync_md */
3157 
3158 static void
3159 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3160 {
3161 	struct spdk_blob_data *blob = __blob_to_data(cb_arg);
3162 
3163 	if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
3164 		blob->data_ro = true;
3165 		blob->md_ro = true;
3166 	}
3167 
3168 	spdk_bs_sequence_finish(seq, bserrno);
3169 }
3170 
3171 static void
3172 _spdk_blob_sync_md(struct spdk_blob_data *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3173 {
3174 	struct spdk_bs_cpl	cpl;
3175 	spdk_bs_sequence_t	*seq;
3176 
3177 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3178 	cpl.u.blob_basic.cb_fn = cb_fn;
3179 	cpl.u.blob_basic.cb_arg = cb_arg;
3180 
3181 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3182 	if (!seq) {
3183 		cb_fn(cb_arg, -ENOMEM);
3184 		return;
3185 	}
3186 
3187 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
3188 }
3189 
3190 void
3191 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3192 {
3193 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3194 
3195 	assert(blob != NULL);
3196 	assert(spdk_get_thread() == blob->bs->md_thread);
3197 
3198 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
3199 
3200 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3201 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3202 
3203 	if (blob->md_ro) {
3204 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
3205 		cb_fn(cb_arg, 0);
3206 		return;
3207 	}
3208 
3209 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
3210 		cb_fn(cb_arg, 0);
3211 		return;
3212 	}
3213 
3214 	_spdk_blob_sync_md(blob, cb_fn, cb_arg);
3215 }
3216 
3217 /* END spdk_blob_sync_md */
3218 
3219 struct spdk_blob_insert_cluster_ctx {
3220 	struct spdk_thread	*thread;
3221 	struct spdk_blob_data	*blob;
3222 	uint32_t		cluster_num;	/* cluster index in blob */
3223 	uint32_t		cluster;	/* cluster on disk */
3224 	int			rc;
3225 	spdk_blob_op_complete	cb_fn;
3226 	void			*cb_arg;
3227 };
3228 
3229 static void
3230 _spdk_blob_insert_cluster_msg_cpl(void *arg)
3231 {
3232 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3233 
3234 	ctx->cb_fn(ctx->cb_arg, ctx->rc);
3235 	free(ctx);
3236 }
3237 
3238 static void
3239 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
3240 {
3241 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3242 
3243 	ctx->rc = bserrno;
3244 	spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
3245 }
3246 
3247 static void
3248 _spdk_blob_insert_cluster_msg(void *arg)
3249 {
3250 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3251 
3252 	ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
3253 	if (ctx->rc != 0) {
3254 		spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
3255 		return;
3256 	}
3257 
3258 	ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
3259 	_spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
3260 }
3261 
3262 void
3263 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob_data *blob, uint32_t cluster_num,
3264 				       uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg)
3265 {
3266 	struct spdk_blob_insert_cluster_ctx *ctx;
3267 
3268 	ctx = calloc(1, sizeof(*ctx));
3269 	if (ctx == NULL) {
3270 		cb_fn(cb_arg, -ENOMEM);
3271 		return;
3272 	}
3273 
3274 	ctx->thread = spdk_get_thread();
3275 	ctx->blob = blob;
3276 	ctx->cluster_num = cluster_num;
3277 	ctx->cluster = cluster;
3278 	ctx->cb_fn = cb_fn;
3279 	ctx->cb_arg = cb_arg;
3280 
3281 	spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
3282 }
3283 
3284 /* START spdk_blob_close */
3285 
3286 static void
3287 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3288 {
3289 	struct spdk_blob_data *blob = cb_arg;
3290 
3291 	if (bserrno == 0) {
3292 		blob->open_ref--;
3293 		if (blob->open_ref == 0) {
3294 			/*
3295 			 * Blobs with active.num_pages == 0 are deleted blobs.
3296 			 *  these blobs are removed from the blob_store list
3297 			 *  when the deletion process starts - so don't try to
3298 			 *  remove them again.
3299 			 */
3300 			if (blob->active.num_pages > 0) {
3301 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3302 			}
3303 			_spdk_blob_free(blob);
3304 		}
3305 	}
3306 
3307 	spdk_bs_sequence_finish(seq, bserrno);
3308 }
3309 
3310 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg)
3311 {
3312 	struct spdk_bs_cpl	cpl;
3313 	struct spdk_blob_data	*blob;
3314 	spdk_bs_sequence_t	*seq;
3315 
3316 	assert(b != NULL);
3317 	blob = __blob_to_data(b);
3318 	assert(blob != NULL);
3319 	assert(spdk_get_thread() == blob->bs->md_thread);
3320 
3321 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
3322 
3323 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3324 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3325 
3326 	if (blob->open_ref == 0) {
3327 		cb_fn(cb_arg, -EBADF);
3328 		return;
3329 	}
3330 
3331 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3332 	cpl.u.blob_basic.cb_fn = cb_fn;
3333 	cpl.u.blob_basic.cb_arg = cb_arg;
3334 
3335 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3336 	if (!seq) {
3337 		cb_fn(cb_arg, -ENOMEM);
3338 		return;
3339 	}
3340 
3341 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
3342 		_spdk_blob_close_cpl(seq, blob, 0);
3343 		return;
3344 	}
3345 
3346 	/* Sync metadata */
3347 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
3348 }
3349 
3350 /* END spdk_blob_close */
3351 
3352 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
3353 {
3354 	return spdk_get_io_channel(bs);
3355 }
3356 
3357 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
3358 {
3359 	spdk_put_io_channel(channel);
3360 }
3361 
3362 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3363 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3364 {
3365 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3366 				     SPDK_BLOB_UNMAP);
3367 }
3368 
3369 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3370 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3371 {
3372 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3373 				     SPDK_BLOB_WRITE_ZEROES);
3374 }
3375 
3376 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3377 			   void *payload, uint64_t offset, uint64_t length,
3378 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3379 {
3380 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3381 				     SPDK_BLOB_WRITE);
3382 }
3383 
3384 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3385 			  void *payload, uint64_t offset, uint64_t length,
3386 			  spdk_blob_op_complete cb_fn, void *cb_arg)
3387 {
3388 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3389 				     SPDK_BLOB_READ);
3390 }
3391 
3392 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3393 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3394 			    spdk_blob_op_complete cb_fn, void *cb_arg)
3395 {
3396 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
3397 }
3398 
3399 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3400 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3401 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3402 {
3403 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
3404 }
3405 
3406 struct spdk_bs_iter_ctx {
3407 	int64_t page_num;
3408 	struct spdk_blob_store *bs;
3409 
3410 	spdk_blob_op_with_handle_complete cb_fn;
3411 	void *cb_arg;
3412 };
3413 
3414 static void
3415 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
3416 {
3417 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3418 	struct spdk_blob_store *bs = ctx->bs;
3419 	spdk_blob_id id;
3420 
3421 	if (bserrno == 0) {
3422 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
3423 		free(ctx);
3424 		return;
3425 	}
3426 
3427 	ctx->page_num++;
3428 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
3429 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
3430 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
3431 		free(ctx);
3432 		return;
3433 	}
3434 
3435 	id = _spdk_bs_page_to_blobid(ctx->page_num);
3436 
3437 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
3438 }
3439 
3440 void
3441 spdk_bs_iter_first(struct spdk_blob_store *bs,
3442 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3443 {
3444 	struct spdk_bs_iter_ctx *ctx;
3445 
3446 	ctx = calloc(1, sizeof(*ctx));
3447 	if (!ctx) {
3448 		cb_fn(cb_arg, NULL, -ENOMEM);
3449 		return;
3450 	}
3451 
3452 	ctx->page_num = -1;
3453 	ctx->bs = bs;
3454 	ctx->cb_fn = cb_fn;
3455 	ctx->cb_arg = cb_arg;
3456 
3457 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3458 }
3459 
3460 static void
3461 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
3462 {
3463 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3464 
3465 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3466 }
3467 
3468 void
3469 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *b,
3470 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3471 {
3472 	struct spdk_bs_iter_ctx *ctx;
3473 	struct spdk_blob_data	*blob;
3474 
3475 	assert(b != NULL);
3476 	blob = __blob_to_data(b);
3477 	assert(blob != NULL);
3478 
3479 	ctx = calloc(1, sizeof(*ctx));
3480 	if (!ctx) {
3481 		cb_fn(cb_arg, NULL, -ENOMEM);
3482 		return;
3483 	}
3484 
3485 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
3486 	ctx->bs = bs;
3487 	ctx->cb_fn = cb_fn;
3488 	ctx->cb_arg = cb_arg;
3489 
3490 	/* Close the existing blob */
3491 	spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx);
3492 }
3493 
3494 int
3495 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value,
3496 		    uint16_t value_len)
3497 {
3498 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3499 	struct spdk_xattr 	*xattr;
3500 
3501 	assert(blob != NULL);
3502 
3503 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3504 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3505 
3506 	if (blob->md_ro) {
3507 		return -EPERM;
3508 	}
3509 
3510 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3511 		if (!strcmp(name, xattr->name)) {
3512 			free(xattr->value);
3513 			xattr->value_len = value_len;
3514 			xattr->value = malloc(value_len);
3515 			memcpy(xattr->value, value, value_len);
3516 
3517 			blob->state = SPDK_BLOB_STATE_DIRTY;
3518 
3519 			return 0;
3520 		}
3521 	}
3522 
3523 	xattr = calloc(1, sizeof(*xattr));
3524 	if (!xattr) {
3525 		return -1;
3526 	}
3527 	xattr->name = strdup(name);
3528 	xattr->value_len = value_len;
3529 	xattr->value = malloc(value_len);
3530 	memcpy(xattr->value, value, value_len);
3531 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
3532 
3533 	blob->state = SPDK_BLOB_STATE_DIRTY;
3534 
3535 	return 0;
3536 }
3537 
3538 int
3539 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name)
3540 {
3541 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3542 	struct spdk_xattr	*xattr;
3543 
3544 	assert(blob != NULL);
3545 
3546 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3547 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3548 
3549 	if (blob->md_ro) {
3550 		return -EPERM;
3551 	}
3552 
3553 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3554 		if (!strcmp(name, xattr->name)) {
3555 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
3556 			free(xattr->value);
3557 			free(xattr->name);
3558 			free(xattr);
3559 
3560 			blob->state = SPDK_BLOB_STATE_DIRTY;
3561 
3562 			return 0;
3563 		}
3564 	}
3565 
3566 	return -ENOENT;
3567 }
3568 
3569 int
3570 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name,
3571 			  const void **value, size_t *value_len)
3572 {
3573 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3574 	struct spdk_xattr	*xattr;
3575 
3576 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3577 		if (!strcmp(name, xattr->name)) {
3578 			*value = xattr->value;
3579 			*value_len = xattr->value_len;
3580 			return 0;
3581 		}
3582 	}
3583 
3584 	return -ENOENT;
3585 }
3586 
3587 struct spdk_xattr_names {
3588 	uint32_t	count;
3589 	const char	*names[0];
3590 };
3591 
3592 int
3593 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names)
3594 {
3595 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3596 	struct spdk_xattr	*xattr;
3597 	int			count = 0;
3598 
3599 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3600 		count++;
3601 	}
3602 
3603 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
3604 	if (*names == NULL) {
3605 		return -ENOMEM;
3606 	}
3607 
3608 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3609 		(*names)->names[(*names)->count++] = xattr->name;
3610 	}
3611 
3612 	return 0;
3613 }
3614 
3615 uint32_t
3616 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
3617 {
3618 	assert(names != NULL);
3619 
3620 	return names->count;
3621 }
3622 
3623 const char *
3624 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
3625 {
3626 	if (index >= names->count) {
3627 		return NULL;
3628 	}
3629 
3630 	return names->names[index];
3631 }
3632 
3633 void
3634 spdk_xattr_names_free(struct spdk_xattr_names *names)
3635 {
3636 	free(names);
3637 }
3638 
3639 struct spdk_bs_type
3640 spdk_bs_get_bstype(struct spdk_blob_store *bs)
3641 {
3642 	return bs->bstype;
3643 }
3644 
3645 void
3646 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
3647 {
3648 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
3649 }
3650 
3651 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
3652