xref: /spdk/lib/blob/blobstore.c (revision 4d36735401a968630e14533383f8d8b8fd610b3a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
53 void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
54 		uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg);
55 
56 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
57 				uint16_t value_len, bool internal);
58 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
59 				      const void **value, size_t *value_len, bool internal);
60 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
61 
62 static void
63 _spdk_blob_verify_md_op(struct spdk_blob *blob)
64 {
65 	assert(blob != NULL);
66 	assert(spdk_get_thread() == blob->bs->md_thread);
67 	assert(blob->state != SPDK_BLOB_STATE_LOADING);
68 }
69 
70 static inline size_t
71 divide_round_up(size_t num, size_t divisor)
72 {
73 	return (num + divisor - 1) / divisor;
74 }
75 
76 static void
77 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
78 {
79 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
80 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
81 	assert(bs->num_free_clusters > 0);
82 
83 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
84 
85 	spdk_bit_array_set(bs->used_clusters, cluster_num);
86 	bs->num_free_clusters--;
87 }
88 
89 static int
90 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster)
91 {
92 	uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
93 
94 	_spdk_blob_verify_md_op(blob);
95 
96 	if (*cluster_lba != 0) {
97 		return -EEXIST;
98 	}
99 
100 	*cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
101 	return 0;
102 }
103 
104 static int
105 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
106 			  uint64_t *lowest_free_cluster, bool update_map)
107 {
108 	pthread_mutex_lock(&blob->bs->used_clusters_mutex);
109 	*lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
110 			       *lowest_free_cluster);
111 	if (*lowest_free_cluster >= blob->bs->total_clusters) {
112 		/* No more free clusters. Cannot satisfy the request */
113 		pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
114 		return -ENOSPC;
115 	}
116 
117 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
118 	_spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
119 	pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
120 
121 	if (update_map) {
122 		_spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
123 	}
124 
125 	return 0;
126 }
127 
128 static void
129 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
130 {
131 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
132 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
133 	assert(bs->num_free_clusters < bs->total_clusters);
134 
135 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
136 
137 	pthread_mutex_lock(&bs->used_clusters_mutex);
138 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
139 	bs->num_free_clusters++;
140 	pthread_mutex_unlock(&bs->used_clusters_mutex);
141 }
142 
143 static void
144 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs)
145 {
146 	xattrs->count = 0;
147 	xattrs->names = NULL;
148 	xattrs->ctx = NULL;
149 	xattrs->get_value = NULL;
150 }
151 
152 void
153 spdk_blob_opts_init(struct spdk_blob_opts *opts)
154 {
155 	opts->num_clusters = 0;
156 	opts->thin_provision = false;
157 	_spdk_blob_xattrs_init(&opts->xattrs);
158 }
159 
160 static struct spdk_blob *
161 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
162 {
163 	struct spdk_blob *blob;
164 
165 	blob = calloc(1, sizeof(*blob));
166 	if (!blob) {
167 		return NULL;
168 	}
169 
170 	blob->id = id;
171 	blob->bs = bs;
172 
173 	blob->state = SPDK_BLOB_STATE_DIRTY;
174 	blob->active.num_pages = 1;
175 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
176 	if (!blob->active.pages) {
177 		free(blob);
178 		return NULL;
179 	}
180 
181 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
182 
183 	TAILQ_INIT(&blob->xattrs);
184 	TAILQ_INIT(&blob->xattrs_internal);
185 
186 	return blob;
187 }
188 
189 static void
190 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
191 {
192 	struct spdk_xattr	*xattr, *xattr_tmp;
193 
194 	TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) {
195 		TAILQ_REMOVE(xattrs, xattr, link);
196 		free(xattr->name);
197 		free(xattr->value);
198 		free(xattr);
199 	}
200 }
201 
202 static void
203 _spdk_blob_free(struct spdk_blob *blob)
204 {
205 	assert(blob != NULL);
206 
207 	free(blob->active.clusters);
208 	free(blob->clean.clusters);
209 	free(blob->active.pages);
210 	free(blob->clean.pages);
211 
212 	_spdk_xattrs_free(&blob->xattrs);
213 	_spdk_xattrs_free(&blob->xattrs_internal);
214 
215 	if (blob->back_bs_dev) {
216 		blob->back_bs_dev->destroy(blob->back_bs_dev);
217 	}
218 
219 	free(blob);
220 }
221 
222 static int
223 _spdk_blob_mark_clean(struct spdk_blob *blob)
224 {
225 	uint64_t *clusters = NULL;
226 	uint32_t *pages = NULL;
227 
228 	assert(blob != NULL);
229 
230 	if (blob->active.num_clusters) {
231 		assert(blob->active.clusters);
232 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
233 		if (!clusters) {
234 			return -1;
235 		}
236 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
237 	}
238 
239 	if (blob->active.num_pages) {
240 		assert(blob->active.pages);
241 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
242 		if (!pages) {
243 			free(clusters);
244 			return -1;
245 		}
246 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
247 	}
248 
249 	free(blob->clean.clusters);
250 	free(blob->clean.pages);
251 
252 	blob->clean.num_clusters = blob->active.num_clusters;
253 	blob->clean.clusters = blob->active.clusters;
254 	blob->clean.num_pages = blob->active.num_pages;
255 	blob->clean.pages = blob->active.pages;
256 
257 	blob->active.clusters = clusters;
258 	blob->active.pages = pages;
259 
260 	/* If the metadata was dirtied again while the metadata was being written to disk,
261 	 *  we do not want to revert the DIRTY state back to CLEAN here.
262 	 */
263 	if (blob->state == SPDK_BLOB_STATE_LOADING) {
264 		blob->state = SPDK_BLOB_STATE_CLEAN;
265 	}
266 
267 	return 0;
268 }
269 
270 static int
271 _spdk_blob_deserialize_xattr(struct spdk_blob *blob,
272 			     struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal)
273 {
274 	struct spdk_xattr                       *xattr;
275 
276 	if (desc_xattr->length != sizeof(desc_xattr->name_length) +
277 	    sizeof(desc_xattr->value_length) +
278 	    desc_xattr->name_length + desc_xattr->value_length) {
279 		return -EINVAL;
280 	}
281 
282 	xattr = calloc(1, sizeof(*xattr));
283 	if (xattr == NULL) {
284 		return -ENOMEM;
285 	}
286 
287 	xattr->name = malloc(desc_xattr->name_length + 1);
288 	if (xattr->name == NULL) {
289 		free(xattr);
290 		return -ENOMEM;
291 	}
292 	strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
293 	xattr->name[desc_xattr->name_length] = '\0';
294 
295 	xattr->value = malloc(desc_xattr->value_length);
296 	if (xattr->value == NULL) {
297 		free(xattr->name);
298 		free(xattr);
299 		return -ENOMEM;
300 	}
301 	xattr->value_len = desc_xattr->value_length;
302 	memcpy(xattr->value,
303 	       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
304 	       desc_xattr->value_length);
305 
306 	TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link);
307 
308 	return 0;
309 }
310 
311 
312 static int
313 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
314 {
315 	struct spdk_blob_md_descriptor *desc;
316 	size_t	cur_desc = 0;
317 	void *tmp;
318 
319 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
320 	while (cur_desc < sizeof(page->descriptors)) {
321 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
322 			if (desc->length == 0) {
323 				/* If padding and length are 0, this terminates the page */
324 				break;
325 			}
326 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
327 			struct spdk_blob_md_descriptor_flags	*desc_flags;
328 
329 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
330 
331 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
332 				return -EINVAL;
333 			}
334 
335 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
336 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
337 				return -EINVAL;
338 			}
339 
340 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
341 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
342 				blob->data_ro = true;
343 				blob->md_ro = true;
344 			}
345 
346 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
347 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
348 				blob->md_ro = true;
349 			}
350 
351 			if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
352 				blob->data_ro = true;
353 				blob->md_ro = true;
354 			}
355 
356 			blob->invalid_flags = desc_flags->invalid_flags;
357 			blob->data_ro_flags = desc_flags->data_ro_flags;
358 			blob->md_ro_flags = desc_flags->md_ro_flags;
359 
360 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
361 			struct spdk_blob_md_descriptor_extent	*desc_extent;
362 			unsigned int				i, j;
363 			unsigned int				cluster_count = blob->active.num_clusters;
364 
365 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
366 
367 			if (desc_extent->length == 0 ||
368 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
369 				return -EINVAL;
370 			}
371 
372 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
373 				for (j = 0; j < desc_extent->extents[i].length; j++) {
374 					if (!spdk_bit_array_get(blob->bs->used_clusters,
375 								desc_extent->extents[i].cluster_idx + j)) {
376 						return -EINVAL;
377 					}
378 					cluster_count++;
379 				}
380 			}
381 
382 			if (cluster_count == 0) {
383 				return -EINVAL;
384 			}
385 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
386 			if (tmp == NULL) {
387 				return -ENOMEM;
388 			}
389 			blob->active.clusters = tmp;
390 			blob->active.cluster_array_size = cluster_count;
391 
392 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
393 				for (j = 0; j < desc_extent->extents[i].length; j++) {
394 					if (desc_extent->extents[i].cluster_idx != 0) {
395 						blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
396 								desc_extent->extents[i].cluster_idx + j);
397 					} else if (spdk_blob_is_thin_provisioned(blob)) {
398 						blob->active.clusters[blob->active.num_clusters++] = 0;
399 					} else {
400 						return -EINVAL;
401 					}
402 				}
403 			}
404 
405 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
406 			int rc;
407 
408 			rc = _spdk_blob_deserialize_xattr(blob,
409 							  (struct spdk_blob_md_descriptor_xattr *) desc, false);
410 			if (rc != 0) {
411 				return rc;
412 			}
413 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
414 			int rc;
415 
416 			rc = _spdk_blob_deserialize_xattr(blob,
417 							  (struct spdk_blob_md_descriptor_xattr *) desc, true);
418 			if (rc != 0) {
419 				return rc;
420 			}
421 		} else {
422 			/* Unrecognized descriptor type.  Do not fail - just continue to the
423 			 *  next descriptor.  If this descriptor is associated with some feature
424 			 *  defined in a newer version of blobstore, that version of blobstore
425 			 *  should create and set an associated feature flag to specify if this
426 			 *  blob can be loaded or not.
427 			 */
428 		}
429 
430 		/* Advance to the next descriptor */
431 		cur_desc += sizeof(*desc) + desc->length;
432 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
433 			break;
434 		}
435 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
436 	}
437 
438 	return 0;
439 }
440 
441 static int
442 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
443 		 struct spdk_blob *blob)
444 {
445 	const struct spdk_blob_md_page *page;
446 	uint32_t i;
447 	int rc;
448 
449 	assert(page_count > 0);
450 	assert(pages[0].sequence_num == 0);
451 	assert(blob != NULL);
452 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
453 	assert(blob->active.clusters == NULL);
454 
455 	/* The blobid provided doesn't match what's in the MD, this can
456 	 * happen for example if a bogus blobid is passed in through open.
457 	 */
458 	if (blob->id != pages[0].id) {
459 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
460 			    blob->id, pages[0].id);
461 		return -ENOENT;
462 	}
463 
464 	for (i = 0; i < page_count; i++) {
465 		page = &pages[i];
466 
467 		assert(page->id == blob->id);
468 		assert(page->sequence_num == i);
469 
470 		rc = _spdk_blob_parse_page(page, blob);
471 		if (rc != 0) {
472 			return rc;
473 		}
474 	}
475 
476 	return 0;
477 }
478 
479 static int
480 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
481 			      struct spdk_blob_md_page **pages,
482 			      uint32_t *page_count,
483 			      struct spdk_blob_md_page **last_page)
484 {
485 	struct spdk_blob_md_page *page;
486 
487 	assert(pages != NULL);
488 	assert(page_count != NULL);
489 
490 	if (*page_count == 0) {
491 		assert(*pages == NULL);
492 		*page_count = 1;
493 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
494 					 SPDK_BS_PAGE_SIZE,
495 					 NULL);
496 	} else {
497 		assert(*pages != NULL);
498 		(*page_count)++;
499 		*pages = spdk_dma_realloc(*pages,
500 					  SPDK_BS_PAGE_SIZE * (*page_count),
501 					  SPDK_BS_PAGE_SIZE,
502 					  NULL);
503 	}
504 
505 	if (*pages == NULL) {
506 		*page_count = 0;
507 		*last_page = NULL;
508 		return -ENOMEM;
509 	}
510 
511 	page = &(*pages)[*page_count - 1];
512 	memset(page, 0, sizeof(*page));
513 	page->id = blob->id;
514 	page->sequence_num = *page_count - 1;
515 	page->next = SPDK_INVALID_MD_PAGE;
516 	*last_page = page;
517 
518 	return 0;
519 }
520 
521 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
522  * Update required_sz on both success and failure.
523  *
524  */
525 static int
526 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
527 			   uint8_t *buf, size_t buf_sz,
528 			   size_t *required_sz, bool internal)
529 {
530 	struct spdk_blob_md_descriptor_xattr	*desc;
531 
532 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
533 		       strlen(xattr->name) +
534 		       xattr->value_len;
535 
536 	if (buf_sz < *required_sz) {
537 		return -1;
538 	}
539 
540 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
541 
542 	desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR;
543 	desc->length = sizeof(desc->name_length) +
544 		       sizeof(desc->value_length) +
545 		       strlen(xattr->name) +
546 		       xattr->value_len;
547 	desc->name_length = strlen(xattr->name);
548 	desc->value_length = xattr->value_len;
549 
550 	memcpy(desc->name, xattr->name, desc->name_length);
551 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
552 	       xattr->value,
553 	       desc->value_length);
554 
555 	return 0;
556 }
557 
558 static void
559 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
560 			    uint64_t start_cluster, uint64_t *next_cluster,
561 			    uint8_t *buf, size_t buf_sz)
562 {
563 	struct spdk_blob_md_descriptor_extent *desc;
564 	size_t cur_sz;
565 	uint64_t i, extent_idx;
566 	uint32_t lba, lba_per_cluster, lba_count;
567 
568 	/* The buffer must have room for at least one extent */
569 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
570 	if (buf_sz < cur_sz) {
571 		*next_cluster = start_cluster;
572 		return;
573 	}
574 
575 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
576 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
577 
578 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
579 
580 	lba = blob->active.clusters[start_cluster];
581 	lba_count = lba_per_cluster;
582 	extent_idx = 0;
583 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
584 		if ((lba + lba_count) == blob->active.clusters[i]) {
585 			lba_count += lba_per_cluster;
586 			continue;
587 		}
588 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
589 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
590 		extent_idx++;
591 
592 		cur_sz += sizeof(desc->extents[extent_idx]);
593 
594 		if (buf_sz < cur_sz) {
595 			/* If we ran out of buffer space, return */
596 			desc->length = sizeof(desc->extents[0]) * extent_idx;
597 			*next_cluster = i;
598 			return;
599 		}
600 
601 		lba = blob->active.clusters[i];
602 		lba_count = lba_per_cluster;
603 	}
604 
605 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
606 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
607 	extent_idx++;
608 
609 	desc->length = sizeof(desc->extents[0]) * extent_idx;
610 	*next_cluster = blob->active.num_clusters;
611 
612 	return;
613 }
614 
615 static void
616 _spdk_blob_serialize_flags(const struct spdk_blob *blob,
617 			   uint8_t *buf, size_t *buf_sz)
618 {
619 	struct spdk_blob_md_descriptor_flags *desc;
620 
621 	/*
622 	 * Flags get serialized first, so we should always have room for the flags
623 	 *  descriptor.
624 	 */
625 	assert(*buf_sz >= sizeof(*desc));
626 
627 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
628 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
629 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
630 	desc->invalid_flags = blob->invalid_flags;
631 	desc->data_ro_flags = blob->data_ro_flags;
632 	desc->md_ro_flags = blob->md_ro_flags;
633 
634 	*buf_sz -= sizeof(*desc);
635 }
636 
637 static int
638 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob,
639 			    const struct spdk_xattr_tailq *xattrs, bool internal,
640 			    struct spdk_blob_md_page **pages,
641 			    struct spdk_blob_md_page *cur_page,
642 			    uint32_t *page_count, uint8_t **buf,
643 			    size_t *remaining_sz)
644 {
645 	const struct spdk_xattr	*xattr;
646 	int	rc;
647 
648 	TAILQ_FOREACH(xattr, xattrs, link) {
649 		size_t required_sz = 0;
650 
651 		rc = _spdk_blob_serialize_xattr(xattr,
652 						*buf, *remaining_sz,
653 						&required_sz, internal);
654 		if (rc < 0) {
655 			/* Need to add a new page to the chain */
656 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
657 							   &cur_page);
658 			if (rc < 0) {
659 				spdk_dma_free(*pages);
660 				*pages = NULL;
661 				*page_count = 0;
662 				return rc;
663 			}
664 
665 			*buf = (uint8_t *)cur_page->descriptors;
666 			*remaining_sz = sizeof(cur_page->descriptors);
667 
668 			/* Try again */
669 			required_sz = 0;
670 			rc = _spdk_blob_serialize_xattr(xattr,
671 							*buf, *remaining_sz,
672 							&required_sz, internal);
673 
674 			if (rc < 0) {
675 				spdk_dma_free(*pages);
676 				*pages = NULL;
677 				*page_count = 0;
678 				return -1;
679 			}
680 		}
681 
682 		*remaining_sz -= required_sz;
683 		*buf += required_sz;
684 	}
685 
686 	return 0;
687 }
688 
689 static int
690 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
691 		     uint32_t *page_count)
692 {
693 	struct spdk_blob_md_page		*cur_page;
694 	int					rc;
695 	uint8_t					*buf;
696 	size_t					remaining_sz;
697 	uint64_t				last_cluster;
698 
699 	assert(pages != NULL);
700 	assert(page_count != NULL);
701 	assert(blob != NULL);
702 	assert(blob->state == SPDK_BLOB_STATE_DIRTY);
703 
704 	*pages = NULL;
705 	*page_count = 0;
706 
707 	/* A blob always has at least 1 page, even if it has no descriptors */
708 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
709 	if (rc < 0) {
710 		return rc;
711 	}
712 
713 	buf = (uint8_t *)cur_page->descriptors;
714 	remaining_sz = sizeof(cur_page->descriptors);
715 
716 	/* Serialize flags */
717 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
718 	buf += sizeof(struct spdk_blob_md_descriptor_flags);
719 
720 	/* Serialize xattrs */
721 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false,
722 					 pages, cur_page, page_count, &buf, &remaining_sz);
723 	if (rc < 0) {
724 		return rc;
725 	}
726 
727 	/* Serialize internal xattrs */
728 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true,
729 					 pages, cur_page, page_count, &buf, &remaining_sz);
730 	if (rc < 0) {
731 		return rc;
732 	}
733 
734 	/* Serialize extents */
735 	last_cluster = 0;
736 	while (last_cluster < blob->active.num_clusters) {
737 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
738 					    buf, remaining_sz);
739 
740 		if (last_cluster == blob->active.num_clusters) {
741 			break;
742 		}
743 
744 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
745 						   &cur_page);
746 		if (rc < 0) {
747 			return rc;
748 		}
749 
750 		buf = (uint8_t *)cur_page->descriptors;
751 		remaining_sz = sizeof(cur_page->descriptors);
752 	}
753 
754 	return 0;
755 }
756 
757 struct spdk_blob_load_ctx {
758 	struct spdk_blob		*blob;
759 
760 	struct spdk_blob_md_page	*pages;
761 	uint32_t			num_pages;
762 	spdk_bs_sequence_t	        *seq;
763 
764 	spdk_bs_sequence_cpl		cb_fn;
765 	void				*cb_arg;
766 };
767 
768 static uint32_t
769 _spdk_blob_md_page_calc_crc(void *page)
770 {
771 	uint32_t		crc;
772 
773 	crc = BLOB_CRC32C_INITIAL;
774 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
775 	crc ^= BLOB_CRC32C_INITIAL;
776 
777 	return crc;
778 
779 }
780 
781 static void
782 _spdk_blob_load_final(void *cb_arg, int bserrno)
783 {
784 	struct spdk_blob_load_ctx	*ctx = cb_arg;
785 	struct spdk_blob		*blob = ctx->blob;
786 
787 	_spdk_blob_mark_clean(blob);
788 
789 	ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno);
790 
791 	/* Free the memory */
792 	spdk_dma_free(ctx->pages);
793 	free(ctx);
794 }
795 
796 static void
797 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno)
798 {
799 	struct spdk_blob_load_ctx	*ctx = cb_arg;
800 	struct spdk_blob		*blob = ctx->blob;
801 
802 	if (bserrno != 0) {
803 		goto error;
804 	}
805 
806 	blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot);
807 
808 	if (blob->back_bs_dev == NULL) {
809 		bserrno = -ENOMEM;
810 		goto error;
811 	}
812 
813 	_spdk_blob_load_final(ctx, bserrno);
814 	return;
815 
816 error:
817 	SPDK_ERRLOG("Snapshot fail\n");
818 	_spdk_blob_free(blob);
819 	ctx->cb_fn(ctx->seq, NULL, bserrno);
820 	spdk_dma_free(ctx->pages);
821 	free(ctx);
822 }
823 
824 static void
825 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
826 {
827 	struct spdk_blob_load_ctx	*ctx = cb_arg;
828 	struct spdk_blob		*blob = ctx->blob;
829 	struct spdk_blob_md_page	*page;
830 	const void			*value;
831 	size_t				len;
832 	int				rc;
833 	uint32_t			crc;
834 
835 	page = &ctx->pages[ctx->num_pages - 1];
836 	crc = _spdk_blob_md_page_calc_crc(page);
837 	if (crc != page->crc) {
838 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
839 		_spdk_blob_free(blob);
840 		ctx->cb_fn(seq, NULL, -EINVAL);
841 		spdk_dma_free(ctx->pages);
842 		free(ctx);
843 		return;
844 	}
845 
846 	if (page->next != SPDK_INVALID_MD_PAGE) {
847 		uint32_t next_page = page->next;
848 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
849 
850 
851 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
852 
853 		/* Read the next page */
854 		ctx->num_pages++;
855 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
856 					      sizeof(*page), NULL);
857 		if (ctx->pages == NULL) {
858 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
859 			free(ctx);
860 			return;
861 		}
862 
863 		spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
864 					  next_lba,
865 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
866 					  _spdk_blob_load_cpl, ctx);
867 		return;
868 	}
869 
870 	/* Parse the pages */
871 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
872 	if (rc) {
873 		_spdk_blob_free(blob);
874 		ctx->cb_fn(seq, NULL, rc);
875 		spdk_dma_free(ctx->pages);
876 		free(ctx);
877 		return;
878 	}
879 	ctx->seq = seq;
880 
881 
882 	if (spdk_blob_is_thin_provisioned(blob)) {
883 		rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true);
884 		if (rc == 0) {
885 			if (len != sizeof(spdk_blob_id)) {
886 				_spdk_blob_free(blob);
887 				ctx->cb_fn(seq, NULL, -EINVAL);
888 				spdk_dma_free(ctx->pages);
889 				free(ctx);
890 				return;
891 			}
892 			/* open snapshot blob and continue in the callback function */
893 			spdk_bs_open_blob(blob->bs, *(spdk_blob_id *)value,
894 					  _spdk_blob_load_snapshot_cpl, ctx);
895 			return;
896 		} else {
897 			/* add zeroes_dev for thin provisioned blob */
898 			blob->back_bs_dev = spdk_bs_create_zeroes_dev();
899 		}
900 	} else {
901 		/* standard blob */
902 		blob->back_bs_dev = NULL;
903 	}
904 	_spdk_blob_load_final(ctx, bserrno);
905 }
906 
907 /* Load a blob from disk given a blobid */
908 static void
909 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
910 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
911 {
912 	struct spdk_blob_load_ctx *ctx;
913 	struct spdk_blob_store *bs;
914 	uint32_t page_num;
915 	uint64_t lba;
916 
917 	_spdk_blob_verify_md_op(blob);
918 
919 	bs = blob->bs;
920 
921 	ctx = calloc(1, sizeof(*ctx));
922 	if (!ctx) {
923 		cb_fn(seq, cb_arg, -ENOMEM);
924 		return;
925 	}
926 
927 	ctx->blob = blob;
928 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
929 				      SPDK_BS_PAGE_SIZE, NULL);
930 	if (!ctx->pages) {
931 		free(ctx);
932 		cb_fn(seq, cb_arg, -ENOMEM);
933 		return;
934 	}
935 	ctx->num_pages = 1;
936 	ctx->cb_fn = cb_fn;
937 	ctx->cb_arg = cb_arg;
938 
939 	page_num = _spdk_bs_blobid_to_page(blob->id);
940 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
941 
942 	blob->state = SPDK_BLOB_STATE_LOADING;
943 
944 	spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
945 				  _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
946 				  _spdk_blob_load_cpl, ctx);
947 }
948 
949 struct spdk_blob_persist_ctx {
950 	struct spdk_blob		*blob;
951 
952 	struct spdk_blob_md_page	*pages;
953 
954 	uint64_t			idx;
955 
956 	spdk_bs_sequence_t		*seq;
957 	spdk_bs_sequence_cpl		cb_fn;
958 	void				*cb_arg;
959 };
960 
961 static void
962 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
963 {
964 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
965 	struct spdk_blob		*blob = ctx->blob;
966 
967 	if (bserrno == 0) {
968 		_spdk_blob_mark_clean(blob);
969 	}
970 
971 	/* Call user callback */
972 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
973 
974 	/* Free the memory */
975 	spdk_dma_free(ctx->pages);
976 	free(ctx);
977 }
978 
979 static void
980 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
981 {
982 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
983 	struct spdk_blob		*blob = ctx->blob;
984 	struct spdk_blob_store		*bs = blob->bs;
985 	void				*tmp;
986 	size_t				i;
987 
988 	/* Release all clusters that were truncated */
989 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
990 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
991 
992 		/* Nothing to release if it was not allocated */
993 		if (blob->active.clusters[i] != 0) {
994 			_spdk_bs_release_cluster(bs, cluster_num);
995 		}
996 	}
997 
998 	if (blob->active.num_clusters == 0) {
999 		free(blob->active.clusters);
1000 		blob->active.clusters = NULL;
1001 		blob->active.cluster_array_size = 0;
1002 	} else {
1003 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
1004 		assert(tmp != NULL);
1005 		blob->active.clusters = tmp;
1006 		blob->active.cluster_array_size = blob->active.num_clusters;
1007 	}
1008 
1009 	_spdk_blob_persist_complete(seq, ctx, bserrno);
1010 }
1011 
1012 static void
1013 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1014 {
1015 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1016 	struct spdk_blob		*blob = ctx->blob;
1017 	struct spdk_blob_store		*bs = blob->bs;
1018 	spdk_bs_batch_t			*batch;
1019 	size_t				i;
1020 	uint64_t			lba;
1021 	uint32_t			lba_count;
1022 
1023 	/* Clusters don't move around in blobs. The list shrinks or grows
1024 	 * at the end, but no changes ever occur in the middle of the list.
1025 	 */
1026 
1027 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
1028 
1029 	/* Unmap all clusters that were truncated */
1030 	lba = 0;
1031 	lba_count = 0;
1032 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1033 		uint64_t next_lba = blob->active.clusters[i];
1034 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
1035 
1036 		if (next_lba > 0 && (lba + lba_count) == next_lba) {
1037 			/* This cluster is contiguous with the previous one. */
1038 			lba_count += next_lba_count;
1039 			continue;
1040 		}
1041 
1042 		/* This cluster is not contiguous with the previous one. */
1043 
1044 		/* If a run of LBAs previously existing, send them
1045 		 * as an unmap.
1046 		 */
1047 		if (lba_count > 0) {
1048 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1049 		}
1050 
1051 		/* Start building the next batch */
1052 		lba = next_lba;
1053 		if (next_lba > 0) {
1054 			lba_count = next_lba_count;
1055 		} else {
1056 			lba_count = 0;
1057 		}
1058 	}
1059 
1060 	/* If we ended with a contiguous set of LBAs, send the unmap now */
1061 	if (lba_count > 0) {
1062 		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1063 	}
1064 
1065 	spdk_bs_batch_close(batch);
1066 }
1067 
1068 static void
1069 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1070 {
1071 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1072 	struct spdk_blob		*blob = ctx->blob;
1073 	struct spdk_blob_store		*bs = blob->bs;
1074 	size_t				i;
1075 
1076 	/* This loop starts at 1 because the first page is special and handled
1077 	 * below. The pages (except the first) are never written in place,
1078 	 * so any pages in the clean list must be zeroed.
1079 	 */
1080 	for (i = 1; i < blob->clean.num_pages; i++) {
1081 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
1082 	}
1083 
1084 	if (blob->active.num_pages == 0) {
1085 		uint32_t page_num;
1086 
1087 		page_num = _spdk_bs_blobid_to_page(blob->id);
1088 		spdk_bit_array_clear(bs->used_md_pages, page_num);
1089 	}
1090 
1091 	/* Move on to unmapping clusters */
1092 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
1093 }
1094 
1095 static void
1096 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1097 {
1098 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1099 	struct spdk_blob		*blob = ctx->blob;
1100 	struct spdk_blob_store		*bs = blob->bs;
1101 	uint64_t			lba;
1102 	uint32_t			lba_count;
1103 	spdk_bs_batch_t			*batch;
1104 	size_t				i;
1105 
1106 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
1107 
1108 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
1109 
1110 	/* This loop starts at 1 because the first page is special and handled
1111 	 * below. The pages (except the first) are never written in place,
1112 	 * so any pages in the clean list must be zeroed.
1113 	 */
1114 	for (i = 1; i < blob->clean.num_pages; i++) {
1115 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
1116 
1117 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1118 	}
1119 
1120 	/* The first page will only be zeroed if this is a delete. */
1121 	if (blob->active.num_pages == 0) {
1122 		uint32_t page_num;
1123 
1124 		/* The first page in the metadata goes where the blobid indicates */
1125 		page_num = _spdk_bs_blobid_to_page(blob->id);
1126 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
1127 
1128 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1129 	}
1130 
1131 	spdk_bs_batch_close(batch);
1132 }
1133 
1134 static void
1135 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1136 {
1137 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1138 	struct spdk_blob		*blob = ctx->blob;
1139 	struct spdk_blob_store		*bs = blob->bs;
1140 	uint64_t			lba;
1141 	uint32_t			lba_count;
1142 	struct spdk_blob_md_page	*page;
1143 
1144 	if (blob->active.num_pages == 0) {
1145 		/* Move on to the next step */
1146 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1147 		return;
1148 	}
1149 
1150 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1151 
1152 	page = &ctx->pages[0];
1153 	/* The first page in the metadata goes where the blobid indicates */
1154 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
1155 
1156 	spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1157 				   _spdk_blob_persist_zero_pages, ctx);
1158 }
1159 
1160 static void
1161 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1162 {
1163 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1164 	struct spdk_blob		*blob = ctx->blob;
1165 	struct spdk_blob_store		*bs = blob->bs;
1166 	uint64_t			lba;
1167 	uint32_t			lba_count;
1168 	struct spdk_blob_md_page	*page;
1169 	spdk_bs_batch_t			*batch;
1170 	size_t				i;
1171 
1172 	/* Clusters don't move around in blobs. The list shrinks or grows
1173 	 * at the end, but no changes ever occur in the middle of the list.
1174 	 */
1175 
1176 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1177 
1178 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1179 
1180 	/* This starts at 1. The root page is not written until
1181 	 * all of the others are finished
1182 	 */
1183 	for (i = 1; i < blob->active.num_pages; i++) {
1184 		page = &ctx->pages[i];
1185 		assert(page->sequence_num == i);
1186 
1187 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
1188 
1189 		spdk_bs_batch_write_dev(batch, page, lba, lba_count);
1190 	}
1191 
1192 	spdk_bs_batch_close(batch);
1193 }
1194 
1195 static int
1196 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz)
1197 {
1198 	uint64_t	i;
1199 	uint64_t	*tmp;
1200 	uint64_t	lfc; /* lowest free cluster */
1201 	uint64_t	num_clusters;
1202 	struct spdk_blob_store *bs;
1203 
1204 	bs = blob->bs;
1205 
1206 	_spdk_blob_verify_md_op(blob);
1207 
1208 	if (blob->active.num_clusters == sz) {
1209 		return 0;
1210 	}
1211 
1212 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1213 		/* If this blob was resized to be larger, then smaller, then
1214 		 * larger without syncing, then the cluster array already
1215 		 * contains spare assigned clusters we can use.
1216 		 */
1217 		num_clusters = spdk_min(blob->active.cluster_array_size,
1218 					sz);
1219 	} else {
1220 		num_clusters = blob->active.num_clusters;
1221 	}
1222 
1223 	/* Do two passes - one to verify that we can obtain enough clusters
1224 	 * and another to actually claim them.
1225 	 */
1226 
1227 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1228 		lfc = 0;
1229 		for (i = num_clusters; i < sz; i++) {
1230 			lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1231 			if (lfc >= bs->total_clusters) {
1232 				/* No more free clusters. Cannot satisfy the request */
1233 				return -ENOSPC;
1234 			}
1235 			lfc++;
1236 		}
1237 	}
1238 
1239 	if (sz > num_clusters) {
1240 		/* Expand the cluster array if necessary.
1241 		 * We only shrink the array when persisting.
1242 		 */
1243 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1244 		if (sz > 0 && tmp == NULL) {
1245 			return -ENOMEM;
1246 		}
1247 		memset(tmp + blob->active.cluster_array_size, 0,
1248 		       sizeof(uint64_t) * (sz - blob->active.cluster_array_size));
1249 		blob->active.clusters = tmp;
1250 		blob->active.cluster_array_size = sz;
1251 	}
1252 
1253 	blob->state = SPDK_BLOB_STATE_DIRTY;
1254 
1255 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1256 		lfc = 0;
1257 		for (i = num_clusters; i < sz; i++) {
1258 			_spdk_bs_allocate_cluster(blob, i, &lfc, true);
1259 			lfc++;
1260 		}
1261 	}
1262 
1263 	blob->active.num_clusters = sz;
1264 
1265 	return 0;
1266 }
1267 
1268 static void
1269 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
1270 {
1271 	spdk_bs_sequence_t *seq = ctx->seq;
1272 	struct spdk_blob *blob = ctx->blob;
1273 	struct spdk_blob_store *bs = blob->bs;
1274 	uint64_t i;
1275 	uint32_t page_num;
1276 	int rc;
1277 
1278 	if (blob->active.num_pages == 0) {
1279 		/* This is the signal that the blob should be deleted.
1280 		 * Immediately jump to the clean up routine. */
1281 		assert(blob->clean.num_pages > 0);
1282 		ctx->idx = blob->clean.num_pages - 1;
1283 		blob->state = SPDK_BLOB_STATE_CLEAN;
1284 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1285 		return;
1286 
1287 	}
1288 
1289 	/* Generate the new metadata */
1290 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1291 	if (rc < 0) {
1292 		_spdk_blob_persist_complete(seq, ctx, rc);
1293 		return;
1294 	}
1295 
1296 	assert(blob->active.num_pages >= 1);
1297 
1298 	/* Resize the cache of page indices */
1299 	blob->active.pages = realloc(blob->active.pages,
1300 				     blob->active.num_pages * sizeof(*blob->active.pages));
1301 	if (!blob->active.pages) {
1302 		_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1303 		return;
1304 	}
1305 
1306 	/* Assign this metadata to pages. This requires two passes -
1307 	 * one to verify that there are enough pages and a second
1308 	 * to actually claim them. */
1309 	page_num = 0;
1310 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1311 	for (i = 1; i < blob->active.num_pages; i++) {
1312 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1313 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1314 			_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1315 			return;
1316 		}
1317 		page_num++;
1318 	}
1319 
1320 	page_num = 0;
1321 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1322 	for (i = 1; i < blob->active.num_pages; i++) {
1323 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1324 		ctx->pages[i - 1].next = page_num;
1325 		/* Now that previous metadata page is complete, calculate the crc for it. */
1326 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1327 		blob->active.pages[i] = page_num;
1328 		spdk_bit_array_set(bs->used_md_pages, page_num);
1329 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1330 		page_num++;
1331 	}
1332 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1333 	/* Start writing the metadata from last page to first */
1334 	ctx->idx = blob->active.num_pages - 1;
1335 	blob->state = SPDK_BLOB_STATE_CLEAN;
1336 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1337 }
1338 
1339 /* Write a blob to disk */
1340 static void
1341 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1342 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1343 {
1344 	struct spdk_blob_persist_ctx *ctx;
1345 
1346 	_spdk_blob_verify_md_op(blob);
1347 
1348 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1349 		cb_fn(seq, cb_arg, 0);
1350 		return;
1351 	}
1352 
1353 	ctx = calloc(1, sizeof(*ctx));
1354 	if (!ctx) {
1355 		cb_fn(seq, cb_arg, -ENOMEM);
1356 		return;
1357 	}
1358 	ctx->blob = blob;
1359 	ctx->seq = seq;
1360 	ctx->cb_fn = cb_fn;
1361 	ctx->cb_arg = cb_arg;
1362 
1363 	_spdk_blob_persist_start(ctx);
1364 }
1365 
1366 struct spdk_blob_copy_cluster_ctx {
1367 	struct spdk_blob *blob;
1368 	uint8_t *buf;
1369 	uint64_t page;
1370 	uint64_t new_cluster;
1371 	spdk_bs_sequence_t *seq;
1372 };
1373 
1374 static void
1375 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno)
1376 {
1377 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1378 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq;
1379 	TAILQ_HEAD(, spdk_bs_request_set) requests;
1380 	spdk_bs_user_op_t *op;
1381 
1382 	TAILQ_INIT(&requests);
1383 	TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link);
1384 
1385 	while (!TAILQ_EMPTY(&requests)) {
1386 		op = TAILQ_FIRST(&requests);
1387 		TAILQ_REMOVE(&requests, op, link);
1388 		if (bserrno == 0) {
1389 			spdk_bs_user_op_execute(op);
1390 		} else {
1391 			spdk_bs_user_op_abort(op);
1392 		}
1393 	}
1394 
1395 	spdk_dma_free(ctx->buf);
1396 	free(ctx);
1397 }
1398 
1399 static void
1400 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno)
1401 {
1402 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1403 
1404 	if (bserrno) {
1405 		uint32_t cluster_number;
1406 
1407 		if (bserrno == -EEXIST) {
1408 			/* The metadata insert failed because another thread
1409 			 * allocated the cluster first. Free our cluster
1410 			 * but continue without error. */
1411 			bserrno = 0;
1412 		}
1413 
1414 		cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1415 		_spdk_bs_release_cluster(ctx->blob->bs, cluster_number);
1416 	}
1417 
1418 	spdk_bs_sequence_finish(ctx->seq, bserrno);
1419 }
1420 
1421 static void
1422 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1423 {
1424 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1425 	uint32_t cluster_number;
1426 
1427 	if (bserrno) {
1428 		/* The write failed, so jump to the final completion handler */
1429 		spdk_bs_sequence_finish(seq, bserrno);
1430 		return;
1431 	}
1432 
1433 	cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1434 
1435 	_spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1436 					       _spdk_blob_insert_cluster_cpl, ctx);
1437 }
1438 
1439 static void
1440 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1441 {
1442 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1443 
1444 	if (bserrno != 0) {
1445 		/* The read failed, so jump to the final completion handler */
1446 		spdk_bs_sequence_finish(seq, bserrno);
1447 		return;
1448 	}
1449 
1450 	/* Write whole cluster */
1451 	spdk_bs_sequence_write_dev(seq, ctx->buf,
1452 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster),
1453 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, 1),
1454 				   _spdk_blob_write_copy_cpl, ctx);
1455 }
1456 
1457 static void
1458 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob,
1459 				   struct spdk_io_channel *_ch,
1460 				   uint64_t offset, spdk_bs_user_op_t *op)
1461 {
1462 	struct spdk_bs_cpl cpl;
1463 	struct spdk_bs_channel *ch;
1464 	struct spdk_blob_copy_cluster_ctx *ctx;
1465 	uint32_t cluster_start_page;
1466 	uint32_t cluster_number;
1467 	int rc;
1468 
1469 	ch = spdk_io_channel_get_ctx(_ch);
1470 
1471 	if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) {
1472 		/* There are already operations pending. Queue this user op
1473 		 * and return because it will be re-executed when the outstanding
1474 		 * cluster allocation completes. */
1475 		TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1476 		return;
1477 	}
1478 
1479 	/* Round the page offset down to the first page in the cluster */
1480 	cluster_start_page = _spdk_bs_page_to_cluster_start(blob, offset);
1481 
1482 	/* Calculate which index in the metadata cluster array the corresponding
1483 	 * cluster is supposed to be at. */
1484 	cluster_number = _spdk_bs_page_to_cluster(blob->bs, cluster_start_page);
1485 
1486 	ctx = calloc(1, sizeof(*ctx));
1487 	if (!ctx) {
1488 		spdk_bs_user_op_abort(op);
1489 		return;
1490 	}
1491 
1492 	assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0);
1493 
1494 	ctx->blob = blob;
1495 	ctx->page = cluster_start_page;
1496 
1497 	ctx->buf = spdk_dma_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, NULL);
1498 	if (!ctx->buf) {
1499 		SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n",
1500 			    blob->bs->cluster_sz);
1501 		free(ctx);
1502 		spdk_bs_user_op_abort(op);
1503 		return;
1504 	}
1505 
1506 	rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false);
1507 	if (rc != 0) {
1508 		spdk_dma_free(ctx->buf);
1509 		free(ctx);
1510 		spdk_bs_user_op_abort(op);
1511 		return;
1512 	}
1513 
1514 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1515 	cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl;
1516 	cpl.u.blob_basic.cb_arg = ctx;
1517 
1518 	ctx->seq = spdk_bs_sequence_start(_ch, &cpl);
1519 	if (!ctx->seq) {
1520 		_spdk_bs_release_cluster(blob->bs, ctx->new_cluster);
1521 		spdk_dma_free(ctx->buf);
1522 		free(ctx);
1523 		spdk_bs_user_op_abort(op);
1524 		return;
1525 	}
1526 
1527 	/* Queue the user op to block other incoming operations */
1528 	TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1529 
1530 	/* Read cluster from backing device */
1531 	spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf,
1532 				     _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page),
1533 				     _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz),
1534 				     _spdk_blob_write_copy, ctx);
1535 }
1536 
1537 static void
1538 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t page, uint64_t length,
1539 				       uint64_t *lba,	uint32_t *lba_count)
1540 {
1541 	*lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1542 
1543 	if (!_spdk_bs_page_is_allocated(blob, page)) {
1544 		assert(blob->back_bs_dev != NULL);
1545 		*lba = _spdk_bs_dev_page_to_lba(blob->back_bs_dev, page);
1546 		*lba_count = _spdk_bs_blob_lba_to_back_dev_lba(blob, *lba_count);
1547 	} else {
1548 		*lba = _spdk_bs_blob_page_to_lba(blob, page);
1549 	}
1550 }
1551 
1552 static void
1553 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
1554 				   void *payload, uint64_t offset, uint64_t length,
1555 				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1556 {
1557 	spdk_bs_batch_t		*batch;
1558 	struct spdk_bs_cpl	cpl;
1559 	uint64_t		op_length;
1560 	uint8_t			*buf;
1561 
1562 	assert(blob != NULL);
1563 
1564 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1565 	cpl.u.blob_basic.cb_fn = cb_fn;
1566 	cpl.u.blob_basic.cb_arg = cb_arg;
1567 
1568 	batch = spdk_bs_batch_open(ch, &cpl);
1569 	if (!batch) {
1570 		cb_fn(cb_arg, -ENOMEM);
1571 		return;
1572 	}
1573 
1574 	buf = payload;
1575 	while (length > 0) {
1576 		op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset));
1577 
1578 		switch (op_type) {
1579 		case SPDK_BLOB_READ:
1580 			spdk_bs_batch_read_blob(batch, blob, buf, offset, op_length);
1581 			break;
1582 		case SPDK_BLOB_WRITE:
1583 			spdk_bs_batch_write_blob(batch, blob, buf, offset, op_length);
1584 			break;
1585 		case SPDK_BLOB_UNMAP:
1586 			spdk_bs_batch_unmap_blob(batch, blob, offset, op_length);
1587 			break;
1588 		case SPDK_BLOB_WRITE_ZEROES:
1589 			spdk_bs_batch_write_zeroes_blob(batch, blob, offset, op_length);
1590 			break;
1591 		case SPDK_BLOB_READV:
1592 		case SPDK_BLOB_WRITEV:
1593 			SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
1594 			break;
1595 		}
1596 
1597 		length -= op_length;
1598 		offset += op_length;
1599 		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1600 			buf += op_length * SPDK_BS_PAGE_SIZE;
1601 		}
1602 	}
1603 
1604 	spdk_bs_batch_close(batch);
1605 }
1606 
1607 static void
1608 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob,
1609 				    void *payload, uint64_t offset, uint64_t length,
1610 				    spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1611 {
1612 	struct spdk_bs_cpl cpl;
1613 	uint64_t lba;
1614 	uint32_t lba_count;
1615 
1616 	assert(blob != NULL);
1617 
1618 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1619 	cpl.u.blob_basic.cb_fn = cb_fn;
1620 	cpl.u.blob_basic.cb_arg = cb_arg;
1621 
1622 	_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1623 
1624 	switch (op_type) {
1625 	case SPDK_BLOB_READ: {
1626 		spdk_bs_batch_t *batch;
1627 
1628 		batch = spdk_bs_batch_open(_ch, &cpl);
1629 		if (!batch) {
1630 			cb_fn(cb_arg, -ENOMEM);
1631 			return;
1632 		}
1633 
1634 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1635 			/* Read from the blob */
1636 			spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
1637 		} else {
1638 			/* Read from the backing block device */
1639 			spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count);
1640 		}
1641 
1642 		spdk_bs_batch_close(batch);
1643 		break;
1644 	}
1645 	case SPDK_BLOB_WRITE:
1646 	case SPDK_BLOB_WRITE_ZEROES: {
1647 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1648 			/* Write to the blob */
1649 			spdk_bs_batch_t *batch;
1650 
1651 			batch = spdk_bs_batch_open(_ch, &cpl);
1652 			if (!batch) {
1653 				cb_fn(cb_arg, -ENOMEM);
1654 				return;
1655 			}
1656 
1657 			if (op_type == SPDK_BLOB_WRITE) {
1658 				spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
1659 			} else {
1660 				spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1661 			}
1662 
1663 			spdk_bs_batch_close(batch);
1664 		} else {
1665 			/* Queue this operation and allocate the cluster */
1666 			spdk_bs_user_op_t *op;
1667 
1668 			op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1669 			if (!op) {
1670 				cb_fn(cb_arg, -ENOMEM);
1671 				return;
1672 			}
1673 
1674 			_spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op);
1675 		}
1676 		break;
1677 	}
1678 	case SPDK_BLOB_UNMAP: {
1679 		spdk_bs_batch_t *batch;
1680 
1681 		batch = spdk_bs_batch_open(_ch, &cpl);
1682 		if (!batch) {
1683 			cb_fn(cb_arg, -ENOMEM);
1684 			return;
1685 		}
1686 
1687 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1688 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1689 		}
1690 
1691 		spdk_bs_batch_close(batch);
1692 		break;
1693 	}
1694 	case SPDK_BLOB_READV:
1695 	case SPDK_BLOB_WRITEV:
1696 		SPDK_ERRLOG("readv/write not valid\n");
1697 		cb_fn(cb_arg, -EINVAL);
1698 		break;
1699 	}
1700 }
1701 
1702 static void
1703 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1704 			     void *payload, uint64_t offset, uint64_t length,
1705 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1706 {
1707 	assert(blob != NULL);
1708 
1709 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1710 		cb_fn(cb_arg, -EPERM);
1711 		return;
1712 	}
1713 
1714 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1715 		cb_fn(cb_arg, -EINVAL);
1716 		return;
1717 	}
1718 
1719 	if (length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset)) {
1720 		_spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length,
1721 						    cb_fn, cb_arg, op_type);
1722 	} else {
1723 		_spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length,
1724 						   cb_fn, cb_arg, op_type);
1725 	}
1726 }
1727 
1728 struct rw_iov_ctx {
1729 	struct spdk_blob *blob;
1730 	struct spdk_io_channel *channel;
1731 	spdk_blob_op_complete cb_fn;
1732 	void *cb_arg;
1733 	bool read;
1734 	int iovcnt;
1735 	struct iovec *orig_iov;
1736 	uint64_t page_offset;
1737 	uint64_t pages_remaining;
1738 	uint64_t pages_done;
1739 	struct iovec iov[0];
1740 };
1741 
1742 static void
1743 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1744 {
1745 	assert(cb_arg == NULL);
1746 	spdk_bs_sequence_finish(seq, bserrno);
1747 }
1748 
1749 static void
1750 _spdk_rw_iov_split_next(void *cb_arg, int bserrno)
1751 {
1752 	struct rw_iov_ctx *ctx = cb_arg;
1753 	struct spdk_blob *blob = ctx->blob;
1754 	struct iovec *iov, *orig_iov;
1755 	int iovcnt;
1756 	size_t orig_iovoff;
1757 	uint64_t page_count, pages_to_boundary, page_offset;
1758 	uint64_t byte_count;
1759 
1760 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1761 		ctx->cb_fn(ctx->cb_arg, bserrno);
1762 		free(ctx);
1763 		return;
1764 	}
1765 
1766 	page_offset = ctx->page_offset;
1767 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(blob, page_offset);
1768 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1769 
1770 	/*
1771 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1772 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1773 	 *  point to the current position in the I/O sequence.
1774 	 */
1775 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1776 	orig_iov = &ctx->orig_iov[0];
1777 	orig_iovoff = 0;
1778 	while (byte_count > 0) {
1779 		if (byte_count >= orig_iov->iov_len) {
1780 			byte_count -= orig_iov->iov_len;
1781 			orig_iov++;
1782 		} else {
1783 			orig_iovoff = byte_count;
1784 			byte_count = 0;
1785 		}
1786 	}
1787 
1788 	/*
1789 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1790 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1791 	 */
1792 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1793 	iov = &ctx->iov[0];
1794 	iovcnt = 0;
1795 	while (byte_count > 0) {
1796 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1797 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1798 		byte_count -= iov->iov_len;
1799 		orig_iovoff = 0;
1800 		orig_iov++;
1801 		iov++;
1802 		iovcnt++;
1803 	}
1804 
1805 	ctx->page_offset += page_count;
1806 	ctx->pages_done += page_count;
1807 	ctx->pages_remaining -= page_count;
1808 	iov = &ctx->iov[0];
1809 
1810 	if (ctx->read) {
1811 		spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1812 				   page_count, _spdk_rw_iov_split_next, ctx);
1813 	} else {
1814 		spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1815 				    page_count, _spdk_rw_iov_split_next, ctx);
1816 	}
1817 }
1818 
1819 static void
1820 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1821 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1822 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1823 {
1824 	struct spdk_bs_cpl	cpl;
1825 
1826 	assert(blob != NULL);
1827 
1828 	if (!read && blob->data_ro) {
1829 		cb_fn(cb_arg, -EPERM);
1830 		return;
1831 	}
1832 
1833 	if (length == 0) {
1834 		cb_fn(cb_arg, 0);
1835 		return;
1836 	}
1837 
1838 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1839 		cb_fn(cb_arg, -EINVAL);
1840 		return;
1841 	}
1842 
1843 	/*
1844 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1845 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1846 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1847 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1848 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1849 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1850 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1851 	 *
1852 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1853 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1854 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1855 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1856 	 */
1857 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1858 		uint32_t lba_count;
1859 		uint64_t lba;
1860 
1861 		_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1862 
1863 		cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1864 		cpl.u.blob_basic.cb_fn = cb_fn;
1865 		cpl.u.blob_basic.cb_arg = cb_arg;
1866 
1867 		if (read) {
1868 			spdk_bs_sequence_t *seq;
1869 
1870 			seq = spdk_bs_sequence_start(_channel, &cpl);
1871 			if (!seq) {
1872 				cb_fn(cb_arg, -ENOMEM);
1873 				return;
1874 			}
1875 
1876 			if (_spdk_bs_page_is_allocated(blob, offset)) {
1877 				spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1878 			} else {
1879 				spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count,
1880 							      _spdk_rw_iov_done, NULL);
1881 			}
1882 		} else {
1883 			if (_spdk_bs_page_is_allocated(blob, offset)) {
1884 				spdk_bs_sequence_t *seq;
1885 
1886 				seq = spdk_bs_sequence_start(_channel, &cpl);
1887 				if (!seq) {
1888 					cb_fn(cb_arg, -ENOMEM);
1889 					return;
1890 				}
1891 
1892 				spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1893 			} else {
1894 				/* Queue this operation and allocate the cluster */
1895 				spdk_bs_user_op_t *op;
1896 
1897 				op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, length);
1898 				if (!op) {
1899 					cb_fn(cb_arg, -ENOMEM);
1900 					return;
1901 				}
1902 
1903 				_spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op);
1904 			}
1905 		}
1906 	} else {
1907 		struct rw_iov_ctx *ctx;
1908 
1909 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1910 		if (ctx == NULL) {
1911 			cb_fn(cb_arg, -ENOMEM);
1912 			return;
1913 		}
1914 
1915 		ctx->blob = blob;
1916 		ctx->channel = _channel;
1917 		ctx->cb_fn = cb_fn;
1918 		ctx->cb_arg = cb_arg;
1919 		ctx->read = read;
1920 		ctx->orig_iov = iov;
1921 		ctx->iovcnt = iovcnt;
1922 		ctx->page_offset = offset;
1923 		ctx->pages_remaining = length;
1924 		ctx->pages_done = 0;
1925 
1926 		_spdk_rw_iov_split_next(ctx, 0);
1927 	}
1928 }
1929 
1930 static struct spdk_blob *
1931 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1932 {
1933 	struct spdk_blob *blob;
1934 
1935 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1936 		if (blob->id == blobid) {
1937 			return blob;
1938 		}
1939 	}
1940 
1941 	return NULL;
1942 }
1943 
1944 static int
1945 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1946 {
1947 	struct spdk_blob_store		*bs = io_device;
1948 	struct spdk_bs_channel		*channel = ctx_buf;
1949 	struct spdk_bs_dev		*dev;
1950 	uint32_t			max_ops = bs->max_channel_ops;
1951 	uint32_t			i;
1952 
1953 	dev = bs->dev;
1954 
1955 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1956 	if (!channel->req_mem) {
1957 		return -1;
1958 	}
1959 
1960 	TAILQ_INIT(&channel->reqs);
1961 
1962 	for (i = 0; i < max_ops; i++) {
1963 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1964 	}
1965 
1966 	channel->bs = bs;
1967 	channel->dev = dev;
1968 	channel->dev_channel = dev->create_channel(dev);
1969 
1970 	if (!channel->dev_channel) {
1971 		SPDK_ERRLOG("Failed to create device channel.\n");
1972 		free(channel->req_mem);
1973 		return -1;
1974 	}
1975 
1976 	TAILQ_INIT(&channel->need_cluster_alloc);
1977 
1978 	return 0;
1979 }
1980 
1981 static void
1982 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1983 {
1984 	struct spdk_bs_channel *channel = ctx_buf;
1985 	spdk_bs_user_op_t *op;
1986 
1987 	while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) {
1988 		op = TAILQ_FIRST(&channel->need_cluster_alloc);
1989 		TAILQ_REMOVE(&channel->need_cluster_alloc, op, link);
1990 		spdk_bs_user_op_abort(op);
1991 	}
1992 
1993 	free(channel->req_mem);
1994 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1995 }
1996 
1997 static void
1998 _spdk_bs_dev_destroy(void *io_device)
1999 {
2000 	struct spdk_blob_store *bs = io_device;
2001 	struct spdk_blob	*blob, *blob_tmp;
2002 
2003 	bs->dev->destroy(bs->dev);
2004 
2005 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
2006 		TAILQ_REMOVE(&bs->blobs, blob, link);
2007 		_spdk_blob_free(blob);
2008 	}
2009 
2010 	pthread_mutex_destroy(&bs->used_clusters_mutex);
2011 
2012 	spdk_bit_array_free(&bs->used_blobids);
2013 	spdk_bit_array_free(&bs->used_md_pages);
2014 	spdk_bit_array_free(&bs->used_clusters);
2015 	/*
2016 	 * If this function is called for any reason except a successful unload,
2017 	 * the unload_cpl type will be NONE and this will be a nop.
2018 	 */
2019 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
2020 
2021 	free(bs);
2022 }
2023 
2024 static void
2025 _spdk_bs_free(struct spdk_blob_store *bs)
2026 {
2027 	spdk_bs_unregister_md_thread(bs);
2028 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
2029 }
2030 
2031 void
2032 spdk_bs_opts_init(struct spdk_bs_opts *opts)
2033 {
2034 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
2035 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
2036 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
2037 	opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
2038 	memset(&opts->bstype, 0, sizeof(opts->bstype));
2039 	opts->iter_cb_fn = NULL;
2040 	opts->iter_cb_arg = NULL;
2041 }
2042 
2043 static int
2044 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
2045 {
2046 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
2047 	    opts->max_channel_ops == 0) {
2048 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2049 		return -1;
2050 	}
2051 
2052 	return 0;
2053 }
2054 
2055 static struct spdk_blob_store *
2056 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
2057 {
2058 	struct spdk_blob_store	*bs;
2059 	uint64_t dev_size;
2060 	int rc;
2061 
2062 	dev_size = dev->blocklen * dev->blockcnt;
2063 	if (dev_size < opts->cluster_sz) {
2064 		/* Device size cannot be smaller than cluster size of blobstore */
2065 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n",
2066 			    dev_size, opts->cluster_sz);
2067 		return NULL;
2068 	}
2069 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
2070 		/* Cluster size cannot be smaller than page size */
2071 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n",
2072 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
2073 		return NULL;
2074 	}
2075 	bs = calloc(1, sizeof(struct spdk_blob_store));
2076 	if (!bs) {
2077 		return NULL;
2078 	}
2079 
2080 	TAILQ_INIT(&bs->blobs);
2081 	bs->dev = dev;
2082 	bs->md_thread = spdk_get_thread();
2083 	assert(bs->md_thread != NULL);
2084 
2085 	/*
2086 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2087 	 *  even multiple of the cluster size.
2088 	 */
2089 	bs->cluster_sz = opts->cluster_sz;
2090 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
2091 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2092 	bs->num_free_clusters = bs->total_clusters;
2093 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
2094 	if (bs->used_clusters == NULL) {
2095 		free(bs);
2096 		return NULL;
2097 	}
2098 
2099 	bs->max_channel_ops = opts->max_channel_ops;
2100 	bs->super_blob = SPDK_BLOBID_INVALID;
2101 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
2102 
2103 	/* The metadata is assumed to be at least 1 page */
2104 	bs->used_md_pages = spdk_bit_array_create(1);
2105 	bs->used_blobids = spdk_bit_array_create(0);
2106 
2107 	pthread_mutex_init(&bs->used_clusters_mutex, NULL);
2108 
2109 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
2110 				sizeof(struct spdk_bs_channel));
2111 	rc = spdk_bs_register_md_thread(bs);
2112 	if (rc == -1) {
2113 		spdk_io_device_unregister(bs, NULL);
2114 		pthread_mutex_destroy(&bs->used_clusters_mutex);
2115 		spdk_bit_array_free(&bs->used_blobids);
2116 		spdk_bit_array_free(&bs->used_md_pages);
2117 		spdk_bit_array_free(&bs->used_clusters);
2118 		free(bs);
2119 		return NULL;
2120 	}
2121 
2122 	return bs;
2123 }
2124 
2125 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2126 
2127 struct spdk_bs_load_ctx {
2128 	struct spdk_blob_store		*bs;
2129 	struct spdk_bs_super_block	*super;
2130 
2131 	struct spdk_bs_md_mask		*mask;
2132 	bool				in_page_chain;
2133 	uint32_t			page_index;
2134 	uint32_t			cur_page;
2135 	struct spdk_blob_md_page	*page;
2136 	bool				is_load;
2137 
2138 	spdk_bs_sequence_t			*seq;
2139 	spdk_blob_op_with_handle_complete	iter_cb_fn;
2140 	void					*iter_cb_arg;
2141 };
2142 
2143 static void
2144 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2145 {
2146 	assert(bserrno != 0);
2147 
2148 	spdk_dma_free(ctx->super);
2149 	spdk_bs_sequence_finish(seq, bserrno);
2150 	/*
2151 	 * Only free the blobstore when a load fails.  If an unload fails (for some reason)
2152 	 *  we want to keep the blobstore in case the caller wants to try again.
2153 	 */
2154 	if (ctx->is_load) {
2155 		_spdk_bs_free(ctx->bs);
2156 	}
2157 	free(ctx);
2158 }
2159 
2160 static void
2161 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
2162 {
2163 	uint32_t i = 0;
2164 
2165 	while (true) {
2166 		i = spdk_bit_array_find_first_set(array, i);
2167 		if (i >= mask->length) {
2168 			break;
2169 		}
2170 		mask->mask[i / 8] |= 1U << (i % 8);
2171 		i++;
2172 	}
2173 }
2174 
2175 static void
2176 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
2177 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
2178 {
2179 	/* Update the values in the super block */
2180 	super->super_blob = bs->super_blob;
2181 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
2182 	super->crc = _spdk_blob_md_page_calc_crc(super);
2183 	spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
2184 				   _spdk_bs_byte_to_lba(bs, sizeof(*super)),
2185 				   cb_fn, cb_arg);
2186 }
2187 
2188 static void
2189 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2190 {
2191 	struct spdk_bs_load_ctx	*ctx = arg;
2192 	uint64_t	mask_size, lba, lba_count;
2193 
2194 	/* Write out the used clusters mask */
2195 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2196 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2197 	if (!ctx->mask) {
2198 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2199 		return;
2200 	}
2201 
2202 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
2203 	ctx->mask->length = ctx->bs->total_clusters;
2204 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
2205 
2206 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
2207 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2208 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2209 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2210 }
2211 
2212 static void
2213 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2214 {
2215 	struct spdk_bs_load_ctx	*ctx = arg;
2216 	uint64_t	mask_size, lba, lba_count;
2217 
2218 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2219 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2220 	if (!ctx->mask) {
2221 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2222 		return;
2223 	}
2224 
2225 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
2226 	ctx->mask->length = ctx->super->md_len;
2227 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
2228 
2229 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
2230 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2231 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2232 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2233 }
2234 
2235 static void
2236 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2237 {
2238 	struct spdk_bs_load_ctx	*ctx = arg;
2239 	uint64_t	mask_size, lba, lba_count;
2240 
2241 	if (ctx->super->used_blobid_mask_len == 0) {
2242 		/*
2243 		 * This is a pre-v3 on-disk format where the blobid mask does not get
2244 		 *  written to disk.
2245 		 */
2246 		cb_fn(seq, arg, 0);
2247 		return;
2248 	}
2249 
2250 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2251 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2252 	if (!ctx->mask) {
2253 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2254 		return;
2255 	}
2256 
2257 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
2258 	ctx->mask->length = ctx->super->md_len;
2259 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
2260 
2261 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
2262 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2263 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2264 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2265 }
2266 
2267 static void _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx,
2268 				   int bserrno);
2269 
2270 static void
2271 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno)
2272 {
2273 	struct spdk_bs_load_ctx *ctx = arg;
2274 
2275 	if (bserrno == 0) {
2276 		ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0);
2277 		spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx);
2278 		return;
2279 	}
2280 
2281 	if (bserrno == -ENOENT) {
2282 		bserrno = 0;
2283 	} else {
2284 		/*
2285 		 * This case needs to be looked at further.  Same problem
2286 		 *  exists with applications that rely on explicit blob
2287 		 *  iteration.  We should just skip the blob that failed
2288 		 *  to load and coontinue on to the next one.
2289 		 */
2290 		SPDK_ERRLOG("Error in iterating blobs\n");
2291 	}
2292 
2293 	ctx->iter_cb_fn = NULL;
2294 	_spdk_bs_load_complete(ctx->seq, ctx, bserrno);
2295 }
2296 
2297 static void
2298 _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2299 {
2300 	if (ctx->iter_cb_fn) {
2301 		ctx->seq = seq;
2302 		spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx);
2303 		return;
2304 	}
2305 
2306 	spdk_dma_free(ctx->super);
2307 	spdk_dma_free(ctx->mask);
2308 	free(ctx);
2309 	spdk_bs_sequence_finish(seq, bserrno);
2310 }
2311 
2312 static void
2313 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2314 {
2315 	struct spdk_bs_load_ctx *ctx = cb_arg;
2316 	uint32_t i, j;
2317 	int rc;
2318 
2319 	/* The type must be correct */
2320 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
2321 
2322 	/* The length of the mask (in bits) must not be greater than
2323 	 * the length of the buffer (converted to bits) */
2324 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
2325 
2326 	/* The length of the mask must be exactly equal to the size
2327 	 * (in pages) of the metadata region */
2328 	assert(ctx->mask->length == ctx->super->md_len);
2329 
2330 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->mask->length);
2331 	if (rc < 0) {
2332 		spdk_dma_free(ctx->mask);
2333 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2334 		return;
2335 	}
2336 
2337 	for (i = 0; i < ctx->mask->length / 8; i++) {
2338 		uint8_t segment = ctx->mask->mask[i];
2339 		for (j = 0; segment; j++) {
2340 			if (segment & 1U) {
2341 				spdk_bit_array_set(ctx->bs->used_blobids, (i * 8) + j);
2342 			}
2343 			segment >>= 1U;
2344 		}
2345 	}
2346 
2347 	_spdk_bs_load_complete(seq, ctx, bserrno);
2348 }
2349 
2350 static void
2351 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2352 {
2353 	struct spdk_bs_load_ctx *ctx = cb_arg;
2354 	uint64_t		lba, lba_count, mask_size;
2355 	uint32_t		i, j;
2356 	int			rc;
2357 
2358 	/* The type must be correct */
2359 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
2360 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2361 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
2362 					     struct spdk_blob_md_page) * 8));
2363 	/* The length of the mask must be exactly equal to the total number of clusters */
2364 	assert(ctx->mask->length == ctx->bs->total_clusters);
2365 
2366 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2367 	if (rc < 0) {
2368 		spdk_dma_free(ctx->mask);
2369 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2370 		return;
2371 	}
2372 
2373 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2374 	for (i = 0; i < ctx->mask->length / 8; i++) {
2375 		uint8_t segment = ctx->mask->mask[i];
2376 		for (j = 0; segment && (j < 8); j++) {
2377 			if (segment & 1U) {
2378 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
2379 				assert(ctx->bs->num_free_clusters > 0);
2380 				ctx->bs->num_free_clusters--;
2381 			}
2382 			segment >>= 1U;
2383 		}
2384 	}
2385 
2386 	spdk_dma_free(ctx->mask);
2387 
2388 	/* Read the used blobids mask */
2389 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2390 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2391 	if (!ctx->mask) {
2392 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2393 		return;
2394 	}
2395 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2396 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2397 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2398 				  _spdk_bs_load_used_blobids_cpl, ctx);
2399 }
2400 
2401 static void
2402 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2403 {
2404 	struct spdk_bs_load_ctx *ctx = cb_arg;
2405 	uint64_t		lba, lba_count, mask_size;
2406 	uint32_t		i, j;
2407 	int			rc;
2408 
2409 	/* The type must be correct */
2410 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
2411 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2412 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
2413 				     8));
2414 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
2415 	assert(ctx->mask->length == ctx->super->md_len);
2416 
2417 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
2418 	if (rc < 0) {
2419 		spdk_dma_free(ctx->mask);
2420 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2421 		return;
2422 	}
2423 
2424 	for (i = 0; i < ctx->mask->length / 8; i++) {
2425 		uint8_t segment = ctx->mask->mask[i];
2426 		for (j = 0; segment && (j < 8); j++) {
2427 			if (segment & 1U) {
2428 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
2429 			}
2430 			segment >>= 1U;
2431 		}
2432 	}
2433 	spdk_dma_free(ctx->mask);
2434 
2435 	/* Read the used clusters mask */
2436 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2437 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2438 	if (!ctx->mask) {
2439 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2440 		return;
2441 	}
2442 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2443 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2444 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2445 				  _spdk_bs_load_used_clusters_cpl, ctx);
2446 }
2447 
2448 static void
2449 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2450 {
2451 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2452 	uint64_t lba, lba_count, mask_size;
2453 
2454 	/* Read the used pages mask */
2455 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2456 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2457 	if (!ctx->mask) {
2458 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2459 		return;
2460 	}
2461 
2462 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2463 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2464 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2465 				  _spdk_bs_load_used_pages_cpl, ctx);
2466 }
2467 
2468 static int
2469 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
2470 {
2471 	struct spdk_blob_md_descriptor *desc;
2472 	size_t	cur_desc = 0;
2473 
2474 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
2475 	while (cur_desc < sizeof(page->descriptors)) {
2476 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
2477 			if (desc->length == 0) {
2478 				/* If padding and length are 0, this terminates the page */
2479 				break;
2480 			}
2481 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
2482 			struct spdk_blob_md_descriptor_extent	*desc_extent;
2483 			unsigned int				i, j;
2484 			unsigned int				cluster_count = 0;
2485 
2486 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
2487 
2488 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
2489 				for (j = 0; j < desc_extent->extents[i].length; j++) {
2490 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
2491 					if (bs->num_free_clusters == 0) {
2492 						return -1;
2493 					}
2494 					bs->num_free_clusters--;
2495 					cluster_count++;
2496 				}
2497 			}
2498 			if (cluster_count == 0) {
2499 				return -1;
2500 			}
2501 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
2502 			/* Skip this item */
2503 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
2504 			/* Skip this item */
2505 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
2506 			/* Skip this item */
2507 		} else {
2508 			/* Error */
2509 			return -1;
2510 		}
2511 		/* Advance to the next descriptor */
2512 		cur_desc += sizeof(*desc) + desc->length;
2513 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
2514 			break;
2515 		}
2516 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
2517 	}
2518 	return 0;
2519 }
2520 
2521 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
2522 {
2523 	uint32_t crc;
2524 
2525 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
2526 	if (crc != ctx->page->crc) {
2527 		return false;
2528 	}
2529 
2530 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
2531 		return false;
2532 	}
2533 	return true;
2534 }
2535 
2536 static void
2537 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
2538 
2539 static void
2540 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2541 {
2542 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2543 
2544 	_spdk_bs_load_complete(seq, ctx, bserrno);
2545 }
2546 
2547 static void
2548 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2549 {
2550 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2551 
2552 	spdk_dma_free(ctx->mask);
2553 	ctx->mask = NULL;
2554 
2555 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
2556 }
2557 
2558 static void
2559 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2560 {
2561 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2562 
2563 	spdk_dma_free(ctx->mask);
2564 	ctx->mask = NULL;
2565 
2566 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
2567 }
2568 
2569 static void
2570 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2571 {
2572 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
2573 }
2574 
2575 static void
2576 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2577 {
2578 	struct spdk_bs_load_ctx *ctx = cb_arg;
2579 	uint64_t num_md_clusters;
2580 	uint64_t i;
2581 	uint32_t page_num;
2582 
2583 	if (bserrno != 0) {
2584 		_spdk_bs_load_ctx_fail(seq, ctx, bserrno);
2585 		return;
2586 	}
2587 
2588 	page_num = ctx->cur_page;
2589 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
2590 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
2591 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
2592 			if (ctx->page->sequence_num == 0) {
2593 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
2594 			}
2595 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
2596 				_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2597 				return;
2598 			}
2599 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
2600 				ctx->in_page_chain = true;
2601 				ctx->cur_page = ctx->page->next;
2602 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2603 				return;
2604 			}
2605 		}
2606 	}
2607 
2608 	ctx->in_page_chain = false;
2609 
2610 	do {
2611 		ctx->page_index++;
2612 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
2613 
2614 	if (ctx->page_index < ctx->super->md_len) {
2615 		ctx->cur_page = ctx->page_index;
2616 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2617 	} else {
2618 		/* Claim all of the clusters used by the metadata */
2619 		num_md_clusters = divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
2620 		for (i = 0; i < num_md_clusters; i++) {
2621 			_spdk_bs_claim_cluster(ctx->bs, i);
2622 		}
2623 		spdk_dma_free(ctx->page);
2624 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
2625 	}
2626 }
2627 
2628 static void
2629 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
2630 {
2631 	struct spdk_bs_load_ctx *ctx = cb_arg;
2632 	uint64_t lba;
2633 
2634 	assert(ctx->cur_page < ctx->super->md_len);
2635 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
2636 	spdk_bs_sequence_read_dev(seq, ctx->page, lba,
2637 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
2638 				  _spdk_bs_load_replay_md_cpl, ctx);
2639 }
2640 
2641 static void
2642 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
2643 {
2644 	struct spdk_bs_load_ctx *ctx = cb_arg;
2645 
2646 	ctx->page_index = 0;
2647 	ctx->cur_page = 0;
2648 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
2649 				     SPDK_BS_PAGE_SIZE,
2650 				     NULL);
2651 	if (!ctx->page) {
2652 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2653 		return;
2654 	}
2655 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2656 }
2657 
2658 static void
2659 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2660 {
2661 	struct spdk_bs_load_ctx *ctx = cb_arg;
2662 	int		rc;
2663 
2664 	if (bserrno != 0) {
2665 		_spdk_bs_load_ctx_fail(seq, ctx, -EIO);
2666 		return;
2667 	}
2668 
2669 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
2670 	if (rc < 0) {
2671 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2672 		return;
2673 	}
2674 
2675 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
2676 	if (rc < 0) {
2677 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2678 		return;
2679 	}
2680 
2681 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2682 	if (rc < 0) {
2683 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2684 		return;
2685 	}
2686 
2687 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2688 	_spdk_bs_load_replay_md(seq, cb_arg);
2689 }
2690 
2691 static void
2692 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2693 {
2694 	struct spdk_bs_load_ctx *ctx = cb_arg;
2695 	uint32_t	crc;
2696 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
2697 
2698 	if (ctx->super->version > SPDK_BS_VERSION ||
2699 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
2700 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2701 		return;
2702 	}
2703 
2704 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2705 		   sizeof(ctx->super->signature)) != 0) {
2706 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2707 		return;
2708 	}
2709 
2710 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2711 	if (crc != ctx->super->crc) {
2712 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2713 		return;
2714 	}
2715 
2716 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2717 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2718 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2719 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2720 	} else {
2721 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2722 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2723 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2724 		_spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
2725 		return;
2726 	}
2727 
2728 	/* Parse the super block */
2729 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2730 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2731 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2732 	ctx->bs->md_start = ctx->super->md_start;
2733 	ctx->bs->md_len = ctx->super->md_len;
2734 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2735 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2736 	ctx->bs->super_blob = ctx->super->super_blob;
2737 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2738 
2739 	if (ctx->super->clean == 0) {
2740 		_spdk_bs_recover(seq, ctx, 0);
2741 	} else if (ctx->super->used_blobid_mask_len == 0) {
2742 		/*
2743 		 * Metadata is clean, but this is an old metadata format without
2744 		 *  a blobid mask.  Clear the clean bit and then build the masks
2745 		 *  using _spdk_bs_recover.
2746 		 */
2747 		ctx->super->clean = 0;
2748 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_recover, ctx);
2749 	} else {
2750 		ctx->super->clean = 0;
2751 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2752 	}
2753 }
2754 
2755 void
2756 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2757 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2758 {
2759 	struct spdk_blob_store	*bs;
2760 	struct spdk_bs_cpl	cpl;
2761 	spdk_bs_sequence_t	*seq;
2762 	struct spdk_bs_load_ctx *ctx;
2763 	struct spdk_bs_opts	opts = {};
2764 
2765 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2766 
2767 	if (o) {
2768 		opts = *o;
2769 	} else {
2770 		spdk_bs_opts_init(&opts);
2771 	}
2772 
2773 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2774 		cb_fn(cb_arg, NULL, -EINVAL);
2775 		return;
2776 	}
2777 
2778 	bs = _spdk_bs_alloc(dev, &opts);
2779 	if (!bs) {
2780 		cb_fn(cb_arg, NULL, -ENOMEM);
2781 		return;
2782 	}
2783 
2784 	ctx = calloc(1, sizeof(*ctx));
2785 	if (!ctx) {
2786 		_spdk_bs_free(bs);
2787 		cb_fn(cb_arg, NULL, -ENOMEM);
2788 		return;
2789 	}
2790 
2791 	ctx->bs = bs;
2792 	ctx->is_load = true;
2793 	ctx->iter_cb_fn = opts.iter_cb_fn;
2794 	ctx->iter_cb_arg = opts.iter_cb_arg;
2795 
2796 	/* Allocate memory for the super block */
2797 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2798 	if (!ctx->super) {
2799 		free(ctx);
2800 		_spdk_bs_free(bs);
2801 		return;
2802 	}
2803 
2804 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2805 	cpl.u.bs_handle.cb_fn = cb_fn;
2806 	cpl.u.bs_handle.cb_arg = cb_arg;
2807 	cpl.u.bs_handle.bs = bs;
2808 
2809 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2810 	if (!seq) {
2811 		spdk_dma_free(ctx->super);
2812 		free(ctx);
2813 		_spdk_bs_free(bs);
2814 		cb_fn(cb_arg, NULL, -ENOMEM);
2815 		return;
2816 	}
2817 
2818 	/* Read the super block */
2819 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2820 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2821 				  _spdk_bs_load_super_cpl, ctx);
2822 }
2823 
2824 /* END spdk_bs_load */
2825 
2826 /* START spdk_bs_init */
2827 
2828 struct spdk_bs_init_ctx {
2829 	struct spdk_blob_store		*bs;
2830 	struct spdk_bs_super_block	*super;
2831 };
2832 
2833 static void
2834 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2835 {
2836 	struct spdk_bs_init_ctx *ctx = cb_arg;
2837 
2838 	spdk_dma_free(ctx->super);
2839 	free(ctx);
2840 
2841 	spdk_bs_sequence_finish(seq, bserrno);
2842 }
2843 
2844 static void
2845 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2846 {
2847 	struct spdk_bs_init_ctx *ctx = cb_arg;
2848 
2849 	/* Write super block */
2850 	spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2851 				   _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2852 				   _spdk_bs_init_persist_super_cpl, ctx);
2853 }
2854 
2855 void
2856 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2857 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2858 {
2859 	struct spdk_bs_init_ctx *ctx;
2860 	struct spdk_blob_store	*bs;
2861 	struct spdk_bs_cpl	cpl;
2862 	spdk_bs_sequence_t	*seq;
2863 	spdk_bs_batch_t		*batch;
2864 	uint64_t		num_md_lba;
2865 	uint64_t		num_md_pages;
2866 	uint64_t		num_md_clusters;
2867 	uint32_t		i;
2868 	struct spdk_bs_opts	opts = {};
2869 	int			rc;
2870 
2871 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2872 
2873 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2874 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2875 			    dev->blocklen);
2876 		dev->destroy(dev);
2877 		cb_fn(cb_arg, NULL, -EINVAL);
2878 		return;
2879 	}
2880 
2881 	if (o) {
2882 		opts = *o;
2883 	} else {
2884 		spdk_bs_opts_init(&opts);
2885 	}
2886 
2887 	if (_spdk_bs_opts_verify(&opts) != 0) {
2888 		dev->destroy(dev);
2889 		cb_fn(cb_arg, NULL, -EINVAL);
2890 		return;
2891 	}
2892 
2893 	bs = _spdk_bs_alloc(dev, &opts);
2894 	if (!bs) {
2895 		dev->destroy(dev);
2896 		cb_fn(cb_arg, NULL, -ENOMEM);
2897 		return;
2898 	}
2899 
2900 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2901 		/* By default, allocate 1 page per cluster.
2902 		 * Technically, this over-allocates metadata
2903 		 * because more metadata will reduce the number
2904 		 * of usable clusters. This can be addressed with
2905 		 * more complex math in the future.
2906 		 */
2907 		bs->md_len = bs->total_clusters;
2908 	} else {
2909 		bs->md_len = opts.num_md_pages;
2910 	}
2911 
2912 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2913 	if (rc < 0) {
2914 		_spdk_bs_free(bs);
2915 		cb_fn(cb_arg, NULL, -ENOMEM);
2916 		return;
2917 	}
2918 
2919 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
2920 	if (rc < 0) {
2921 		_spdk_bs_free(bs);
2922 		cb_fn(cb_arg, NULL, -ENOMEM);
2923 		return;
2924 	}
2925 
2926 	ctx = calloc(1, sizeof(*ctx));
2927 	if (!ctx) {
2928 		_spdk_bs_free(bs);
2929 		cb_fn(cb_arg, NULL, -ENOMEM);
2930 		return;
2931 	}
2932 
2933 	ctx->bs = bs;
2934 
2935 	/* Allocate memory for the super block */
2936 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2937 	if (!ctx->super) {
2938 		free(ctx);
2939 		_spdk_bs_free(bs);
2940 		return;
2941 	}
2942 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2943 	       sizeof(ctx->super->signature));
2944 	ctx->super->version = SPDK_BS_VERSION;
2945 	ctx->super->length = sizeof(*ctx->super);
2946 	ctx->super->super_blob = bs->super_blob;
2947 	ctx->super->clean = 0;
2948 	ctx->super->cluster_size = bs->cluster_sz;
2949 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
2950 
2951 	/* Calculate how many pages the metadata consumes at the front
2952 	 * of the disk.
2953 	 */
2954 
2955 	/* The super block uses 1 page */
2956 	num_md_pages = 1;
2957 
2958 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
2959 	 * up to the nearest page, plus a header.
2960 	 */
2961 	ctx->super->used_page_mask_start = num_md_pages;
2962 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2963 					 divide_round_up(bs->md_len, 8),
2964 					 SPDK_BS_PAGE_SIZE);
2965 	num_md_pages += ctx->super->used_page_mask_len;
2966 
2967 	/* The used_clusters mask requires 1 bit per cluster, rounded
2968 	 * up to the nearest page, plus a header.
2969 	 */
2970 	ctx->super->used_cluster_mask_start = num_md_pages;
2971 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2972 					    divide_round_up(bs->total_clusters, 8),
2973 					    SPDK_BS_PAGE_SIZE);
2974 	num_md_pages += ctx->super->used_cluster_mask_len;
2975 
2976 	/* The used_blobids mask requires 1 bit per metadata page, rounded
2977 	 * up to the nearest page, plus a header.
2978 	 */
2979 	ctx->super->used_blobid_mask_start = num_md_pages;
2980 	ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2981 					   divide_round_up(bs->md_len, 8),
2982 					   SPDK_BS_PAGE_SIZE);
2983 	num_md_pages += ctx->super->used_blobid_mask_len;
2984 
2985 	/* The metadata region size was chosen above */
2986 	ctx->super->md_start = bs->md_start = num_md_pages;
2987 	ctx->super->md_len = bs->md_len;
2988 	num_md_pages += bs->md_len;
2989 
2990 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
2991 
2992 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
2993 
2994 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
2995 	if (num_md_clusters > bs->total_clusters) {
2996 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
2997 			    "please decrease number of pages reserved for metadata "
2998 			    "or increase cluster size.\n");
2999 		spdk_dma_free(ctx->super);
3000 		free(ctx);
3001 		_spdk_bs_free(bs);
3002 		cb_fn(cb_arg, NULL, -ENOMEM);
3003 		return;
3004 	}
3005 	/* Claim all of the clusters used by the metadata */
3006 	for (i = 0; i < num_md_clusters; i++) {
3007 		_spdk_bs_claim_cluster(bs, i);
3008 	}
3009 
3010 	bs->total_data_clusters = bs->num_free_clusters;
3011 
3012 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3013 	cpl.u.bs_handle.cb_fn = cb_fn;
3014 	cpl.u.bs_handle.cb_arg = cb_arg;
3015 	cpl.u.bs_handle.bs = bs;
3016 
3017 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3018 	if (!seq) {
3019 		spdk_dma_free(ctx->super);
3020 		free(ctx);
3021 		_spdk_bs_free(bs);
3022 		cb_fn(cb_arg, NULL, -ENOMEM);
3023 		return;
3024 	}
3025 
3026 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
3027 
3028 	/* Clear metadata space */
3029 	spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
3030 	/* Trim data clusters */
3031 	spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3032 
3033 	spdk_bs_batch_close(batch);
3034 }
3035 
3036 /* END spdk_bs_init */
3037 
3038 /* START spdk_bs_destroy */
3039 
3040 static void
3041 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3042 {
3043 	struct spdk_bs_init_ctx *ctx = cb_arg;
3044 	struct spdk_blob_store *bs = ctx->bs;
3045 
3046 	/*
3047 	 * We need to defer calling spdk_bs_call_cpl() until after
3048 	 * dev destruction, so tuck these away for later use.
3049 	 */
3050 	bs->unload_err = bserrno;
3051 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3052 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3053 
3054 	spdk_bs_sequence_finish(seq, bserrno);
3055 
3056 	_spdk_bs_free(bs);
3057 	free(ctx);
3058 }
3059 
3060 void
3061 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
3062 		void *cb_arg)
3063 {
3064 	struct spdk_bs_cpl	cpl;
3065 	spdk_bs_sequence_t	*seq;
3066 	struct spdk_bs_init_ctx *ctx;
3067 
3068 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
3069 
3070 	if (!TAILQ_EMPTY(&bs->blobs)) {
3071 		SPDK_ERRLOG("Blobstore still has open blobs\n");
3072 		cb_fn(cb_arg, -EBUSY);
3073 		return;
3074 	}
3075 
3076 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3077 	cpl.u.bs_basic.cb_fn = cb_fn;
3078 	cpl.u.bs_basic.cb_arg = cb_arg;
3079 
3080 	ctx = calloc(1, sizeof(*ctx));
3081 	if (!ctx) {
3082 		cb_fn(cb_arg, -ENOMEM);
3083 		return;
3084 	}
3085 
3086 	ctx->bs = bs;
3087 
3088 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3089 	if (!seq) {
3090 		free(ctx);
3091 		cb_fn(cb_arg, -ENOMEM);
3092 		return;
3093 	}
3094 
3095 	/* Write zeroes to the super block */
3096 	spdk_bs_sequence_write_zeroes_dev(seq,
3097 					  _spdk_bs_page_to_lba(bs, 0),
3098 					  _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
3099 					  _spdk_bs_destroy_trim_cpl, ctx);
3100 }
3101 
3102 /* END spdk_bs_destroy */
3103 
3104 /* START spdk_bs_unload */
3105 
3106 static void
3107 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3108 {
3109 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3110 
3111 	spdk_dma_free(ctx->super);
3112 
3113 	/*
3114 	 * We need to defer calling spdk_bs_call_cpl() until after
3115 	 * dev destuction, so tuck these away for later use.
3116 	 */
3117 	ctx->bs->unload_err = bserrno;
3118 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3119 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3120 
3121 	spdk_bs_sequence_finish(seq, bserrno);
3122 
3123 	_spdk_bs_free(ctx->bs);
3124 	free(ctx);
3125 }
3126 
3127 static void
3128 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3129 {
3130 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3131 
3132 	spdk_dma_free(ctx->mask);
3133 	ctx->super->clean = 1;
3134 
3135 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
3136 }
3137 
3138 static void
3139 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3140 {
3141 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3142 
3143 	spdk_dma_free(ctx->mask);
3144 	ctx->mask = NULL;
3145 
3146 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
3147 }
3148 
3149 static void
3150 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3151 {
3152 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3153 
3154 	spdk_dma_free(ctx->mask);
3155 	ctx->mask = NULL;
3156 
3157 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
3158 }
3159 
3160 static void
3161 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3162 {
3163 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
3164 }
3165 
3166 void
3167 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
3168 {
3169 	struct spdk_bs_cpl	cpl;
3170 	spdk_bs_sequence_t	*seq;
3171 	struct spdk_bs_load_ctx *ctx;
3172 
3173 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
3174 
3175 	if (!TAILQ_EMPTY(&bs->blobs)) {
3176 		SPDK_ERRLOG("Blobstore still has open blobs\n");
3177 		cb_fn(cb_arg, -EBUSY);
3178 		return;
3179 	}
3180 
3181 	ctx = calloc(1, sizeof(*ctx));
3182 	if (!ctx) {
3183 		cb_fn(cb_arg, -ENOMEM);
3184 		return;
3185 	}
3186 
3187 	ctx->bs = bs;
3188 	ctx->is_load = false;
3189 
3190 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3191 	if (!ctx->super) {
3192 		free(ctx);
3193 		cb_fn(cb_arg, -ENOMEM);
3194 		return;
3195 	}
3196 
3197 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3198 	cpl.u.bs_basic.cb_fn = cb_fn;
3199 	cpl.u.bs_basic.cb_arg = cb_arg;
3200 
3201 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3202 	if (!seq) {
3203 		spdk_dma_free(ctx->super);
3204 		free(ctx);
3205 		cb_fn(cb_arg, -ENOMEM);
3206 		return;
3207 	}
3208 
3209 	/* Read super block */
3210 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3211 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3212 				  _spdk_bs_unload_read_super_cpl, ctx);
3213 }
3214 
3215 /* END spdk_bs_unload */
3216 
3217 /* START spdk_bs_set_super */
3218 
3219 struct spdk_bs_set_super_ctx {
3220 	struct spdk_blob_store		*bs;
3221 	struct spdk_bs_super_block	*super;
3222 };
3223 
3224 static void
3225 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3226 {
3227 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
3228 
3229 	if (bserrno != 0) {
3230 		SPDK_ERRLOG("Unable to write to super block of blobstore\n");
3231 	}
3232 
3233 	spdk_dma_free(ctx->super);
3234 
3235 	spdk_bs_sequence_finish(seq, bserrno);
3236 
3237 	free(ctx);
3238 }
3239 
3240 static void
3241 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3242 {
3243 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
3244 
3245 	if (bserrno != 0) {
3246 		SPDK_ERRLOG("Unable to read super block of blobstore\n");
3247 		spdk_dma_free(ctx->super);
3248 		spdk_bs_sequence_finish(seq, bserrno);
3249 		free(ctx);
3250 		return;
3251 	}
3252 
3253 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx);
3254 }
3255 
3256 void
3257 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
3258 		  spdk_bs_op_complete cb_fn, void *cb_arg)
3259 {
3260 	struct spdk_bs_cpl		cpl;
3261 	spdk_bs_sequence_t		*seq;
3262 	struct spdk_bs_set_super_ctx	*ctx;
3263 
3264 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n");
3265 
3266 	ctx = calloc(1, sizeof(*ctx));
3267 	if (!ctx) {
3268 		cb_fn(cb_arg, -ENOMEM);
3269 		return;
3270 	}
3271 
3272 	ctx->bs = bs;
3273 
3274 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3275 	if (!ctx->super) {
3276 		free(ctx);
3277 		cb_fn(cb_arg, -ENOMEM);
3278 		return;
3279 	}
3280 
3281 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3282 	cpl.u.bs_basic.cb_fn = cb_fn;
3283 	cpl.u.bs_basic.cb_arg = cb_arg;
3284 
3285 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3286 	if (!seq) {
3287 		spdk_dma_free(ctx->super);
3288 		free(ctx);
3289 		cb_fn(cb_arg, -ENOMEM);
3290 		return;
3291 	}
3292 
3293 	bs->super_blob = blobid;
3294 
3295 	/* Read super block */
3296 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3297 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3298 				  _spdk_bs_set_super_read_cpl, ctx);
3299 }
3300 
3301 /* END spdk_bs_set_super */
3302 
3303 void
3304 spdk_bs_get_super(struct spdk_blob_store *bs,
3305 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3306 {
3307 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
3308 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
3309 	} else {
3310 		cb_fn(cb_arg, bs->super_blob, 0);
3311 	}
3312 }
3313 
3314 uint64_t
3315 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
3316 {
3317 	return bs->cluster_sz;
3318 }
3319 
3320 uint64_t
3321 spdk_bs_get_page_size(struct spdk_blob_store *bs)
3322 {
3323 	return SPDK_BS_PAGE_SIZE;
3324 }
3325 
3326 uint64_t
3327 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
3328 {
3329 	return bs->num_free_clusters;
3330 }
3331 
3332 uint64_t
3333 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
3334 {
3335 	return bs->total_data_clusters;
3336 }
3337 
3338 static int
3339 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
3340 {
3341 	bs->md_channel = spdk_get_io_channel(bs);
3342 	if (!bs->md_channel) {
3343 		SPDK_ERRLOG("Failed to get IO channel.\n");
3344 		return -1;
3345 	}
3346 
3347 	return 0;
3348 }
3349 
3350 static int
3351 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
3352 {
3353 	spdk_put_io_channel(bs->md_channel);
3354 
3355 	return 0;
3356 }
3357 
3358 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
3359 {
3360 	assert(blob != NULL);
3361 
3362 	return blob->id;
3363 }
3364 
3365 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
3366 {
3367 	assert(blob != NULL);
3368 
3369 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
3370 }
3371 
3372 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
3373 {
3374 	assert(blob != NULL);
3375 
3376 	return blob->active.num_clusters;
3377 }
3378 
3379 /* START spdk_bs_create_blob */
3380 
3381 static void
3382 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3383 {
3384 	struct spdk_blob *blob = cb_arg;
3385 
3386 	_spdk_blob_free(blob);
3387 
3388 	spdk_bs_sequence_finish(seq, bserrno);
3389 }
3390 
3391 static int
3392 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs,
3393 		      bool internal)
3394 {
3395 	uint64_t i;
3396 	size_t value_len = 0;
3397 	int rc;
3398 	const void *value = NULL;
3399 	if (xattrs->count > 0 && xattrs->get_value == NULL) {
3400 		return -EINVAL;
3401 	}
3402 	for (i = 0; i < xattrs->count; i++) {
3403 		xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len);
3404 		if (value == NULL || value_len == 0) {
3405 			return -EINVAL;
3406 		}
3407 		rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal);
3408 		if (rc < 0) {
3409 			return rc;
3410 		}
3411 	}
3412 	return 0;
3413 }
3414 
3415 static void
3416 _spdk_blob_set_thin_provision(struct spdk_blob *blob)
3417 {
3418 	_spdk_blob_verify_md_op(blob);
3419 	blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
3420 	blob->state = SPDK_BLOB_STATE_DIRTY;
3421 }
3422 
3423 static void
3424 _spdk_bs_create_blob(struct spdk_blob_store *bs,
3425 		     const struct spdk_blob_opts *opts,
3426 		     const struct spdk_blob_xattr_opts *internal_xattrs,
3427 		     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3428 {
3429 	struct spdk_blob	*blob;
3430 	uint32_t		page_idx;
3431 	struct spdk_bs_cpl	cpl;
3432 	struct spdk_blob_opts	opts_default;
3433 	struct spdk_blob_xattr_opts internal_xattrs_default;
3434 	spdk_bs_sequence_t	*seq;
3435 	spdk_blob_id		id;
3436 	int rc;
3437 
3438 	assert(spdk_get_thread() == bs->md_thread);
3439 
3440 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
3441 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
3442 		cb_fn(cb_arg, 0, -ENOMEM);
3443 		return;
3444 	}
3445 	spdk_bit_array_set(bs->used_blobids, page_idx);
3446 	spdk_bit_array_set(bs->used_md_pages, page_idx);
3447 
3448 	id = _spdk_bs_page_to_blobid(page_idx);
3449 
3450 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
3451 
3452 	blob = _spdk_blob_alloc(bs, id);
3453 	if (!blob) {
3454 		cb_fn(cb_arg, 0, -ENOMEM);
3455 		return;
3456 	}
3457 
3458 	if (!opts) {
3459 		spdk_blob_opts_init(&opts_default);
3460 		opts = &opts_default;
3461 	}
3462 	if (!internal_xattrs) {
3463 		_spdk_blob_xattrs_init(&internal_xattrs_default);
3464 		internal_xattrs = &internal_xattrs_default;
3465 	}
3466 
3467 	rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false);
3468 	if (rc < 0) {
3469 		_spdk_blob_free(blob);
3470 		cb_fn(cb_arg, 0, rc);
3471 		return;
3472 	}
3473 
3474 	rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true);
3475 	if (rc < 0) {
3476 		_spdk_blob_free(blob);
3477 		cb_fn(cb_arg, 0, rc);
3478 		return;
3479 	}
3480 
3481 	if (opts->thin_provision) {
3482 		_spdk_blob_set_thin_provision(blob);
3483 	}
3484 
3485 	rc = spdk_blob_resize(blob, opts->num_clusters);
3486 	if (rc < 0) {
3487 		_spdk_blob_free(blob);
3488 		cb_fn(cb_arg, 0, rc);
3489 		return;
3490 	}
3491 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
3492 	cpl.u.blobid.cb_fn = cb_fn;
3493 	cpl.u.blobid.cb_arg = cb_arg;
3494 	cpl.u.blobid.blobid = blob->id;
3495 
3496 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3497 	if (!seq) {
3498 		_spdk_blob_free(blob);
3499 		cb_fn(cb_arg, 0, -ENOMEM);
3500 		return;
3501 	}
3502 
3503 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
3504 }
3505 
3506 void spdk_bs_create_blob(struct spdk_blob_store *bs,
3507 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3508 {
3509 	_spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg);
3510 }
3511 
3512 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
3513 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3514 {
3515 	_spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg);
3516 }
3517 
3518 /* END spdk_bs_create_blob */
3519 
3520 /* START spdk_blob_resize */
3521 int
3522 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz)
3523 {
3524 	int			rc;
3525 
3526 	_spdk_blob_verify_md_op(blob);
3527 
3528 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
3529 
3530 	if (blob->md_ro) {
3531 		return -EPERM;
3532 	}
3533 
3534 	if (sz == blob->active.num_clusters) {
3535 		return 0;
3536 	}
3537 
3538 	rc = _spdk_resize_blob(blob, sz);
3539 	if (rc < 0) {
3540 		return rc;
3541 	}
3542 
3543 	return 0;
3544 }
3545 
3546 /* END spdk_blob_resize */
3547 
3548 
3549 /* START spdk_bs_delete_blob */
3550 
3551 static void
3552 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
3553 {
3554 	spdk_bs_sequence_t *seq = cb_arg;
3555 
3556 	spdk_bs_sequence_finish(seq, bserrno);
3557 }
3558 
3559 static void
3560 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3561 {
3562 	struct spdk_blob *blob = cb_arg;
3563 
3564 	if (bserrno != 0) {
3565 		/*
3566 		 * We already removed this blob from the blobstore tailq, so
3567 		 *  we need to free it here since this is the last reference
3568 		 *  to it.
3569 		 */
3570 		_spdk_blob_free(blob);
3571 		_spdk_bs_delete_close_cpl(seq, bserrno);
3572 		return;
3573 	}
3574 
3575 	/*
3576 	 * This will immediately decrement the ref_count and call
3577 	 *  the completion routine since the metadata state is clean.
3578 	 *  By calling spdk_blob_close, we reduce the number of call
3579 	 *  points into code that touches the blob->open_ref count
3580 	 *  and the blobstore's blob list.
3581 	 */
3582 	spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq);
3583 }
3584 
3585 static void
3586 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
3587 {
3588 	spdk_bs_sequence_t *seq = cb_arg;
3589 	uint32_t page_num;
3590 
3591 	if (bserrno != 0) {
3592 		spdk_bs_sequence_finish(seq, bserrno);
3593 		return;
3594 	}
3595 
3596 	_spdk_blob_verify_md_op(blob);
3597 
3598 	if (blob->open_ref > 1) {
3599 		/*
3600 		 * Someone has this blob open (besides this delete context).
3601 		 *  Decrement the ref count directly and return -EBUSY.
3602 		 */
3603 		blob->open_ref--;
3604 		spdk_bs_sequence_finish(seq, -EBUSY);
3605 		return;
3606 	}
3607 
3608 	/*
3609 	 * Remove the blob from the blob_store list now, to ensure it does not
3610 	 *  get returned after this point by _spdk_blob_lookup().
3611 	 */
3612 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3613 	page_num = _spdk_bs_blobid_to_page(blob->id);
3614 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
3615 	blob->state = SPDK_BLOB_STATE_DIRTY;
3616 	blob->active.num_pages = 0;
3617 	_spdk_resize_blob(blob, 0);
3618 
3619 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob);
3620 }
3621 
3622 void
3623 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
3624 		    spdk_blob_op_complete cb_fn, void *cb_arg)
3625 {
3626 	struct spdk_bs_cpl	cpl;
3627 	spdk_bs_sequence_t	*seq;
3628 
3629 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
3630 
3631 	assert(spdk_get_thread() == bs->md_thread);
3632 
3633 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3634 	cpl.u.blob_basic.cb_fn = cb_fn;
3635 	cpl.u.blob_basic.cb_arg = cb_arg;
3636 
3637 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3638 	if (!seq) {
3639 		cb_fn(cb_arg, -ENOMEM);
3640 		return;
3641 	}
3642 
3643 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
3644 }
3645 
3646 /* END spdk_bs_delete_blob */
3647 
3648 /* START spdk_bs_open_blob */
3649 
3650 static void
3651 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3652 {
3653 	struct spdk_blob *blob = cb_arg;
3654 
3655 	/* If the blob have crc error, we just return NULL. */
3656 	if (blob == NULL) {
3657 		seq->cpl.u.blob_handle.blob = NULL;
3658 		spdk_bs_sequence_finish(seq, bserrno);
3659 		return;
3660 	}
3661 
3662 	blob->open_ref++;
3663 
3664 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
3665 
3666 	spdk_bs_sequence_finish(seq, bserrno);
3667 }
3668 
3669 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
3670 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3671 {
3672 	struct spdk_blob		*blob;
3673 	struct spdk_bs_cpl		cpl;
3674 	spdk_bs_sequence_t		*seq;
3675 	uint32_t			page_num;
3676 
3677 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
3678 	assert(spdk_get_thread() == bs->md_thread);
3679 
3680 	page_num = _spdk_bs_blobid_to_page(blobid);
3681 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
3682 		/* Invalid blobid */
3683 		cb_fn(cb_arg, NULL, -ENOENT);
3684 		return;
3685 	}
3686 
3687 	blob = _spdk_blob_lookup(bs, blobid);
3688 	if (blob) {
3689 		blob->open_ref++;
3690 		cb_fn(cb_arg, blob, 0);
3691 		return;
3692 	}
3693 
3694 	blob = _spdk_blob_alloc(bs, blobid);
3695 	if (!blob) {
3696 		cb_fn(cb_arg, NULL, -ENOMEM);
3697 		return;
3698 	}
3699 
3700 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
3701 	cpl.u.blob_handle.cb_fn = cb_fn;
3702 	cpl.u.blob_handle.cb_arg = cb_arg;
3703 	cpl.u.blob_handle.blob = blob;
3704 
3705 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3706 	if (!seq) {
3707 		_spdk_blob_free(blob);
3708 		cb_fn(cb_arg, NULL, -ENOMEM);
3709 		return;
3710 	}
3711 
3712 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
3713 }
3714 /* END spdk_bs_open_blob */
3715 
3716 /* START spdk_blob_set_read_only */
3717 int spdk_blob_set_read_only(struct spdk_blob *blob)
3718 {
3719 	_spdk_blob_verify_md_op(blob);
3720 
3721 	blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
3722 
3723 	blob->state = SPDK_BLOB_STATE_DIRTY;
3724 	return 0;
3725 }
3726 /* END spdk_blob_set_read_only */
3727 
3728 /* START spdk_blob_sync_md */
3729 
3730 static void
3731 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3732 {
3733 	struct spdk_blob *blob = cb_arg;
3734 
3735 	if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
3736 		blob->data_ro = true;
3737 		blob->md_ro = true;
3738 	}
3739 
3740 	spdk_bs_sequence_finish(seq, bserrno);
3741 }
3742 
3743 static void
3744 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3745 {
3746 	struct spdk_bs_cpl	cpl;
3747 	spdk_bs_sequence_t	*seq;
3748 
3749 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3750 	cpl.u.blob_basic.cb_fn = cb_fn;
3751 	cpl.u.blob_basic.cb_arg = cb_arg;
3752 
3753 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3754 	if (!seq) {
3755 		cb_fn(cb_arg, -ENOMEM);
3756 		return;
3757 	}
3758 
3759 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
3760 }
3761 
3762 void
3763 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3764 {
3765 	_spdk_blob_verify_md_op(blob);
3766 
3767 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
3768 
3769 	if (blob->md_ro) {
3770 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
3771 		cb_fn(cb_arg, 0);
3772 		return;
3773 	}
3774 
3775 	_spdk_blob_sync_md(blob, cb_fn, cb_arg);
3776 }
3777 
3778 /* END spdk_blob_sync_md */
3779 
3780 struct spdk_blob_insert_cluster_ctx {
3781 	struct spdk_thread	*thread;
3782 	struct spdk_blob	*blob;
3783 	uint32_t		cluster_num;	/* cluster index in blob */
3784 	uint32_t		cluster;	/* cluster on disk */
3785 	int			rc;
3786 	spdk_blob_op_complete	cb_fn;
3787 	void			*cb_arg;
3788 };
3789 
3790 static void
3791 _spdk_blob_insert_cluster_msg_cpl(void *arg)
3792 {
3793 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3794 
3795 	ctx->cb_fn(ctx->cb_arg, ctx->rc);
3796 	free(ctx);
3797 }
3798 
3799 static void
3800 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
3801 {
3802 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3803 
3804 	ctx->rc = bserrno;
3805 	spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
3806 }
3807 
3808 static void
3809 _spdk_blob_insert_cluster_msg(void *arg)
3810 {
3811 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3812 
3813 	ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
3814 	if (ctx->rc != 0) {
3815 		spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
3816 		return;
3817 	}
3818 
3819 	ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
3820 	_spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
3821 }
3822 
3823 void
3824 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
3825 				       uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg)
3826 {
3827 	struct spdk_blob_insert_cluster_ctx *ctx;
3828 
3829 	ctx = calloc(1, sizeof(*ctx));
3830 	if (ctx == NULL) {
3831 		cb_fn(cb_arg, -ENOMEM);
3832 		return;
3833 	}
3834 
3835 	ctx->thread = spdk_get_thread();
3836 	ctx->blob = blob;
3837 	ctx->cluster_num = cluster_num;
3838 	ctx->cluster = cluster;
3839 	ctx->cb_fn = cb_fn;
3840 	ctx->cb_arg = cb_arg;
3841 
3842 	spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
3843 }
3844 
3845 /* START spdk_blob_close */
3846 
3847 static void
3848 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3849 {
3850 	struct spdk_blob *blob = cb_arg;
3851 
3852 	if (bserrno == 0) {
3853 		blob->open_ref--;
3854 		if (blob->open_ref == 0) {
3855 			/*
3856 			 * Blobs with active.num_pages == 0 are deleted blobs.
3857 			 *  these blobs are removed from the blob_store list
3858 			 *  when the deletion process starts - so don't try to
3859 			 *  remove them again.
3860 			 */
3861 			if (blob->active.num_pages > 0) {
3862 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3863 			}
3864 			_spdk_blob_free(blob);
3865 		}
3866 	}
3867 
3868 	spdk_bs_sequence_finish(seq, bserrno);
3869 }
3870 
3871 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3872 {
3873 	struct spdk_bs_cpl	cpl;
3874 	spdk_bs_sequence_t	*seq;
3875 
3876 	_spdk_blob_verify_md_op(blob);
3877 
3878 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
3879 
3880 	if (blob->open_ref == 0) {
3881 		cb_fn(cb_arg, -EBADF);
3882 		return;
3883 	}
3884 
3885 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3886 	cpl.u.blob_basic.cb_fn = cb_fn;
3887 	cpl.u.blob_basic.cb_arg = cb_arg;
3888 
3889 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3890 	if (!seq) {
3891 		cb_fn(cb_arg, -ENOMEM);
3892 		return;
3893 	}
3894 
3895 	/* Sync metadata */
3896 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
3897 }
3898 
3899 /* END spdk_blob_close */
3900 
3901 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
3902 {
3903 	return spdk_get_io_channel(bs);
3904 }
3905 
3906 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
3907 {
3908 	spdk_put_io_channel(channel);
3909 }
3910 
3911 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel,
3912 			uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3913 {
3914 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3915 				     SPDK_BLOB_UNMAP);
3916 }
3917 
3918 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel,
3919 			       uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3920 {
3921 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3922 				     SPDK_BLOB_WRITE_ZEROES);
3923 }
3924 
3925 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel,
3926 			void *payload, uint64_t offset, uint64_t length,
3927 			spdk_blob_op_complete cb_fn, void *cb_arg)
3928 {
3929 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3930 				     SPDK_BLOB_WRITE);
3931 }
3932 
3933 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel,
3934 		       void *payload, uint64_t offset, uint64_t length,
3935 		       spdk_blob_op_complete cb_fn, void *cb_arg)
3936 {
3937 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3938 				     SPDK_BLOB_READ);
3939 }
3940 
3941 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel,
3942 			 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3943 			 spdk_blob_op_complete cb_fn, void *cb_arg)
3944 {
3945 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
3946 }
3947 
3948 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel,
3949 			struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3950 			spdk_blob_op_complete cb_fn, void *cb_arg)
3951 {
3952 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
3953 }
3954 
3955 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3956 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3957 {
3958 	spdk_blob_io_unmap(blob, channel, offset, length, cb_fn, cb_arg);
3959 }
3960 
3961 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3962 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3963 {
3964 	spdk_blob_io_write_zeroes(blob, channel, offset, length, cb_fn, cb_arg);
3965 }
3966 
3967 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3968 			   void *payload, uint64_t offset, uint64_t length,
3969 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3970 {
3971 	spdk_blob_io_write(blob, channel, payload, offset, length, cb_fn, cb_arg);
3972 }
3973 
3974 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3975 			  void *payload, uint64_t offset, uint64_t length,
3976 			  spdk_blob_op_complete cb_fn, void *cb_arg)
3977 {
3978 	spdk_blob_io_read(blob, channel, payload, offset, length, cb_fn, cb_arg);
3979 }
3980 
3981 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3982 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3983 			    spdk_blob_op_complete cb_fn, void *cb_arg)
3984 {
3985 	spdk_blob_io_writev(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg);
3986 }
3987 
3988 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3989 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3990 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3991 {
3992 	spdk_blob_io_readv(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg);
3993 }
3994 
3995 struct spdk_bs_iter_ctx {
3996 	int64_t page_num;
3997 	struct spdk_blob_store *bs;
3998 
3999 	spdk_blob_op_with_handle_complete cb_fn;
4000 	void *cb_arg;
4001 };
4002 
4003 static void
4004 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4005 {
4006 	struct spdk_bs_iter_ctx *ctx = cb_arg;
4007 	struct spdk_blob_store *bs = ctx->bs;
4008 	spdk_blob_id id;
4009 
4010 	if (bserrno == 0) {
4011 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
4012 		free(ctx);
4013 		return;
4014 	}
4015 
4016 	ctx->page_num++;
4017 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
4018 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
4019 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
4020 		free(ctx);
4021 		return;
4022 	}
4023 
4024 	id = _spdk_bs_page_to_blobid(ctx->page_num);
4025 
4026 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
4027 }
4028 
4029 void
4030 spdk_bs_iter_first(struct spdk_blob_store *bs,
4031 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
4032 {
4033 	struct spdk_bs_iter_ctx *ctx;
4034 
4035 	ctx = calloc(1, sizeof(*ctx));
4036 	if (!ctx) {
4037 		cb_fn(cb_arg, NULL, -ENOMEM);
4038 		return;
4039 	}
4040 
4041 	ctx->page_num = -1;
4042 	ctx->bs = bs;
4043 	ctx->cb_fn = cb_fn;
4044 	ctx->cb_arg = cb_arg;
4045 
4046 	_spdk_bs_iter_cpl(ctx, NULL, -1);
4047 }
4048 
4049 static void
4050 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
4051 {
4052 	struct spdk_bs_iter_ctx *ctx = cb_arg;
4053 
4054 	_spdk_bs_iter_cpl(ctx, NULL, -1);
4055 }
4056 
4057 void
4058 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob,
4059 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
4060 {
4061 	struct spdk_bs_iter_ctx *ctx;
4062 
4063 	assert(blob != NULL);
4064 
4065 	ctx = calloc(1, sizeof(*ctx));
4066 	if (!ctx) {
4067 		cb_fn(cb_arg, NULL, -ENOMEM);
4068 		return;
4069 	}
4070 
4071 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
4072 	ctx->bs = bs;
4073 	ctx->cb_fn = cb_fn;
4074 	ctx->cb_arg = cb_arg;
4075 
4076 	/* Close the existing blob */
4077 	spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx);
4078 }
4079 
4080 static int
4081 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
4082 		     uint16_t value_len, bool internal)
4083 {
4084 	struct spdk_xattr_tailq *xattrs;
4085 	struct spdk_xattr	*xattr;
4086 
4087 	_spdk_blob_verify_md_op(blob);
4088 
4089 	if (blob->md_ro) {
4090 		return -EPERM;
4091 	}
4092 
4093 	if (internal) {
4094 		xattrs = &blob->xattrs_internal;
4095 		blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR;
4096 	} else {
4097 		xattrs = &blob->xattrs;
4098 	}
4099 
4100 	TAILQ_FOREACH(xattr, xattrs, link) {
4101 		if (!strcmp(name, xattr->name)) {
4102 			free(xattr->value);
4103 			xattr->value_len = value_len;
4104 			xattr->value = malloc(value_len);
4105 			memcpy(xattr->value, value, value_len);
4106 
4107 			blob->state = SPDK_BLOB_STATE_DIRTY;
4108 
4109 			return 0;
4110 		}
4111 	}
4112 
4113 	xattr = calloc(1, sizeof(*xattr));
4114 	if (!xattr) {
4115 		return -1;
4116 	}
4117 	xattr->name = strdup(name);
4118 	xattr->value_len = value_len;
4119 	xattr->value = malloc(value_len);
4120 	memcpy(xattr->value, value, value_len);
4121 	TAILQ_INSERT_TAIL(xattrs, xattr, link);
4122 
4123 	blob->state = SPDK_BLOB_STATE_DIRTY;
4124 
4125 	return 0;
4126 }
4127 
4128 int
4129 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
4130 		    uint16_t value_len)
4131 {
4132 	return _spdk_blob_set_xattr(blob, name, value, value_len, false);
4133 }
4134 
4135 static int
4136 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal)
4137 {
4138 	struct spdk_xattr_tailq *xattrs;
4139 	struct spdk_xattr	*xattr;
4140 
4141 	_spdk_blob_verify_md_op(blob);
4142 
4143 	if (blob->md_ro) {
4144 		return -EPERM;
4145 	}
4146 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
4147 
4148 	TAILQ_FOREACH(xattr, xattrs, link) {
4149 		if (!strcmp(name, xattr->name)) {
4150 			TAILQ_REMOVE(xattrs, xattr, link);
4151 			free(xattr->value);
4152 			free(xattr->name);
4153 			free(xattr);
4154 
4155 			if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) {
4156 				blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR;
4157 			}
4158 			blob->state = SPDK_BLOB_STATE_DIRTY;
4159 
4160 			return 0;
4161 		}
4162 	}
4163 
4164 	return -ENOENT;
4165 }
4166 
4167 int
4168 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name)
4169 {
4170 	return _spdk_blob_remove_xattr(blob, name, false);
4171 }
4172 
4173 static int
4174 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
4175 			   const void **value, size_t *value_len, bool internal)
4176 {
4177 	struct spdk_xattr	*xattr;
4178 	struct spdk_xattr_tailq *xattrs;
4179 
4180 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
4181 
4182 	TAILQ_FOREACH(xattr, xattrs, link) {
4183 		if (!strcmp(name, xattr->name)) {
4184 			*value = xattr->value;
4185 			*value_len = xattr->value_len;
4186 			return 0;
4187 		}
4188 	}
4189 	return -ENOENT;
4190 }
4191 
4192 int
4193 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
4194 			  const void **value, size_t *value_len)
4195 {
4196 	_spdk_blob_verify_md_op(blob);
4197 
4198 	return _spdk_blob_get_xattr_value(blob, name, value, value_len, false);
4199 }
4200 
4201 struct spdk_xattr_names {
4202 	uint32_t	count;
4203 	const char	*names[0];
4204 };
4205 
4206 static int
4207 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names)
4208 {
4209 	struct spdk_xattr	*xattr;
4210 	int			count = 0;
4211 
4212 	TAILQ_FOREACH(xattr, xattrs, link) {
4213 		count++;
4214 	}
4215 
4216 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
4217 	if (*names == NULL) {
4218 		return -ENOMEM;
4219 	}
4220 
4221 	TAILQ_FOREACH(xattr, xattrs, link) {
4222 		(*names)->names[(*names)->count++] = xattr->name;
4223 	}
4224 
4225 	return 0;
4226 }
4227 
4228 int
4229 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names)
4230 {
4231 	_spdk_blob_verify_md_op(blob);
4232 
4233 	return _spdk_blob_get_xattr_names(&blob->xattrs, names);
4234 }
4235 
4236 uint32_t
4237 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
4238 {
4239 	assert(names != NULL);
4240 
4241 	return names->count;
4242 }
4243 
4244 const char *
4245 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
4246 {
4247 	if (index >= names->count) {
4248 		return NULL;
4249 	}
4250 
4251 	return names->names[index];
4252 }
4253 
4254 void
4255 spdk_xattr_names_free(struct spdk_xattr_names *names)
4256 {
4257 	free(names);
4258 }
4259 
4260 struct spdk_bs_type
4261 spdk_bs_get_bstype(struct spdk_blob_store *bs)
4262 {
4263 	return bs->bstype;
4264 }
4265 
4266 void
4267 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
4268 {
4269 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
4270 }
4271 
4272 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
4273