xref: /spdk/lib/blob/blobstore.c (revision 463925ff0ff9e52b2491b268e91fa3ff88f27e09)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
53 void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
54 		uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg);
55 
56 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
57 				uint16_t value_len, bool internal);
58 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
59 				      const void **value, size_t *value_len, bool internal);
60 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
61 
62 static void
63 _spdk_blob_verify_md_op(struct spdk_blob *blob)
64 {
65 	assert(blob != NULL);
66 	assert(spdk_get_thread() == blob->bs->md_thread);
67 	assert(blob->state != SPDK_BLOB_STATE_LOADING);
68 }
69 
70 static inline size_t
71 divide_round_up(size_t num, size_t divisor)
72 {
73 	return (num + divisor - 1) / divisor;
74 }
75 
76 static void
77 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
78 {
79 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
80 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
81 	assert(bs->num_free_clusters > 0);
82 
83 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
84 
85 	spdk_bit_array_set(bs->used_clusters, cluster_num);
86 	bs->num_free_clusters--;
87 }
88 
89 static int
90 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster)
91 {
92 	uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
93 
94 	_spdk_blob_verify_md_op(blob);
95 
96 	if (*cluster_lba != 0) {
97 		return -EEXIST;
98 	}
99 
100 	*cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
101 	return 0;
102 }
103 
104 static int
105 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
106 			  uint64_t *lowest_free_cluster, bool update_map)
107 {
108 	pthread_mutex_lock(&blob->bs->used_clusters_mutex);
109 	*lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
110 			       *lowest_free_cluster);
111 	if (*lowest_free_cluster >= blob->bs->total_clusters) {
112 		/* No more free clusters. Cannot satisfy the request */
113 		pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
114 		return -ENOSPC;
115 	}
116 
117 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
118 	_spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
119 	pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
120 
121 	if (update_map) {
122 		_spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
123 	}
124 
125 	return 0;
126 }
127 
128 static void
129 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
130 {
131 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
132 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
133 	assert(bs->num_free_clusters < bs->total_clusters);
134 
135 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
136 
137 	pthread_mutex_lock(&bs->used_clusters_mutex);
138 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
139 	bs->num_free_clusters++;
140 	pthread_mutex_unlock(&bs->used_clusters_mutex);
141 }
142 
143 static void
144 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs)
145 {
146 	xattrs->count = 0;
147 	xattrs->names = NULL;
148 	xattrs->ctx = NULL;
149 	xattrs->get_value = NULL;
150 }
151 
152 void
153 spdk_blob_opts_init(struct spdk_blob_opts *opts)
154 {
155 	opts->num_clusters = 0;
156 	opts->thin_provision = false;
157 	_spdk_blob_xattrs_init(&opts->xattrs);
158 }
159 
160 static struct spdk_blob *
161 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
162 {
163 	struct spdk_blob *blob;
164 
165 	blob = calloc(1, sizeof(*blob));
166 	if (!blob) {
167 		return NULL;
168 	}
169 
170 	blob->id = id;
171 	blob->bs = bs;
172 
173 	blob->state = SPDK_BLOB_STATE_DIRTY;
174 	blob->active.num_pages = 1;
175 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
176 	if (!blob->active.pages) {
177 		free(blob);
178 		return NULL;
179 	}
180 
181 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
182 
183 	TAILQ_INIT(&blob->xattrs);
184 	TAILQ_INIT(&blob->xattrs_internal);
185 
186 	return blob;
187 }
188 
189 static void
190 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
191 {
192 	struct spdk_xattr	*xattr, *xattr_tmp;
193 
194 	TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) {
195 		TAILQ_REMOVE(xattrs, xattr, link);
196 		free(xattr->name);
197 		free(xattr->value);
198 		free(xattr);
199 	}
200 }
201 
202 static void
203 _spdk_blob_free(struct spdk_blob *blob)
204 {
205 	assert(blob != NULL);
206 
207 	free(blob->active.clusters);
208 	free(blob->clean.clusters);
209 	free(blob->active.pages);
210 	free(blob->clean.pages);
211 
212 	_spdk_xattrs_free(&blob->xattrs);
213 	_spdk_xattrs_free(&blob->xattrs_internal);
214 
215 	if (blob->back_bs_dev) {
216 		blob->back_bs_dev->destroy(blob->back_bs_dev);
217 	}
218 
219 	free(blob);
220 }
221 
222 static int
223 _spdk_blob_mark_clean(struct spdk_blob *blob)
224 {
225 	uint64_t *clusters = NULL;
226 	uint32_t *pages = NULL;
227 
228 	assert(blob != NULL);
229 
230 	if (blob->active.num_clusters) {
231 		assert(blob->active.clusters);
232 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
233 		if (!clusters) {
234 			return -1;
235 		}
236 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
237 	}
238 
239 	if (blob->active.num_pages) {
240 		assert(blob->active.pages);
241 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
242 		if (!pages) {
243 			free(clusters);
244 			return -1;
245 		}
246 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
247 	}
248 
249 	free(blob->clean.clusters);
250 	free(blob->clean.pages);
251 
252 	blob->clean.num_clusters = blob->active.num_clusters;
253 	blob->clean.clusters = blob->active.clusters;
254 	blob->clean.num_pages = blob->active.num_pages;
255 	blob->clean.pages = blob->active.pages;
256 
257 	blob->active.clusters = clusters;
258 	blob->active.pages = pages;
259 
260 	/* If the metadata was dirtied again while the metadata was being written to disk,
261 	 *  we do not want to revert the DIRTY state back to CLEAN here.
262 	 */
263 	if (blob->state == SPDK_BLOB_STATE_LOADING) {
264 		blob->state = SPDK_BLOB_STATE_CLEAN;
265 	}
266 
267 	return 0;
268 }
269 
270 static int
271 _spdk_blob_deserialize_xattr(struct spdk_blob *blob,
272 			     struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal)
273 {
274 	struct spdk_xattr                       *xattr;
275 
276 	if (desc_xattr->length != sizeof(desc_xattr->name_length) +
277 	    sizeof(desc_xattr->value_length) +
278 	    desc_xattr->name_length + desc_xattr->value_length) {
279 		return -EINVAL;
280 	}
281 
282 	xattr = calloc(1, sizeof(*xattr));
283 	if (xattr == NULL) {
284 		return -ENOMEM;
285 	}
286 
287 	xattr->name = malloc(desc_xattr->name_length + 1);
288 	if (xattr->name == NULL) {
289 		free(xattr);
290 		return -ENOMEM;
291 	}
292 	strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
293 	xattr->name[desc_xattr->name_length] = '\0';
294 
295 	xattr->value = malloc(desc_xattr->value_length);
296 	if (xattr->value == NULL) {
297 		free(xattr->name);
298 		free(xattr);
299 		return -ENOMEM;
300 	}
301 	xattr->value_len = desc_xattr->value_length;
302 	memcpy(xattr->value,
303 	       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
304 	       desc_xattr->value_length);
305 
306 	TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link);
307 
308 	return 0;
309 }
310 
311 
312 static int
313 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
314 {
315 	struct spdk_blob_md_descriptor *desc;
316 	size_t	cur_desc = 0;
317 	void *tmp;
318 
319 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
320 	while (cur_desc < sizeof(page->descriptors)) {
321 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
322 			if (desc->length == 0) {
323 				/* If padding and length are 0, this terminates the page */
324 				break;
325 			}
326 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
327 			struct spdk_blob_md_descriptor_flags	*desc_flags;
328 
329 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
330 
331 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
332 				return -EINVAL;
333 			}
334 
335 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
336 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
337 				return -EINVAL;
338 			}
339 
340 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
341 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
342 				blob->data_ro = true;
343 				blob->md_ro = true;
344 			}
345 
346 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
347 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
348 				blob->md_ro = true;
349 			}
350 
351 			if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
352 				blob->data_ro = true;
353 				blob->md_ro = true;
354 			}
355 
356 			blob->invalid_flags = desc_flags->invalid_flags;
357 			blob->data_ro_flags = desc_flags->data_ro_flags;
358 			blob->md_ro_flags = desc_flags->md_ro_flags;
359 
360 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
361 			struct spdk_blob_md_descriptor_extent	*desc_extent;
362 			unsigned int				i, j;
363 			unsigned int				cluster_count = blob->active.num_clusters;
364 
365 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
366 
367 			if (desc_extent->length == 0 ||
368 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
369 				return -EINVAL;
370 			}
371 
372 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
373 				for (j = 0; j < desc_extent->extents[i].length; j++) {
374 					if (!spdk_bit_array_get(blob->bs->used_clusters,
375 								desc_extent->extents[i].cluster_idx + j)) {
376 						return -EINVAL;
377 					}
378 					cluster_count++;
379 				}
380 			}
381 
382 			if (cluster_count == 0) {
383 				return -EINVAL;
384 			}
385 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
386 			if (tmp == NULL) {
387 				return -ENOMEM;
388 			}
389 			blob->active.clusters = tmp;
390 			blob->active.cluster_array_size = cluster_count;
391 
392 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
393 				for (j = 0; j < desc_extent->extents[i].length; j++) {
394 					if (desc_extent->extents[i].cluster_idx != 0) {
395 						blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
396 								desc_extent->extents[i].cluster_idx + j);
397 					} else if (spdk_blob_is_thin_provisioned(blob)) {
398 						blob->active.clusters[blob->active.num_clusters++] = 0;
399 					} else {
400 						return -EINVAL;
401 					}
402 				}
403 			}
404 
405 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
406 			int rc;
407 
408 			rc = _spdk_blob_deserialize_xattr(blob,
409 							  (struct spdk_blob_md_descriptor_xattr *) desc, false);
410 			if (rc != 0) {
411 				return rc;
412 			}
413 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
414 			int rc;
415 
416 			rc = _spdk_blob_deserialize_xattr(blob,
417 							  (struct spdk_blob_md_descriptor_xattr *) desc, true);
418 			if (rc != 0) {
419 				return rc;
420 			}
421 		} else {
422 			/* Unrecognized descriptor type.  Do not fail - just continue to the
423 			 *  next descriptor.  If this descriptor is associated with some feature
424 			 *  defined in a newer version of blobstore, that version of blobstore
425 			 *  should create and set an associated feature flag to specify if this
426 			 *  blob can be loaded or not.
427 			 */
428 		}
429 
430 		/* Advance to the next descriptor */
431 		cur_desc += sizeof(*desc) + desc->length;
432 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
433 			break;
434 		}
435 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
436 	}
437 
438 	return 0;
439 }
440 
441 static int
442 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
443 		 struct spdk_blob *blob)
444 {
445 	const struct spdk_blob_md_page *page;
446 	uint32_t i;
447 	int rc;
448 
449 	assert(page_count > 0);
450 	assert(pages[0].sequence_num == 0);
451 	assert(blob != NULL);
452 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
453 	assert(blob->active.clusters == NULL);
454 
455 	/* The blobid provided doesn't match what's in the MD, this can
456 	 * happen for example if a bogus blobid is passed in through open.
457 	 */
458 	if (blob->id != pages[0].id) {
459 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
460 			    blob->id, pages[0].id);
461 		return -ENOENT;
462 	}
463 
464 	for (i = 0; i < page_count; i++) {
465 		page = &pages[i];
466 
467 		assert(page->id == blob->id);
468 		assert(page->sequence_num == i);
469 
470 		rc = _spdk_blob_parse_page(page, blob);
471 		if (rc != 0) {
472 			return rc;
473 		}
474 	}
475 
476 	return 0;
477 }
478 
479 static int
480 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
481 			      struct spdk_blob_md_page **pages,
482 			      uint32_t *page_count,
483 			      struct spdk_blob_md_page **last_page)
484 {
485 	struct spdk_blob_md_page *page;
486 
487 	assert(pages != NULL);
488 	assert(page_count != NULL);
489 
490 	if (*page_count == 0) {
491 		assert(*pages == NULL);
492 		*page_count = 1;
493 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
494 					 SPDK_BS_PAGE_SIZE,
495 					 NULL);
496 	} else {
497 		assert(*pages != NULL);
498 		(*page_count)++;
499 		*pages = spdk_dma_realloc(*pages,
500 					  SPDK_BS_PAGE_SIZE * (*page_count),
501 					  SPDK_BS_PAGE_SIZE,
502 					  NULL);
503 	}
504 
505 	if (*pages == NULL) {
506 		*page_count = 0;
507 		*last_page = NULL;
508 		return -ENOMEM;
509 	}
510 
511 	page = &(*pages)[*page_count - 1];
512 	memset(page, 0, sizeof(*page));
513 	page->id = blob->id;
514 	page->sequence_num = *page_count - 1;
515 	page->next = SPDK_INVALID_MD_PAGE;
516 	*last_page = page;
517 
518 	return 0;
519 }
520 
521 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
522  * Update required_sz on both success and failure.
523  *
524  */
525 static int
526 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
527 			   uint8_t *buf, size_t buf_sz,
528 			   size_t *required_sz, bool internal)
529 {
530 	struct spdk_blob_md_descriptor_xattr	*desc;
531 
532 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
533 		       strlen(xattr->name) +
534 		       xattr->value_len;
535 
536 	if (buf_sz < *required_sz) {
537 		return -1;
538 	}
539 
540 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
541 
542 	desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR;
543 	desc->length = sizeof(desc->name_length) +
544 		       sizeof(desc->value_length) +
545 		       strlen(xattr->name) +
546 		       xattr->value_len;
547 	desc->name_length = strlen(xattr->name);
548 	desc->value_length = xattr->value_len;
549 
550 	memcpy(desc->name, xattr->name, desc->name_length);
551 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
552 	       xattr->value,
553 	       desc->value_length);
554 
555 	return 0;
556 }
557 
558 static void
559 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
560 			    uint64_t start_cluster, uint64_t *next_cluster,
561 			    uint8_t *buf, size_t buf_sz)
562 {
563 	struct spdk_blob_md_descriptor_extent *desc;
564 	size_t cur_sz;
565 	uint64_t i, extent_idx;
566 	uint32_t lba, lba_per_cluster, lba_count;
567 
568 	/* The buffer must have room for at least one extent */
569 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
570 	if (buf_sz < cur_sz) {
571 		*next_cluster = start_cluster;
572 		return;
573 	}
574 
575 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
576 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
577 
578 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
579 
580 	lba = blob->active.clusters[start_cluster];
581 	lba_count = lba_per_cluster;
582 	extent_idx = 0;
583 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
584 		if ((lba + lba_count) == blob->active.clusters[i]) {
585 			lba_count += lba_per_cluster;
586 			continue;
587 		}
588 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
589 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
590 		extent_idx++;
591 
592 		cur_sz += sizeof(desc->extents[extent_idx]);
593 
594 		if (buf_sz < cur_sz) {
595 			/* If we ran out of buffer space, return */
596 			desc->length = sizeof(desc->extents[0]) * extent_idx;
597 			*next_cluster = i;
598 			return;
599 		}
600 
601 		lba = blob->active.clusters[i];
602 		lba_count = lba_per_cluster;
603 	}
604 
605 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
606 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
607 	extent_idx++;
608 
609 	desc->length = sizeof(desc->extents[0]) * extent_idx;
610 	*next_cluster = blob->active.num_clusters;
611 
612 	return;
613 }
614 
615 static void
616 _spdk_blob_serialize_flags(const struct spdk_blob *blob,
617 			   uint8_t *buf, size_t *buf_sz)
618 {
619 	struct spdk_blob_md_descriptor_flags *desc;
620 
621 	/*
622 	 * Flags get serialized first, so we should always have room for the flags
623 	 *  descriptor.
624 	 */
625 	assert(*buf_sz >= sizeof(*desc));
626 
627 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
628 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
629 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
630 	desc->invalid_flags = blob->invalid_flags;
631 	desc->data_ro_flags = blob->data_ro_flags;
632 	desc->md_ro_flags = blob->md_ro_flags;
633 
634 	*buf_sz -= sizeof(*desc);
635 }
636 
637 static int
638 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob,
639 			    const struct spdk_xattr_tailq *xattrs, bool internal,
640 			    struct spdk_blob_md_page **pages,
641 			    struct spdk_blob_md_page *cur_page,
642 			    uint32_t *page_count, uint8_t **buf,
643 			    size_t *remaining_sz)
644 {
645 	const struct spdk_xattr	*xattr;
646 	int	rc;
647 
648 	TAILQ_FOREACH(xattr, xattrs, link) {
649 		size_t required_sz = 0;
650 
651 		rc = _spdk_blob_serialize_xattr(xattr,
652 						*buf, *remaining_sz,
653 						&required_sz, internal);
654 		if (rc < 0) {
655 			/* Need to add a new page to the chain */
656 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
657 							   &cur_page);
658 			if (rc < 0) {
659 				spdk_dma_free(*pages);
660 				*pages = NULL;
661 				*page_count = 0;
662 				return rc;
663 			}
664 
665 			*buf = (uint8_t *)cur_page->descriptors;
666 			*remaining_sz = sizeof(cur_page->descriptors);
667 
668 			/* Try again */
669 			required_sz = 0;
670 			rc = _spdk_blob_serialize_xattr(xattr,
671 							*buf, *remaining_sz,
672 							&required_sz, internal);
673 
674 			if (rc < 0) {
675 				spdk_dma_free(*pages);
676 				*pages = NULL;
677 				*page_count = 0;
678 				return -1;
679 			}
680 		}
681 
682 		*remaining_sz -= required_sz;
683 		*buf += required_sz;
684 	}
685 
686 	return 0;
687 }
688 
689 static int
690 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
691 		     uint32_t *page_count)
692 {
693 	struct spdk_blob_md_page		*cur_page;
694 	int					rc;
695 	uint8_t					*buf;
696 	size_t					remaining_sz;
697 	uint64_t				last_cluster;
698 
699 	assert(pages != NULL);
700 	assert(page_count != NULL);
701 	assert(blob != NULL);
702 	assert(blob->state == SPDK_BLOB_STATE_DIRTY);
703 
704 	*pages = NULL;
705 	*page_count = 0;
706 
707 	/* A blob always has at least 1 page, even if it has no descriptors */
708 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
709 	if (rc < 0) {
710 		return rc;
711 	}
712 
713 	buf = (uint8_t *)cur_page->descriptors;
714 	remaining_sz = sizeof(cur_page->descriptors);
715 
716 	/* Serialize flags */
717 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
718 	buf += sizeof(struct spdk_blob_md_descriptor_flags);
719 
720 	/* Serialize xattrs */
721 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false,
722 					 pages, cur_page, page_count, &buf, &remaining_sz);
723 	if (rc < 0) {
724 		return rc;
725 	}
726 
727 	/* Serialize internal xattrs */
728 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true,
729 					 pages, cur_page, page_count, &buf, &remaining_sz);
730 	if (rc < 0) {
731 		return rc;
732 	}
733 
734 	/* Serialize extents */
735 	last_cluster = 0;
736 	while (last_cluster < blob->active.num_clusters) {
737 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
738 					    buf, remaining_sz);
739 
740 		if (last_cluster == blob->active.num_clusters) {
741 			break;
742 		}
743 
744 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
745 						   &cur_page);
746 		if (rc < 0) {
747 			return rc;
748 		}
749 
750 		buf = (uint8_t *)cur_page->descriptors;
751 		remaining_sz = sizeof(cur_page->descriptors);
752 	}
753 
754 	return 0;
755 }
756 
757 struct spdk_blob_load_ctx {
758 	struct spdk_blob		*blob;
759 
760 	struct spdk_blob_md_page	*pages;
761 	uint32_t			num_pages;
762 	spdk_bs_sequence_t	        *seq;
763 
764 	spdk_bs_sequence_cpl		cb_fn;
765 	void				*cb_arg;
766 };
767 
768 static uint32_t
769 _spdk_blob_md_page_calc_crc(void *page)
770 {
771 	uint32_t		crc;
772 
773 	crc = BLOB_CRC32C_INITIAL;
774 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
775 	crc ^= BLOB_CRC32C_INITIAL;
776 
777 	return crc;
778 
779 }
780 
781 static void
782 _spdk_blob_load_final(void *cb_arg, int bserrno)
783 {
784 	struct spdk_blob_load_ctx	*ctx = cb_arg;
785 	struct spdk_blob		*blob = ctx->blob;
786 
787 	_spdk_blob_mark_clean(blob);
788 
789 	ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno);
790 
791 	/* Free the memory */
792 	spdk_dma_free(ctx->pages);
793 	free(ctx);
794 }
795 
796 static void
797 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno)
798 {
799 	struct spdk_blob_load_ctx	*ctx = cb_arg;
800 	struct spdk_blob		*blob = ctx->blob;
801 
802 	if (bserrno != 0) {
803 		goto error;
804 	}
805 
806 	blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot);
807 
808 	if (blob->back_bs_dev == NULL) {
809 		bserrno = -ENOMEM;
810 		goto error;
811 	}
812 
813 	_spdk_blob_load_final(ctx, bserrno);
814 	return;
815 
816 error:
817 	SPDK_ERRLOG("Snapshot fail\n");
818 	_spdk_blob_free(blob);
819 	ctx->cb_fn(ctx->seq, NULL, bserrno);
820 	spdk_dma_free(ctx->pages);
821 	free(ctx);
822 }
823 
824 static void
825 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
826 {
827 	struct spdk_blob_load_ctx	*ctx = cb_arg;
828 	struct spdk_blob		*blob = ctx->blob;
829 	struct spdk_blob_md_page	*page;
830 	const void			*value;
831 	size_t				len;
832 	int				rc;
833 	uint32_t			crc;
834 
835 	page = &ctx->pages[ctx->num_pages - 1];
836 	crc = _spdk_blob_md_page_calc_crc(page);
837 	if (crc != page->crc) {
838 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
839 		_spdk_blob_free(blob);
840 		ctx->cb_fn(seq, NULL, -EINVAL);
841 		spdk_dma_free(ctx->pages);
842 		free(ctx);
843 		return;
844 	}
845 
846 	if (page->next != SPDK_INVALID_MD_PAGE) {
847 		uint32_t next_page = page->next;
848 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
849 
850 
851 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
852 
853 		/* Read the next page */
854 		ctx->num_pages++;
855 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
856 					      sizeof(*page), NULL);
857 		if (ctx->pages == NULL) {
858 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
859 			free(ctx);
860 			return;
861 		}
862 
863 		spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
864 					  next_lba,
865 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
866 					  _spdk_blob_load_cpl, ctx);
867 		return;
868 	}
869 
870 	/* Parse the pages */
871 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
872 	if (rc) {
873 		_spdk_blob_free(blob);
874 		ctx->cb_fn(seq, NULL, rc);
875 		spdk_dma_free(ctx->pages);
876 		free(ctx);
877 		return;
878 	}
879 	ctx->seq = seq;
880 
881 
882 	if (spdk_blob_is_thin_provisioned(blob)) {
883 		rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true);
884 		if (rc == 0) {
885 			if (len != sizeof(spdk_blob_id)) {
886 				_spdk_blob_free(blob);
887 				ctx->cb_fn(seq, NULL, -EINVAL);
888 				spdk_dma_free(ctx->pages);
889 				free(ctx);
890 				return;
891 			}
892 			/* open snapshot blob and continue in the callback function */
893 			spdk_bs_open_blob(blob->bs, *(spdk_blob_id *)value,
894 					  _spdk_blob_load_snapshot_cpl, ctx);
895 			return;
896 		} else {
897 			/* add zeroes_dev for thin provisioned blob */
898 			blob->back_bs_dev = spdk_bs_create_zeroes_dev();
899 		}
900 	} else {
901 		/* standard blob */
902 		blob->back_bs_dev = NULL;
903 	}
904 	_spdk_blob_load_final(ctx, bserrno);
905 }
906 
907 /* Load a blob from disk given a blobid */
908 static void
909 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
910 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
911 {
912 	struct spdk_blob_load_ctx *ctx;
913 	struct spdk_blob_store *bs;
914 	uint32_t page_num;
915 	uint64_t lba;
916 
917 	_spdk_blob_verify_md_op(blob);
918 
919 	bs = blob->bs;
920 
921 	ctx = calloc(1, sizeof(*ctx));
922 	if (!ctx) {
923 		cb_fn(seq, cb_arg, -ENOMEM);
924 		return;
925 	}
926 
927 	ctx->blob = blob;
928 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
929 				      SPDK_BS_PAGE_SIZE, NULL);
930 	if (!ctx->pages) {
931 		free(ctx);
932 		cb_fn(seq, cb_arg, -ENOMEM);
933 		return;
934 	}
935 	ctx->num_pages = 1;
936 	ctx->cb_fn = cb_fn;
937 	ctx->cb_arg = cb_arg;
938 
939 	page_num = _spdk_bs_blobid_to_page(blob->id);
940 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
941 
942 	blob->state = SPDK_BLOB_STATE_LOADING;
943 
944 	spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
945 				  _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
946 				  _spdk_blob_load_cpl, ctx);
947 }
948 
949 struct spdk_blob_persist_ctx {
950 	struct spdk_blob		*blob;
951 
952 	struct spdk_blob_md_page	*pages;
953 
954 	uint64_t			idx;
955 
956 	spdk_bs_sequence_t		*seq;
957 	spdk_bs_sequence_cpl		cb_fn;
958 	void				*cb_arg;
959 };
960 
961 static void
962 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
963 {
964 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
965 	struct spdk_blob		*blob = ctx->blob;
966 
967 	if (bserrno == 0) {
968 		_spdk_blob_mark_clean(blob);
969 	}
970 
971 	/* Call user callback */
972 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
973 
974 	/* Free the memory */
975 	spdk_dma_free(ctx->pages);
976 	free(ctx);
977 }
978 
979 static void
980 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
981 {
982 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
983 	struct spdk_blob		*blob = ctx->blob;
984 	struct spdk_blob_store		*bs = blob->bs;
985 	void				*tmp;
986 	size_t				i;
987 
988 	/* Release all clusters that were truncated */
989 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
990 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
991 
992 		/* Nothing to release if it was not allocated */
993 		if (blob->active.clusters[i] != 0) {
994 			_spdk_bs_release_cluster(bs, cluster_num);
995 		}
996 	}
997 
998 	if (blob->active.num_clusters == 0) {
999 		free(blob->active.clusters);
1000 		blob->active.clusters = NULL;
1001 		blob->active.cluster_array_size = 0;
1002 	} else {
1003 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
1004 		assert(tmp != NULL);
1005 		blob->active.clusters = tmp;
1006 		blob->active.cluster_array_size = blob->active.num_clusters;
1007 	}
1008 
1009 	_spdk_blob_persist_complete(seq, ctx, bserrno);
1010 }
1011 
1012 static void
1013 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1014 {
1015 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1016 	struct spdk_blob		*blob = ctx->blob;
1017 	struct spdk_blob_store		*bs = blob->bs;
1018 	spdk_bs_batch_t			*batch;
1019 	size_t				i;
1020 	uint64_t			lba;
1021 	uint32_t			lba_count;
1022 
1023 	/* Clusters don't move around in blobs. The list shrinks or grows
1024 	 * at the end, but no changes ever occur in the middle of the list.
1025 	 */
1026 
1027 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
1028 
1029 	/* Unmap all clusters that were truncated */
1030 	lba = 0;
1031 	lba_count = 0;
1032 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1033 		uint64_t next_lba = blob->active.clusters[i];
1034 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
1035 
1036 		if (next_lba > 0 && (lba + lba_count) == next_lba) {
1037 			/* This cluster is contiguous with the previous one. */
1038 			lba_count += next_lba_count;
1039 			continue;
1040 		}
1041 
1042 		/* This cluster is not contiguous with the previous one. */
1043 
1044 		/* If a run of LBAs previously existing, send them
1045 		 * as an unmap.
1046 		 */
1047 		if (lba_count > 0) {
1048 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1049 		}
1050 
1051 		/* Start building the next batch */
1052 		lba = next_lba;
1053 		if (next_lba > 0) {
1054 			lba_count = next_lba_count;
1055 		} else {
1056 			lba_count = 0;
1057 		}
1058 	}
1059 
1060 	/* If we ended with a contiguous set of LBAs, send the unmap now */
1061 	if (lba_count > 0) {
1062 		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1063 	}
1064 
1065 	spdk_bs_batch_close(batch);
1066 }
1067 
1068 static void
1069 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1070 {
1071 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1072 	struct spdk_blob		*blob = ctx->blob;
1073 	struct spdk_blob_store		*bs = blob->bs;
1074 	size_t				i;
1075 
1076 	/* This loop starts at 1 because the first page is special and handled
1077 	 * below. The pages (except the first) are never written in place,
1078 	 * so any pages in the clean list must be zeroed.
1079 	 */
1080 	for (i = 1; i < blob->clean.num_pages; i++) {
1081 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
1082 	}
1083 
1084 	if (blob->active.num_pages == 0) {
1085 		uint32_t page_num;
1086 
1087 		page_num = _spdk_bs_blobid_to_page(blob->id);
1088 		spdk_bit_array_clear(bs->used_md_pages, page_num);
1089 	}
1090 
1091 	/* Move on to unmapping clusters */
1092 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
1093 }
1094 
1095 static void
1096 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1097 {
1098 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1099 	struct spdk_blob		*blob = ctx->blob;
1100 	struct spdk_blob_store		*bs = blob->bs;
1101 	uint64_t			lba;
1102 	uint32_t			lba_count;
1103 	spdk_bs_batch_t			*batch;
1104 	size_t				i;
1105 
1106 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
1107 
1108 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
1109 
1110 	/* This loop starts at 1 because the first page is special and handled
1111 	 * below. The pages (except the first) are never written in place,
1112 	 * so any pages in the clean list must be zeroed.
1113 	 */
1114 	for (i = 1; i < blob->clean.num_pages; i++) {
1115 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
1116 
1117 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1118 	}
1119 
1120 	/* The first page will only be zeroed if this is a delete. */
1121 	if (blob->active.num_pages == 0) {
1122 		uint32_t page_num;
1123 
1124 		/* The first page in the metadata goes where the blobid indicates */
1125 		page_num = _spdk_bs_blobid_to_page(blob->id);
1126 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
1127 
1128 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1129 	}
1130 
1131 	spdk_bs_batch_close(batch);
1132 }
1133 
1134 static void
1135 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1136 {
1137 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1138 	struct spdk_blob		*blob = ctx->blob;
1139 	struct spdk_blob_store		*bs = blob->bs;
1140 	uint64_t			lba;
1141 	uint32_t			lba_count;
1142 	struct spdk_blob_md_page	*page;
1143 
1144 	if (blob->active.num_pages == 0) {
1145 		/* Move on to the next step */
1146 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1147 		return;
1148 	}
1149 
1150 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1151 
1152 	page = &ctx->pages[0];
1153 	/* The first page in the metadata goes where the blobid indicates */
1154 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
1155 
1156 	spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1157 				   _spdk_blob_persist_zero_pages, ctx);
1158 }
1159 
1160 static void
1161 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1162 {
1163 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1164 	struct spdk_blob		*blob = ctx->blob;
1165 	struct spdk_blob_store		*bs = blob->bs;
1166 	uint64_t			lba;
1167 	uint32_t			lba_count;
1168 	struct spdk_blob_md_page	*page;
1169 	spdk_bs_batch_t			*batch;
1170 	size_t				i;
1171 
1172 	/* Clusters don't move around in blobs. The list shrinks or grows
1173 	 * at the end, but no changes ever occur in the middle of the list.
1174 	 */
1175 
1176 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1177 
1178 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1179 
1180 	/* This starts at 1. The root page is not written until
1181 	 * all of the others are finished
1182 	 */
1183 	for (i = 1; i < blob->active.num_pages; i++) {
1184 		page = &ctx->pages[i];
1185 		assert(page->sequence_num == i);
1186 
1187 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
1188 
1189 		spdk_bs_batch_write_dev(batch, page, lba, lba_count);
1190 	}
1191 
1192 	spdk_bs_batch_close(batch);
1193 }
1194 
1195 static int
1196 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz)
1197 {
1198 	uint64_t	i;
1199 	uint64_t	*tmp;
1200 	uint64_t	lfc; /* lowest free cluster */
1201 	uint64_t	num_clusters;
1202 	struct spdk_blob_store *bs;
1203 
1204 	bs = blob->bs;
1205 
1206 	_spdk_blob_verify_md_op(blob);
1207 
1208 	if (blob->active.num_clusters == sz) {
1209 		return 0;
1210 	}
1211 
1212 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1213 		/* If this blob was resized to be larger, then smaller, then
1214 		 * larger without syncing, then the cluster array already
1215 		 * contains spare assigned clusters we can use.
1216 		 */
1217 		num_clusters = spdk_min(blob->active.cluster_array_size,
1218 					sz);
1219 	} else {
1220 		num_clusters = blob->active.num_clusters;
1221 	}
1222 
1223 	/* Do two passes - one to verify that we can obtain enough clusters
1224 	 * and another to actually claim them.
1225 	 */
1226 
1227 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1228 		lfc = 0;
1229 		for (i = num_clusters; i < sz; i++) {
1230 			lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1231 			if (lfc >= bs->total_clusters) {
1232 				/* No more free clusters. Cannot satisfy the request */
1233 				return -ENOSPC;
1234 			}
1235 			lfc++;
1236 		}
1237 	}
1238 
1239 	if (sz > num_clusters) {
1240 		/* Expand the cluster array if necessary.
1241 		 * We only shrink the array when persisting.
1242 		 */
1243 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1244 		if (sz > 0 && tmp == NULL) {
1245 			return -ENOMEM;
1246 		}
1247 		memset(tmp + blob->active.cluster_array_size, 0,
1248 		       sizeof(uint64_t) * (sz - blob->active.cluster_array_size));
1249 		blob->active.clusters = tmp;
1250 		blob->active.cluster_array_size = sz;
1251 	}
1252 
1253 	blob->state = SPDK_BLOB_STATE_DIRTY;
1254 
1255 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1256 		lfc = 0;
1257 		for (i = num_clusters; i < sz; i++) {
1258 			_spdk_bs_allocate_cluster(blob, i, &lfc, true);
1259 			lfc++;
1260 		}
1261 	}
1262 
1263 	blob->active.num_clusters = sz;
1264 
1265 	return 0;
1266 }
1267 
1268 static void
1269 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
1270 {
1271 	spdk_bs_sequence_t *seq = ctx->seq;
1272 	struct spdk_blob *blob = ctx->blob;
1273 	struct spdk_blob_store *bs = blob->bs;
1274 	uint64_t i;
1275 	uint32_t page_num;
1276 	int rc;
1277 
1278 	if (blob->active.num_pages == 0) {
1279 		/* This is the signal that the blob should be deleted.
1280 		 * Immediately jump to the clean up routine. */
1281 		assert(blob->clean.num_pages > 0);
1282 		ctx->idx = blob->clean.num_pages - 1;
1283 		blob->state = SPDK_BLOB_STATE_CLEAN;
1284 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1285 		return;
1286 
1287 	}
1288 
1289 	/* Generate the new metadata */
1290 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1291 	if (rc < 0) {
1292 		_spdk_blob_persist_complete(seq, ctx, rc);
1293 		return;
1294 	}
1295 
1296 	assert(blob->active.num_pages >= 1);
1297 
1298 	/* Resize the cache of page indices */
1299 	blob->active.pages = realloc(blob->active.pages,
1300 				     blob->active.num_pages * sizeof(*blob->active.pages));
1301 	if (!blob->active.pages) {
1302 		_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1303 		return;
1304 	}
1305 
1306 	/* Assign this metadata to pages. This requires two passes -
1307 	 * one to verify that there are enough pages and a second
1308 	 * to actually claim them. */
1309 	page_num = 0;
1310 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1311 	for (i = 1; i < blob->active.num_pages; i++) {
1312 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1313 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1314 			_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1315 			return;
1316 		}
1317 		page_num++;
1318 	}
1319 
1320 	page_num = 0;
1321 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1322 	for (i = 1; i < blob->active.num_pages; i++) {
1323 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1324 		ctx->pages[i - 1].next = page_num;
1325 		/* Now that previous metadata page is complete, calculate the crc for it. */
1326 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1327 		blob->active.pages[i] = page_num;
1328 		spdk_bit_array_set(bs->used_md_pages, page_num);
1329 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1330 		page_num++;
1331 	}
1332 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1333 	/* Start writing the metadata from last page to first */
1334 	ctx->idx = blob->active.num_pages - 1;
1335 	blob->state = SPDK_BLOB_STATE_CLEAN;
1336 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1337 }
1338 
1339 /* Write a blob to disk */
1340 static void
1341 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1342 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1343 {
1344 	struct spdk_blob_persist_ctx *ctx;
1345 
1346 	_spdk_blob_verify_md_op(blob);
1347 
1348 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1349 		cb_fn(seq, cb_arg, 0);
1350 		return;
1351 	}
1352 
1353 	ctx = calloc(1, sizeof(*ctx));
1354 	if (!ctx) {
1355 		cb_fn(seq, cb_arg, -ENOMEM);
1356 		return;
1357 	}
1358 	ctx->blob = blob;
1359 	ctx->seq = seq;
1360 	ctx->cb_fn = cb_fn;
1361 	ctx->cb_arg = cb_arg;
1362 
1363 	_spdk_blob_persist_start(ctx);
1364 }
1365 
1366 struct spdk_blob_copy_cluster_ctx {
1367 	struct spdk_blob *blob;
1368 	uint8_t *buf;
1369 	uint64_t page;
1370 	uint64_t new_cluster;
1371 	spdk_bs_sequence_t *seq;
1372 };
1373 
1374 static void
1375 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno)
1376 {
1377 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1378 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq;
1379 	TAILQ_HEAD(, spdk_bs_request_set) requests;
1380 	spdk_bs_user_op_t *op;
1381 
1382 	TAILQ_INIT(&requests);
1383 	TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link);
1384 
1385 	while (!TAILQ_EMPTY(&requests)) {
1386 		op = TAILQ_FIRST(&requests);
1387 		TAILQ_REMOVE(&requests, op, link);
1388 		if (bserrno == 0) {
1389 			spdk_bs_user_op_execute(op);
1390 		} else {
1391 			spdk_bs_user_op_abort(op);
1392 		}
1393 	}
1394 
1395 	spdk_dma_free(ctx->buf);
1396 	free(ctx);
1397 }
1398 
1399 static void
1400 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno)
1401 {
1402 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1403 
1404 	if (bserrno) {
1405 		uint32_t cluster_number;
1406 
1407 		if (bserrno == -EEXIST) {
1408 			/* The metadata insert failed because another thread
1409 			 * allocated the cluster first. Free our cluster
1410 			 * but continue without error. */
1411 			bserrno = 0;
1412 		}
1413 
1414 		cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1415 		_spdk_bs_release_cluster(ctx->blob->bs, cluster_number);
1416 	}
1417 
1418 	spdk_bs_sequence_finish(ctx->seq, bserrno);
1419 }
1420 
1421 static void
1422 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1423 {
1424 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1425 	uint32_t cluster_number;
1426 
1427 	if (bserrno) {
1428 		/* The write failed, so jump to the final completion handler */
1429 		spdk_bs_sequence_finish(seq, bserrno);
1430 		return;
1431 	}
1432 
1433 	cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1434 
1435 	_spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1436 					       _spdk_blob_insert_cluster_cpl, ctx);
1437 }
1438 
1439 static void
1440 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1441 {
1442 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1443 
1444 	if (bserrno != 0) {
1445 		/* The read failed, so jump to the final completion handler */
1446 		spdk_bs_sequence_finish(seq, bserrno);
1447 		return;
1448 	}
1449 
1450 	/* Write whole cluster */
1451 	spdk_bs_sequence_write_dev(seq, ctx->buf,
1452 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster),
1453 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, 1),
1454 				   _spdk_blob_write_copy_cpl, ctx);
1455 }
1456 
1457 static void
1458 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob,
1459 				   struct spdk_io_channel *_ch,
1460 				   uint64_t offset, spdk_bs_user_op_t *op)
1461 {
1462 	struct spdk_bs_cpl cpl;
1463 	struct spdk_bs_channel *ch;
1464 	struct spdk_blob_copy_cluster_ctx *ctx;
1465 	uint32_t cluster_start_page;
1466 	uint32_t cluster_number;
1467 	int rc;
1468 
1469 	ch = spdk_io_channel_get_ctx(_ch);
1470 
1471 	if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) {
1472 		/* There are already operations pending. Queue this user op
1473 		 * and return because it will be re-executed when the outstanding
1474 		 * cluster allocation completes. */
1475 		TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1476 		return;
1477 	}
1478 
1479 	/* Round the page offset down to the first page in the cluster */
1480 	cluster_start_page = _spdk_bs_page_to_cluster_start(blob, offset);
1481 
1482 	/* Calculate which index in the metadata cluster array the corresponding
1483 	 * cluster is supposed to be at. */
1484 	cluster_number = _spdk_bs_page_to_cluster(blob->bs, cluster_start_page);
1485 
1486 	ctx = calloc(1, sizeof(*ctx));
1487 	if (!ctx) {
1488 		spdk_bs_user_op_abort(op);
1489 		return;
1490 	}
1491 
1492 	assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0);
1493 
1494 	ctx->blob = blob;
1495 	ctx->page = cluster_start_page;
1496 
1497 	ctx->buf = spdk_dma_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, NULL);
1498 	if (!ctx->buf) {
1499 		SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n",
1500 			    blob->bs->cluster_sz);
1501 		free(ctx);
1502 		spdk_bs_user_op_abort(op);
1503 		return;
1504 	}
1505 
1506 	rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false);
1507 	if (rc != 0) {
1508 		spdk_dma_free(ctx->buf);
1509 		free(ctx);
1510 		spdk_bs_user_op_abort(op);
1511 		return;
1512 	}
1513 
1514 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1515 	cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl;
1516 	cpl.u.blob_basic.cb_arg = ctx;
1517 
1518 	ctx->seq = spdk_bs_sequence_start(_ch, &cpl);
1519 	if (!ctx->seq) {
1520 		_spdk_bs_release_cluster(blob->bs, ctx->new_cluster);
1521 		spdk_dma_free(ctx->buf);
1522 		free(ctx);
1523 		spdk_bs_user_op_abort(op);
1524 		return;
1525 	}
1526 
1527 	/* Queue the user op to block other incoming operations */
1528 	TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1529 
1530 	/* Read cluster from backing device */
1531 	spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf,
1532 				     _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page),
1533 				     _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz),
1534 				     _spdk_blob_write_copy, ctx);
1535 }
1536 
1537 static void
1538 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t page, uint64_t length,
1539 				       uint64_t *lba,	uint32_t *lba_count)
1540 {
1541 	*lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1542 
1543 	if (!_spdk_bs_page_is_allocated(blob, page)) {
1544 		assert(blob->back_bs_dev != NULL);
1545 		*lba = _spdk_bs_dev_page_to_lba(blob->back_bs_dev, page);
1546 		*lba_count = _spdk_bs_blob_lba_to_back_dev_lba(blob, *lba_count);
1547 	} else {
1548 		*lba = _spdk_bs_blob_page_to_lba(blob, page);
1549 	}
1550 }
1551 
1552 struct op_split_ctx {
1553 	struct spdk_blob *blob;
1554 	struct spdk_io_channel *channel;
1555 	uint64_t page_offset;
1556 	uint64_t pages_remaining;
1557 	void *curr_payload;
1558 	enum spdk_blob_op_type op_type;
1559 	spdk_bs_sequence_t *seq;
1560 };
1561 
1562 static void
1563 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno)
1564 {
1565 	struct op_split_ctx	*ctx = cb_arg;
1566 	struct spdk_blob	*blob = ctx->blob;
1567 	struct spdk_io_channel	*ch = ctx->channel;
1568 	enum spdk_blob_op_type	op_type = ctx->op_type;
1569 	uint8_t			*buf = ctx->curr_payload;
1570 	uint64_t		offset = ctx->page_offset;
1571 	uint64_t		length = ctx->pages_remaining;
1572 	uint64_t		op_length;
1573 
1574 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1575 		spdk_bs_sequence_finish(ctx->seq, bserrno);
1576 		free(ctx);
1577 		return;
1578 	}
1579 
1580 	op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset));
1581 
1582 	/* Update length and payload for next operation */
1583 	ctx->pages_remaining -= op_length;
1584 	ctx->page_offset += op_length;
1585 	if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1586 		ctx->curr_payload += op_length;
1587 	}
1588 
1589 	switch (op_type) {
1590 	case SPDK_BLOB_READ:
1591 		spdk_blob_io_read(blob, ch, buf, offset, op_length,
1592 				  _spdk_blob_request_submit_op_split_next, ctx);
1593 		break;
1594 	case SPDK_BLOB_WRITE:
1595 		spdk_blob_io_write(blob, ch, buf, offset, op_length,
1596 				   _spdk_blob_request_submit_op_split_next, ctx);
1597 		break;
1598 	case SPDK_BLOB_UNMAP:
1599 		spdk_blob_io_unmap(blob, ch, offset, op_length,
1600 				   _spdk_blob_request_submit_op_split_next, ctx);
1601 		break;
1602 	case SPDK_BLOB_WRITE_ZEROES:
1603 		spdk_blob_io_write_zeroes(blob, ch, offset, op_length,
1604 					  _spdk_blob_request_submit_op_split_next, ctx);
1605 		break;
1606 	case SPDK_BLOB_READV:
1607 	case SPDK_BLOB_WRITEV:
1608 		SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
1609 		spdk_bs_sequence_finish(ctx->seq, -EINVAL);
1610 		free(ctx);
1611 		break;
1612 	}
1613 }
1614 
1615 static void
1616 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
1617 				   void *payload, uint64_t offset, uint64_t length,
1618 				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1619 {
1620 	struct op_split_ctx *ctx;
1621 	spdk_bs_sequence_t *seq;
1622 	struct spdk_bs_cpl cpl;
1623 
1624 	assert(blob != NULL);
1625 
1626 	ctx = calloc(1, sizeof(struct op_split_ctx));
1627 	if (ctx == NULL) {
1628 		cb_fn(cb_arg, -ENOMEM);
1629 		return;
1630 	}
1631 
1632 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1633 	cpl.u.blob_basic.cb_fn = cb_fn;
1634 	cpl.u.blob_basic.cb_arg = cb_arg;
1635 
1636 	seq = spdk_bs_sequence_start(ch, &cpl);
1637 	if (!seq) {
1638 		free(ctx);
1639 		cb_fn(cb_arg, -ENOMEM);
1640 		return;
1641 	}
1642 
1643 	ctx->blob = blob;
1644 	ctx->channel = ch;
1645 	ctx->curr_payload = payload;
1646 	ctx->page_offset = offset;
1647 	ctx->pages_remaining = length;
1648 	ctx->op_type = op_type;
1649 	ctx->seq = seq;
1650 
1651 	_spdk_blob_request_submit_op_split_next(ctx, 0);
1652 }
1653 
1654 static void
1655 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob,
1656 				    void *payload, uint64_t offset, uint64_t length,
1657 				    spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1658 {
1659 	struct spdk_bs_cpl cpl;
1660 	uint64_t lba;
1661 	uint32_t lba_count;
1662 
1663 	assert(blob != NULL);
1664 
1665 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1666 	cpl.u.blob_basic.cb_fn = cb_fn;
1667 	cpl.u.blob_basic.cb_arg = cb_arg;
1668 
1669 	_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1670 
1671 	switch (op_type) {
1672 	case SPDK_BLOB_READ: {
1673 		spdk_bs_batch_t *batch;
1674 
1675 		batch = spdk_bs_batch_open(_ch, &cpl);
1676 		if (!batch) {
1677 			cb_fn(cb_arg, -ENOMEM);
1678 			return;
1679 		}
1680 
1681 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1682 			/* Read from the blob */
1683 			spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
1684 		} else {
1685 			/* Read from the backing block device */
1686 			spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count);
1687 		}
1688 
1689 		spdk_bs_batch_close(batch);
1690 		break;
1691 	}
1692 	case SPDK_BLOB_WRITE:
1693 	case SPDK_BLOB_WRITE_ZEROES: {
1694 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1695 			/* Write to the blob */
1696 			spdk_bs_batch_t *batch;
1697 
1698 			batch = spdk_bs_batch_open(_ch, &cpl);
1699 			if (!batch) {
1700 				cb_fn(cb_arg, -ENOMEM);
1701 				return;
1702 			}
1703 
1704 			if (op_type == SPDK_BLOB_WRITE) {
1705 				spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
1706 			} else {
1707 				spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1708 			}
1709 
1710 			spdk_bs_batch_close(batch);
1711 		} else {
1712 			/* Queue this operation and allocate the cluster */
1713 			spdk_bs_user_op_t *op;
1714 
1715 			op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1716 			if (!op) {
1717 				cb_fn(cb_arg, -ENOMEM);
1718 				return;
1719 			}
1720 
1721 			_spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op);
1722 		}
1723 		break;
1724 	}
1725 	case SPDK_BLOB_UNMAP: {
1726 		spdk_bs_batch_t *batch;
1727 
1728 		batch = spdk_bs_batch_open(_ch, &cpl);
1729 		if (!batch) {
1730 			cb_fn(cb_arg, -ENOMEM);
1731 			return;
1732 		}
1733 
1734 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1735 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1736 		}
1737 
1738 		spdk_bs_batch_close(batch);
1739 		break;
1740 	}
1741 	case SPDK_BLOB_READV:
1742 	case SPDK_BLOB_WRITEV:
1743 		SPDK_ERRLOG("readv/write not valid\n");
1744 		cb_fn(cb_arg, -EINVAL);
1745 		break;
1746 	}
1747 }
1748 
1749 static void
1750 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1751 			     void *payload, uint64_t offset, uint64_t length,
1752 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1753 {
1754 	assert(blob != NULL);
1755 
1756 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1757 		cb_fn(cb_arg, -EPERM);
1758 		return;
1759 	}
1760 
1761 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1762 		cb_fn(cb_arg, -EINVAL);
1763 		return;
1764 	}
1765 
1766 	if (length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset)) {
1767 		_spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length,
1768 						    cb_fn, cb_arg, op_type);
1769 	} else {
1770 		_spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length,
1771 						   cb_fn, cb_arg, op_type);
1772 	}
1773 }
1774 
1775 struct rw_iov_ctx {
1776 	struct spdk_blob *blob;
1777 	struct spdk_io_channel *channel;
1778 	spdk_blob_op_complete cb_fn;
1779 	void *cb_arg;
1780 	bool read;
1781 	int iovcnt;
1782 	struct iovec *orig_iov;
1783 	uint64_t page_offset;
1784 	uint64_t pages_remaining;
1785 	uint64_t pages_done;
1786 	struct iovec iov[0];
1787 };
1788 
1789 static void
1790 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1791 {
1792 	assert(cb_arg == NULL);
1793 	spdk_bs_sequence_finish(seq, bserrno);
1794 }
1795 
1796 static void
1797 _spdk_rw_iov_split_next(void *cb_arg, int bserrno)
1798 {
1799 	struct rw_iov_ctx *ctx = cb_arg;
1800 	struct spdk_blob *blob = ctx->blob;
1801 	struct iovec *iov, *orig_iov;
1802 	int iovcnt;
1803 	size_t orig_iovoff;
1804 	uint64_t page_count, pages_to_boundary, page_offset;
1805 	uint64_t byte_count;
1806 
1807 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1808 		ctx->cb_fn(ctx->cb_arg, bserrno);
1809 		free(ctx);
1810 		return;
1811 	}
1812 
1813 	page_offset = ctx->page_offset;
1814 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(blob, page_offset);
1815 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1816 
1817 	/*
1818 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1819 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1820 	 *  point to the current position in the I/O sequence.
1821 	 */
1822 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1823 	orig_iov = &ctx->orig_iov[0];
1824 	orig_iovoff = 0;
1825 	while (byte_count > 0) {
1826 		if (byte_count >= orig_iov->iov_len) {
1827 			byte_count -= orig_iov->iov_len;
1828 			orig_iov++;
1829 		} else {
1830 			orig_iovoff = byte_count;
1831 			byte_count = 0;
1832 		}
1833 	}
1834 
1835 	/*
1836 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1837 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1838 	 */
1839 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1840 	iov = &ctx->iov[0];
1841 	iovcnt = 0;
1842 	while (byte_count > 0) {
1843 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1844 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1845 		byte_count -= iov->iov_len;
1846 		orig_iovoff = 0;
1847 		orig_iov++;
1848 		iov++;
1849 		iovcnt++;
1850 	}
1851 
1852 	ctx->page_offset += page_count;
1853 	ctx->pages_done += page_count;
1854 	ctx->pages_remaining -= page_count;
1855 	iov = &ctx->iov[0];
1856 
1857 	if (ctx->read) {
1858 		spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1859 				   page_count, _spdk_rw_iov_split_next, ctx);
1860 	} else {
1861 		spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1862 				    page_count, _spdk_rw_iov_split_next, ctx);
1863 	}
1864 }
1865 
1866 static void
1867 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1868 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1869 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1870 {
1871 	struct spdk_bs_cpl	cpl;
1872 
1873 	assert(blob != NULL);
1874 
1875 	if (!read && blob->data_ro) {
1876 		cb_fn(cb_arg, -EPERM);
1877 		return;
1878 	}
1879 
1880 	if (length == 0) {
1881 		cb_fn(cb_arg, 0);
1882 		return;
1883 	}
1884 
1885 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1886 		cb_fn(cb_arg, -EINVAL);
1887 		return;
1888 	}
1889 
1890 	/*
1891 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1892 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1893 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1894 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1895 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1896 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1897 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1898 	 *
1899 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1900 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1901 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1902 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1903 	 */
1904 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1905 		uint32_t lba_count;
1906 		uint64_t lba;
1907 
1908 		_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1909 
1910 		cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1911 		cpl.u.blob_basic.cb_fn = cb_fn;
1912 		cpl.u.blob_basic.cb_arg = cb_arg;
1913 
1914 		if (read) {
1915 			spdk_bs_sequence_t *seq;
1916 
1917 			seq = spdk_bs_sequence_start(_channel, &cpl);
1918 			if (!seq) {
1919 				cb_fn(cb_arg, -ENOMEM);
1920 				return;
1921 			}
1922 
1923 			if (_spdk_bs_page_is_allocated(blob, offset)) {
1924 				spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1925 			} else {
1926 				spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count,
1927 							      _spdk_rw_iov_done, NULL);
1928 			}
1929 		} else {
1930 			if (_spdk_bs_page_is_allocated(blob, offset)) {
1931 				spdk_bs_sequence_t *seq;
1932 
1933 				seq = spdk_bs_sequence_start(_channel, &cpl);
1934 				if (!seq) {
1935 					cb_fn(cb_arg, -ENOMEM);
1936 					return;
1937 				}
1938 
1939 				spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1940 			} else {
1941 				/* Queue this operation and allocate the cluster */
1942 				spdk_bs_user_op_t *op;
1943 
1944 				op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, length);
1945 				if (!op) {
1946 					cb_fn(cb_arg, -ENOMEM);
1947 					return;
1948 				}
1949 
1950 				_spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op);
1951 			}
1952 		}
1953 	} else {
1954 		struct rw_iov_ctx *ctx;
1955 
1956 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1957 		if (ctx == NULL) {
1958 			cb_fn(cb_arg, -ENOMEM);
1959 			return;
1960 		}
1961 
1962 		ctx->blob = blob;
1963 		ctx->channel = _channel;
1964 		ctx->cb_fn = cb_fn;
1965 		ctx->cb_arg = cb_arg;
1966 		ctx->read = read;
1967 		ctx->orig_iov = iov;
1968 		ctx->iovcnt = iovcnt;
1969 		ctx->page_offset = offset;
1970 		ctx->pages_remaining = length;
1971 		ctx->pages_done = 0;
1972 
1973 		_spdk_rw_iov_split_next(ctx, 0);
1974 	}
1975 }
1976 
1977 static struct spdk_blob *
1978 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1979 {
1980 	struct spdk_blob *blob;
1981 
1982 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1983 		if (blob->id == blobid) {
1984 			return blob;
1985 		}
1986 	}
1987 
1988 	return NULL;
1989 }
1990 
1991 static int
1992 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1993 {
1994 	struct spdk_blob_store		*bs = io_device;
1995 	struct spdk_bs_channel		*channel = ctx_buf;
1996 	struct spdk_bs_dev		*dev;
1997 	uint32_t			max_ops = bs->max_channel_ops;
1998 	uint32_t			i;
1999 
2000 	dev = bs->dev;
2001 
2002 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
2003 	if (!channel->req_mem) {
2004 		return -1;
2005 	}
2006 
2007 	TAILQ_INIT(&channel->reqs);
2008 
2009 	for (i = 0; i < max_ops; i++) {
2010 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
2011 	}
2012 
2013 	channel->bs = bs;
2014 	channel->dev = dev;
2015 	channel->dev_channel = dev->create_channel(dev);
2016 
2017 	if (!channel->dev_channel) {
2018 		SPDK_ERRLOG("Failed to create device channel.\n");
2019 		free(channel->req_mem);
2020 		return -1;
2021 	}
2022 
2023 	TAILQ_INIT(&channel->need_cluster_alloc);
2024 
2025 	return 0;
2026 }
2027 
2028 static void
2029 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
2030 {
2031 	struct spdk_bs_channel *channel = ctx_buf;
2032 	spdk_bs_user_op_t *op;
2033 
2034 	while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) {
2035 		op = TAILQ_FIRST(&channel->need_cluster_alloc);
2036 		TAILQ_REMOVE(&channel->need_cluster_alloc, op, link);
2037 		spdk_bs_user_op_abort(op);
2038 	}
2039 
2040 	free(channel->req_mem);
2041 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
2042 }
2043 
2044 static void
2045 _spdk_bs_dev_destroy(void *io_device)
2046 {
2047 	struct spdk_blob_store *bs = io_device;
2048 	struct spdk_blob	*blob, *blob_tmp;
2049 
2050 	bs->dev->destroy(bs->dev);
2051 
2052 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
2053 		TAILQ_REMOVE(&bs->blobs, blob, link);
2054 		_spdk_blob_free(blob);
2055 	}
2056 
2057 	pthread_mutex_destroy(&bs->used_clusters_mutex);
2058 
2059 	spdk_bit_array_free(&bs->used_blobids);
2060 	spdk_bit_array_free(&bs->used_md_pages);
2061 	spdk_bit_array_free(&bs->used_clusters);
2062 	/*
2063 	 * If this function is called for any reason except a successful unload,
2064 	 * the unload_cpl type will be NONE and this will be a nop.
2065 	 */
2066 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
2067 
2068 	free(bs);
2069 }
2070 
2071 static void
2072 _spdk_bs_free(struct spdk_blob_store *bs)
2073 {
2074 	spdk_bs_unregister_md_thread(bs);
2075 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
2076 }
2077 
2078 void
2079 spdk_bs_opts_init(struct spdk_bs_opts *opts)
2080 {
2081 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
2082 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
2083 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
2084 	opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
2085 	memset(&opts->bstype, 0, sizeof(opts->bstype));
2086 	opts->iter_cb_fn = NULL;
2087 	opts->iter_cb_arg = NULL;
2088 }
2089 
2090 static int
2091 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
2092 {
2093 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
2094 	    opts->max_channel_ops == 0) {
2095 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2096 		return -1;
2097 	}
2098 
2099 	return 0;
2100 }
2101 
2102 static struct spdk_blob_store *
2103 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
2104 {
2105 	struct spdk_blob_store	*bs;
2106 	uint64_t dev_size;
2107 	int rc;
2108 
2109 	dev_size = dev->blocklen * dev->blockcnt;
2110 	if (dev_size < opts->cluster_sz) {
2111 		/* Device size cannot be smaller than cluster size of blobstore */
2112 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n",
2113 			    dev_size, opts->cluster_sz);
2114 		return NULL;
2115 	}
2116 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
2117 		/* Cluster size cannot be smaller than page size */
2118 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n",
2119 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
2120 		return NULL;
2121 	}
2122 	bs = calloc(1, sizeof(struct spdk_blob_store));
2123 	if (!bs) {
2124 		return NULL;
2125 	}
2126 
2127 	TAILQ_INIT(&bs->blobs);
2128 	bs->dev = dev;
2129 	bs->md_thread = spdk_get_thread();
2130 	assert(bs->md_thread != NULL);
2131 
2132 	/*
2133 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2134 	 *  even multiple of the cluster size.
2135 	 */
2136 	bs->cluster_sz = opts->cluster_sz;
2137 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
2138 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2139 	bs->num_free_clusters = bs->total_clusters;
2140 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
2141 	if (bs->used_clusters == NULL) {
2142 		free(bs);
2143 		return NULL;
2144 	}
2145 
2146 	bs->max_channel_ops = opts->max_channel_ops;
2147 	bs->super_blob = SPDK_BLOBID_INVALID;
2148 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
2149 
2150 	/* The metadata is assumed to be at least 1 page */
2151 	bs->used_md_pages = spdk_bit_array_create(1);
2152 	bs->used_blobids = spdk_bit_array_create(0);
2153 
2154 	pthread_mutex_init(&bs->used_clusters_mutex, NULL);
2155 
2156 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
2157 				sizeof(struct spdk_bs_channel));
2158 	rc = spdk_bs_register_md_thread(bs);
2159 	if (rc == -1) {
2160 		spdk_io_device_unregister(bs, NULL);
2161 		pthread_mutex_destroy(&bs->used_clusters_mutex);
2162 		spdk_bit_array_free(&bs->used_blobids);
2163 		spdk_bit_array_free(&bs->used_md_pages);
2164 		spdk_bit_array_free(&bs->used_clusters);
2165 		free(bs);
2166 		return NULL;
2167 	}
2168 
2169 	return bs;
2170 }
2171 
2172 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2173 
2174 struct spdk_bs_load_ctx {
2175 	struct spdk_blob_store		*bs;
2176 	struct spdk_bs_super_block	*super;
2177 
2178 	struct spdk_bs_md_mask		*mask;
2179 	bool				in_page_chain;
2180 	uint32_t			page_index;
2181 	uint32_t			cur_page;
2182 	struct spdk_blob_md_page	*page;
2183 	bool				is_load;
2184 
2185 	spdk_bs_sequence_t			*seq;
2186 	spdk_blob_op_with_handle_complete	iter_cb_fn;
2187 	void					*iter_cb_arg;
2188 };
2189 
2190 static void
2191 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2192 {
2193 	assert(bserrno != 0);
2194 
2195 	spdk_dma_free(ctx->super);
2196 	spdk_bs_sequence_finish(seq, bserrno);
2197 	/*
2198 	 * Only free the blobstore when a load fails.  If an unload fails (for some reason)
2199 	 *  we want to keep the blobstore in case the caller wants to try again.
2200 	 */
2201 	if (ctx->is_load) {
2202 		_spdk_bs_free(ctx->bs);
2203 	}
2204 	free(ctx);
2205 }
2206 
2207 static void
2208 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
2209 {
2210 	uint32_t i = 0;
2211 
2212 	while (true) {
2213 		i = spdk_bit_array_find_first_set(array, i);
2214 		if (i >= mask->length) {
2215 			break;
2216 		}
2217 		mask->mask[i / 8] |= 1U << (i % 8);
2218 		i++;
2219 	}
2220 }
2221 
2222 static void
2223 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
2224 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
2225 {
2226 	/* Update the values in the super block */
2227 	super->super_blob = bs->super_blob;
2228 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
2229 	super->crc = _spdk_blob_md_page_calc_crc(super);
2230 	spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
2231 				   _spdk_bs_byte_to_lba(bs, sizeof(*super)),
2232 				   cb_fn, cb_arg);
2233 }
2234 
2235 static void
2236 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2237 {
2238 	struct spdk_bs_load_ctx	*ctx = arg;
2239 	uint64_t	mask_size, lba, lba_count;
2240 
2241 	/* Write out the used clusters mask */
2242 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2243 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2244 	if (!ctx->mask) {
2245 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2246 		return;
2247 	}
2248 
2249 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
2250 	ctx->mask->length = ctx->bs->total_clusters;
2251 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
2252 
2253 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
2254 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2255 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2256 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2257 }
2258 
2259 static void
2260 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2261 {
2262 	struct spdk_bs_load_ctx	*ctx = arg;
2263 	uint64_t	mask_size, lba, lba_count;
2264 
2265 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2266 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2267 	if (!ctx->mask) {
2268 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2269 		return;
2270 	}
2271 
2272 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
2273 	ctx->mask->length = ctx->super->md_len;
2274 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
2275 
2276 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
2277 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2278 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2279 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2280 }
2281 
2282 static void
2283 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2284 {
2285 	struct spdk_bs_load_ctx	*ctx = arg;
2286 	uint64_t	mask_size, lba, lba_count;
2287 
2288 	if (ctx->super->used_blobid_mask_len == 0) {
2289 		/*
2290 		 * This is a pre-v3 on-disk format where the blobid mask does not get
2291 		 *  written to disk.
2292 		 */
2293 		cb_fn(seq, arg, 0);
2294 		return;
2295 	}
2296 
2297 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2298 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2299 	if (!ctx->mask) {
2300 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2301 		return;
2302 	}
2303 
2304 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
2305 	ctx->mask->length = ctx->super->md_len;
2306 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
2307 
2308 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
2309 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2310 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2311 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2312 }
2313 
2314 static void _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx,
2315 				   int bserrno);
2316 
2317 static void
2318 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno)
2319 {
2320 	struct spdk_bs_load_ctx *ctx = arg;
2321 
2322 	if (bserrno == 0) {
2323 		ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0);
2324 		spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx);
2325 		return;
2326 	}
2327 
2328 	if (bserrno == -ENOENT) {
2329 		bserrno = 0;
2330 	} else {
2331 		/*
2332 		 * This case needs to be looked at further.  Same problem
2333 		 *  exists with applications that rely on explicit blob
2334 		 *  iteration.  We should just skip the blob that failed
2335 		 *  to load and coontinue on to the next one.
2336 		 */
2337 		SPDK_ERRLOG("Error in iterating blobs\n");
2338 	}
2339 
2340 	ctx->iter_cb_fn = NULL;
2341 	_spdk_bs_load_complete(ctx->seq, ctx, bserrno);
2342 }
2343 
2344 static void
2345 _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2346 {
2347 	if (ctx->iter_cb_fn) {
2348 		ctx->seq = seq;
2349 		spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx);
2350 		return;
2351 	}
2352 
2353 	spdk_dma_free(ctx->super);
2354 	spdk_dma_free(ctx->mask);
2355 	free(ctx);
2356 	spdk_bs_sequence_finish(seq, bserrno);
2357 }
2358 
2359 static void
2360 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2361 {
2362 	struct spdk_bs_load_ctx *ctx = cb_arg;
2363 	uint32_t i, j;
2364 	int rc;
2365 
2366 	/* The type must be correct */
2367 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
2368 
2369 	/* The length of the mask (in bits) must not be greater than
2370 	 * the length of the buffer (converted to bits) */
2371 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
2372 
2373 	/* The length of the mask must be exactly equal to the size
2374 	 * (in pages) of the metadata region */
2375 	assert(ctx->mask->length == ctx->super->md_len);
2376 
2377 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->mask->length);
2378 	if (rc < 0) {
2379 		spdk_dma_free(ctx->mask);
2380 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2381 		return;
2382 	}
2383 
2384 	for (i = 0; i < ctx->mask->length / 8; i++) {
2385 		uint8_t segment = ctx->mask->mask[i];
2386 		for (j = 0; segment; j++) {
2387 			if (segment & 1U) {
2388 				spdk_bit_array_set(ctx->bs->used_blobids, (i * 8) + j);
2389 			}
2390 			segment >>= 1U;
2391 		}
2392 	}
2393 
2394 	_spdk_bs_load_complete(seq, ctx, bserrno);
2395 }
2396 
2397 static void
2398 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2399 {
2400 	struct spdk_bs_load_ctx *ctx = cb_arg;
2401 	uint64_t		lba, lba_count, mask_size;
2402 	uint32_t		i, j;
2403 	int			rc;
2404 
2405 	/* The type must be correct */
2406 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
2407 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2408 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
2409 					     struct spdk_blob_md_page) * 8));
2410 	/* The length of the mask must be exactly equal to the total number of clusters */
2411 	assert(ctx->mask->length == ctx->bs->total_clusters);
2412 
2413 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2414 	if (rc < 0) {
2415 		spdk_dma_free(ctx->mask);
2416 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2417 		return;
2418 	}
2419 
2420 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2421 	for (i = 0; i < ctx->mask->length / 8; i++) {
2422 		uint8_t segment = ctx->mask->mask[i];
2423 		for (j = 0; segment && (j < 8); j++) {
2424 			if (segment & 1U) {
2425 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
2426 				assert(ctx->bs->num_free_clusters > 0);
2427 				ctx->bs->num_free_clusters--;
2428 			}
2429 			segment >>= 1U;
2430 		}
2431 	}
2432 
2433 	spdk_dma_free(ctx->mask);
2434 
2435 	/* Read the used blobids mask */
2436 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2437 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2438 	if (!ctx->mask) {
2439 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2440 		return;
2441 	}
2442 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2443 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2444 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2445 				  _spdk_bs_load_used_blobids_cpl, ctx);
2446 }
2447 
2448 static void
2449 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2450 {
2451 	struct spdk_bs_load_ctx *ctx = cb_arg;
2452 	uint64_t		lba, lba_count, mask_size;
2453 	uint32_t		i, j;
2454 	int			rc;
2455 
2456 	/* The type must be correct */
2457 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
2458 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2459 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
2460 				     8));
2461 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
2462 	assert(ctx->mask->length == ctx->super->md_len);
2463 
2464 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
2465 	if (rc < 0) {
2466 		spdk_dma_free(ctx->mask);
2467 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2468 		return;
2469 	}
2470 
2471 	for (i = 0; i < ctx->mask->length / 8; i++) {
2472 		uint8_t segment = ctx->mask->mask[i];
2473 		for (j = 0; segment && (j < 8); j++) {
2474 			if (segment & 1U) {
2475 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
2476 			}
2477 			segment >>= 1U;
2478 		}
2479 	}
2480 	spdk_dma_free(ctx->mask);
2481 
2482 	/* Read the used clusters mask */
2483 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2484 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2485 	if (!ctx->mask) {
2486 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2487 		return;
2488 	}
2489 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2490 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2491 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2492 				  _spdk_bs_load_used_clusters_cpl, ctx);
2493 }
2494 
2495 static void
2496 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2497 {
2498 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2499 	uint64_t lba, lba_count, mask_size;
2500 
2501 	/* Read the used pages mask */
2502 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2503 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2504 	if (!ctx->mask) {
2505 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2506 		return;
2507 	}
2508 
2509 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2510 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2511 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2512 				  _spdk_bs_load_used_pages_cpl, ctx);
2513 }
2514 
2515 static int
2516 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
2517 {
2518 	struct spdk_blob_md_descriptor *desc;
2519 	size_t	cur_desc = 0;
2520 
2521 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
2522 	while (cur_desc < sizeof(page->descriptors)) {
2523 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
2524 			if (desc->length == 0) {
2525 				/* If padding and length are 0, this terminates the page */
2526 				break;
2527 			}
2528 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
2529 			struct spdk_blob_md_descriptor_extent	*desc_extent;
2530 			unsigned int				i, j;
2531 			unsigned int				cluster_count = 0;
2532 
2533 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
2534 
2535 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
2536 				for (j = 0; j < desc_extent->extents[i].length; j++) {
2537 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
2538 					if (bs->num_free_clusters == 0) {
2539 						return -1;
2540 					}
2541 					bs->num_free_clusters--;
2542 					cluster_count++;
2543 				}
2544 			}
2545 			if (cluster_count == 0) {
2546 				return -1;
2547 			}
2548 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
2549 			/* Skip this item */
2550 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
2551 			/* Skip this item */
2552 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
2553 			/* Skip this item */
2554 		} else {
2555 			/* Error */
2556 			return -1;
2557 		}
2558 		/* Advance to the next descriptor */
2559 		cur_desc += sizeof(*desc) + desc->length;
2560 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
2561 			break;
2562 		}
2563 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
2564 	}
2565 	return 0;
2566 }
2567 
2568 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
2569 {
2570 	uint32_t crc;
2571 
2572 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
2573 	if (crc != ctx->page->crc) {
2574 		return false;
2575 	}
2576 
2577 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
2578 		return false;
2579 	}
2580 	return true;
2581 }
2582 
2583 static void
2584 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
2585 
2586 static void
2587 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2588 {
2589 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2590 
2591 	_spdk_bs_load_complete(seq, ctx, bserrno);
2592 }
2593 
2594 static void
2595 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2596 {
2597 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2598 
2599 	spdk_dma_free(ctx->mask);
2600 	ctx->mask = NULL;
2601 
2602 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
2603 }
2604 
2605 static void
2606 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2607 {
2608 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2609 
2610 	spdk_dma_free(ctx->mask);
2611 	ctx->mask = NULL;
2612 
2613 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
2614 }
2615 
2616 static void
2617 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2618 {
2619 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
2620 }
2621 
2622 static void
2623 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2624 {
2625 	struct spdk_bs_load_ctx *ctx = cb_arg;
2626 	uint64_t num_md_clusters;
2627 	uint64_t i;
2628 	uint32_t page_num;
2629 
2630 	if (bserrno != 0) {
2631 		_spdk_bs_load_ctx_fail(seq, ctx, bserrno);
2632 		return;
2633 	}
2634 
2635 	page_num = ctx->cur_page;
2636 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
2637 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
2638 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
2639 			if (ctx->page->sequence_num == 0) {
2640 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
2641 			}
2642 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
2643 				_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2644 				return;
2645 			}
2646 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
2647 				ctx->in_page_chain = true;
2648 				ctx->cur_page = ctx->page->next;
2649 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2650 				return;
2651 			}
2652 		}
2653 	}
2654 
2655 	ctx->in_page_chain = false;
2656 
2657 	do {
2658 		ctx->page_index++;
2659 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
2660 
2661 	if (ctx->page_index < ctx->super->md_len) {
2662 		ctx->cur_page = ctx->page_index;
2663 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2664 	} else {
2665 		/* Claim all of the clusters used by the metadata */
2666 		num_md_clusters = divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
2667 		for (i = 0; i < num_md_clusters; i++) {
2668 			_spdk_bs_claim_cluster(ctx->bs, i);
2669 		}
2670 		spdk_dma_free(ctx->page);
2671 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
2672 	}
2673 }
2674 
2675 static void
2676 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
2677 {
2678 	struct spdk_bs_load_ctx *ctx = cb_arg;
2679 	uint64_t lba;
2680 
2681 	assert(ctx->cur_page < ctx->super->md_len);
2682 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
2683 	spdk_bs_sequence_read_dev(seq, ctx->page, lba,
2684 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
2685 				  _spdk_bs_load_replay_md_cpl, ctx);
2686 }
2687 
2688 static void
2689 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
2690 {
2691 	struct spdk_bs_load_ctx *ctx = cb_arg;
2692 
2693 	ctx->page_index = 0;
2694 	ctx->cur_page = 0;
2695 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
2696 				     SPDK_BS_PAGE_SIZE,
2697 				     NULL);
2698 	if (!ctx->page) {
2699 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2700 		return;
2701 	}
2702 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2703 }
2704 
2705 static void
2706 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2707 {
2708 	struct spdk_bs_load_ctx *ctx = cb_arg;
2709 	int		rc;
2710 
2711 	if (bserrno != 0) {
2712 		_spdk_bs_load_ctx_fail(seq, ctx, -EIO);
2713 		return;
2714 	}
2715 
2716 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
2717 	if (rc < 0) {
2718 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2719 		return;
2720 	}
2721 
2722 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
2723 	if (rc < 0) {
2724 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2725 		return;
2726 	}
2727 
2728 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2729 	if (rc < 0) {
2730 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2731 		return;
2732 	}
2733 
2734 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2735 	_spdk_bs_load_replay_md(seq, cb_arg);
2736 }
2737 
2738 static void
2739 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2740 {
2741 	struct spdk_bs_load_ctx *ctx = cb_arg;
2742 	uint32_t	crc;
2743 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
2744 
2745 	if (ctx->super->version > SPDK_BS_VERSION ||
2746 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
2747 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2748 		return;
2749 	}
2750 
2751 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2752 		   sizeof(ctx->super->signature)) != 0) {
2753 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2754 		return;
2755 	}
2756 
2757 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2758 	if (crc != ctx->super->crc) {
2759 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2760 		return;
2761 	}
2762 
2763 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2764 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2765 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2766 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2767 	} else {
2768 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2769 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2770 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2771 		_spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
2772 		return;
2773 	}
2774 
2775 	/* Parse the super block */
2776 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2777 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2778 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2779 	ctx->bs->md_start = ctx->super->md_start;
2780 	ctx->bs->md_len = ctx->super->md_len;
2781 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2782 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2783 	ctx->bs->super_blob = ctx->super->super_blob;
2784 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2785 
2786 	if (ctx->super->clean == 0) {
2787 		_spdk_bs_recover(seq, ctx, 0);
2788 	} else if (ctx->super->used_blobid_mask_len == 0) {
2789 		/*
2790 		 * Metadata is clean, but this is an old metadata format without
2791 		 *  a blobid mask.  Clear the clean bit and then build the masks
2792 		 *  using _spdk_bs_recover.
2793 		 */
2794 		ctx->super->clean = 0;
2795 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_recover, ctx);
2796 	} else {
2797 		ctx->super->clean = 0;
2798 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2799 	}
2800 }
2801 
2802 void
2803 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2804 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2805 {
2806 	struct spdk_blob_store	*bs;
2807 	struct spdk_bs_cpl	cpl;
2808 	spdk_bs_sequence_t	*seq;
2809 	struct spdk_bs_load_ctx *ctx;
2810 	struct spdk_bs_opts	opts = {};
2811 
2812 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2813 
2814 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2815 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen);
2816 		dev->destroy(dev);
2817 		cb_fn(cb_arg, NULL, -EINVAL);
2818 		return;
2819 	}
2820 
2821 	if (o) {
2822 		opts = *o;
2823 	} else {
2824 		spdk_bs_opts_init(&opts);
2825 	}
2826 
2827 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2828 		dev->destroy(dev);
2829 		cb_fn(cb_arg, NULL, -EINVAL);
2830 		return;
2831 	}
2832 
2833 	bs = _spdk_bs_alloc(dev, &opts);
2834 	if (!bs) {
2835 		dev->destroy(dev);
2836 		cb_fn(cb_arg, NULL, -ENOMEM);
2837 		return;
2838 	}
2839 
2840 	ctx = calloc(1, sizeof(*ctx));
2841 	if (!ctx) {
2842 		_spdk_bs_free(bs);
2843 		cb_fn(cb_arg, NULL, -ENOMEM);
2844 		return;
2845 	}
2846 
2847 	ctx->bs = bs;
2848 	ctx->is_load = true;
2849 	ctx->iter_cb_fn = opts.iter_cb_fn;
2850 	ctx->iter_cb_arg = opts.iter_cb_arg;
2851 
2852 	/* Allocate memory for the super block */
2853 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2854 	if (!ctx->super) {
2855 		free(ctx);
2856 		_spdk_bs_free(bs);
2857 		return;
2858 	}
2859 
2860 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2861 	cpl.u.bs_handle.cb_fn = cb_fn;
2862 	cpl.u.bs_handle.cb_arg = cb_arg;
2863 	cpl.u.bs_handle.bs = bs;
2864 
2865 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2866 	if (!seq) {
2867 		spdk_dma_free(ctx->super);
2868 		free(ctx);
2869 		_spdk_bs_free(bs);
2870 		cb_fn(cb_arg, NULL, -ENOMEM);
2871 		return;
2872 	}
2873 
2874 	/* Read the super block */
2875 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2876 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2877 				  _spdk_bs_load_super_cpl, ctx);
2878 }
2879 
2880 /* END spdk_bs_load */
2881 
2882 /* START spdk_bs_init */
2883 
2884 struct spdk_bs_init_ctx {
2885 	struct spdk_blob_store		*bs;
2886 	struct spdk_bs_super_block	*super;
2887 };
2888 
2889 static void
2890 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2891 {
2892 	struct spdk_bs_init_ctx *ctx = cb_arg;
2893 
2894 	spdk_dma_free(ctx->super);
2895 	free(ctx);
2896 
2897 	spdk_bs_sequence_finish(seq, bserrno);
2898 }
2899 
2900 static void
2901 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2902 {
2903 	struct spdk_bs_init_ctx *ctx = cb_arg;
2904 
2905 	/* Write super block */
2906 	spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2907 				   _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2908 				   _spdk_bs_init_persist_super_cpl, ctx);
2909 }
2910 
2911 void
2912 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2913 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2914 {
2915 	struct spdk_bs_init_ctx *ctx;
2916 	struct spdk_blob_store	*bs;
2917 	struct spdk_bs_cpl	cpl;
2918 	spdk_bs_sequence_t	*seq;
2919 	spdk_bs_batch_t		*batch;
2920 	uint64_t		num_md_lba;
2921 	uint64_t		num_md_pages;
2922 	uint64_t		num_md_clusters;
2923 	uint32_t		i;
2924 	struct spdk_bs_opts	opts = {};
2925 	int			rc;
2926 
2927 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2928 
2929 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2930 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2931 			    dev->blocklen);
2932 		dev->destroy(dev);
2933 		cb_fn(cb_arg, NULL, -EINVAL);
2934 		return;
2935 	}
2936 
2937 	if (o) {
2938 		opts = *o;
2939 	} else {
2940 		spdk_bs_opts_init(&opts);
2941 	}
2942 
2943 	if (_spdk_bs_opts_verify(&opts) != 0) {
2944 		dev->destroy(dev);
2945 		cb_fn(cb_arg, NULL, -EINVAL);
2946 		return;
2947 	}
2948 
2949 	bs = _spdk_bs_alloc(dev, &opts);
2950 	if (!bs) {
2951 		dev->destroy(dev);
2952 		cb_fn(cb_arg, NULL, -ENOMEM);
2953 		return;
2954 	}
2955 
2956 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2957 		/* By default, allocate 1 page per cluster.
2958 		 * Technically, this over-allocates metadata
2959 		 * because more metadata will reduce the number
2960 		 * of usable clusters. This can be addressed with
2961 		 * more complex math in the future.
2962 		 */
2963 		bs->md_len = bs->total_clusters;
2964 	} else {
2965 		bs->md_len = opts.num_md_pages;
2966 	}
2967 
2968 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2969 	if (rc < 0) {
2970 		_spdk_bs_free(bs);
2971 		cb_fn(cb_arg, NULL, -ENOMEM);
2972 		return;
2973 	}
2974 
2975 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
2976 	if (rc < 0) {
2977 		_spdk_bs_free(bs);
2978 		cb_fn(cb_arg, NULL, -ENOMEM);
2979 		return;
2980 	}
2981 
2982 	ctx = calloc(1, sizeof(*ctx));
2983 	if (!ctx) {
2984 		_spdk_bs_free(bs);
2985 		cb_fn(cb_arg, NULL, -ENOMEM);
2986 		return;
2987 	}
2988 
2989 	ctx->bs = bs;
2990 
2991 	/* Allocate memory for the super block */
2992 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2993 	if (!ctx->super) {
2994 		free(ctx);
2995 		_spdk_bs_free(bs);
2996 		return;
2997 	}
2998 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2999 	       sizeof(ctx->super->signature));
3000 	ctx->super->version = SPDK_BS_VERSION;
3001 	ctx->super->length = sizeof(*ctx->super);
3002 	ctx->super->super_blob = bs->super_blob;
3003 	ctx->super->clean = 0;
3004 	ctx->super->cluster_size = bs->cluster_sz;
3005 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
3006 
3007 	/* Calculate how many pages the metadata consumes at the front
3008 	 * of the disk.
3009 	 */
3010 
3011 	/* The super block uses 1 page */
3012 	num_md_pages = 1;
3013 
3014 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
3015 	 * up to the nearest page, plus a header.
3016 	 */
3017 	ctx->super->used_page_mask_start = num_md_pages;
3018 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3019 					 divide_round_up(bs->md_len, 8),
3020 					 SPDK_BS_PAGE_SIZE);
3021 	num_md_pages += ctx->super->used_page_mask_len;
3022 
3023 	/* The used_clusters mask requires 1 bit per cluster, rounded
3024 	 * up to the nearest page, plus a header.
3025 	 */
3026 	ctx->super->used_cluster_mask_start = num_md_pages;
3027 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3028 					    divide_round_up(bs->total_clusters, 8),
3029 					    SPDK_BS_PAGE_SIZE);
3030 	num_md_pages += ctx->super->used_cluster_mask_len;
3031 
3032 	/* The used_blobids mask requires 1 bit per metadata page, rounded
3033 	 * up to the nearest page, plus a header.
3034 	 */
3035 	ctx->super->used_blobid_mask_start = num_md_pages;
3036 	ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3037 					   divide_round_up(bs->md_len, 8),
3038 					   SPDK_BS_PAGE_SIZE);
3039 	num_md_pages += ctx->super->used_blobid_mask_len;
3040 
3041 	/* The metadata region size was chosen above */
3042 	ctx->super->md_start = bs->md_start = num_md_pages;
3043 	ctx->super->md_len = bs->md_len;
3044 	num_md_pages += bs->md_len;
3045 
3046 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
3047 
3048 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
3049 
3050 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
3051 	if (num_md_clusters > bs->total_clusters) {
3052 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
3053 			    "please decrease number of pages reserved for metadata "
3054 			    "or increase cluster size.\n");
3055 		spdk_dma_free(ctx->super);
3056 		free(ctx);
3057 		_spdk_bs_free(bs);
3058 		cb_fn(cb_arg, NULL, -ENOMEM);
3059 		return;
3060 	}
3061 	/* Claim all of the clusters used by the metadata */
3062 	for (i = 0; i < num_md_clusters; i++) {
3063 		_spdk_bs_claim_cluster(bs, i);
3064 	}
3065 
3066 	bs->total_data_clusters = bs->num_free_clusters;
3067 
3068 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3069 	cpl.u.bs_handle.cb_fn = cb_fn;
3070 	cpl.u.bs_handle.cb_arg = cb_arg;
3071 	cpl.u.bs_handle.bs = bs;
3072 
3073 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3074 	if (!seq) {
3075 		spdk_dma_free(ctx->super);
3076 		free(ctx);
3077 		_spdk_bs_free(bs);
3078 		cb_fn(cb_arg, NULL, -ENOMEM);
3079 		return;
3080 	}
3081 
3082 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
3083 
3084 	/* Clear metadata space */
3085 	spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
3086 	/* Trim data clusters */
3087 	spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3088 
3089 	spdk_bs_batch_close(batch);
3090 }
3091 
3092 /* END spdk_bs_init */
3093 
3094 /* START spdk_bs_destroy */
3095 
3096 static void
3097 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3098 {
3099 	struct spdk_bs_init_ctx *ctx = cb_arg;
3100 	struct spdk_blob_store *bs = ctx->bs;
3101 
3102 	/*
3103 	 * We need to defer calling spdk_bs_call_cpl() until after
3104 	 * dev destruction, so tuck these away for later use.
3105 	 */
3106 	bs->unload_err = bserrno;
3107 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3108 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3109 
3110 	spdk_bs_sequence_finish(seq, bserrno);
3111 
3112 	_spdk_bs_free(bs);
3113 	free(ctx);
3114 }
3115 
3116 void
3117 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
3118 		void *cb_arg)
3119 {
3120 	struct spdk_bs_cpl	cpl;
3121 	spdk_bs_sequence_t	*seq;
3122 	struct spdk_bs_init_ctx *ctx;
3123 
3124 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
3125 
3126 	if (!TAILQ_EMPTY(&bs->blobs)) {
3127 		SPDK_ERRLOG("Blobstore still has open blobs\n");
3128 		cb_fn(cb_arg, -EBUSY);
3129 		return;
3130 	}
3131 
3132 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3133 	cpl.u.bs_basic.cb_fn = cb_fn;
3134 	cpl.u.bs_basic.cb_arg = cb_arg;
3135 
3136 	ctx = calloc(1, sizeof(*ctx));
3137 	if (!ctx) {
3138 		cb_fn(cb_arg, -ENOMEM);
3139 		return;
3140 	}
3141 
3142 	ctx->bs = bs;
3143 
3144 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3145 	if (!seq) {
3146 		free(ctx);
3147 		cb_fn(cb_arg, -ENOMEM);
3148 		return;
3149 	}
3150 
3151 	/* Write zeroes to the super block */
3152 	spdk_bs_sequence_write_zeroes_dev(seq,
3153 					  _spdk_bs_page_to_lba(bs, 0),
3154 					  _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
3155 					  _spdk_bs_destroy_trim_cpl, ctx);
3156 }
3157 
3158 /* END spdk_bs_destroy */
3159 
3160 /* START spdk_bs_unload */
3161 
3162 static void
3163 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3164 {
3165 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3166 
3167 	spdk_dma_free(ctx->super);
3168 
3169 	/*
3170 	 * We need to defer calling spdk_bs_call_cpl() until after
3171 	 * dev destuction, so tuck these away for later use.
3172 	 */
3173 	ctx->bs->unload_err = bserrno;
3174 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3175 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3176 
3177 	spdk_bs_sequence_finish(seq, bserrno);
3178 
3179 	_spdk_bs_free(ctx->bs);
3180 	free(ctx);
3181 }
3182 
3183 static void
3184 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3185 {
3186 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3187 
3188 	spdk_dma_free(ctx->mask);
3189 	ctx->super->clean = 1;
3190 
3191 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
3192 }
3193 
3194 static void
3195 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3196 {
3197 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3198 
3199 	spdk_dma_free(ctx->mask);
3200 	ctx->mask = NULL;
3201 
3202 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
3203 }
3204 
3205 static void
3206 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3207 {
3208 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3209 
3210 	spdk_dma_free(ctx->mask);
3211 	ctx->mask = NULL;
3212 
3213 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
3214 }
3215 
3216 static void
3217 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3218 {
3219 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
3220 }
3221 
3222 void
3223 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
3224 {
3225 	struct spdk_bs_cpl	cpl;
3226 	spdk_bs_sequence_t	*seq;
3227 	struct spdk_bs_load_ctx *ctx;
3228 
3229 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
3230 
3231 	if (!TAILQ_EMPTY(&bs->blobs)) {
3232 		SPDK_ERRLOG("Blobstore still has open blobs\n");
3233 		cb_fn(cb_arg, -EBUSY);
3234 		return;
3235 	}
3236 
3237 	ctx = calloc(1, sizeof(*ctx));
3238 	if (!ctx) {
3239 		cb_fn(cb_arg, -ENOMEM);
3240 		return;
3241 	}
3242 
3243 	ctx->bs = bs;
3244 	ctx->is_load = false;
3245 
3246 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3247 	if (!ctx->super) {
3248 		free(ctx);
3249 		cb_fn(cb_arg, -ENOMEM);
3250 		return;
3251 	}
3252 
3253 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3254 	cpl.u.bs_basic.cb_fn = cb_fn;
3255 	cpl.u.bs_basic.cb_arg = cb_arg;
3256 
3257 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3258 	if (!seq) {
3259 		spdk_dma_free(ctx->super);
3260 		free(ctx);
3261 		cb_fn(cb_arg, -ENOMEM);
3262 		return;
3263 	}
3264 
3265 	/* Read super block */
3266 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3267 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3268 				  _spdk_bs_unload_read_super_cpl, ctx);
3269 }
3270 
3271 /* END spdk_bs_unload */
3272 
3273 /* START spdk_bs_set_super */
3274 
3275 struct spdk_bs_set_super_ctx {
3276 	struct spdk_blob_store		*bs;
3277 	struct spdk_bs_super_block	*super;
3278 };
3279 
3280 static void
3281 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3282 {
3283 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
3284 
3285 	if (bserrno != 0) {
3286 		SPDK_ERRLOG("Unable to write to super block of blobstore\n");
3287 	}
3288 
3289 	spdk_dma_free(ctx->super);
3290 
3291 	spdk_bs_sequence_finish(seq, bserrno);
3292 
3293 	free(ctx);
3294 }
3295 
3296 static void
3297 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3298 {
3299 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
3300 
3301 	if (bserrno != 0) {
3302 		SPDK_ERRLOG("Unable to read super block of blobstore\n");
3303 		spdk_dma_free(ctx->super);
3304 		spdk_bs_sequence_finish(seq, bserrno);
3305 		free(ctx);
3306 		return;
3307 	}
3308 
3309 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx);
3310 }
3311 
3312 void
3313 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
3314 		  spdk_bs_op_complete cb_fn, void *cb_arg)
3315 {
3316 	struct spdk_bs_cpl		cpl;
3317 	spdk_bs_sequence_t		*seq;
3318 	struct spdk_bs_set_super_ctx	*ctx;
3319 
3320 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n");
3321 
3322 	ctx = calloc(1, sizeof(*ctx));
3323 	if (!ctx) {
3324 		cb_fn(cb_arg, -ENOMEM);
3325 		return;
3326 	}
3327 
3328 	ctx->bs = bs;
3329 
3330 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3331 	if (!ctx->super) {
3332 		free(ctx);
3333 		cb_fn(cb_arg, -ENOMEM);
3334 		return;
3335 	}
3336 
3337 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3338 	cpl.u.bs_basic.cb_fn = cb_fn;
3339 	cpl.u.bs_basic.cb_arg = cb_arg;
3340 
3341 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3342 	if (!seq) {
3343 		spdk_dma_free(ctx->super);
3344 		free(ctx);
3345 		cb_fn(cb_arg, -ENOMEM);
3346 		return;
3347 	}
3348 
3349 	bs->super_blob = blobid;
3350 
3351 	/* Read super block */
3352 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3353 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3354 				  _spdk_bs_set_super_read_cpl, ctx);
3355 }
3356 
3357 /* END spdk_bs_set_super */
3358 
3359 void
3360 spdk_bs_get_super(struct spdk_blob_store *bs,
3361 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3362 {
3363 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
3364 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
3365 	} else {
3366 		cb_fn(cb_arg, bs->super_blob, 0);
3367 	}
3368 }
3369 
3370 uint64_t
3371 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
3372 {
3373 	return bs->cluster_sz;
3374 }
3375 
3376 uint64_t
3377 spdk_bs_get_page_size(struct spdk_blob_store *bs)
3378 {
3379 	return SPDK_BS_PAGE_SIZE;
3380 }
3381 
3382 uint64_t
3383 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
3384 {
3385 	return bs->num_free_clusters;
3386 }
3387 
3388 uint64_t
3389 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
3390 {
3391 	return bs->total_data_clusters;
3392 }
3393 
3394 static int
3395 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
3396 {
3397 	bs->md_channel = spdk_get_io_channel(bs);
3398 	if (!bs->md_channel) {
3399 		SPDK_ERRLOG("Failed to get IO channel.\n");
3400 		return -1;
3401 	}
3402 
3403 	return 0;
3404 }
3405 
3406 static int
3407 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
3408 {
3409 	spdk_put_io_channel(bs->md_channel);
3410 
3411 	return 0;
3412 }
3413 
3414 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
3415 {
3416 	assert(blob != NULL);
3417 
3418 	return blob->id;
3419 }
3420 
3421 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
3422 {
3423 	assert(blob != NULL);
3424 
3425 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
3426 }
3427 
3428 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
3429 {
3430 	assert(blob != NULL);
3431 
3432 	return blob->active.num_clusters;
3433 }
3434 
3435 /* START spdk_bs_create_blob */
3436 
3437 static void
3438 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3439 {
3440 	struct spdk_blob *blob = cb_arg;
3441 
3442 	_spdk_blob_free(blob);
3443 
3444 	spdk_bs_sequence_finish(seq, bserrno);
3445 }
3446 
3447 static int
3448 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs,
3449 		      bool internal)
3450 {
3451 	uint64_t i;
3452 	size_t value_len = 0;
3453 	int rc;
3454 	const void *value = NULL;
3455 	if (xattrs->count > 0 && xattrs->get_value == NULL) {
3456 		return -EINVAL;
3457 	}
3458 	for (i = 0; i < xattrs->count; i++) {
3459 		xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len);
3460 		if (value == NULL || value_len == 0) {
3461 			return -EINVAL;
3462 		}
3463 		rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal);
3464 		if (rc < 0) {
3465 			return rc;
3466 		}
3467 	}
3468 	return 0;
3469 }
3470 
3471 static void
3472 _spdk_blob_set_thin_provision(struct spdk_blob *blob)
3473 {
3474 	_spdk_blob_verify_md_op(blob);
3475 	blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
3476 	blob->state = SPDK_BLOB_STATE_DIRTY;
3477 }
3478 
3479 static void
3480 _spdk_bs_create_blob(struct spdk_blob_store *bs,
3481 		     const struct spdk_blob_opts *opts,
3482 		     const struct spdk_blob_xattr_opts *internal_xattrs,
3483 		     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3484 {
3485 	struct spdk_blob	*blob;
3486 	uint32_t		page_idx;
3487 	struct spdk_bs_cpl	cpl;
3488 	struct spdk_blob_opts	opts_default;
3489 	struct spdk_blob_xattr_opts internal_xattrs_default;
3490 	spdk_bs_sequence_t	*seq;
3491 	spdk_blob_id		id;
3492 	int rc;
3493 
3494 	assert(spdk_get_thread() == bs->md_thread);
3495 
3496 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
3497 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
3498 		cb_fn(cb_arg, 0, -ENOMEM);
3499 		return;
3500 	}
3501 	spdk_bit_array_set(bs->used_blobids, page_idx);
3502 	spdk_bit_array_set(bs->used_md_pages, page_idx);
3503 
3504 	id = _spdk_bs_page_to_blobid(page_idx);
3505 
3506 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
3507 
3508 	blob = _spdk_blob_alloc(bs, id);
3509 	if (!blob) {
3510 		cb_fn(cb_arg, 0, -ENOMEM);
3511 		return;
3512 	}
3513 
3514 	if (!opts) {
3515 		spdk_blob_opts_init(&opts_default);
3516 		opts = &opts_default;
3517 	}
3518 	if (!internal_xattrs) {
3519 		_spdk_blob_xattrs_init(&internal_xattrs_default);
3520 		internal_xattrs = &internal_xattrs_default;
3521 	}
3522 
3523 	rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false);
3524 	if (rc < 0) {
3525 		_spdk_blob_free(blob);
3526 		cb_fn(cb_arg, 0, rc);
3527 		return;
3528 	}
3529 
3530 	rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true);
3531 	if (rc < 0) {
3532 		_spdk_blob_free(blob);
3533 		cb_fn(cb_arg, 0, rc);
3534 		return;
3535 	}
3536 
3537 	if (opts->thin_provision) {
3538 		_spdk_blob_set_thin_provision(blob);
3539 	}
3540 
3541 	rc = _spdk_blob_resize(blob, opts->num_clusters);
3542 	if (rc < 0) {
3543 		_spdk_blob_free(blob);
3544 		cb_fn(cb_arg, 0, rc);
3545 		return;
3546 	}
3547 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
3548 	cpl.u.blobid.cb_fn = cb_fn;
3549 	cpl.u.blobid.cb_arg = cb_arg;
3550 	cpl.u.blobid.blobid = blob->id;
3551 
3552 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3553 	if (!seq) {
3554 		_spdk_blob_free(blob);
3555 		cb_fn(cb_arg, 0, -ENOMEM);
3556 		return;
3557 	}
3558 
3559 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
3560 }
3561 
3562 void spdk_bs_create_blob(struct spdk_blob_store *bs,
3563 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3564 {
3565 	_spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg);
3566 }
3567 
3568 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
3569 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3570 {
3571 	_spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg);
3572 }
3573 
3574 /* END spdk_bs_create_blob */
3575 
3576 /* START spdk_blob_resize */
3577 void
3578 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg)
3579 {
3580 	int			rc;
3581 
3582 	_spdk_blob_verify_md_op(blob);
3583 
3584 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
3585 
3586 	if (blob->md_ro) {
3587 		cb_fn(cb_arg, -EPERM);
3588 		return;
3589 	}
3590 
3591 	if (sz == blob->active.num_clusters) {
3592 		cb_fn(cb_arg, 0);
3593 		return;
3594 	}
3595 
3596 	rc = _spdk_blob_resize(blob, sz);
3597 	cb_fn(cb_arg, rc);
3598 }
3599 
3600 /* END spdk_blob_resize */
3601 
3602 
3603 /* START spdk_bs_delete_blob */
3604 
3605 static void
3606 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
3607 {
3608 	spdk_bs_sequence_t *seq = cb_arg;
3609 
3610 	spdk_bs_sequence_finish(seq, bserrno);
3611 }
3612 
3613 static void
3614 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3615 {
3616 	struct spdk_blob *blob = cb_arg;
3617 
3618 	if (bserrno != 0) {
3619 		/*
3620 		 * We already removed this blob from the blobstore tailq, so
3621 		 *  we need to free it here since this is the last reference
3622 		 *  to it.
3623 		 */
3624 		_spdk_blob_free(blob);
3625 		_spdk_bs_delete_close_cpl(seq, bserrno);
3626 		return;
3627 	}
3628 
3629 	/*
3630 	 * This will immediately decrement the ref_count and call
3631 	 *  the completion routine since the metadata state is clean.
3632 	 *  By calling spdk_blob_close, we reduce the number of call
3633 	 *  points into code that touches the blob->open_ref count
3634 	 *  and the blobstore's blob list.
3635 	 */
3636 	spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq);
3637 }
3638 
3639 static void
3640 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
3641 {
3642 	spdk_bs_sequence_t *seq = cb_arg;
3643 	uint32_t page_num;
3644 
3645 	if (bserrno != 0) {
3646 		spdk_bs_sequence_finish(seq, bserrno);
3647 		return;
3648 	}
3649 
3650 	_spdk_blob_verify_md_op(blob);
3651 
3652 	if (blob->open_ref > 1) {
3653 		/*
3654 		 * Someone has this blob open (besides this delete context).
3655 		 *  Decrement the ref count directly and return -EBUSY.
3656 		 */
3657 		blob->open_ref--;
3658 		spdk_bs_sequence_finish(seq, -EBUSY);
3659 		return;
3660 	}
3661 
3662 	/*
3663 	 * Remove the blob from the blob_store list now, to ensure it does not
3664 	 *  get returned after this point by _spdk_blob_lookup().
3665 	 */
3666 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3667 	page_num = _spdk_bs_blobid_to_page(blob->id);
3668 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
3669 	blob->state = SPDK_BLOB_STATE_DIRTY;
3670 	blob->active.num_pages = 0;
3671 	_spdk_blob_resize(blob, 0);
3672 
3673 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob);
3674 }
3675 
3676 void
3677 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
3678 		    spdk_blob_op_complete cb_fn, void *cb_arg)
3679 {
3680 	struct spdk_bs_cpl	cpl;
3681 	spdk_bs_sequence_t	*seq;
3682 
3683 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
3684 
3685 	assert(spdk_get_thread() == bs->md_thread);
3686 
3687 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3688 	cpl.u.blob_basic.cb_fn = cb_fn;
3689 	cpl.u.blob_basic.cb_arg = cb_arg;
3690 
3691 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3692 	if (!seq) {
3693 		cb_fn(cb_arg, -ENOMEM);
3694 		return;
3695 	}
3696 
3697 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
3698 }
3699 
3700 /* END spdk_bs_delete_blob */
3701 
3702 /* START spdk_bs_open_blob */
3703 
3704 static void
3705 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3706 {
3707 	struct spdk_blob *blob = cb_arg;
3708 
3709 	/* If the blob have crc error, we just return NULL. */
3710 	if (blob == NULL) {
3711 		seq->cpl.u.blob_handle.blob = NULL;
3712 		spdk_bs_sequence_finish(seq, bserrno);
3713 		return;
3714 	}
3715 
3716 	blob->open_ref++;
3717 
3718 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
3719 
3720 	spdk_bs_sequence_finish(seq, bserrno);
3721 }
3722 
3723 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
3724 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3725 {
3726 	struct spdk_blob		*blob;
3727 	struct spdk_bs_cpl		cpl;
3728 	spdk_bs_sequence_t		*seq;
3729 	uint32_t			page_num;
3730 
3731 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
3732 	assert(spdk_get_thread() == bs->md_thread);
3733 
3734 	page_num = _spdk_bs_blobid_to_page(blobid);
3735 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
3736 		/* Invalid blobid */
3737 		cb_fn(cb_arg, NULL, -ENOENT);
3738 		return;
3739 	}
3740 
3741 	blob = _spdk_blob_lookup(bs, blobid);
3742 	if (blob) {
3743 		blob->open_ref++;
3744 		cb_fn(cb_arg, blob, 0);
3745 		return;
3746 	}
3747 
3748 	blob = _spdk_blob_alloc(bs, blobid);
3749 	if (!blob) {
3750 		cb_fn(cb_arg, NULL, -ENOMEM);
3751 		return;
3752 	}
3753 
3754 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
3755 	cpl.u.blob_handle.cb_fn = cb_fn;
3756 	cpl.u.blob_handle.cb_arg = cb_arg;
3757 	cpl.u.blob_handle.blob = blob;
3758 
3759 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3760 	if (!seq) {
3761 		_spdk_blob_free(blob);
3762 		cb_fn(cb_arg, NULL, -ENOMEM);
3763 		return;
3764 	}
3765 
3766 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
3767 }
3768 /* END spdk_bs_open_blob */
3769 
3770 /* START spdk_blob_set_read_only */
3771 int spdk_blob_set_read_only(struct spdk_blob *blob)
3772 {
3773 	_spdk_blob_verify_md_op(blob);
3774 
3775 	blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
3776 
3777 	blob->state = SPDK_BLOB_STATE_DIRTY;
3778 	return 0;
3779 }
3780 /* END spdk_blob_set_read_only */
3781 
3782 /* START spdk_blob_sync_md */
3783 
3784 static void
3785 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3786 {
3787 	struct spdk_blob *blob = cb_arg;
3788 
3789 	if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
3790 		blob->data_ro = true;
3791 		blob->md_ro = true;
3792 	}
3793 
3794 	spdk_bs_sequence_finish(seq, bserrno);
3795 }
3796 
3797 static void
3798 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3799 {
3800 	struct spdk_bs_cpl	cpl;
3801 	spdk_bs_sequence_t	*seq;
3802 
3803 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3804 	cpl.u.blob_basic.cb_fn = cb_fn;
3805 	cpl.u.blob_basic.cb_arg = cb_arg;
3806 
3807 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3808 	if (!seq) {
3809 		cb_fn(cb_arg, -ENOMEM);
3810 		return;
3811 	}
3812 
3813 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
3814 }
3815 
3816 void
3817 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3818 {
3819 	_spdk_blob_verify_md_op(blob);
3820 
3821 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
3822 
3823 	if (blob->md_ro) {
3824 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
3825 		cb_fn(cb_arg, 0);
3826 		return;
3827 	}
3828 
3829 	_spdk_blob_sync_md(blob, cb_fn, cb_arg);
3830 }
3831 
3832 /* END spdk_blob_sync_md */
3833 
3834 struct spdk_blob_insert_cluster_ctx {
3835 	struct spdk_thread	*thread;
3836 	struct spdk_blob	*blob;
3837 	uint32_t		cluster_num;	/* cluster index in blob */
3838 	uint32_t		cluster;	/* cluster on disk */
3839 	int			rc;
3840 	spdk_blob_op_complete	cb_fn;
3841 	void			*cb_arg;
3842 };
3843 
3844 static void
3845 _spdk_blob_insert_cluster_msg_cpl(void *arg)
3846 {
3847 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3848 
3849 	ctx->cb_fn(ctx->cb_arg, ctx->rc);
3850 	free(ctx);
3851 }
3852 
3853 static void
3854 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
3855 {
3856 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3857 
3858 	ctx->rc = bserrno;
3859 	spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
3860 }
3861 
3862 static void
3863 _spdk_blob_insert_cluster_msg(void *arg)
3864 {
3865 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
3866 
3867 	ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
3868 	if (ctx->rc != 0) {
3869 		spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
3870 		return;
3871 	}
3872 
3873 	ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
3874 	_spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
3875 }
3876 
3877 void
3878 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
3879 				       uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg)
3880 {
3881 	struct spdk_blob_insert_cluster_ctx *ctx;
3882 
3883 	ctx = calloc(1, sizeof(*ctx));
3884 	if (ctx == NULL) {
3885 		cb_fn(cb_arg, -ENOMEM);
3886 		return;
3887 	}
3888 
3889 	ctx->thread = spdk_get_thread();
3890 	ctx->blob = blob;
3891 	ctx->cluster_num = cluster_num;
3892 	ctx->cluster = cluster;
3893 	ctx->cb_fn = cb_fn;
3894 	ctx->cb_arg = cb_arg;
3895 
3896 	spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
3897 }
3898 
3899 /* START spdk_blob_close */
3900 
3901 static void
3902 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3903 {
3904 	struct spdk_blob *blob = cb_arg;
3905 
3906 	if (bserrno == 0) {
3907 		blob->open_ref--;
3908 		if (blob->open_ref == 0) {
3909 			/*
3910 			 * Blobs with active.num_pages == 0 are deleted blobs.
3911 			 *  these blobs are removed from the blob_store list
3912 			 *  when the deletion process starts - so don't try to
3913 			 *  remove them again.
3914 			 */
3915 			if (blob->active.num_pages > 0) {
3916 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3917 			}
3918 			_spdk_blob_free(blob);
3919 		}
3920 	}
3921 
3922 	spdk_bs_sequence_finish(seq, bserrno);
3923 }
3924 
3925 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3926 {
3927 	struct spdk_bs_cpl	cpl;
3928 	spdk_bs_sequence_t	*seq;
3929 
3930 	_spdk_blob_verify_md_op(blob);
3931 
3932 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
3933 
3934 	if (blob->open_ref == 0) {
3935 		cb_fn(cb_arg, -EBADF);
3936 		return;
3937 	}
3938 
3939 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3940 	cpl.u.blob_basic.cb_fn = cb_fn;
3941 	cpl.u.blob_basic.cb_arg = cb_arg;
3942 
3943 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3944 	if (!seq) {
3945 		cb_fn(cb_arg, -ENOMEM);
3946 		return;
3947 	}
3948 
3949 	/* Sync metadata */
3950 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
3951 }
3952 
3953 /* END spdk_blob_close */
3954 
3955 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
3956 {
3957 	return spdk_get_io_channel(bs);
3958 }
3959 
3960 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
3961 {
3962 	spdk_put_io_channel(channel);
3963 }
3964 
3965 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel,
3966 			uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3967 {
3968 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3969 				     SPDK_BLOB_UNMAP);
3970 }
3971 
3972 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel,
3973 			       uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3974 {
3975 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3976 				     SPDK_BLOB_WRITE_ZEROES);
3977 }
3978 
3979 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel,
3980 			void *payload, uint64_t offset, uint64_t length,
3981 			spdk_blob_op_complete cb_fn, void *cb_arg)
3982 {
3983 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3984 				     SPDK_BLOB_WRITE);
3985 }
3986 
3987 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel,
3988 		       void *payload, uint64_t offset, uint64_t length,
3989 		       spdk_blob_op_complete cb_fn, void *cb_arg)
3990 {
3991 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3992 				     SPDK_BLOB_READ);
3993 }
3994 
3995 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel,
3996 			 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3997 			 spdk_blob_op_complete cb_fn, void *cb_arg)
3998 {
3999 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
4000 }
4001 
4002 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel,
4003 			struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
4004 			spdk_blob_op_complete cb_fn, void *cb_arg)
4005 {
4006 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
4007 }
4008 
4009 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4010 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
4011 {
4012 	spdk_blob_io_unmap(blob, channel, offset, length, cb_fn, cb_arg);
4013 }
4014 
4015 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4016 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
4017 {
4018 	spdk_blob_io_write_zeroes(blob, channel, offset, length, cb_fn, cb_arg);
4019 }
4020 
4021 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4022 			   void *payload, uint64_t offset, uint64_t length,
4023 			   spdk_blob_op_complete cb_fn, void *cb_arg)
4024 {
4025 	spdk_blob_io_write(blob, channel, payload, offset, length, cb_fn, cb_arg);
4026 }
4027 
4028 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4029 			  void *payload, uint64_t offset, uint64_t length,
4030 			  spdk_blob_op_complete cb_fn, void *cb_arg)
4031 {
4032 	spdk_blob_io_read(blob, channel, payload, offset, length, cb_fn, cb_arg);
4033 }
4034 
4035 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4036 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
4037 			    spdk_blob_op_complete cb_fn, void *cb_arg)
4038 {
4039 	spdk_blob_io_writev(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg);
4040 }
4041 
4042 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4043 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
4044 			   spdk_blob_op_complete cb_fn, void *cb_arg)
4045 {
4046 	spdk_blob_io_readv(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg);
4047 }
4048 
4049 struct spdk_bs_iter_ctx {
4050 	int64_t page_num;
4051 	struct spdk_blob_store *bs;
4052 
4053 	spdk_blob_op_with_handle_complete cb_fn;
4054 	void *cb_arg;
4055 };
4056 
4057 static void
4058 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4059 {
4060 	struct spdk_bs_iter_ctx *ctx = cb_arg;
4061 	struct spdk_blob_store *bs = ctx->bs;
4062 	spdk_blob_id id;
4063 
4064 	if (bserrno == 0) {
4065 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
4066 		free(ctx);
4067 		return;
4068 	}
4069 
4070 	ctx->page_num++;
4071 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
4072 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
4073 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
4074 		free(ctx);
4075 		return;
4076 	}
4077 
4078 	id = _spdk_bs_page_to_blobid(ctx->page_num);
4079 
4080 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
4081 }
4082 
4083 void
4084 spdk_bs_iter_first(struct spdk_blob_store *bs,
4085 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
4086 {
4087 	struct spdk_bs_iter_ctx *ctx;
4088 
4089 	ctx = calloc(1, sizeof(*ctx));
4090 	if (!ctx) {
4091 		cb_fn(cb_arg, NULL, -ENOMEM);
4092 		return;
4093 	}
4094 
4095 	ctx->page_num = -1;
4096 	ctx->bs = bs;
4097 	ctx->cb_fn = cb_fn;
4098 	ctx->cb_arg = cb_arg;
4099 
4100 	_spdk_bs_iter_cpl(ctx, NULL, -1);
4101 }
4102 
4103 static void
4104 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
4105 {
4106 	struct spdk_bs_iter_ctx *ctx = cb_arg;
4107 
4108 	_spdk_bs_iter_cpl(ctx, NULL, -1);
4109 }
4110 
4111 void
4112 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob,
4113 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
4114 {
4115 	struct spdk_bs_iter_ctx *ctx;
4116 
4117 	assert(blob != NULL);
4118 
4119 	ctx = calloc(1, sizeof(*ctx));
4120 	if (!ctx) {
4121 		cb_fn(cb_arg, NULL, -ENOMEM);
4122 		return;
4123 	}
4124 
4125 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
4126 	ctx->bs = bs;
4127 	ctx->cb_fn = cb_fn;
4128 	ctx->cb_arg = cb_arg;
4129 
4130 	/* Close the existing blob */
4131 	spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx);
4132 }
4133 
4134 static int
4135 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
4136 		     uint16_t value_len, bool internal)
4137 {
4138 	struct spdk_xattr_tailq *xattrs;
4139 	struct spdk_xattr	*xattr;
4140 
4141 	_spdk_blob_verify_md_op(blob);
4142 
4143 	if (blob->md_ro) {
4144 		return -EPERM;
4145 	}
4146 
4147 	if (internal) {
4148 		xattrs = &blob->xattrs_internal;
4149 		blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR;
4150 	} else {
4151 		xattrs = &blob->xattrs;
4152 	}
4153 
4154 	TAILQ_FOREACH(xattr, xattrs, link) {
4155 		if (!strcmp(name, xattr->name)) {
4156 			free(xattr->value);
4157 			xattr->value_len = value_len;
4158 			xattr->value = malloc(value_len);
4159 			memcpy(xattr->value, value, value_len);
4160 
4161 			blob->state = SPDK_BLOB_STATE_DIRTY;
4162 
4163 			return 0;
4164 		}
4165 	}
4166 
4167 	xattr = calloc(1, sizeof(*xattr));
4168 	if (!xattr) {
4169 		return -1;
4170 	}
4171 	xattr->name = strdup(name);
4172 	xattr->value_len = value_len;
4173 	xattr->value = malloc(value_len);
4174 	memcpy(xattr->value, value, value_len);
4175 	TAILQ_INSERT_TAIL(xattrs, xattr, link);
4176 
4177 	blob->state = SPDK_BLOB_STATE_DIRTY;
4178 
4179 	return 0;
4180 }
4181 
4182 int
4183 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
4184 		    uint16_t value_len)
4185 {
4186 	return _spdk_blob_set_xattr(blob, name, value, value_len, false);
4187 }
4188 
4189 static int
4190 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal)
4191 {
4192 	struct spdk_xattr_tailq *xattrs;
4193 	struct spdk_xattr	*xattr;
4194 
4195 	_spdk_blob_verify_md_op(blob);
4196 
4197 	if (blob->md_ro) {
4198 		return -EPERM;
4199 	}
4200 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
4201 
4202 	TAILQ_FOREACH(xattr, xattrs, link) {
4203 		if (!strcmp(name, xattr->name)) {
4204 			TAILQ_REMOVE(xattrs, xattr, link);
4205 			free(xattr->value);
4206 			free(xattr->name);
4207 			free(xattr);
4208 
4209 			if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) {
4210 				blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR;
4211 			}
4212 			blob->state = SPDK_BLOB_STATE_DIRTY;
4213 
4214 			return 0;
4215 		}
4216 	}
4217 
4218 	return -ENOENT;
4219 }
4220 
4221 int
4222 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name)
4223 {
4224 	return _spdk_blob_remove_xattr(blob, name, false);
4225 }
4226 
4227 static int
4228 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
4229 			   const void **value, size_t *value_len, bool internal)
4230 {
4231 	struct spdk_xattr	*xattr;
4232 	struct spdk_xattr_tailq *xattrs;
4233 
4234 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
4235 
4236 	TAILQ_FOREACH(xattr, xattrs, link) {
4237 		if (!strcmp(name, xattr->name)) {
4238 			*value = xattr->value;
4239 			*value_len = xattr->value_len;
4240 			return 0;
4241 		}
4242 	}
4243 	return -ENOENT;
4244 }
4245 
4246 int
4247 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
4248 			  const void **value, size_t *value_len)
4249 {
4250 	_spdk_blob_verify_md_op(blob);
4251 
4252 	return _spdk_blob_get_xattr_value(blob, name, value, value_len, false);
4253 }
4254 
4255 struct spdk_xattr_names {
4256 	uint32_t	count;
4257 	const char	*names[0];
4258 };
4259 
4260 static int
4261 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names)
4262 {
4263 	struct spdk_xattr	*xattr;
4264 	int			count = 0;
4265 
4266 	TAILQ_FOREACH(xattr, xattrs, link) {
4267 		count++;
4268 	}
4269 
4270 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
4271 	if (*names == NULL) {
4272 		return -ENOMEM;
4273 	}
4274 
4275 	TAILQ_FOREACH(xattr, xattrs, link) {
4276 		(*names)->names[(*names)->count++] = xattr->name;
4277 	}
4278 
4279 	return 0;
4280 }
4281 
4282 int
4283 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names)
4284 {
4285 	_spdk_blob_verify_md_op(blob);
4286 
4287 	return _spdk_blob_get_xattr_names(&blob->xattrs, names);
4288 }
4289 
4290 uint32_t
4291 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
4292 {
4293 	assert(names != NULL);
4294 
4295 	return names->count;
4296 }
4297 
4298 const char *
4299 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
4300 {
4301 	if (index >= names->count) {
4302 		return NULL;
4303 	}
4304 
4305 	return names->names[index];
4306 }
4307 
4308 void
4309 spdk_xattr_names_free(struct spdk_xattr_names *names)
4310 {
4311 	free(names);
4312 }
4313 
4314 struct spdk_bs_type
4315 spdk_bs_get_bstype(struct spdk_blob_store *bs)
4316 {
4317 	return bs->bstype;
4318 }
4319 
4320 void
4321 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
4322 {
4323 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
4324 }
4325 
4326 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
4327