xref: /spdk/lib/blob/blobstore.c (revision 95b478cc707eeb308e0fd44180b4c24e75e6a76b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 #include "spdk/util.h"
44 #include "spdk/string.h"
45 
46 #include "spdk_internal/assert.h"
47 #include "spdk_internal/log.h"
48 
49 #include "blobstore.h"
50 
51 #define BLOB_CRC32C_INITIAL    0xffffffffUL
52 
53 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
54 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
55 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
56 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
57 		uint64_t cluster, uint32_t extent, spdk_blob_op_complete cb_fn, void *cb_arg);
58 
59 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
60 				uint16_t value_len, bool internal);
61 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
62 				      const void **value, size_t *value_len, bool internal);
63 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
64 
65 static void _spdk_blob_insert_extent(struct spdk_blob *blob, uint32_t extent, uint64_t cluster_num,
66 				     spdk_blob_op_complete cb_fn, void *cb_arg);
67 
68 static void
69 _spdk_blob_verify_md_op(struct spdk_blob *blob)
70 {
71 	assert(blob != NULL);
72 	assert(spdk_get_thread() == blob->bs->md_thread);
73 	assert(blob->state != SPDK_BLOB_STATE_LOADING);
74 }
75 
76 static struct spdk_blob_list *
77 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid)
78 {
79 	struct spdk_blob_list *snapshot_entry = NULL;
80 
81 	TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
82 		if (snapshot_entry->id == blobid) {
83 			break;
84 		}
85 	}
86 
87 	return snapshot_entry;
88 }
89 
90 static void
91 _spdk_bs_claim_md_page(struct spdk_blob_store *bs, uint32_t page)
92 {
93 	assert(page < spdk_bit_array_capacity(bs->used_md_pages));
94 	assert(spdk_bit_array_get(bs->used_md_pages, page) == false);
95 
96 	spdk_bit_array_set(bs->used_md_pages, page);
97 }
98 
99 static void
100 _spdk_bs_release_md_page(struct spdk_blob_store *bs, uint32_t page)
101 {
102 	assert(page < spdk_bit_array_capacity(bs->used_md_pages));
103 	assert(spdk_bit_array_get(bs->used_md_pages, page) == true);
104 
105 	spdk_bit_array_clear(bs->used_md_pages, page);
106 }
107 
108 static void
109 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
110 {
111 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
112 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
113 	assert(bs->num_free_clusters > 0);
114 
115 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
116 
117 	spdk_bit_array_set(bs->used_clusters, cluster_num);
118 	bs->num_free_clusters--;
119 }
120 
121 static int
122 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster)
123 {
124 	uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
125 
126 	_spdk_blob_verify_md_op(blob);
127 
128 	if (*cluster_lba != 0) {
129 		return -EEXIST;
130 	}
131 
132 	*cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
133 	return 0;
134 }
135 
136 static int
137 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
138 			  uint64_t *lowest_free_cluster, uint32_t *lowest_free_md_page, bool update_map)
139 {
140 	uint32_t *extent_page = _spdk_bs_cluster_to_extent_page(blob, cluster_num);
141 
142 	pthread_mutex_lock(&blob->bs->used_clusters_mutex);
143 	*lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
144 			       *lowest_free_cluster);
145 	if (*lowest_free_cluster == UINT32_MAX) {
146 		/* No more free clusters. Cannot satisfy the request */
147 		pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
148 		return -ENOSPC;
149 	}
150 
151 	if (extent_page != NULL && *extent_page == 0) {
152 		/* No extent_page is allocated for the cluster */
153 		*lowest_free_md_page = spdk_bit_array_find_first_clear(blob->bs->used_md_pages,
154 				       *lowest_free_md_page);
155 		if (*lowest_free_md_page == UINT32_MAX) {
156 			/* No more free md pages. Cannot satisfy the request */
157 			pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
158 			return -ENOSPC;
159 		}
160 		_spdk_bs_claim_md_page(blob->bs, *lowest_free_md_page);
161 	}
162 
163 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
164 	_spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
165 
166 	pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
167 
168 	if (update_map) {
169 		_spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
170 		if (extent_page != NULL && *extent_page == 0) {
171 			*extent_page = *lowest_free_md_page;
172 		}
173 	}
174 
175 	return 0;
176 }
177 
178 static void
179 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
180 {
181 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
182 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
183 	assert(bs->num_free_clusters < bs->total_clusters);
184 
185 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
186 
187 	pthread_mutex_lock(&bs->used_clusters_mutex);
188 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
189 	bs->num_free_clusters++;
190 	pthread_mutex_unlock(&bs->used_clusters_mutex);
191 }
192 
193 static void
194 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs)
195 {
196 	xattrs->count = 0;
197 	xattrs->names = NULL;
198 	xattrs->ctx = NULL;
199 	xattrs->get_value = NULL;
200 }
201 
202 void
203 spdk_blob_opts_init(struct spdk_blob_opts *opts)
204 {
205 	opts->num_clusters = 0;
206 	opts->thin_provision = false;
207 	opts->clear_method = BLOB_CLEAR_WITH_DEFAULT;
208 	_spdk_blob_xattrs_init(&opts->xattrs);
209 	opts->use_extent_table = false;
210 }
211 
212 void
213 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts)
214 {
215 	opts->clear_method = BLOB_CLEAR_WITH_DEFAULT;
216 }
217 
218 static struct spdk_blob *
219 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
220 {
221 	struct spdk_blob *blob;
222 
223 	blob = calloc(1, sizeof(*blob));
224 	if (!blob) {
225 		return NULL;
226 	}
227 
228 	blob->id = id;
229 	blob->bs = bs;
230 
231 	blob->parent_id = SPDK_BLOBID_INVALID;
232 
233 	blob->state = SPDK_BLOB_STATE_DIRTY;
234 	blob->extent_rle_found = false;
235 	blob->extent_table_found = false;
236 	blob->active.num_pages = 1;
237 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
238 	if (!blob->active.pages) {
239 		free(blob);
240 		return NULL;
241 	}
242 
243 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
244 
245 	TAILQ_INIT(&blob->xattrs);
246 	TAILQ_INIT(&blob->xattrs_internal);
247 
248 	return blob;
249 }
250 
251 static void
252 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
253 {
254 	struct spdk_xattr	*xattr, *xattr_tmp;
255 
256 	TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) {
257 		TAILQ_REMOVE(xattrs, xattr, link);
258 		free(xattr->name);
259 		free(xattr->value);
260 		free(xattr);
261 	}
262 }
263 
264 static void
265 _spdk_blob_free(struct spdk_blob *blob)
266 {
267 	assert(blob != NULL);
268 
269 	free(blob->active.extent_pages);
270 	free(blob->clean.extent_pages);
271 	free(blob->active.clusters);
272 	free(blob->clean.clusters);
273 	free(blob->active.pages);
274 	free(blob->clean.pages);
275 
276 	_spdk_xattrs_free(&blob->xattrs);
277 	_spdk_xattrs_free(&blob->xattrs_internal);
278 
279 	if (blob->back_bs_dev) {
280 		blob->back_bs_dev->destroy(blob->back_bs_dev);
281 	}
282 
283 	free(blob);
284 }
285 
286 struct freeze_io_ctx {
287 	struct spdk_bs_cpl cpl;
288 	struct spdk_blob *blob;
289 };
290 
291 static void
292 _spdk_blob_io_sync(struct spdk_io_channel_iter *i)
293 {
294 	spdk_for_each_channel_continue(i, 0);
295 }
296 
297 static void
298 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i)
299 {
300 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
301 	struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch);
302 	struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
303 	struct spdk_bs_request_set	*set;
304 	struct spdk_bs_user_op_args	*args;
305 	spdk_bs_user_op_t *op, *tmp;
306 
307 	TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) {
308 		set = (struct spdk_bs_request_set *)op;
309 		args = &set->u.user_op;
310 
311 		if (args->blob == ctx->blob) {
312 			TAILQ_REMOVE(&ch->queued_io, op, link);
313 			spdk_bs_user_op_execute(op);
314 		}
315 	}
316 
317 	spdk_for_each_channel_continue(i, 0);
318 }
319 
320 static void
321 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status)
322 {
323 	struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
324 
325 	ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0);
326 
327 	free(ctx);
328 }
329 
330 static void
331 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
332 {
333 	struct freeze_io_ctx *ctx;
334 
335 	ctx = calloc(1, sizeof(*ctx));
336 	if (!ctx) {
337 		cb_fn(cb_arg, -ENOMEM);
338 		return;
339 	}
340 
341 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
342 	ctx->cpl.u.blob_basic.cb_fn = cb_fn;
343 	ctx->cpl.u.blob_basic.cb_arg = cb_arg;
344 	ctx->blob = blob;
345 
346 	/* Freeze I/O on blob */
347 	blob->frozen_refcnt++;
348 
349 	if (blob->frozen_refcnt == 1) {
350 		spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl);
351 	} else {
352 		cb_fn(cb_arg, 0);
353 		free(ctx);
354 	}
355 }
356 
357 static void
358 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
359 {
360 	struct freeze_io_ctx *ctx;
361 
362 	ctx = calloc(1, sizeof(*ctx));
363 	if (!ctx) {
364 		cb_fn(cb_arg, -ENOMEM);
365 		return;
366 	}
367 
368 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
369 	ctx->cpl.u.blob_basic.cb_fn = cb_fn;
370 	ctx->cpl.u.blob_basic.cb_arg = cb_arg;
371 	ctx->blob = blob;
372 
373 	assert(blob->frozen_refcnt > 0);
374 
375 	blob->frozen_refcnt--;
376 
377 	if (blob->frozen_refcnt == 0) {
378 		spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl);
379 	} else {
380 		cb_fn(cb_arg, 0);
381 		free(ctx);
382 	}
383 }
384 
385 static int
386 _spdk_blob_mark_clean(struct spdk_blob *blob)
387 {
388 	uint32_t *extent_pages = NULL;
389 	uint64_t *clusters = NULL;
390 	uint32_t *pages = NULL;
391 
392 	assert(blob != NULL);
393 
394 	if (blob->active.num_extent_pages) {
395 		assert(blob->active.extent_pages);
396 		extent_pages = calloc(blob->active.num_extent_pages, sizeof(*blob->active.extent_pages));
397 		if (!extent_pages) {
398 			return -ENOMEM;
399 		}
400 		memcpy(extent_pages, blob->active.extent_pages,
401 		       blob->active.num_extent_pages * sizeof(*extent_pages));
402 	}
403 
404 	if (blob->active.num_clusters) {
405 		assert(blob->active.clusters);
406 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
407 		if (!clusters) {
408 			free(extent_pages);
409 			return -ENOMEM;
410 		}
411 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters));
412 	}
413 
414 	if (blob->active.num_pages) {
415 		assert(blob->active.pages);
416 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
417 		if (!pages) {
418 			free(extent_pages);
419 			free(clusters);
420 			return -ENOMEM;
421 		}
422 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages));
423 	}
424 
425 	free(blob->clean.extent_pages);
426 	free(blob->clean.clusters);
427 	free(blob->clean.pages);
428 
429 	blob->clean.num_extent_pages = blob->active.num_extent_pages;
430 	blob->clean.extent_pages = blob->active.extent_pages;
431 	blob->clean.num_clusters = blob->active.num_clusters;
432 	blob->clean.clusters = blob->active.clusters;
433 	blob->clean.num_pages = blob->active.num_pages;
434 	blob->clean.pages = blob->active.pages;
435 
436 	blob->active.extent_pages = extent_pages;
437 	blob->active.clusters = clusters;
438 	blob->active.pages = pages;
439 
440 	/* If the metadata was dirtied again while the metadata was being written to disk,
441 	 *  we do not want to revert the DIRTY state back to CLEAN here.
442 	 */
443 	if (blob->state == SPDK_BLOB_STATE_LOADING) {
444 		blob->state = SPDK_BLOB_STATE_CLEAN;
445 	}
446 
447 	return 0;
448 }
449 
450 static int
451 _spdk_blob_deserialize_xattr(struct spdk_blob *blob,
452 			     struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal)
453 {
454 	struct spdk_xattr                       *xattr;
455 
456 	if (desc_xattr->length != sizeof(desc_xattr->name_length) +
457 	    sizeof(desc_xattr->value_length) +
458 	    desc_xattr->name_length + desc_xattr->value_length) {
459 		return -EINVAL;
460 	}
461 
462 	xattr = calloc(1, sizeof(*xattr));
463 	if (xattr == NULL) {
464 		return -ENOMEM;
465 	}
466 
467 	xattr->name = malloc(desc_xattr->name_length + 1);
468 	if (xattr->name == NULL) {
469 		free(xattr);
470 		return -ENOMEM;
471 	}
472 	memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
473 	xattr->name[desc_xattr->name_length] = '\0';
474 
475 	xattr->value = malloc(desc_xattr->value_length);
476 	if (xattr->value == NULL) {
477 		free(xattr->name);
478 		free(xattr);
479 		return -ENOMEM;
480 	}
481 	xattr->value_len = desc_xattr->value_length;
482 	memcpy(xattr->value,
483 	       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
484 	       desc_xattr->value_length);
485 
486 	TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link);
487 
488 	return 0;
489 }
490 
491 
492 static int
493 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
494 {
495 	struct spdk_blob_md_descriptor *desc;
496 	size_t	cur_desc = 0;
497 	void *tmp;
498 
499 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
500 	while (cur_desc < sizeof(page->descriptors)) {
501 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
502 			if (desc->length == 0) {
503 				/* If padding and length are 0, this terminates the page */
504 				break;
505 			}
506 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
507 			struct spdk_blob_md_descriptor_flags	*desc_flags;
508 
509 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
510 
511 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
512 				return -EINVAL;
513 			}
514 
515 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
516 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
517 				return -EINVAL;
518 			}
519 
520 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
521 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
522 				blob->data_ro = true;
523 				blob->md_ro = true;
524 			}
525 
526 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
527 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
528 				blob->md_ro = true;
529 			}
530 
531 			if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
532 				blob->data_ro = true;
533 				blob->md_ro = true;
534 			}
535 
536 			blob->invalid_flags = desc_flags->invalid_flags;
537 			blob->data_ro_flags = desc_flags->data_ro_flags;
538 			blob->md_ro_flags = desc_flags->md_ro_flags;
539 
540 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) {
541 			struct spdk_blob_md_descriptor_extent_rle	*desc_extent_rle;
542 			unsigned int				i, j;
543 			unsigned int				cluster_count = blob->active.num_clusters;
544 
545 			if (blob->extent_table_found) {
546 				/* Extent Table already present in the md,
547 				 * both descriptors should never be at the same time. */
548 				return -EINVAL;
549 			}
550 			blob->extent_rle_found = true;
551 
552 			desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc;
553 
554 			if (desc_extent_rle->length == 0 ||
555 			    (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) {
556 				return -EINVAL;
557 			}
558 
559 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
560 				for (j = 0; j < desc_extent_rle->extents[i].length; j++) {
561 					if (desc_extent_rle->extents[i].cluster_idx != 0) {
562 						if (!spdk_bit_array_get(blob->bs->used_clusters,
563 									desc_extent_rle->extents[i].cluster_idx + j)) {
564 							return -EINVAL;
565 						}
566 					}
567 					cluster_count++;
568 				}
569 			}
570 
571 			if (cluster_count == 0) {
572 				return -EINVAL;
573 			}
574 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters));
575 			if (tmp == NULL) {
576 				return -ENOMEM;
577 			}
578 			blob->active.clusters = tmp;
579 			blob->active.cluster_array_size = cluster_count;
580 
581 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
582 				for (j = 0; j < desc_extent_rle->extents[i].length; j++) {
583 					if (desc_extent_rle->extents[i].cluster_idx != 0) {
584 						blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
585 								desc_extent_rle->extents[i].cluster_idx + j);
586 					} else if (spdk_blob_is_thin_provisioned(blob)) {
587 						blob->active.clusters[blob->active.num_clusters++] = 0;
588 					} else {
589 						return -EINVAL;
590 					}
591 				}
592 			}
593 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_TABLE) {
594 			struct spdk_blob_md_descriptor_extent_table *desc_extent_table;
595 			uint32_t num_extent_pages = blob->active.num_extent_pages;
596 			uint32_t i, j;
597 			size_t extent_pages_length;
598 
599 			desc_extent_table = (struct spdk_blob_md_descriptor_extent_table *)desc;
600 			extent_pages_length = desc_extent_table->length - sizeof(desc_extent_table->num_clusters);
601 
602 			if (blob->extent_rle_found) {
603 				/* This means that Extent RLE is present in MD,
604 				 * both should never be at the same time. */
605 				return -EINVAL;
606 			}
607 			blob->extent_table_found = true;
608 
609 			if (desc_extent_table->length == 0 ||
610 			    (extent_pages_length % sizeof(desc_extent_table->extent_page[0]) != 0)) {
611 				return -EINVAL;
612 			}
613 
614 			for (i = 0; i < extent_pages_length / sizeof(desc_extent_table->extent_page[0]); i++) {
615 				num_extent_pages += desc_extent_table->extent_page[i].num_pages;
616 			}
617 
618 			tmp = realloc(blob->active.extent_pages, num_extent_pages * sizeof(uint32_t));
619 			if (tmp == NULL) {
620 				return -ENOMEM;
621 			}
622 			blob->active.extent_pages = tmp;
623 			blob->active.extent_pages_array_size = num_extent_pages;
624 
625 			blob->num_clusters_in_et = desc_extent_table->num_clusters;
626 
627 			/* Extent table entries contain md page numbers for extent pages.
628 			 * Zeroes represent unallocated extent pages, those are run-length-encoded.
629 			 */
630 			for (i = 0; i < extent_pages_length / sizeof(desc_extent_table->extent_page[0]); i++) {
631 				if (desc_extent_table->extent_page[i].page_idx != 0) {
632 					assert(desc_extent_table->extent_page[i].num_pages == 1);
633 					blob->active.extent_pages[blob->active.num_extent_pages++] =
634 						desc_extent_table->extent_page[i].page_idx;
635 				} else if (spdk_blob_is_thin_provisioned(blob)) {
636 					for (j = 0; j < desc_extent_table->extent_page[i].num_pages; j++) {
637 						blob->active.extent_pages[blob->active.num_extent_pages++] = 0;
638 					}
639 				} else {
640 					return -EINVAL;
641 				}
642 			}
643 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) {
644 			struct spdk_blob_md_descriptor_extent_page	*desc_extent;
645 			unsigned int					i;
646 			unsigned int					cluster_count = blob->active.num_clusters;
647 
648 			if (blob->extent_rle_found) {
649 				/* This means that Extent RLE is present in MD,
650 				 * both should never be at the same time. */
651 				return -EINVAL;
652 			}
653 
654 			desc_extent = (struct spdk_blob_md_descriptor_extent_page *)desc;
655 
656 			if (desc_extent->length == 0 ||
657 			    (desc_extent->length % sizeof(desc_extent->cluster_idx[0]) != 0)) {
658 				return -EINVAL;
659 			}
660 
661 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->cluster_idx[0]); i++) {
662 				if (desc_extent->cluster_idx[i] != 0) {
663 					if (!spdk_bit_array_get(blob->bs->used_clusters, desc_extent->cluster_idx[i])) {
664 						return -EINVAL;
665 					}
666 				}
667 				cluster_count++;
668 			}
669 
670 			if (cluster_count == 0) {
671 				return -EINVAL;
672 			}
673 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters));
674 			if (tmp == NULL) {
675 				return -ENOMEM;
676 			}
677 			blob->active.clusters = tmp;
678 			blob->active.cluster_array_size = cluster_count;
679 
680 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->cluster_idx[0]); i++) {
681 				if (desc_extent->cluster_idx[i] != 0) {
682 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
683 							desc_extent->cluster_idx[i]);
684 				} else if (spdk_blob_is_thin_provisioned(blob)) {
685 					blob->active.clusters[blob->active.num_clusters++] = 0;
686 				} else {
687 					return -EINVAL;
688 				}
689 			}
690 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
691 			int rc;
692 
693 			rc = _spdk_blob_deserialize_xattr(blob,
694 							  (struct spdk_blob_md_descriptor_xattr *) desc, false);
695 			if (rc != 0) {
696 				return rc;
697 			}
698 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
699 			int rc;
700 
701 			rc = _spdk_blob_deserialize_xattr(blob,
702 							  (struct spdk_blob_md_descriptor_xattr *) desc, true);
703 			if (rc != 0) {
704 				return rc;
705 			}
706 		} else {
707 			/* Unrecognized descriptor type.  Do not fail - just continue to the
708 			 *  next descriptor.  If this descriptor is associated with some feature
709 			 *  defined in a newer version of blobstore, that version of blobstore
710 			 *  should create and set an associated feature flag to specify if this
711 			 *  blob can be loaded or not.
712 			 */
713 		}
714 
715 		/* Advance to the next descriptor */
716 		cur_desc += sizeof(*desc) + desc->length;
717 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
718 			break;
719 		}
720 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
721 	}
722 
723 	return 0;
724 }
725 
726 static int
727 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
728 		 struct spdk_blob *blob)
729 {
730 	const struct spdk_blob_md_page *page;
731 	uint32_t i;
732 	int rc;
733 
734 	assert(page_count > 0);
735 	assert(pages[0].sequence_num == 0);
736 	assert(blob != NULL);
737 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
738 	assert(blob->active.clusters == NULL);
739 
740 	/* The blobid provided doesn't match what's in the MD, this can
741 	 * happen for example if a bogus blobid is passed in through open.
742 	 */
743 	if (blob->id != pages[0].id) {
744 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
745 			    blob->id, pages[0].id);
746 		return -ENOENT;
747 	}
748 
749 	for (i = 0; i < page_count; i++) {
750 		page = &pages[i];
751 
752 		assert(page->id == blob->id);
753 		assert(page->sequence_num == i);
754 
755 		rc = _spdk_blob_parse_page(page, blob);
756 		if (rc != 0) {
757 			return rc;
758 		}
759 	}
760 
761 	return 0;
762 }
763 
764 static int
765 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
766 			      struct spdk_blob_md_page **pages,
767 			      uint32_t *page_count,
768 			      struct spdk_blob_md_page **last_page)
769 {
770 	struct spdk_blob_md_page *page;
771 
772 	assert(pages != NULL);
773 	assert(page_count != NULL);
774 
775 	if (*page_count == 0) {
776 		assert(*pages == NULL);
777 		*page_count = 1;
778 		*pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
779 				     NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
780 	} else {
781 		assert(*pages != NULL);
782 		(*page_count)++;
783 		*pages = spdk_realloc(*pages,
784 				      SPDK_BS_PAGE_SIZE * (*page_count),
785 				      SPDK_BS_PAGE_SIZE);
786 	}
787 
788 	if (*pages == NULL) {
789 		*page_count = 0;
790 		*last_page = NULL;
791 		return -ENOMEM;
792 	}
793 
794 	page = &(*pages)[*page_count - 1];
795 	memset(page, 0, sizeof(*page));
796 	page->id = blob->id;
797 	page->sequence_num = *page_count - 1;
798 	page->next = SPDK_INVALID_MD_PAGE;
799 	*last_page = page;
800 
801 	return 0;
802 }
803 
804 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
805  * Update required_sz on both success and failure.
806  *
807  */
808 static int
809 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
810 			   uint8_t *buf, size_t buf_sz,
811 			   size_t *required_sz, bool internal)
812 {
813 	struct spdk_blob_md_descriptor_xattr	*desc;
814 
815 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
816 		       strlen(xattr->name) +
817 		       xattr->value_len;
818 
819 	if (buf_sz < *required_sz) {
820 		return -1;
821 	}
822 
823 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
824 
825 	desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR;
826 	desc->length = sizeof(desc->name_length) +
827 		       sizeof(desc->value_length) +
828 		       strlen(xattr->name) +
829 		       xattr->value_len;
830 	desc->name_length = strlen(xattr->name);
831 	desc->value_length = xattr->value_len;
832 
833 	memcpy(desc->name, xattr->name, desc->name_length);
834 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
835 	       xattr->value,
836 	       desc->value_length);
837 
838 	return 0;
839 }
840 
841 static void
842 _spdk_blob_serialize_extent_table_entry(const struct spdk_blob *blob,
843 					uint64_t start_ep, uint64_t *next_ep,
844 					uint8_t **buf, size_t *remaining_sz)
845 {
846 	struct spdk_blob_md_descriptor_extent_table *desc;
847 	size_t cur_sz;
848 	uint64_t i, et_idx;
849 	uint32_t extent_page, ep_len;
850 
851 	/* The buffer must have room for at least one extent page */
852 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->num_clusters) + sizeof(
853 			 desc->extent_page[0]);
854 	if (*remaining_sz < cur_sz) {
855 		*next_ep = start_ep;
856 		return;
857 	}
858 
859 	desc = (struct spdk_blob_md_descriptor_extent_table *)*buf;
860 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_TABLE;
861 
862 	desc->num_clusters = blob->active.num_clusters;
863 
864 	extent_page = blob->active.extent_pages[start_ep];
865 	ep_len = 1;
866 	et_idx = 0;
867 	for (i = start_ep + 1; i < blob->active.num_extent_pages; i++) {
868 		/* Extent table entries contain md page offsets for extent pages.
869 		 * Zeroes represent unallocated extent pages, which are run-length-encoded.
870 		 */
871 		if (extent_page == 0 && blob->active.extent_pages[i] == 0) {
872 			ep_len++;
873 			continue;
874 		}
875 		desc->extent_page[et_idx].page_idx = extent_page;
876 		desc->extent_page[et_idx].num_pages = ep_len;
877 		et_idx++;
878 
879 		cur_sz += sizeof(desc->extent_page[et_idx]);
880 
881 		if (*remaining_sz < cur_sz) {
882 			/* If we ran out of buffer space, return */
883 			*next_ep = i;
884 			break;
885 		}
886 		extent_page = blob->active.extent_pages[i];
887 		ep_len = 1;
888 	}
889 
890 	if (*remaining_sz >= cur_sz) {
891 		desc->extent_page[et_idx].page_idx = extent_page;
892 		desc->extent_page[et_idx].num_pages = ep_len;
893 		et_idx++;
894 
895 		*next_ep = blob->active.num_extent_pages;
896 	}
897 
898 	desc->length = sizeof(desc->num_clusters) + sizeof(desc->extent_page[0]) * et_idx;
899 	*remaining_sz -= sizeof(struct spdk_blob_md_descriptor) + desc->length;
900 	*buf += sizeof(struct spdk_blob_md_descriptor) + desc->length;
901 }
902 
903 static int
904 _spdk_blob_serialize_extent_table(const struct spdk_blob *blob,
905 				  struct spdk_blob_md_page **pages,
906 				  struct spdk_blob_md_page *cur_page,
907 				  uint32_t *page_count, uint8_t **buf,
908 				  size_t *remaining_sz)
909 {
910 	uint64_t				last_extent_page;
911 	int					rc;
912 
913 	last_extent_page = 0;
914 	while (last_extent_page < blob->active.num_extent_pages) {
915 		_spdk_blob_serialize_extent_table_entry(blob, last_extent_page, &last_extent_page, buf,
916 							remaining_sz);
917 
918 		if (last_extent_page == blob->active.num_extent_pages) {
919 			break;
920 		}
921 
922 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
923 		if (rc < 0) {
924 			return rc;
925 		}
926 
927 		*buf = (uint8_t *)cur_page->descriptors;
928 		*remaining_sz = sizeof(cur_page->descriptors);
929 	}
930 
931 	return 0;
932 }
933 
934 static void
935 _spdk_blob_serialize_extent_rle(const struct spdk_blob *blob,
936 				uint64_t start_cluster, uint64_t *next_cluster,
937 				uint8_t **buf, size_t *buf_sz)
938 {
939 	struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle;
940 	size_t cur_sz;
941 	uint64_t i, extent_idx;
942 	uint64_t lba, lba_per_cluster, lba_count;
943 
944 	/* The buffer must have room for at least one extent */
945 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]);
946 	if (*buf_sz < cur_sz) {
947 		*next_cluster = start_cluster;
948 		return;
949 	}
950 
951 	desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf;
952 	desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE;
953 
954 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
955 
956 	lba = blob->active.clusters[start_cluster];
957 	lba_count = lba_per_cluster;
958 	extent_idx = 0;
959 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
960 		if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) {
961 			/* Run-length encode sequential non-zero LBA */
962 			lba_count += lba_per_cluster;
963 			continue;
964 		} else if (lba == 0 && blob->active.clusters[i] == 0) {
965 			/* Run-length encode unallocated clusters */
966 			lba_count += lba_per_cluster;
967 			continue;
968 		}
969 		desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
970 		desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster;
971 		extent_idx++;
972 
973 		cur_sz += sizeof(desc_extent_rle->extents[extent_idx]);
974 
975 		if (*buf_sz < cur_sz) {
976 			/* If we ran out of buffer space, return */
977 			*next_cluster = i;
978 			break;
979 		}
980 
981 		lba = blob->active.clusters[i];
982 		lba_count = lba_per_cluster;
983 	}
984 
985 	if (*buf_sz >= cur_sz) {
986 		desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
987 		desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster;
988 		extent_idx++;
989 
990 		*next_cluster = blob->active.num_clusters;
991 	}
992 
993 	desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx;
994 	*buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length;
995 	*buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length;
996 }
997 
998 static int
999 _spdk_blob_serialize_extents_rle(const struct spdk_blob *blob,
1000 				 struct spdk_blob_md_page **pages,
1001 				 struct spdk_blob_md_page *cur_page,
1002 				 uint32_t *page_count, uint8_t **buf,
1003 				 size_t *remaining_sz)
1004 {
1005 	uint64_t				last_cluster;
1006 	int					rc;
1007 
1008 	last_cluster = 0;
1009 	while (last_cluster < blob->active.num_clusters) {
1010 		_spdk_blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz);
1011 
1012 		if (last_cluster == blob->active.num_clusters) {
1013 			break;
1014 		}
1015 
1016 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
1017 		if (rc < 0) {
1018 			return rc;
1019 		}
1020 
1021 		*buf = (uint8_t *)cur_page->descriptors;
1022 		*remaining_sz = sizeof(cur_page->descriptors);
1023 	}
1024 
1025 	return 0;
1026 }
1027 
1028 static void
1029 _spdk_blob_serialize_extent_page(const struct spdk_blob *blob,
1030 				 uint64_t cluster, struct spdk_blob_md_page *page)
1031 {
1032 	struct spdk_blob_md_descriptor_extent_page *desc_extent;
1033 	uint64_t i, extent_idx;
1034 	uint64_t lba, lba_per_cluster;
1035 	uint64_t start_cluster = (cluster / SPDK_EXTENTS_PER_EP) * SPDK_EXTENTS_PER_EP;
1036 	uint64_t end_cluster = spdk_min(start_cluster + SPDK_EXTENTS_PER_EP, blob->active.num_clusters);
1037 
1038 	desc_extent = (struct spdk_blob_md_descriptor_extent_page *) page->descriptors;
1039 	desc_extent->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE;
1040 
1041 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
1042 
1043 	extent_idx = 0;
1044 	for (i = start_cluster; i < end_cluster; i++) {
1045 		lba = blob->active.clusters[i];
1046 		desc_extent->cluster_idx[extent_idx++] = lba / lba_per_cluster;
1047 	}
1048 
1049 	desc_extent->length = sizeof(desc_extent->cluster_idx[0]) * extent_idx;
1050 }
1051 
1052 static void
1053 _spdk_blob_serialize_flags(const struct spdk_blob *blob,
1054 			   uint8_t *buf, size_t *buf_sz)
1055 {
1056 	struct spdk_blob_md_descriptor_flags *desc;
1057 
1058 	/*
1059 	 * Flags get serialized first, so we should always have room for the flags
1060 	 *  descriptor.
1061 	 */
1062 	assert(*buf_sz >= sizeof(*desc));
1063 
1064 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
1065 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
1066 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
1067 	desc->invalid_flags = blob->invalid_flags;
1068 	desc->data_ro_flags = blob->data_ro_flags;
1069 	desc->md_ro_flags = blob->md_ro_flags;
1070 
1071 	*buf_sz -= sizeof(*desc);
1072 }
1073 
1074 static int
1075 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob,
1076 			    const struct spdk_xattr_tailq *xattrs, bool internal,
1077 			    struct spdk_blob_md_page **pages,
1078 			    struct spdk_blob_md_page *cur_page,
1079 			    uint32_t *page_count, uint8_t **buf,
1080 			    size_t *remaining_sz)
1081 {
1082 	const struct spdk_xattr	*xattr;
1083 	int	rc;
1084 
1085 	TAILQ_FOREACH(xattr, xattrs, link) {
1086 		size_t required_sz = 0;
1087 
1088 		rc = _spdk_blob_serialize_xattr(xattr,
1089 						*buf, *remaining_sz,
1090 						&required_sz, internal);
1091 		if (rc < 0) {
1092 			/* Need to add a new page to the chain */
1093 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
1094 							   &cur_page);
1095 			if (rc < 0) {
1096 				spdk_free(*pages);
1097 				*pages = NULL;
1098 				*page_count = 0;
1099 				return rc;
1100 			}
1101 
1102 			*buf = (uint8_t *)cur_page->descriptors;
1103 			*remaining_sz = sizeof(cur_page->descriptors);
1104 
1105 			/* Try again */
1106 			required_sz = 0;
1107 			rc = _spdk_blob_serialize_xattr(xattr,
1108 							*buf, *remaining_sz,
1109 							&required_sz, internal);
1110 
1111 			if (rc < 0) {
1112 				spdk_free(*pages);
1113 				*pages = NULL;
1114 				*page_count = 0;
1115 				return rc;
1116 			}
1117 		}
1118 
1119 		*remaining_sz -= required_sz;
1120 		*buf += required_sz;
1121 	}
1122 
1123 	return 0;
1124 }
1125 
1126 static int
1127 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
1128 		     uint32_t *page_count)
1129 {
1130 	struct spdk_blob_md_page		*cur_page;
1131 	int					rc;
1132 	uint8_t					*buf;
1133 	size_t					remaining_sz;
1134 
1135 	assert(pages != NULL);
1136 	assert(page_count != NULL);
1137 	assert(blob != NULL);
1138 	assert(blob->state == SPDK_BLOB_STATE_DIRTY);
1139 
1140 	*pages = NULL;
1141 	*page_count = 0;
1142 
1143 	/* A blob always has at least 1 page, even if it has no descriptors */
1144 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
1145 	if (rc < 0) {
1146 		return rc;
1147 	}
1148 
1149 	buf = (uint8_t *)cur_page->descriptors;
1150 	remaining_sz = sizeof(cur_page->descriptors);
1151 
1152 	/* Serialize flags */
1153 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
1154 	buf += sizeof(struct spdk_blob_md_descriptor_flags);
1155 
1156 	/* Serialize xattrs */
1157 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false,
1158 					 pages, cur_page, page_count, &buf, &remaining_sz);
1159 	if (rc < 0) {
1160 		return rc;
1161 	}
1162 
1163 	/* Serialize internal xattrs */
1164 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true,
1165 					 pages, cur_page, page_count, &buf, &remaining_sz);
1166 	if (rc < 0) {
1167 		return rc;
1168 	}
1169 
1170 	if (blob->use_extent_table) {
1171 		/* Serialize extent table */
1172 		rc = _spdk_blob_serialize_extent_table(blob, pages, cur_page, page_count, &buf, &remaining_sz);
1173 	} else {
1174 		/* Serialize extents */
1175 		rc = _spdk_blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz);
1176 	}
1177 
1178 	return rc;
1179 }
1180 
1181 struct spdk_blob_load_ctx {
1182 	struct spdk_blob		*blob;
1183 
1184 	struct spdk_blob_md_page	*pages;
1185 	uint32_t			num_pages;
1186 	spdk_bs_sequence_t	        *seq;
1187 
1188 	spdk_bs_sequence_cpl		cb_fn;
1189 	void				*cb_arg;
1190 };
1191 
1192 static uint32_t
1193 _spdk_blob_md_page_calc_crc(void *page)
1194 {
1195 	uint32_t		crc;
1196 
1197 	crc = BLOB_CRC32C_INITIAL;
1198 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
1199 	crc ^= BLOB_CRC32C_INITIAL;
1200 
1201 	return crc;
1202 
1203 }
1204 
1205 static void
1206 _spdk_blob_load_final(void *cb_arg, int bserrno)
1207 {
1208 	struct spdk_blob_load_ctx	*ctx = cb_arg;
1209 	struct spdk_blob		*blob = ctx->blob;
1210 
1211 	if (bserrno == 0) {
1212 		_spdk_blob_mark_clean(blob);
1213 	}
1214 
1215 	ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno);
1216 
1217 	/* Free the memory */
1218 	spdk_free(ctx->pages);
1219 	free(ctx);
1220 }
1221 
1222 static void
1223 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno)
1224 {
1225 	struct spdk_blob_load_ctx	*ctx = cb_arg;
1226 	struct spdk_blob		*blob = ctx->blob;
1227 
1228 	if (bserrno == 0) {
1229 		blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot);
1230 		if (blob->back_bs_dev == NULL) {
1231 			bserrno = -ENOMEM;
1232 		}
1233 	}
1234 	if (bserrno != 0) {
1235 		SPDK_ERRLOG("Snapshot fail\n");
1236 	}
1237 
1238 	_spdk_blob_load_final(ctx, bserrno);
1239 }
1240 
1241 static void _spdk_blob_update_clear_method(struct spdk_blob *blob);
1242 
1243 static void
1244 _spdk_blob_load_backing_dev(void *cb_arg)
1245 {
1246 	struct spdk_blob_load_ctx	*ctx = cb_arg;
1247 	struct spdk_blob		*blob = ctx->blob;
1248 	const void			*value;
1249 	size_t				len;
1250 	int				rc;
1251 
1252 	if (spdk_blob_is_thin_provisioned(blob)) {
1253 		rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true);
1254 		if (rc == 0) {
1255 			if (len != sizeof(spdk_blob_id)) {
1256 				_spdk_blob_load_final(ctx, -EINVAL);
1257 				return;
1258 			}
1259 			/* open snapshot blob and continue in the callback function */
1260 			blob->parent_id = *(spdk_blob_id *)value;
1261 			spdk_bs_open_blob(blob->bs, blob->parent_id,
1262 					  _spdk_blob_load_snapshot_cpl, ctx);
1263 			return;
1264 		} else {
1265 			/* add zeroes_dev for thin provisioned blob */
1266 			blob->back_bs_dev = spdk_bs_create_zeroes_dev();
1267 		}
1268 	} else {
1269 		/* standard blob */
1270 		blob->back_bs_dev = NULL;
1271 	}
1272 	_spdk_blob_load_final(ctx, 0);
1273 }
1274 
1275 static void
1276 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1277 {
1278 	struct spdk_blob_load_ctx	*ctx = cb_arg;
1279 	struct spdk_blob		*blob = ctx->blob;
1280 	struct spdk_blob_md_page	*page;
1281 	int				rc;
1282 	uint32_t			crc;
1283 
1284 	if (bserrno) {
1285 		SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno);
1286 		_spdk_blob_load_final(ctx, bserrno);
1287 		return;
1288 	}
1289 
1290 	page = &ctx->pages[ctx->num_pages - 1];
1291 	crc = _spdk_blob_md_page_calc_crc(page);
1292 	if (crc != page->crc) {
1293 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
1294 		_spdk_blob_load_final(ctx, -EINVAL);
1295 		return;
1296 	}
1297 
1298 	if (page->next != SPDK_INVALID_MD_PAGE) {
1299 		uint32_t next_page = page->next;
1300 		uint64_t next_lba = _spdk_bs_md_page_to_lba(blob->bs, next_page);
1301 
1302 		/* Read the next page */
1303 		ctx->num_pages++;
1304 		ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
1305 					  sizeof(*page));
1306 		if (ctx->pages == NULL) {
1307 			_spdk_blob_load_final(ctx, -ENOMEM);
1308 			return;
1309 		}
1310 
1311 		spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
1312 					  next_lba,
1313 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
1314 					  _spdk_blob_load_cpl, ctx);
1315 		return;
1316 	}
1317 
1318 	/* Parse the pages */
1319 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
1320 	if (rc) {
1321 		_spdk_blob_load_final(ctx, rc);
1322 		return;
1323 	}
1324 
1325 	if (blob->extent_table_found == true) {
1326 		/* If EXTENT_TABLE was found, that means support for it should be enabled. */
1327 		assert(blob->extent_rle_found == false);
1328 		blob->use_extent_table = true;
1329 	} else {
1330 		/* If EXTENT_RLE or no extent_* descriptor was found disable support
1331 		 * for extent table. No extent_* descriptors means that blob has length of 0
1332 		 * and no extent_rle descriptors were persisted for it.
1333 		 * EXTENT_TABLE if used, is always present in metadata regardless of length. */
1334 		blob->use_extent_table = false;
1335 	}
1336 
1337 	ctx->seq = seq;
1338 
1339 	/* Check the clear_method stored in metadata vs what may have been passed
1340 	 * via spdk_bs_open_blob_ext() and update accordingly.
1341 	 */
1342 	_spdk_blob_update_clear_method(blob);
1343 
1344 	_spdk_blob_load_backing_dev(ctx);
1345 }
1346 
1347 /* Load a blob from disk given a blobid */
1348 static void
1349 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1350 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1351 {
1352 	struct spdk_blob_load_ctx *ctx;
1353 	struct spdk_blob_store *bs;
1354 	uint32_t page_num;
1355 	uint64_t lba;
1356 
1357 	_spdk_blob_verify_md_op(blob);
1358 
1359 	bs = blob->bs;
1360 
1361 	ctx = calloc(1, sizeof(*ctx));
1362 	if (!ctx) {
1363 		cb_fn(seq, cb_arg, -ENOMEM);
1364 		return;
1365 	}
1366 
1367 	ctx->blob = blob;
1368 	ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE);
1369 	if (!ctx->pages) {
1370 		free(ctx);
1371 		cb_fn(seq, cb_arg, -ENOMEM);
1372 		return;
1373 	}
1374 	ctx->num_pages = 1;
1375 	ctx->cb_fn = cb_fn;
1376 	ctx->cb_arg = cb_arg;
1377 	ctx->seq = seq;
1378 
1379 	page_num = _spdk_bs_blobid_to_page(blob->id);
1380 	lba = _spdk_bs_md_page_to_lba(blob->bs, page_num);
1381 
1382 	blob->state = SPDK_BLOB_STATE_LOADING;
1383 
1384 	spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
1385 				  _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
1386 				  _spdk_blob_load_cpl, ctx);
1387 }
1388 
1389 struct spdk_blob_persist_ctx {
1390 	struct spdk_blob		*blob;
1391 
1392 	struct spdk_bs_super_block	*super;
1393 
1394 	struct spdk_blob_md_page	*pages;
1395 
1396 	spdk_bs_sequence_t		*seq;
1397 	spdk_bs_sequence_cpl		cb_fn;
1398 	void				*cb_arg;
1399 };
1400 
1401 static void
1402 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba,
1403 			uint32_t lba_count)
1404 {
1405 	switch (ctx->blob->clear_method) {
1406 	case BLOB_CLEAR_WITH_DEFAULT:
1407 	case BLOB_CLEAR_WITH_UNMAP:
1408 		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1409 		break;
1410 	case BLOB_CLEAR_WITH_WRITE_ZEROES:
1411 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1412 		break;
1413 	case BLOB_CLEAR_WITH_NONE:
1414 	default:
1415 		break;
1416 	}
1417 }
1418 
1419 static void
1420 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1421 {
1422 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1423 	struct spdk_blob		*blob = ctx->blob;
1424 
1425 	if (bserrno == 0) {
1426 		_spdk_blob_mark_clean(blob);
1427 	}
1428 
1429 	/* Call user callback */
1430 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
1431 
1432 	/* Free the memory */
1433 	spdk_free(ctx->pages);
1434 	free(ctx);
1435 }
1436 
1437 static void
1438 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1439 {
1440 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1441 	struct spdk_blob		*blob = ctx->blob;
1442 	struct spdk_blob_store		*bs = blob->bs;
1443 	size_t				i;
1444 
1445 	/* Release all clusters that were truncated */
1446 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1447 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
1448 
1449 		/* Nothing to release if it was not allocated */
1450 		if (blob->active.clusters[i] != 0) {
1451 			_spdk_bs_release_cluster(bs, cluster_num);
1452 		}
1453 	}
1454 
1455 	if (blob->active.num_clusters == 0) {
1456 		free(blob->active.clusters);
1457 		blob->active.clusters = NULL;
1458 		blob->active.cluster_array_size = 0;
1459 	} else if (blob->active.num_clusters != blob->active.cluster_array_size) {
1460 #ifndef __clang_analyzer__
1461 		void *tmp;
1462 
1463 		/* scan-build really can't figure reallocs, workaround it */
1464 		tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters);
1465 		assert(tmp != NULL);
1466 		blob->active.clusters = tmp;
1467 
1468 		tmp = realloc(blob->active.extent_pages, sizeof(uint32_t) * blob->active.num_extent_pages);
1469 		assert(tmp != NULL);
1470 		blob->active.extent_pages = tmp;
1471 #endif
1472 		blob->active.extent_pages_array_size = blob->active.num_extent_pages;
1473 		blob->active.cluster_array_size = blob->active.num_clusters;
1474 	}
1475 
1476 	/* TODO: Add path to persist clear extent pages. */
1477 	_spdk_blob_persist_complete(seq, ctx, bserrno);
1478 }
1479 
1480 static void
1481 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1482 {
1483 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1484 	struct spdk_blob		*blob = ctx->blob;
1485 	struct spdk_blob_store		*bs = blob->bs;
1486 	spdk_bs_batch_t			*batch;
1487 	size_t				i;
1488 	uint64_t			lba;
1489 	uint32_t			lba_count;
1490 
1491 	/* Clusters don't move around in blobs. The list shrinks or grows
1492 	 * at the end, but no changes ever occur in the middle of the list.
1493 	 */
1494 
1495 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx);
1496 
1497 	/* Clear all clusters that were truncated */
1498 	lba = 0;
1499 	lba_count = 0;
1500 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1501 		uint64_t next_lba = blob->active.clusters[i];
1502 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
1503 
1504 		if (next_lba > 0 && (lba + lba_count) == next_lba) {
1505 			/* This cluster is contiguous with the previous one. */
1506 			lba_count += next_lba_count;
1507 			continue;
1508 		}
1509 
1510 		/* This cluster is not contiguous with the previous one. */
1511 
1512 		/* If a run of LBAs previously existing, clear them now */
1513 		if (lba_count > 0) {
1514 			spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count);
1515 		}
1516 
1517 		/* Start building the next batch */
1518 		lba = next_lba;
1519 		if (next_lba > 0) {
1520 			lba_count = next_lba_count;
1521 		} else {
1522 			lba_count = 0;
1523 		}
1524 	}
1525 
1526 	/* If we ended with a contiguous set of LBAs, clear them now */
1527 	if (lba_count > 0) {
1528 		spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count);
1529 	}
1530 
1531 	spdk_bs_batch_close(batch);
1532 }
1533 
1534 static void
1535 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1536 {
1537 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1538 	struct spdk_blob		*blob = ctx->blob;
1539 	struct spdk_blob_store		*bs = blob->bs;
1540 	size_t				i;
1541 
1542 	/* This loop starts at 1 because the first page is special and handled
1543 	 * below. The pages (except the first) are never written in place,
1544 	 * so any pages in the clean list must be zeroed.
1545 	 */
1546 	for (i = 1; i < blob->clean.num_pages; i++) {
1547 		_spdk_bs_release_md_page(bs, blob->clean.pages[i]);
1548 	}
1549 
1550 	if (blob->active.num_pages == 0) {
1551 		uint32_t page_num;
1552 
1553 		page_num = _spdk_bs_blobid_to_page(blob->id);
1554 		_spdk_bs_release_md_page(bs, page_num);
1555 	}
1556 
1557 	/* Move on to clearing clusters */
1558 	_spdk_blob_persist_clear_clusters(seq, ctx, 0);
1559 }
1560 
1561 static void
1562 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1563 {
1564 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1565 	struct spdk_blob		*blob = ctx->blob;
1566 	struct spdk_blob_store		*bs = blob->bs;
1567 	uint64_t			lba;
1568 	uint32_t			lba_count;
1569 	spdk_bs_batch_t			*batch;
1570 	size_t				i;
1571 
1572 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
1573 
1574 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
1575 
1576 	/* This loop starts at 1 because the first page is special and handled
1577 	 * below. The pages (except the first) are never written in place,
1578 	 * so any pages in the clean list must be zeroed.
1579 	 */
1580 	for (i = 1; i < blob->clean.num_pages; i++) {
1581 		lba = _spdk_bs_md_page_to_lba(bs, blob->clean.pages[i]);
1582 
1583 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1584 	}
1585 
1586 	/* The first page will only be zeroed if this is a delete. */
1587 	if (blob->active.num_pages == 0) {
1588 		uint32_t page_num;
1589 
1590 		/* The first page in the metadata goes where the blobid indicates */
1591 		page_num = _spdk_bs_blobid_to_page(blob->id);
1592 		lba = _spdk_bs_md_page_to_lba(bs, page_num);
1593 
1594 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1595 	}
1596 
1597 	spdk_bs_batch_close(batch);
1598 }
1599 
1600 static void
1601 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1602 {
1603 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1604 	struct spdk_blob		*blob = ctx->blob;
1605 	struct spdk_blob_store		*bs = blob->bs;
1606 	uint64_t			lba;
1607 	uint32_t			lba_count;
1608 	struct spdk_blob_md_page	*page;
1609 
1610 	if (blob->active.num_pages == 0) {
1611 		/* Move on to the next step */
1612 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1613 		return;
1614 	}
1615 
1616 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1617 
1618 	page = &ctx->pages[0];
1619 	/* The first page in the metadata goes where the blobid indicates */
1620 	lba = _spdk_bs_md_page_to_lba(bs, _spdk_bs_blobid_to_page(blob->id));
1621 
1622 	spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1623 				   _spdk_blob_persist_zero_pages, ctx);
1624 }
1625 
1626 static void
1627 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1628 {
1629 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1630 	struct spdk_blob		*blob = ctx->blob;
1631 	struct spdk_blob_store		*bs = blob->bs;
1632 	uint64_t			lba;
1633 	uint32_t			lba_count;
1634 	struct spdk_blob_md_page	*page;
1635 	spdk_bs_batch_t			*batch;
1636 	size_t				i;
1637 
1638 	/* Clusters don't move around in blobs. The list shrinks or grows
1639 	 * at the end, but no changes ever occur in the middle of the list.
1640 	 */
1641 
1642 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1643 
1644 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1645 
1646 	/* This starts at 1. The root page is not written until
1647 	 * all of the others are finished
1648 	 */
1649 	for (i = 1; i < blob->active.num_pages; i++) {
1650 		page = &ctx->pages[i];
1651 		assert(page->sequence_num == i);
1652 
1653 		lba = _spdk_bs_md_page_to_lba(bs, blob->active.pages[i]);
1654 
1655 		spdk_bs_batch_write_dev(batch, page, lba, lba_count);
1656 	}
1657 
1658 	spdk_bs_batch_close(batch);
1659 }
1660 
1661 static int
1662 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz)
1663 {
1664 	uint64_t	i;
1665 	uint64_t	*tmp;
1666 	uint64_t	lfc; /* lowest free cluster */
1667 	uint32_t	lfmd; /*  lowest free md page */
1668 	uint64_t	num_clusters;
1669 	uint32_t	*ep_tmp;
1670 	uint64_t	new_num_ep = 0, current_num_ep = 0;
1671 	struct spdk_blob_store *bs;
1672 
1673 	bs = blob->bs;
1674 
1675 	_spdk_blob_verify_md_op(blob);
1676 
1677 	if (blob->active.num_clusters == sz) {
1678 		return 0;
1679 	}
1680 
1681 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1682 		/* If this blob was resized to be larger, then smaller, then
1683 		 * larger without syncing, then the cluster array already
1684 		 * contains spare assigned clusters we can use.
1685 		 */
1686 		num_clusters = spdk_min(blob->active.cluster_array_size,
1687 					sz);
1688 	} else {
1689 		num_clusters = blob->active.num_clusters;
1690 	}
1691 
1692 	if (blob->use_extent_table) {
1693 		/* Round up since every cluster beyond current Extent Table size,
1694 		 * requires new extent page. */
1695 		new_num_ep = spdk_divide_round_up(sz, SPDK_EXTENTS_PER_EP);
1696 		current_num_ep = spdk_divide_round_up(num_clusters, SPDK_EXTENTS_PER_EP);
1697 	}
1698 
1699 	/* Do two passes - one to verify that we can obtain enough clusters
1700 	 * and md pages, another to actually claim them.
1701 	 */
1702 
1703 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1704 		lfc = 0;
1705 		for (i = num_clusters; i < sz; i++) {
1706 			lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1707 			if (lfc == UINT32_MAX) {
1708 				/* No more free clusters. Cannot satisfy the request */
1709 				return -ENOSPC;
1710 			}
1711 			lfc++;
1712 		}
1713 		lfmd = 0;
1714 		for (i = current_num_ep; i < new_num_ep ; i++) {
1715 			lfmd = spdk_bit_array_find_first_clear(blob->bs->used_md_pages, lfmd);
1716 			if (lfmd == UINT32_MAX) {
1717 				/* No more free md pages. Cannot satisfy the request */
1718 				return -ENOSPC;
1719 			}
1720 		}
1721 	}
1722 
1723 	if (sz > num_clusters) {
1724 		/* Expand the cluster array if necessary.
1725 		 * We only shrink the array when persisting.
1726 		 */
1727 		tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz);
1728 		if (sz > 0 && tmp == NULL) {
1729 			return -ENOMEM;
1730 		}
1731 		memset(tmp + blob->active.cluster_array_size, 0,
1732 		       sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size));
1733 		blob->active.clusters = tmp;
1734 		blob->active.cluster_array_size = sz;
1735 
1736 		/* Expand the extents table, only if enough clusters were added */
1737 		if (new_num_ep > current_num_ep && blob->use_extent_table) {
1738 			ep_tmp = realloc(blob->active.extent_pages, sizeof(*blob->active.extent_pages) * new_num_ep);
1739 			if (new_num_ep > 0 && ep_tmp == NULL) {
1740 				return -ENOMEM;
1741 			}
1742 			memset(ep_tmp + blob->active.extent_pages_array_size, 0,
1743 			       sizeof(*blob->active.extent_pages) * (new_num_ep - blob->active.extent_pages_array_size));
1744 			blob->active.extent_pages = ep_tmp;
1745 			blob->active.extent_pages_array_size = new_num_ep;
1746 		}
1747 	}
1748 
1749 	blob->state = SPDK_BLOB_STATE_DIRTY;
1750 
1751 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1752 		lfc = 0;
1753 		lfmd = 0;
1754 		for (i = num_clusters; i < sz; i++) {
1755 			_spdk_bs_allocate_cluster(blob, i, &lfc, &lfmd, true);
1756 			lfc++;
1757 			lfmd++;
1758 		}
1759 	}
1760 
1761 	blob->active.num_clusters = sz;
1762 	blob->active.num_extent_pages = new_num_ep;
1763 
1764 	return 0;
1765 }
1766 
1767 static void
1768 _spdk_blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx)
1769 {
1770 	spdk_bs_sequence_t *seq = ctx->seq;
1771 	struct spdk_blob *blob = ctx->blob;
1772 	struct spdk_blob_store *bs = blob->bs;
1773 	uint64_t i;
1774 	uint32_t page_num;
1775 	void *tmp;
1776 	int rc;
1777 
1778 	/* Generate the new metadata */
1779 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1780 	if (rc < 0) {
1781 		_spdk_blob_persist_complete(seq, ctx, rc);
1782 		return;
1783 	}
1784 
1785 	assert(blob->active.num_pages >= 1);
1786 
1787 	/* Resize the cache of page indices */
1788 	tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages));
1789 	if (!tmp) {
1790 		_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1791 		return;
1792 	}
1793 	blob->active.pages = tmp;
1794 
1795 	/* Assign this metadata to pages. This requires two passes -
1796 	 * one to verify that there are enough pages and a second
1797 	 * to actually claim them. */
1798 	page_num = 0;
1799 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1800 	for (i = 1; i < blob->active.num_pages; i++) {
1801 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1802 		if (page_num == UINT32_MAX) {
1803 			_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1804 			return;
1805 		}
1806 		page_num++;
1807 	}
1808 
1809 	page_num = 0;
1810 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1811 	for (i = 1; i < blob->active.num_pages; i++) {
1812 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1813 		ctx->pages[i - 1].next = page_num;
1814 		/* Now that previous metadata page is complete, calculate the crc for it. */
1815 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1816 		blob->active.pages[i] = page_num;
1817 		_spdk_bs_claim_md_page(bs, page_num);
1818 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1819 		page_num++;
1820 	}
1821 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1822 	/* Start writing the metadata from last page to first */
1823 	blob->state = SPDK_BLOB_STATE_CLEAN;
1824 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1825 }
1826 
1827 static void
1828 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
1829 {
1830 	spdk_bs_sequence_t *seq = ctx->seq;
1831 	struct spdk_blob *blob = ctx->blob;
1832 
1833 	if (blob->active.num_pages == 0) {
1834 		/* This is the signal that the blob should be deleted.
1835 		 * Immediately jump to the clean up routine. */
1836 		assert(blob->clean.num_pages > 0);
1837 		blob->state = SPDK_BLOB_STATE_CLEAN;
1838 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1839 		return;
1840 
1841 	}
1842 
1843 	_spdk_blob_persist_generate_new_md(ctx);
1844 }
1845 
1846 static void
1847 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1848 {
1849 	struct spdk_blob_persist_ctx *ctx = cb_arg;
1850 
1851 	ctx->blob->bs->clean = 0;
1852 
1853 	spdk_free(ctx->super);
1854 
1855 	_spdk_blob_persist_start(ctx);
1856 }
1857 
1858 static void
1859 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1860 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg);
1861 
1862 
1863 static void
1864 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1865 {
1866 	struct spdk_blob_persist_ctx *ctx = cb_arg;
1867 
1868 	ctx->super->clean = 0;
1869 	if (ctx->super->size == 0) {
1870 		ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen;
1871 	}
1872 
1873 	_spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx);
1874 }
1875 
1876 
1877 /* Write a blob to disk */
1878 static void
1879 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1880 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1881 {
1882 	struct spdk_blob_persist_ctx *ctx;
1883 
1884 	_spdk_blob_verify_md_op(blob);
1885 
1886 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1887 		cb_fn(seq, cb_arg, 0);
1888 		return;
1889 	}
1890 
1891 	ctx = calloc(1, sizeof(*ctx));
1892 	if (!ctx) {
1893 		cb_fn(seq, cb_arg, -ENOMEM);
1894 		return;
1895 	}
1896 	ctx->blob = blob;
1897 	ctx->seq = seq;
1898 	ctx->cb_fn = cb_fn;
1899 	ctx->cb_arg = cb_arg;
1900 
1901 	if (blob->bs->clean) {
1902 		ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
1903 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1904 		if (!ctx->super) {
1905 			cb_fn(seq, cb_arg, -ENOMEM);
1906 			free(ctx);
1907 			return;
1908 		}
1909 
1910 		spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0),
1911 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)),
1912 					  _spdk_blob_persist_dirty, ctx);
1913 	} else {
1914 		_spdk_blob_persist_start(ctx);
1915 	}
1916 }
1917 
1918 struct spdk_blob_copy_cluster_ctx {
1919 	struct spdk_blob *blob;
1920 	uint8_t *buf;
1921 	uint64_t page;
1922 	uint64_t new_cluster;
1923 	uint32_t new_extent_page;
1924 	spdk_bs_sequence_t *seq;
1925 };
1926 
1927 static void
1928 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno)
1929 {
1930 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1931 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq;
1932 	TAILQ_HEAD(, spdk_bs_request_set) requests;
1933 	spdk_bs_user_op_t *op;
1934 
1935 	TAILQ_INIT(&requests);
1936 	TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link);
1937 
1938 	while (!TAILQ_EMPTY(&requests)) {
1939 		op = TAILQ_FIRST(&requests);
1940 		TAILQ_REMOVE(&requests, op, link);
1941 		if (bserrno == 0) {
1942 			spdk_bs_user_op_execute(op);
1943 		} else {
1944 			spdk_bs_user_op_abort(op);
1945 		}
1946 	}
1947 
1948 	spdk_free(ctx->buf);
1949 	free(ctx);
1950 }
1951 
1952 static void
1953 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno)
1954 {
1955 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1956 
1957 	if (bserrno) {
1958 		if (bserrno == -EEXIST) {
1959 			/* The metadata insert failed because another thread
1960 			 * allocated the cluster first. Free our cluster
1961 			 * but continue without error. */
1962 			bserrno = 0;
1963 		}
1964 		_spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster);
1965 		if (ctx->new_extent_page != 0) {
1966 			_spdk_bs_release_md_page(ctx->blob->bs, ctx->new_extent_page);
1967 		}
1968 	}
1969 
1970 	spdk_bs_sequence_finish(ctx->seq, bserrno);
1971 }
1972 
1973 static void
1974 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1975 {
1976 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1977 	uint32_t cluster_number;
1978 
1979 	if (bserrno) {
1980 		/* The write failed, so jump to the final completion handler */
1981 		spdk_bs_sequence_finish(seq, bserrno);
1982 		return;
1983 	}
1984 
1985 	cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1986 
1987 	_spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1988 					       ctx->new_extent_page, _spdk_blob_insert_cluster_cpl, ctx);
1989 }
1990 
1991 static void
1992 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1993 {
1994 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1995 
1996 	if (bserrno != 0) {
1997 		/* The read failed, so jump to the final completion handler */
1998 		spdk_bs_sequence_finish(seq, bserrno);
1999 		return;
2000 	}
2001 
2002 	/* Write whole cluster */
2003 	spdk_bs_sequence_write_dev(seq, ctx->buf,
2004 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster),
2005 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, 1),
2006 				   _spdk_blob_write_copy_cpl, ctx);
2007 }
2008 
2009 static void
2010 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob,
2011 				   struct spdk_io_channel *_ch,
2012 				   uint64_t io_unit, spdk_bs_user_op_t *op)
2013 {
2014 	struct spdk_bs_cpl cpl;
2015 	struct spdk_bs_channel *ch;
2016 	struct spdk_blob_copy_cluster_ctx *ctx;
2017 	uint32_t cluster_start_page;
2018 	uint32_t cluster_number;
2019 	int rc;
2020 
2021 	ch = spdk_io_channel_get_ctx(_ch);
2022 
2023 	if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) {
2024 		/* There are already operations pending. Queue this user op
2025 		 * and return because it will be re-executed when the outstanding
2026 		 * cluster allocation completes. */
2027 		TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
2028 		return;
2029 	}
2030 
2031 	/* Round the io_unit offset down to the first page in the cluster */
2032 	cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit);
2033 
2034 	/* Calculate which index in the metadata cluster array the corresponding
2035 	 * cluster is supposed to be at. */
2036 	cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit);
2037 
2038 	ctx = calloc(1, sizeof(*ctx));
2039 	if (!ctx) {
2040 		spdk_bs_user_op_abort(op);
2041 		return;
2042 	}
2043 
2044 	assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0);
2045 
2046 	ctx->blob = blob;
2047 	ctx->page = cluster_start_page;
2048 
2049 	if (blob->parent_id != SPDK_BLOBID_INVALID) {
2050 		ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen,
2051 				       NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2052 		if (!ctx->buf) {
2053 			SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n",
2054 				    blob->bs->cluster_sz);
2055 			free(ctx);
2056 			spdk_bs_user_op_abort(op);
2057 			return;
2058 		}
2059 	}
2060 
2061 	rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, &ctx->new_extent_page,
2062 				       false);
2063 	if (rc != 0) {
2064 		spdk_free(ctx->buf);
2065 		free(ctx);
2066 		spdk_bs_user_op_abort(op);
2067 		return;
2068 	}
2069 
2070 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2071 	cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl;
2072 	cpl.u.blob_basic.cb_arg = ctx;
2073 
2074 	ctx->seq = spdk_bs_sequence_start(_ch, &cpl);
2075 	if (!ctx->seq) {
2076 		_spdk_bs_release_cluster(blob->bs, ctx->new_cluster);
2077 		spdk_free(ctx->buf);
2078 		free(ctx);
2079 		spdk_bs_user_op_abort(op);
2080 		return;
2081 	}
2082 
2083 	/* Queue the user op to block other incoming operations */
2084 	TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
2085 
2086 	if (blob->parent_id != SPDK_BLOBID_INVALID) {
2087 		/* Read cluster from backing device */
2088 		spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf,
2089 					     _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page),
2090 					     _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz),
2091 					     _spdk_blob_write_copy, ctx);
2092 	} else {
2093 		_spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
2094 						       ctx->new_extent_page, _spdk_blob_insert_cluster_cpl, ctx);
2095 	}
2096 }
2097 
2098 static void
2099 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length,
2100 				       uint64_t *lba,	uint32_t *lba_count)
2101 {
2102 	*lba_count = length;
2103 
2104 	if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) {
2105 		assert(blob->back_bs_dev != NULL);
2106 		*lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit);
2107 		*lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count);
2108 	} else {
2109 		*lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit);
2110 	}
2111 }
2112 
2113 struct op_split_ctx {
2114 	struct spdk_blob *blob;
2115 	struct spdk_io_channel *channel;
2116 	uint64_t io_unit_offset;
2117 	uint64_t io_units_remaining;
2118 	void *curr_payload;
2119 	enum spdk_blob_op_type op_type;
2120 	spdk_bs_sequence_t *seq;
2121 };
2122 
2123 static void
2124 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno)
2125 {
2126 	struct op_split_ctx	*ctx = cb_arg;
2127 	struct spdk_blob	*blob = ctx->blob;
2128 	struct spdk_io_channel	*ch = ctx->channel;
2129 	enum spdk_blob_op_type	op_type = ctx->op_type;
2130 	uint8_t			*buf = ctx->curr_payload;
2131 	uint64_t		offset = ctx->io_unit_offset;
2132 	uint64_t		length = ctx->io_units_remaining;
2133 	uint64_t		op_length;
2134 
2135 	if (bserrno != 0 || ctx->io_units_remaining == 0) {
2136 		spdk_bs_sequence_finish(ctx->seq, bserrno);
2137 		free(ctx);
2138 		return;
2139 	}
2140 
2141 	op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob,
2142 			     offset));
2143 
2144 	/* Update length and payload for next operation */
2145 	ctx->io_units_remaining -= op_length;
2146 	ctx->io_unit_offset += op_length;
2147 	if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
2148 		ctx->curr_payload += op_length * blob->bs->io_unit_size;
2149 	}
2150 
2151 	switch (op_type) {
2152 	case SPDK_BLOB_READ:
2153 		spdk_blob_io_read(blob, ch, buf, offset, op_length,
2154 				  _spdk_blob_request_submit_op_split_next, ctx);
2155 		break;
2156 	case SPDK_BLOB_WRITE:
2157 		spdk_blob_io_write(blob, ch, buf, offset, op_length,
2158 				   _spdk_blob_request_submit_op_split_next, ctx);
2159 		break;
2160 	case SPDK_BLOB_UNMAP:
2161 		spdk_blob_io_unmap(blob, ch, offset, op_length,
2162 				   _spdk_blob_request_submit_op_split_next, ctx);
2163 		break;
2164 	case SPDK_BLOB_WRITE_ZEROES:
2165 		spdk_blob_io_write_zeroes(blob, ch, offset, op_length,
2166 					  _spdk_blob_request_submit_op_split_next, ctx);
2167 		break;
2168 	case SPDK_BLOB_READV:
2169 	case SPDK_BLOB_WRITEV:
2170 		SPDK_ERRLOG("readv/write not valid\n");
2171 		spdk_bs_sequence_finish(ctx->seq, -EINVAL);
2172 		free(ctx);
2173 		break;
2174 	}
2175 }
2176 
2177 static void
2178 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
2179 				   void *payload, uint64_t offset, uint64_t length,
2180 				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
2181 {
2182 	struct op_split_ctx *ctx;
2183 	spdk_bs_sequence_t *seq;
2184 	struct spdk_bs_cpl cpl;
2185 
2186 	assert(blob != NULL);
2187 
2188 	ctx = calloc(1, sizeof(struct op_split_ctx));
2189 	if (ctx == NULL) {
2190 		cb_fn(cb_arg, -ENOMEM);
2191 		return;
2192 	}
2193 
2194 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2195 	cpl.u.blob_basic.cb_fn = cb_fn;
2196 	cpl.u.blob_basic.cb_arg = cb_arg;
2197 
2198 	seq = spdk_bs_sequence_start(ch, &cpl);
2199 	if (!seq) {
2200 		free(ctx);
2201 		cb_fn(cb_arg, -ENOMEM);
2202 		return;
2203 	}
2204 
2205 	ctx->blob = blob;
2206 	ctx->channel = ch;
2207 	ctx->curr_payload = payload;
2208 	ctx->io_unit_offset = offset;
2209 	ctx->io_units_remaining = length;
2210 	ctx->op_type = op_type;
2211 	ctx->seq = seq;
2212 
2213 	_spdk_blob_request_submit_op_split_next(ctx, 0);
2214 }
2215 
2216 static void
2217 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob,
2218 				    void *payload, uint64_t offset, uint64_t length,
2219 				    spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
2220 {
2221 	struct spdk_bs_cpl cpl;
2222 	uint64_t lba;
2223 	uint32_t lba_count;
2224 
2225 	assert(blob != NULL);
2226 
2227 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2228 	cpl.u.blob_basic.cb_fn = cb_fn;
2229 	cpl.u.blob_basic.cb_arg = cb_arg;
2230 
2231 	_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
2232 
2233 	if (blob->frozen_refcnt) {
2234 		/* This blob I/O is frozen */
2235 		spdk_bs_user_op_t *op;
2236 		struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch);
2237 
2238 		op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
2239 		if (!op) {
2240 			cb_fn(cb_arg, -ENOMEM);
2241 			return;
2242 		}
2243 
2244 		TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
2245 
2246 		return;
2247 	}
2248 
2249 	switch (op_type) {
2250 	case SPDK_BLOB_READ: {
2251 		spdk_bs_batch_t *batch;
2252 
2253 		batch = spdk_bs_batch_open(_ch, &cpl);
2254 		if (!batch) {
2255 			cb_fn(cb_arg, -ENOMEM);
2256 			return;
2257 		}
2258 
2259 		if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2260 			/* Read from the blob */
2261 			spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
2262 		} else {
2263 			/* Read from the backing block device */
2264 			spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count);
2265 		}
2266 
2267 		spdk_bs_batch_close(batch);
2268 		break;
2269 	}
2270 	case SPDK_BLOB_WRITE:
2271 	case SPDK_BLOB_WRITE_ZEROES: {
2272 		if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2273 			/* Write to the blob */
2274 			spdk_bs_batch_t *batch;
2275 
2276 			if (lba_count == 0) {
2277 				cb_fn(cb_arg, 0);
2278 				return;
2279 			}
2280 
2281 			batch = spdk_bs_batch_open(_ch, &cpl);
2282 			if (!batch) {
2283 				cb_fn(cb_arg, -ENOMEM);
2284 				return;
2285 			}
2286 
2287 			if (op_type == SPDK_BLOB_WRITE) {
2288 				spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
2289 			} else {
2290 				spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
2291 			}
2292 
2293 			spdk_bs_batch_close(batch);
2294 		} else {
2295 			/* Queue this operation and allocate the cluster */
2296 			spdk_bs_user_op_t *op;
2297 
2298 			op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
2299 			if (!op) {
2300 				cb_fn(cb_arg, -ENOMEM);
2301 				return;
2302 			}
2303 
2304 			_spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op);
2305 		}
2306 		break;
2307 	}
2308 	case SPDK_BLOB_UNMAP: {
2309 		spdk_bs_batch_t *batch;
2310 
2311 		batch = spdk_bs_batch_open(_ch, &cpl);
2312 		if (!batch) {
2313 			cb_fn(cb_arg, -ENOMEM);
2314 			return;
2315 		}
2316 
2317 		if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2318 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
2319 		}
2320 
2321 		spdk_bs_batch_close(batch);
2322 		break;
2323 	}
2324 	case SPDK_BLOB_READV:
2325 	case SPDK_BLOB_WRITEV:
2326 		SPDK_ERRLOG("readv/write not valid\n");
2327 		cb_fn(cb_arg, -EINVAL);
2328 		break;
2329 	}
2330 }
2331 
2332 static void
2333 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
2334 			     void *payload, uint64_t offset, uint64_t length,
2335 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
2336 {
2337 	assert(blob != NULL);
2338 
2339 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
2340 		cb_fn(cb_arg, -EPERM);
2341 		return;
2342 	}
2343 
2344 	if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
2345 		cb_fn(cb_arg, -EINVAL);
2346 		return;
2347 	}
2348 	if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) {
2349 		_spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length,
2350 						    cb_fn, cb_arg, op_type);
2351 	} else {
2352 		_spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length,
2353 						   cb_fn, cb_arg, op_type);
2354 	}
2355 }
2356 
2357 struct rw_iov_ctx {
2358 	struct spdk_blob *blob;
2359 	struct spdk_io_channel *channel;
2360 	spdk_blob_op_complete cb_fn;
2361 	void *cb_arg;
2362 	bool read;
2363 	int iovcnt;
2364 	struct iovec *orig_iov;
2365 	uint64_t io_unit_offset;
2366 	uint64_t io_units_remaining;
2367 	uint64_t io_units_done;
2368 	struct iovec iov[0];
2369 };
2370 
2371 static void
2372 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2373 {
2374 	assert(cb_arg == NULL);
2375 	spdk_bs_sequence_finish(seq, bserrno);
2376 }
2377 
2378 static void
2379 _spdk_rw_iov_split_next(void *cb_arg, int bserrno)
2380 {
2381 	struct rw_iov_ctx *ctx = cb_arg;
2382 	struct spdk_blob *blob = ctx->blob;
2383 	struct iovec *iov, *orig_iov;
2384 	int iovcnt;
2385 	size_t orig_iovoff;
2386 	uint64_t io_units_count, io_units_to_boundary, io_unit_offset;
2387 	uint64_t byte_count;
2388 
2389 	if (bserrno != 0 || ctx->io_units_remaining == 0) {
2390 		ctx->cb_fn(ctx->cb_arg, bserrno);
2391 		free(ctx);
2392 		return;
2393 	}
2394 
2395 	io_unit_offset = ctx->io_unit_offset;
2396 	io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset);
2397 	io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary);
2398 	/*
2399 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
2400 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
2401 	 *  point to the current position in the I/O sequence.
2402 	 */
2403 	byte_count = ctx->io_units_done * blob->bs->io_unit_size;
2404 	orig_iov = &ctx->orig_iov[0];
2405 	orig_iovoff = 0;
2406 	while (byte_count > 0) {
2407 		if (byte_count >= orig_iov->iov_len) {
2408 			byte_count -= orig_iov->iov_len;
2409 			orig_iov++;
2410 		} else {
2411 			orig_iovoff = byte_count;
2412 			byte_count = 0;
2413 		}
2414 	}
2415 
2416 	/*
2417 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
2418 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
2419 	 */
2420 	byte_count = io_units_count * blob->bs->io_unit_size;
2421 	iov = &ctx->iov[0];
2422 	iovcnt = 0;
2423 	while (byte_count > 0) {
2424 		assert(iovcnt < ctx->iovcnt);
2425 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
2426 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
2427 		byte_count -= iov->iov_len;
2428 		orig_iovoff = 0;
2429 		orig_iov++;
2430 		iov++;
2431 		iovcnt++;
2432 	}
2433 
2434 	ctx->io_unit_offset += io_units_count;
2435 	ctx->io_units_remaining -= io_units_count;
2436 	ctx->io_units_done += io_units_count;
2437 	iov = &ctx->iov[0];
2438 
2439 	if (ctx->read) {
2440 		spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2441 				   io_units_count, _spdk_rw_iov_split_next, ctx);
2442 	} else {
2443 		spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2444 				    io_units_count, _spdk_rw_iov_split_next, ctx);
2445 	}
2446 }
2447 
2448 static void
2449 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
2450 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2451 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
2452 {
2453 	struct spdk_bs_cpl	cpl;
2454 
2455 	assert(blob != NULL);
2456 
2457 	if (!read && blob->data_ro) {
2458 		cb_fn(cb_arg, -EPERM);
2459 		return;
2460 	}
2461 
2462 	if (length == 0) {
2463 		cb_fn(cb_arg, 0);
2464 		return;
2465 	}
2466 
2467 	if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
2468 		cb_fn(cb_arg, -EINVAL);
2469 		return;
2470 	}
2471 
2472 	/*
2473 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
2474 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
2475 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
2476 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
2477 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
2478 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
2479 	 *  but since this case happens very infrequently, any performance impact will be negligible.
2480 	 *
2481 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
2482 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
2483 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
2484 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
2485 	 */
2486 	if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) {
2487 		uint32_t lba_count;
2488 		uint64_t lba;
2489 
2490 		cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2491 		cpl.u.blob_basic.cb_fn = cb_fn;
2492 		cpl.u.blob_basic.cb_arg = cb_arg;
2493 
2494 		if (blob->frozen_refcnt) {
2495 			/* This blob I/O is frozen */
2496 			enum spdk_blob_op_type op_type;
2497 			spdk_bs_user_op_t *op;
2498 			struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel);
2499 
2500 			op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV;
2501 			op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length);
2502 			if (!op) {
2503 				cb_fn(cb_arg, -ENOMEM);
2504 				return;
2505 			}
2506 
2507 			TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
2508 
2509 			return;
2510 		}
2511 
2512 		_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
2513 
2514 		if (read) {
2515 			spdk_bs_sequence_t *seq;
2516 
2517 			seq = spdk_bs_sequence_start(_channel, &cpl);
2518 			if (!seq) {
2519 				cb_fn(cb_arg, -ENOMEM);
2520 				return;
2521 			}
2522 
2523 			if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2524 				spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2525 			} else {
2526 				spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count,
2527 							      _spdk_rw_iov_done, NULL);
2528 			}
2529 		} else {
2530 			if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2531 				spdk_bs_sequence_t *seq;
2532 
2533 				seq = spdk_bs_sequence_start(_channel, &cpl);
2534 				if (!seq) {
2535 					cb_fn(cb_arg, -ENOMEM);
2536 					return;
2537 				}
2538 
2539 				spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2540 			} else {
2541 				/* Queue this operation and allocate the cluster */
2542 				spdk_bs_user_op_t *op;
2543 
2544 				op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset,
2545 							   length);
2546 				if (!op) {
2547 					cb_fn(cb_arg, -ENOMEM);
2548 					return;
2549 				}
2550 
2551 				_spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op);
2552 			}
2553 		}
2554 	} else {
2555 		struct rw_iov_ctx *ctx;
2556 
2557 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
2558 		if (ctx == NULL) {
2559 			cb_fn(cb_arg, -ENOMEM);
2560 			return;
2561 		}
2562 
2563 		ctx->blob = blob;
2564 		ctx->channel = _channel;
2565 		ctx->cb_fn = cb_fn;
2566 		ctx->cb_arg = cb_arg;
2567 		ctx->read = read;
2568 		ctx->orig_iov = iov;
2569 		ctx->iovcnt = iovcnt;
2570 		ctx->io_unit_offset = offset;
2571 		ctx->io_units_remaining = length;
2572 		ctx->io_units_done = 0;
2573 
2574 		_spdk_rw_iov_split_next(ctx, 0);
2575 	}
2576 }
2577 
2578 static struct spdk_blob *
2579 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
2580 {
2581 	struct spdk_blob *blob;
2582 
2583 	TAILQ_FOREACH(blob, &bs->blobs, link) {
2584 		if (blob->id == blobid) {
2585 			return blob;
2586 		}
2587 	}
2588 
2589 	return NULL;
2590 }
2591 
2592 static void
2593 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob,
2594 		struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry)
2595 {
2596 	assert(blob != NULL);
2597 	*snapshot_entry = NULL;
2598 	*clone_entry = NULL;
2599 
2600 	if (blob->parent_id == SPDK_BLOBID_INVALID) {
2601 		return;
2602 	}
2603 
2604 	TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) {
2605 		if ((*snapshot_entry)->id == blob->parent_id) {
2606 			break;
2607 		}
2608 	}
2609 
2610 	if (*snapshot_entry != NULL) {
2611 		TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) {
2612 			if ((*clone_entry)->id == blob->id) {
2613 				break;
2614 			}
2615 		}
2616 
2617 		assert(clone_entry != NULL);
2618 	}
2619 }
2620 
2621 static int
2622 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
2623 {
2624 	struct spdk_blob_store		*bs = io_device;
2625 	struct spdk_bs_channel		*channel = ctx_buf;
2626 	struct spdk_bs_dev		*dev;
2627 	uint32_t			max_ops = bs->max_channel_ops;
2628 	uint32_t			i;
2629 
2630 	dev = bs->dev;
2631 
2632 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
2633 	if (!channel->req_mem) {
2634 		return -1;
2635 	}
2636 
2637 	TAILQ_INIT(&channel->reqs);
2638 
2639 	for (i = 0; i < max_ops; i++) {
2640 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
2641 	}
2642 
2643 	channel->bs = bs;
2644 	channel->dev = dev;
2645 	channel->dev_channel = dev->create_channel(dev);
2646 
2647 	if (!channel->dev_channel) {
2648 		SPDK_ERRLOG("Failed to create device channel.\n");
2649 		free(channel->req_mem);
2650 		return -1;
2651 	}
2652 
2653 	TAILQ_INIT(&channel->need_cluster_alloc);
2654 	TAILQ_INIT(&channel->queued_io);
2655 
2656 	return 0;
2657 }
2658 
2659 static void
2660 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
2661 {
2662 	struct spdk_bs_channel *channel = ctx_buf;
2663 	spdk_bs_user_op_t *op;
2664 
2665 	while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) {
2666 		op = TAILQ_FIRST(&channel->need_cluster_alloc);
2667 		TAILQ_REMOVE(&channel->need_cluster_alloc, op, link);
2668 		spdk_bs_user_op_abort(op);
2669 	}
2670 
2671 	while (!TAILQ_EMPTY(&channel->queued_io)) {
2672 		op = TAILQ_FIRST(&channel->queued_io);
2673 		TAILQ_REMOVE(&channel->queued_io, op, link);
2674 		spdk_bs_user_op_abort(op);
2675 	}
2676 
2677 	free(channel->req_mem);
2678 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
2679 }
2680 
2681 static void
2682 _spdk_bs_dev_destroy(void *io_device)
2683 {
2684 	struct spdk_blob_store *bs = io_device;
2685 	struct spdk_blob	*blob, *blob_tmp;
2686 
2687 	bs->dev->destroy(bs->dev);
2688 
2689 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
2690 		TAILQ_REMOVE(&bs->blobs, blob, link);
2691 		_spdk_blob_free(blob);
2692 	}
2693 
2694 	pthread_mutex_destroy(&bs->used_clusters_mutex);
2695 
2696 	spdk_bit_array_free(&bs->used_blobids);
2697 	spdk_bit_array_free(&bs->used_md_pages);
2698 	spdk_bit_array_free(&bs->used_clusters);
2699 	/*
2700 	 * If this function is called for any reason except a successful unload,
2701 	 * the unload_cpl type will be NONE and this will be a nop.
2702 	 */
2703 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
2704 
2705 	free(bs);
2706 }
2707 
2708 static int
2709 _spdk_bs_blob_list_add(struct spdk_blob *blob)
2710 {
2711 	spdk_blob_id snapshot_id;
2712 	struct spdk_blob_list *snapshot_entry = NULL;
2713 	struct spdk_blob_list *clone_entry = NULL;
2714 
2715 	assert(blob != NULL);
2716 
2717 	snapshot_id = blob->parent_id;
2718 	if (snapshot_id == SPDK_BLOBID_INVALID) {
2719 		return 0;
2720 	}
2721 
2722 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id);
2723 	if (snapshot_entry == NULL) {
2724 		/* Snapshot not found */
2725 		snapshot_entry = calloc(1, sizeof(struct spdk_blob_list));
2726 		if (snapshot_entry == NULL) {
2727 			return -ENOMEM;
2728 		}
2729 		snapshot_entry->id = snapshot_id;
2730 		TAILQ_INIT(&snapshot_entry->clones);
2731 		TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link);
2732 	} else {
2733 		TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
2734 			if (clone_entry->id == blob->id) {
2735 				break;
2736 			}
2737 		}
2738 	}
2739 
2740 	if (clone_entry == NULL) {
2741 		/* Clone not found */
2742 		clone_entry = calloc(1, sizeof(struct spdk_blob_list));
2743 		if (clone_entry == NULL) {
2744 			return -ENOMEM;
2745 		}
2746 		clone_entry->id = blob->id;
2747 		TAILQ_INIT(&clone_entry->clones);
2748 		TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link);
2749 		snapshot_entry->clone_count++;
2750 	}
2751 
2752 	return 0;
2753 }
2754 
2755 static void
2756 _spdk_bs_blob_list_remove(struct spdk_blob *blob)
2757 {
2758 	struct spdk_blob_list *snapshot_entry = NULL;
2759 	struct spdk_blob_list *clone_entry = NULL;
2760 
2761 	_spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry);
2762 
2763 	if (snapshot_entry == NULL) {
2764 		return;
2765 	}
2766 
2767 	blob->parent_id = SPDK_BLOBID_INVALID;
2768 	TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2769 	free(clone_entry);
2770 
2771 	snapshot_entry->clone_count--;
2772 }
2773 
2774 static int
2775 _spdk_bs_blob_list_free(struct spdk_blob_store *bs)
2776 {
2777 	struct spdk_blob_list *snapshot_entry;
2778 	struct spdk_blob_list *snapshot_entry_tmp;
2779 	struct spdk_blob_list *clone_entry;
2780 	struct spdk_blob_list *clone_entry_tmp;
2781 
2782 	TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) {
2783 		TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) {
2784 			TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2785 			free(clone_entry);
2786 		}
2787 		TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link);
2788 		free(snapshot_entry);
2789 	}
2790 
2791 	return 0;
2792 }
2793 
2794 static void
2795 _spdk_bs_free(struct spdk_blob_store *bs)
2796 {
2797 	_spdk_bs_blob_list_free(bs);
2798 
2799 	spdk_bs_unregister_md_thread(bs);
2800 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
2801 }
2802 
2803 void
2804 spdk_bs_opts_init(struct spdk_bs_opts *opts)
2805 {
2806 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
2807 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
2808 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
2809 	opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
2810 	opts->clear_method = BS_CLEAR_WITH_UNMAP;
2811 	memset(&opts->bstype, 0, sizeof(opts->bstype));
2812 	opts->iter_cb_fn = NULL;
2813 	opts->iter_cb_arg = NULL;
2814 }
2815 
2816 static int
2817 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
2818 {
2819 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
2820 	    opts->max_channel_ops == 0) {
2821 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2822 		return -1;
2823 	}
2824 
2825 	return 0;
2826 }
2827 
2828 static int
2829 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs)
2830 {
2831 	struct spdk_blob_store	*bs;
2832 	uint64_t dev_size;
2833 	int rc;
2834 
2835 	dev_size = dev->blocklen * dev->blockcnt;
2836 	if (dev_size < opts->cluster_sz) {
2837 		/* Device size cannot be smaller than cluster size of blobstore */
2838 		SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n",
2839 			     dev_size, opts->cluster_sz);
2840 		return -ENOSPC;
2841 	}
2842 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
2843 		/* Cluster size cannot be smaller than page size */
2844 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n",
2845 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
2846 		return -EINVAL;
2847 	}
2848 	bs = calloc(1, sizeof(struct spdk_blob_store));
2849 	if (!bs) {
2850 		return -ENOMEM;
2851 	}
2852 
2853 	TAILQ_INIT(&bs->blobs);
2854 	TAILQ_INIT(&bs->snapshots);
2855 	bs->dev = dev;
2856 	bs->md_thread = spdk_get_thread();
2857 	assert(bs->md_thread != NULL);
2858 
2859 	/*
2860 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2861 	 *  even multiple of the cluster size.
2862 	 */
2863 	bs->cluster_sz = opts->cluster_sz;
2864 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
2865 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2866 	bs->num_free_clusters = bs->total_clusters;
2867 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
2868 	bs->io_unit_size = dev->blocklen;
2869 	if (bs->used_clusters == NULL) {
2870 		free(bs);
2871 		return -ENOMEM;
2872 	}
2873 
2874 	bs->max_channel_ops = opts->max_channel_ops;
2875 	bs->super_blob = SPDK_BLOBID_INVALID;
2876 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
2877 
2878 	/* The metadata is assumed to be at least 1 page */
2879 	bs->used_md_pages = spdk_bit_array_create(1);
2880 	bs->used_blobids = spdk_bit_array_create(0);
2881 
2882 	pthread_mutex_init(&bs->used_clusters_mutex, NULL);
2883 
2884 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
2885 				sizeof(struct spdk_bs_channel), "blobstore");
2886 	rc = spdk_bs_register_md_thread(bs);
2887 	if (rc == -1) {
2888 		spdk_io_device_unregister(bs, NULL);
2889 		pthread_mutex_destroy(&bs->used_clusters_mutex);
2890 		spdk_bit_array_free(&bs->used_blobids);
2891 		spdk_bit_array_free(&bs->used_md_pages);
2892 		spdk_bit_array_free(&bs->used_clusters);
2893 		free(bs);
2894 		/* FIXME: this is a lie but don't know how to get a proper error code here */
2895 		return -ENOMEM;
2896 	}
2897 
2898 	*_bs = bs;
2899 	return 0;
2900 }
2901 
2902 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2903 
2904 struct spdk_bs_load_ctx {
2905 	struct spdk_blob_store		*bs;
2906 	struct spdk_bs_super_block	*super;
2907 
2908 	struct spdk_bs_md_mask		*mask;
2909 	bool				in_page_chain;
2910 	uint32_t			page_index;
2911 	uint32_t			cur_page;
2912 	struct spdk_blob_md_page	*page;
2913 
2914 	spdk_bs_sequence_t			*seq;
2915 	spdk_blob_op_with_handle_complete	iter_cb_fn;
2916 	void					*iter_cb_arg;
2917 	struct spdk_blob			*blob;
2918 	spdk_blob_id				blobid;
2919 };
2920 
2921 static void
2922 _spdk_bs_load_ctx_fail(struct spdk_bs_load_ctx *ctx, int bserrno)
2923 {
2924 	assert(bserrno != 0);
2925 
2926 	spdk_free(ctx->super);
2927 	spdk_bs_sequence_finish(ctx->seq, bserrno);
2928 	_spdk_bs_free(ctx->bs);
2929 	free(ctx);
2930 }
2931 
2932 static void
2933 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
2934 {
2935 	uint32_t i = 0;
2936 
2937 	while (true) {
2938 		i = spdk_bit_array_find_first_set(array, i);
2939 		if (i >= mask->length) {
2940 			break;
2941 		}
2942 		mask->mask[i / 8] |= 1U << (i % 8);
2943 		i++;
2944 	}
2945 }
2946 
2947 static int
2948 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask)
2949 {
2950 	struct spdk_bit_array *array;
2951 	uint32_t i;
2952 
2953 	if (spdk_bit_array_resize(array_ptr, mask->length) < 0) {
2954 		return -ENOMEM;
2955 	}
2956 
2957 	array = *array_ptr;
2958 	for (i = 0; i < mask->length; i++) {
2959 		if (mask->mask[i / 8] & (1U << (i % 8))) {
2960 			spdk_bit_array_set(array, i);
2961 		}
2962 	}
2963 
2964 	return 0;
2965 }
2966 
2967 static void
2968 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
2969 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
2970 {
2971 	/* Update the values in the super block */
2972 	super->super_blob = bs->super_blob;
2973 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
2974 	super->crc = _spdk_blob_md_page_calc_crc(super);
2975 	spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
2976 				   _spdk_bs_byte_to_lba(bs, sizeof(*super)),
2977 				   cb_fn, cb_arg);
2978 }
2979 
2980 static void
2981 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2982 {
2983 	struct spdk_bs_load_ctx	*ctx = arg;
2984 	uint64_t	mask_size, lba, lba_count;
2985 
2986 	/* Write out the used clusters mask */
2987 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2988 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
2989 				 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2990 	if (!ctx->mask) {
2991 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
2992 		return;
2993 	}
2994 
2995 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
2996 	ctx->mask->length = ctx->bs->total_clusters;
2997 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
2998 
2999 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
3000 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
3001 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
3002 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
3003 }
3004 
3005 static void
3006 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
3007 {
3008 	struct spdk_bs_load_ctx	*ctx = arg;
3009 	uint64_t	mask_size, lba, lba_count;
3010 
3011 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
3012 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
3013 				 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3014 	if (!ctx->mask) {
3015 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3016 		return;
3017 	}
3018 
3019 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
3020 	ctx->mask->length = ctx->super->md_len;
3021 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
3022 
3023 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
3024 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
3025 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
3026 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
3027 }
3028 
3029 static void
3030 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
3031 {
3032 	struct spdk_bs_load_ctx	*ctx = arg;
3033 	uint64_t	mask_size, lba, lba_count;
3034 
3035 	if (ctx->super->used_blobid_mask_len == 0) {
3036 		/*
3037 		 * This is a pre-v3 on-disk format where the blobid mask does not get
3038 		 *  written to disk.
3039 		 */
3040 		cb_fn(seq, arg, 0);
3041 		return;
3042 	}
3043 
3044 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
3045 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
3046 				 SPDK_MALLOC_DMA);
3047 	if (!ctx->mask) {
3048 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3049 		return;
3050 	}
3051 
3052 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
3053 	ctx->mask->length = ctx->super->md_len;
3054 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
3055 
3056 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
3057 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
3058 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
3059 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
3060 }
3061 
3062 static void
3063 _spdk_blob_set_thin_provision(struct spdk_blob *blob)
3064 {
3065 	_spdk_blob_verify_md_op(blob);
3066 	blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
3067 	blob->state = SPDK_BLOB_STATE_DIRTY;
3068 }
3069 
3070 static void
3071 _spdk_blob_set_clear_method(struct spdk_blob *blob, enum blob_clear_method clear_method)
3072 {
3073 	_spdk_blob_verify_md_op(blob);
3074 	blob->clear_method = clear_method;
3075 	blob->md_ro_flags |= (clear_method << SPDK_BLOB_CLEAR_METHOD_SHIFT);
3076 	blob->state = SPDK_BLOB_STATE_DIRTY;
3077 }
3078 
3079 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno);
3080 
3081 static void
3082 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno)
3083 {
3084 	struct spdk_bs_load_ctx *ctx = cb_arg;
3085 	spdk_blob_id id;
3086 	int64_t page_num;
3087 
3088 	/* Iterate to next blob (we can't use spdk_bs_iter_next function as our
3089 	 * last blob has been removed */
3090 	page_num = _spdk_bs_blobid_to_page(ctx->blobid);
3091 	page_num++;
3092 	page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num);
3093 	if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) {
3094 		_spdk_bs_load_iter(ctx, NULL, -ENOENT);
3095 		return;
3096 	}
3097 
3098 	id = _spdk_bs_page_to_blobid(page_num);
3099 
3100 	spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx);
3101 }
3102 
3103 static void
3104 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno)
3105 {
3106 	struct spdk_bs_load_ctx *ctx = cb_arg;
3107 
3108 	if (bserrno != 0) {
3109 		SPDK_ERRLOG("Failed to close corrupted blob\n");
3110 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
3111 		return;
3112 	}
3113 
3114 	spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx);
3115 }
3116 
3117 static void
3118 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno)
3119 {
3120 	struct spdk_bs_load_ctx *ctx = cb_arg;
3121 	uint64_t i;
3122 
3123 	if (bserrno != 0) {
3124 		SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
3125 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
3126 		return;
3127 	}
3128 
3129 	/* Snapshot and clone have the same copy of cluster map at this point.
3130 	 * Let's clear cluster map for snpashot now so that it won't be cleared
3131 	 * for clone later when we remove snapshot. Also set thin provision to
3132 	 * pass data corruption check */
3133 	for (i = 0; i < ctx->blob->active.num_clusters; i++) {
3134 		ctx->blob->active.clusters[i] = 0;
3135 	}
3136 
3137 	ctx->blob->md_ro = false;
3138 
3139 	_spdk_blob_set_thin_provision(ctx->blob);
3140 
3141 	ctx->blobid = ctx->blob->id;
3142 
3143 	spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx);
3144 }
3145 
3146 static void
3147 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno)
3148 {
3149 	struct spdk_bs_load_ctx *ctx = cb_arg;
3150 
3151 	if (bserrno != 0) {
3152 		SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
3153 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
3154 		return;
3155 	}
3156 
3157 	ctx->blob->md_ro = false;
3158 	_spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true);
3159 	_spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true);
3160 	spdk_blob_set_read_only(ctx->blob);
3161 
3162 	if (ctx->iter_cb_fn) {
3163 		ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0);
3164 	}
3165 	_spdk_bs_blob_list_add(ctx->blob);
3166 
3167 	spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
3168 }
3169 
3170 static void
3171 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno)
3172 {
3173 	struct spdk_bs_load_ctx *ctx = cb_arg;
3174 
3175 	if (bserrno != 0) {
3176 		SPDK_ERRLOG("Failed to open clone of a corrupted blob\n");
3177 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
3178 		return;
3179 	}
3180 
3181 	if (blob->parent_id == ctx->blob->id) {
3182 		/* Power failure occured before updating clone (snapshot delete case)
3183 		 * or after updating clone (creating snapshot case) - keep snapshot */
3184 		spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx);
3185 	} else {
3186 		/* Power failure occured after updating clone (snapshot delete case)
3187 		 * or before updating clone (creating snapshot case) - remove snapshot */
3188 		spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx);
3189 	}
3190 }
3191 
3192 static void
3193 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno)
3194 {
3195 	struct spdk_bs_load_ctx *ctx = arg;
3196 	const void *value;
3197 	size_t len;
3198 	int rc = 0;
3199 
3200 	if (bserrno == 0) {
3201 		/* Examine blob if it is corrupted after power failure. Fix
3202 		 * the ones that can be fixed and remove any other corrupted
3203 		 * ones. If it is not corrupted just process it */
3204 		rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true);
3205 		if (rc != 0) {
3206 			rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true);
3207 			if (rc != 0) {
3208 				/* Not corrupted - process it and continue with iterating through blobs */
3209 				if (ctx->iter_cb_fn) {
3210 					ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0);
3211 				}
3212 				_spdk_bs_blob_list_add(blob);
3213 				spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx);
3214 				return;
3215 			}
3216 
3217 		}
3218 
3219 		assert(len == sizeof(spdk_blob_id));
3220 
3221 		ctx->blob = blob;
3222 
3223 		/* Open clone to check if we are able to fix this blob or should we remove it */
3224 		spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx);
3225 		return;
3226 	} else if (bserrno == -ENOENT) {
3227 		bserrno = 0;
3228 	} else {
3229 		/*
3230 		 * This case needs to be looked at further.  Same problem
3231 		 *  exists with applications that rely on explicit blob
3232 		 *  iteration.  We should just skip the blob that failed
3233 		 *  to load and continue on to the next one.
3234 		 */
3235 		SPDK_ERRLOG("Error in iterating blobs\n");
3236 	}
3237 
3238 	ctx->iter_cb_fn = NULL;
3239 
3240 	spdk_free(ctx->super);
3241 	spdk_free(ctx->mask);
3242 	spdk_bs_sequence_finish(ctx->seq, bserrno);
3243 	free(ctx);
3244 }
3245 
3246 static void
3247 _spdk_bs_load_complete(struct spdk_bs_load_ctx *ctx)
3248 {
3249 	spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx);
3250 }
3251 
3252 static void
3253 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3254 {
3255 	struct spdk_bs_load_ctx *ctx = cb_arg;
3256 	int rc;
3257 
3258 	/* The type must be correct */
3259 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
3260 
3261 	/* The length of the mask (in bits) must not be greater than
3262 	 * the length of the buffer (converted to bits) */
3263 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
3264 
3265 	/* The length of the mask must be exactly equal to the size
3266 	 * (in pages) of the metadata region */
3267 	assert(ctx->mask->length == ctx->super->md_len);
3268 
3269 	rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask);
3270 	if (rc < 0) {
3271 		spdk_free(ctx->mask);
3272 		_spdk_bs_load_ctx_fail(ctx, rc);
3273 		return;
3274 	}
3275 
3276 	_spdk_bs_load_complete(ctx);
3277 }
3278 
3279 static void
3280 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3281 {
3282 	struct spdk_bs_load_ctx *ctx = cb_arg;
3283 	uint64_t		lba, lba_count, mask_size;
3284 	int			rc;
3285 
3286 	if (bserrno != 0) {
3287 		_spdk_bs_load_ctx_fail(ctx, bserrno);
3288 		return;
3289 	}
3290 
3291 	/* The type must be correct */
3292 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
3293 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
3294 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
3295 					     struct spdk_blob_md_page) * 8));
3296 	/* The length of the mask must be exactly equal to the total number of clusters */
3297 	assert(ctx->mask->length == ctx->bs->total_clusters);
3298 
3299 	rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask);
3300 	if (rc < 0) {
3301 		spdk_free(ctx->mask);
3302 		_spdk_bs_load_ctx_fail(ctx, rc);
3303 		return;
3304 	}
3305 
3306 	ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters);
3307 	assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters);
3308 
3309 	spdk_free(ctx->mask);
3310 
3311 	/* Read the used blobids mask */
3312 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
3313 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
3314 				 SPDK_MALLOC_DMA);
3315 	if (!ctx->mask) {
3316 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3317 		return;
3318 	}
3319 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
3320 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
3321 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
3322 				  _spdk_bs_load_used_blobids_cpl, ctx);
3323 }
3324 
3325 static void
3326 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3327 {
3328 	struct spdk_bs_load_ctx *ctx = cb_arg;
3329 	uint64_t		lba, lba_count, mask_size;
3330 	int			rc;
3331 
3332 	if (bserrno != 0) {
3333 		_spdk_bs_load_ctx_fail(ctx, bserrno);
3334 		return;
3335 	}
3336 
3337 	/* The type must be correct */
3338 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
3339 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
3340 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
3341 				     8));
3342 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
3343 	assert(ctx->mask->length == ctx->super->md_len);
3344 
3345 	rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask);
3346 	if (rc < 0) {
3347 		spdk_free(ctx->mask);
3348 		_spdk_bs_load_ctx_fail(ctx, rc);
3349 		return;
3350 	}
3351 
3352 	spdk_free(ctx->mask);
3353 
3354 	/* Read the used clusters mask */
3355 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
3356 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
3357 				 SPDK_MALLOC_DMA);
3358 	if (!ctx->mask) {
3359 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3360 		return;
3361 	}
3362 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
3363 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
3364 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
3365 				  _spdk_bs_load_used_clusters_cpl, ctx);
3366 }
3367 
3368 static void
3369 _spdk_bs_load_read_used_pages(struct spdk_bs_load_ctx *ctx)
3370 {
3371 	uint64_t lba, lba_count, mask_size;
3372 
3373 	/* Read the used pages mask */
3374 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
3375 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
3376 				 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3377 	if (!ctx->mask) {
3378 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3379 		return;
3380 	}
3381 
3382 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
3383 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
3384 	spdk_bs_sequence_read_dev(ctx->seq, ctx->mask, lba, lba_count,
3385 				  _spdk_bs_load_used_pages_cpl, ctx);
3386 }
3387 
3388 static int
3389 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
3390 {
3391 	struct spdk_blob_md_descriptor *desc;
3392 	size_t	cur_desc = 0;
3393 
3394 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
3395 	while (cur_desc < sizeof(page->descriptors)) {
3396 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
3397 			if (desc->length == 0) {
3398 				/* If padding and length are 0, this terminates the page */
3399 				break;
3400 			}
3401 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) {
3402 			struct spdk_blob_md_descriptor_extent_rle	*desc_extent_rle;
3403 			unsigned int				i, j;
3404 			unsigned int				cluster_count = 0;
3405 			uint32_t				cluster_idx;
3406 
3407 			desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc;
3408 
3409 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
3410 				for (j = 0; j < desc_extent_rle->extents[i].length; j++) {
3411 					cluster_idx = desc_extent_rle->extents[i].cluster_idx;
3412 					/*
3413 					 * cluster_idx = 0 means an unallocated cluster - don't mark that
3414 					 * in the used cluster map.
3415 					 */
3416 					if (cluster_idx != 0) {
3417 						spdk_bit_array_set(bs->used_clusters, cluster_idx + j);
3418 						if (bs->num_free_clusters == 0) {
3419 							return -ENOSPC;
3420 						}
3421 						bs->num_free_clusters--;
3422 					}
3423 					cluster_count++;
3424 				}
3425 			}
3426 			if (cluster_count == 0) {
3427 				return -EINVAL;
3428 			}
3429 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) {
3430 			/* Skip this item */
3431 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
3432 			/* Skip this item */
3433 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
3434 			/* Skip this item */
3435 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
3436 			/* Skip this item */
3437 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_TABLE) {
3438 			/* TODO: Read the extent pages when replaying the md,
3439 			 * only after particular blob md chain was read */
3440 		} else {
3441 			/* Error */
3442 			return -EINVAL;
3443 		}
3444 		/* Advance to the next descriptor */
3445 		cur_desc += sizeof(*desc) + desc->length;
3446 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
3447 			break;
3448 		}
3449 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
3450 	}
3451 	return 0;
3452 }
3453 
3454 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
3455 {
3456 	uint32_t crc;
3457 
3458 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
3459 	if (crc != ctx->page->crc) {
3460 		return false;
3461 	}
3462 
3463 	/* First page of a sequence should match the blobid. */
3464 	if (ctx->page->sequence_num == 0 &&
3465 	    _spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
3466 		return false;
3467 	}
3468 	return true;
3469 }
3470 
3471 static void
3472 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx);
3473 
3474 static void
3475 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3476 {
3477 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3478 
3479 	if (bserrno != 0) {
3480 		_spdk_bs_load_ctx_fail(ctx, bserrno);
3481 		return;
3482 	}
3483 
3484 	_spdk_bs_load_complete(ctx);
3485 }
3486 
3487 static void
3488 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3489 {
3490 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3491 
3492 	spdk_free(ctx->mask);
3493 	ctx->mask = NULL;
3494 
3495 	if (bserrno != 0) {
3496 		_spdk_bs_load_ctx_fail(ctx, bserrno);
3497 		return;
3498 	}
3499 
3500 	_spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_load_write_used_clusters_cpl);
3501 }
3502 
3503 static void
3504 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3505 {
3506 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3507 
3508 	spdk_free(ctx->mask);
3509 	ctx->mask = NULL;
3510 
3511 	if (bserrno != 0) {
3512 		_spdk_bs_load_ctx_fail(ctx, bserrno);
3513 		return;
3514 	}
3515 
3516 	_spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_load_write_used_blobids_cpl);
3517 }
3518 
3519 static void
3520 _spdk_bs_load_write_used_md(struct spdk_bs_load_ctx *ctx)
3521 {
3522 	_spdk_bs_write_used_md(ctx->seq, ctx, _spdk_bs_load_write_used_pages_cpl);
3523 }
3524 
3525 static void
3526 _spdk_bs_load_replay_md_chain_cpl(struct spdk_bs_load_ctx *ctx)
3527 {
3528 	uint64_t num_md_clusters;
3529 	uint64_t i;
3530 
3531 	ctx->in_page_chain = false;
3532 
3533 	do {
3534 		ctx->page_index++;
3535 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
3536 
3537 	if (ctx->page_index < ctx->super->md_len) {
3538 		ctx->cur_page = ctx->page_index;
3539 		_spdk_bs_load_replay_cur_md_page(ctx);
3540 	} else {
3541 		/* Claim all of the clusters used by the metadata */
3542 		num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
3543 		for (i = 0; i < num_md_clusters; i++) {
3544 			_spdk_bs_claim_cluster(ctx->bs, i);
3545 		}
3546 		spdk_free(ctx->page);
3547 		_spdk_bs_load_write_used_md(ctx);
3548 	}
3549 }
3550 
3551 static void
3552 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3553 {
3554 	struct spdk_bs_load_ctx *ctx = cb_arg;
3555 	uint32_t page_num;
3556 
3557 	if (bserrno != 0) {
3558 		_spdk_bs_load_ctx_fail(ctx, bserrno);
3559 		return;
3560 	}
3561 
3562 	page_num = ctx->cur_page;
3563 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
3564 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
3565 			_spdk_bs_claim_md_page(ctx->bs, page_num);
3566 			if (ctx->page->sequence_num == 0) {
3567 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
3568 			}
3569 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
3570 				_spdk_bs_load_ctx_fail(ctx, -EILSEQ);
3571 				return;
3572 			}
3573 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
3574 				ctx->in_page_chain = true;
3575 				ctx->cur_page = ctx->page->next;
3576 				_spdk_bs_load_replay_cur_md_page(ctx);
3577 				return;
3578 			}
3579 		}
3580 	}
3581 	_spdk_bs_load_replay_md_chain_cpl(ctx);
3582 }
3583 
3584 static void
3585 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx)
3586 {
3587 	uint64_t lba;
3588 
3589 	assert(ctx->cur_page < ctx->super->md_len);
3590 	lba = _spdk_bs_md_page_to_lba(ctx->bs, ctx->cur_page);
3591 	spdk_bs_sequence_read_dev(ctx->seq, ctx->page, lba,
3592 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3593 				  _spdk_bs_load_replay_md_cpl, ctx);
3594 }
3595 
3596 static void
3597 _spdk_bs_load_replay_md(struct spdk_bs_load_ctx *ctx)
3598 {
3599 	ctx->page_index = 0;
3600 	ctx->cur_page = 0;
3601 	ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
3602 				 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3603 	if (!ctx->page) {
3604 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3605 		return;
3606 	}
3607 	_spdk_bs_load_replay_cur_md_page(ctx);
3608 }
3609 
3610 static void
3611 _spdk_bs_recover(struct spdk_bs_load_ctx *ctx)
3612 {
3613 	int		rc;
3614 
3615 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
3616 	if (rc < 0) {
3617 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3618 		return;
3619 	}
3620 
3621 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
3622 	if (rc < 0) {
3623 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3624 		return;
3625 	}
3626 
3627 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3628 	if (rc < 0) {
3629 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3630 		return;
3631 	}
3632 
3633 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
3634 	_spdk_bs_load_replay_md(ctx);
3635 }
3636 
3637 static void
3638 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3639 {
3640 	struct spdk_bs_load_ctx *ctx = cb_arg;
3641 	uint32_t	crc;
3642 	int		rc;
3643 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
3644 
3645 	if (ctx->super->version > SPDK_BS_VERSION ||
3646 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
3647 		_spdk_bs_load_ctx_fail(ctx, -EILSEQ);
3648 		return;
3649 	}
3650 
3651 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3652 		   sizeof(ctx->super->signature)) != 0) {
3653 		_spdk_bs_load_ctx_fail(ctx, -EILSEQ);
3654 		return;
3655 	}
3656 
3657 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
3658 	if (crc != ctx->super->crc) {
3659 		_spdk_bs_load_ctx_fail(ctx, -EILSEQ);
3660 		return;
3661 	}
3662 
3663 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3664 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
3665 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3666 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
3667 	} else {
3668 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
3669 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3670 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3671 		_spdk_bs_load_ctx_fail(ctx, -ENXIO);
3672 		return;
3673 	}
3674 
3675 	if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) {
3676 		SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n",
3677 			       ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size);
3678 		_spdk_bs_load_ctx_fail(ctx, -EILSEQ);
3679 		return;
3680 	}
3681 
3682 	if (ctx->super->size == 0) {
3683 		ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen;
3684 	}
3685 
3686 	if (ctx->super->io_unit_size == 0) {
3687 		ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE;
3688 	}
3689 
3690 	/* Parse the super block */
3691 	ctx->bs->clean = 1;
3692 	ctx->bs->cluster_sz = ctx->super->cluster_size;
3693 	ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size;
3694 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
3695 	ctx->bs->io_unit_size = ctx->super->io_unit_size;
3696 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3697 	if (rc < 0) {
3698 		_spdk_bs_load_ctx_fail(ctx, -ENOMEM);
3699 		return;
3700 	}
3701 	ctx->bs->md_start = ctx->super->md_start;
3702 	ctx->bs->md_len = ctx->super->md_len;
3703 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up(
3704 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
3705 	ctx->bs->super_blob = ctx->super->super_blob;
3706 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
3707 
3708 	if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) {
3709 		_spdk_bs_recover(ctx);
3710 	} else {
3711 		_spdk_bs_load_read_used_pages(ctx);
3712 	}
3713 }
3714 
3715 void
3716 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
3717 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
3718 {
3719 	struct spdk_blob_store	*bs;
3720 	struct spdk_bs_cpl	cpl;
3721 	struct spdk_bs_load_ctx *ctx;
3722 	struct spdk_bs_opts	opts = {};
3723 	int err;
3724 
3725 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
3726 
3727 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
3728 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen);
3729 		dev->destroy(dev);
3730 		cb_fn(cb_arg, NULL, -EINVAL);
3731 		return;
3732 	}
3733 
3734 	if (o) {
3735 		opts = *o;
3736 	} else {
3737 		spdk_bs_opts_init(&opts);
3738 	}
3739 
3740 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
3741 		dev->destroy(dev);
3742 		cb_fn(cb_arg, NULL, -EINVAL);
3743 		return;
3744 	}
3745 
3746 	err = _spdk_bs_alloc(dev, &opts, &bs);
3747 	if (err) {
3748 		dev->destroy(dev);
3749 		cb_fn(cb_arg, NULL, err);
3750 		return;
3751 	}
3752 
3753 	ctx = calloc(1, sizeof(*ctx));
3754 	if (!ctx) {
3755 		_spdk_bs_free(bs);
3756 		cb_fn(cb_arg, NULL, -ENOMEM);
3757 		return;
3758 	}
3759 
3760 	ctx->bs = bs;
3761 	ctx->iter_cb_fn = opts.iter_cb_fn;
3762 	ctx->iter_cb_arg = opts.iter_cb_arg;
3763 
3764 	/* Allocate memory for the super block */
3765 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
3766 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3767 	if (!ctx->super) {
3768 		free(ctx);
3769 		_spdk_bs_free(bs);
3770 		cb_fn(cb_arg, NULL, -ENOMEM);
3771 		return;
3772 	}
3773 
3774 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3775 	cpl.u.bs_handle.cb_fn = cb_fn;
3776 	cpl.u.bs_handle.cb_arg = cb_arg;
3777 	cpl.u.bs_handle.bs = bs;
3778 
3779 	ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3780 	if (!ctx->seq) {
3781 		spdk_free(ctx->super);
3782 		free(ctx);
3783 		_spdk_bs_free(bs);
3784 		cb_fn(cb_arg, NULL, -ENOMEM);
3785 		return;
3786 	}
3787 
3788 	/* Read the super block */
3789 	spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3790 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3791 				  _spdk_bs_load_super_cpl, ctx);
3792 }
3793 
3794 /* END spdk_bs_load */
3795 
3796 /* START spdk_bs_dump */
3797 
3798 struct spdk_bs_dump_ctx {
3799 	struct spdk_blob_store		*bs;
3800 	struct spdk_bs_super_block	*super;
3801 	uint32_t			cur_page;
3802 	struct spdk_blob_md_page	*page;
3803 	spdk_bs_sequence_t		*seq;
3804 	FILE				*fp;
3805 	spdk_bs_dump_print_xattr	print_xattr_fn;
3806 	char				xattr_name[4096];
3807 };
3808 
3809 static void
3810 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno)
3811 {
3812 	spdk_free(ctx->super);
3813 
3814 	/*
3815 	 * We need to defer calling spdk_bs_call_cpl() until after
3816 	 * dev destruction, so tuck these away for later use.
3817 	 */
3818 	ctx->bs->unload_err = bserrno;
3819 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3820 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3821 
3822 	spdk_bs_sequence_finish(seq, 0);
3823 	_spdk_bs_free(ctx->bs);
3824 	free(ctx);
3825 }
3826 
3827 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
3828 
3829 static void
3830 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx)
3831 {
3832 	uint32_t page_idx = ctx->cur_page;
3833 	struct spdk_blob_md_page *page = ctx->page;
3834 	struct spdk_blob_md_descriptor *desc;
3835 	size_t cur_desc = 0;
3836 	uint32_t crc;
3837 
3838 	fprintf(ctx->fp, "=========\n");
3839 	fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx);
3840 	fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id);
3841 
3842 	crc = _spdk_blob_md_page_calc_crc(page);
3843 	fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch");
3844 
3845 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
3846 	while (cur_desc < sizeof(page->descriptors)) {
3847 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
3848 			if (desc->length == 0) {
3849 				/* If padding and length are 0, this terminates the page */
3850 				break;
3851 			}
3852 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) {
3853 			struct spdk_blob_md_descriptor_extent_rle	*desc_extent_rle;
3854 			unsigned int				i;
3855 
3856 			desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc;
3857 
3858 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
3859 				if (desc_extent_rle->extents[i].cluster_idx != 0) {
3860 					fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32,
3861 						desc_extent_rle->extents[i].cluster_idx);
3862 				} else {
3863 					fprintf(ctx->fp, "Unallocated Extent - ");
3864 				}
3865 				fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length);
3866 				fprintf(ctx->fp, "\n");
3867 			}
3868 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) {
3869 			struct spdk_blob_md_descriptor_extent_page	*desc_extent;
3870 			unsigned int					i;
3871 
3872 			desc_extent = (struct spdk_blob_md_descriptor_extent_page *)desc;
3873 
3874 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->cluster_idx[0]); i++) {
3875 				if (desc_extent->cluster_idx[i] != 0) {
3876 					fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32,
3877 						desc_extent->cluster_idx[i]);
3878 				} else {
3879 					fprintf(ctx->fp, "Unallocated Extent");
3880 				}
3881 				fprintf(ctx->fp, "\n");
3882 			}
3883 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
3884 			struct spdk_blob_md_descriptor_xattr *desc_xattr;
3885 			uint32_t i;
3886 
3887 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
3888 
3889 			if (desc_xattr->length !=
3890 			    sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) +
3891 			    desc_xattr->name_length + desc_xattr->value_length) {
3892 			}
3893 
3894 			memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length);
3895 			ctx->xattr_name[desc_xattr->name_length] = '\0';
3896 			fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name);
3897 			fprintf(ctx->fp, "       value = \"");
3898 			ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name,
3899 					    (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
3900 					    desc_xattr->value_length);
3901 			fprintf(ctx->fp, "\"\n");
3902 			for (i = 0; i < desc_xattr->value_length; i++) {
3903 				if (i % 16 == 0) {
3904 					fprintf(ctx->fp, "               ");
3905 				}
3906 				fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i));
3907 				if ((i + 1) % 16 == 0) {
3908 					fprintf(ctx->fp, "\n");
3909 				}
3910 			}
3911 			if (i % 16 != 0) {
3912 				fprintf(ctx->fp, "\n");
3913 			}
3914 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
3915 			/* TODO */
3916 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
3917 			/* TODO */
3918 		} else {
3919 			/* Error */
3920 		}
3921 		/* Advance to the next descriptor */
3922 		cur_desc += sizeof(*desc) + desc->length;
3923 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
3924 			break;
3925 		}
3926 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
3927 	}
3928 }
3929 
3930 static void
3931 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3932 {
3933 	struct spdk_bs_dump_ctx *ctx = cb_arg;
3934 
3935 	if (bserrno != 0) {
3936 		_spdk_bs_dump_finish(seq, ctx, bserrno);
3937 		return;
3938 	}
3939 
3940 	if (ctx->page->id != 0) {
3941 		_spdk_bs_dump_print_md_page(ctx);
3942 	}
3943 
3944 	ctx->cur_page++;
3945 
3946 	if (ctx->cur_page < ctx->super->md_len) {
3947 		_spdk_bs_dump_read_md_page(seq, ctx);
3948 	} else {
3949 		spdk_free(ctx->page);
3950 		_spdk_bs_dump_finish(seq, ctx, 0);
3951 	}
3952 }
3953 
3954 static void
3955 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
3956 {
3957 	struct spdk_bs_dump_ctx *ctx = cb_arg;
3958 	uint64_t lba;
3959 
3960 	assert(ctx->cur_page < ctx->super->md_len);
3961 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
3962 	spdk_bs_sequence_read_dev(seq, ctx->page, lba,
3963 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3964 				  _spdk_bs_dump_read_md_page_cpl, ctx);
3965 }
3966 
3967 static void
3968 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3969 {
3970 	struct spdk_bs_dump_ctx *ctx = cb_arg;
3971 
3972 	fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature);
3973 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3974 		   sizeof(ctx->super->signature)) != 0) {
3975 		fprintf(ctx->fp, "(Mismatch)\n");
3976 		_spdk_bs_dump_finish(seq, ctx, bserrno);
3977 		return;
3978 	} else {
3979 		fprintf(ctx->fp, "(OK)\n");
3980 	}
3981 	fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version);
3982 	fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc,
3983 		(ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch");
3984 	fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype);
3985 	fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size);
3986 	fprintf(ctx->fp, "Super Blob ID: ");
3987 	if (ctx->super->super_blob == SPDK_BLOBID_INVALID) {
3988 		fprintf(ctx->fp, "(None)\n");
3989 	} else {
3990 		fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob);
3991 	}
3992 	fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean);
3993 	fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start);
3994 	fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len);
3995 	fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start);
3996 	fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len);
3997 	fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start);
3998 	fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len);
3999 	fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start);
4000 	fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len);
4001 
4002 	ctx->cur_page = 0;
4003 	ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
4004 				 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4005 	if (!ctx->page) {
4006 		_spdk_bs_dump_finish(seq, ctx, -ENOMEM);
4007 		return;
4008 	}
4009 	_spdk_bs_dump_read_md_page(seq, ctx);
4010 }
4011 
4012 void
4013 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn,
4014 	     spdk_bs_op_complete cb_fn, void *cb_arg)
4015 {
4016 	struct spdk_blob_store	*bs;
4017 	struct spdk_bs_cpl	cpl;
4018 	spdk_bs_sequence_t	*seq;
4019 	struct spdk_bs_dump_ctx *ctx;
4020 	struct spdk_bs_opts	opts = {};
4021 	int err;
4022 
4023 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev);
4024 
4025 	spdk_bs_opts_init(&opts);
4026 
4027 	err = _spdk_bs_alloc(dev, &opts, &bs);
4028 	if (err) {
4029 		dev->destroy(dev);
4030 		cb_fn(cb_arg, err);
4031 		return;
4032 	}
4033 
4034 	ctx = calloc(1, sizeof(*ctx));
4035 	if (!ctx) {
4036 		_spdk_bs_free(bs);
4037 		cb_fn(cb_arg, -ENOMEM);
4038 		return;
4039 	}
4040 
4041 	ctx->bs = bs;
4042 	ctx->fp = fp;
4043 	ctx->print_xattr_fn = print_xattr_fn;
4044 
4045 	/* Allocate memory for the super block */
4046 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4047 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4048 	if (!ctx->super) {
4049 		free(ctx);
4050 		_spdk_bs_free(bs);
4051 		cb_fn(cb_arg, -ENOMEM);
4052 		return;
4053 	}
4054 
4055 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4056 	cpl.u.bs_basic.cb_fn = cb_fn;
4057 	cpl.u.bs_basic.cb_arg = cb_arg;
4058 
4059 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4060 	if (!seq) {
4061 		spdk_free(ctx->super);
4062 		free(ctx);
4063 		_spdk_bs_free(bs);
4064 		cb_fn(cb_arg, -ENOMEM);
4065 		return;
4066 	}
4067 
4068 	/* Read the super block */
4069 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
4070 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
4071 				  _spdk_bs_dump_super_cpl, ctx);
4072 }
4073 
4074 /* END spdk_bs_dump */
4075 
4076 /* START spdk_bs_init */
4077 
4078 struct spdk_bs_init_ctx {
4079 	struct spdk_blob_store		*bs;
4080 	struct spdk_bs_super_block	*super;
4081 };
4082 
4083 static void
4084 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4085 {
4086 	struct spdk_bs_init_ctx *ctx = cb_arg;
4087 
4088 	spdk_free(ctx->super);
4089 	free(ctx);
4090 
4091 	spdk_bs_sequence_finish(seq, bserrno);
4092 }
4093 
4094 static void
4095 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4096 {
4097 	struct spdk_bs_init_ctx *ctx = cb_arg;
4098 
4099 	/* Write super block */
4100 	spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
4101 				   _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
4102 				   _spdk_bs_init_persist_super_cpl, ctx);
4103 }
4104 
4105 void
4106 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
4107 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
4108 {
4109 	struct spdk_bs_init_ctx *ctx;
4110 	struct spdk_blob_store	*bs;
4111 	struct spdk_bs_cpl	cpl;
4112 	spdk_bs_sequence_t	*seq;
4113 	spdk_bs_batch_t		*batch;
4114 	uint64_t		num_md_lba;
4115 	uint64_t		num_md_pages;
4116 	uint64_t		num_md_clusters;
4117 	uint32_t		i;
4118 	struct spdk_bs_opts	opts = {};
4119 	int			rc;
4120 
4121 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
4122 
4123 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
4124 		SPDK_ERRLOG("unsupported dev block length of %d\n",
4125 			    dev->blocklen);
4126 		dev->destroy(dev);
4127 		cb_fn(cb_arg, NULL, -EINVAL);
4128 		return;
4129 	}
4130 
4131 	if (o) {
4132 		opts = *o;
4133 	} else {
4134 		spdk_bs_opts_init(&opts);
4135 	}
4136 
4137 	if (_spdk_bs_opts_verify(&opts) != 0) {
4138 		dev->destroy(dev);
4139 		cb_fn(cb_arg, NULL, -EINVAL);
4140 		return;
4141 	}
4142 
4143 	rc = _spdk_bs_alloc(dev, &opts, &bs);
4144 	if (rc) {
4145 		dev->destroy(dev);
4146 		cb_fn(cb_arg, NULL, rc);
4147 		return;
4148 	}
4149 
4150 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
4151 		/* By default, allocate 1 page per cluster.
4152 		 * Technically, this over-allocates metadata
4153 		 * because more metadata will reduce the number
4154 		 * of usable clusters. This can be addressed with
4155 		 * more complex math in the future.
4156 		 */
4157 		bs->md_len = bs->total_clusters;
4158 	} else {
4159 		bs->md_len = opts.num_md_pages;
4160 	}
4161 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
4162 	if (rc < 0) {
4163 		_spdk_bs_free(bs);
4164 		cb_fn(cb_arg, NULL, -ENOMEM);
4165 		return;
4166 	}
4167 
4168 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
4169 	if (rc < 0) {
4170 		_spdk_bs_free(bs);
4171 		cb_fn(cb_arg, NULL, -ENOMEM);
4172 		return;
4173 	}
4174 
4175 	ctx = calloc(1, sizeof(*ctx));
4176 	if (!ctx) {
4177 		_spdk_bs_free(bs);
4178 		cb_fn(cb_arg, NULL, -ENOMEM);
4179 		return;
4180 	}
4181 
4182 	ctx->bs = bs;
4183 
4184 	/* Allocate memory for the super block */
4185 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4186 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4187 	if (!ctx->super) {
4188 		free(ctx);
4189 		_spdk_bs_free(bs);
4190 		cb_fn(cb_arg, NULL, -ENOMEM);
4191 		return;
4192 	}
4193 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
4194 	       sizeof(ctx->super->signature));
4195 	ctx->super->version = SPDK_BS_VERSION;
4196 	ctx->super->length = sizeof(*ctx->super);
4197 	ctx->super->super_blob = bs->super_blob;
4198 	ctx->super->clean = 0;
4199 	ctx->super->cluster_size = bs->cluster_sz;
4200 	ctx->super->io_unit_size = bs->io_unit_size;
4201 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
4202 
4203 	/* Calculate how many pages the metadata consumes at the front
4204 	 * of the disk.
4205 	 */
4206 
4207 	/* The super block uses 1 page */
4208 	num_md_pages = 1;
4209 
4210 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
4211 	 * up to the nearest page, plus a header.
4212 	 */
4213 	ctx->super->used_page_mask_start = num_md_pages;
4214 	ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
4215 					 spdk_divide_round_up(bs->md_len, 8),
4216 					 SPDK_BS_PAGE_SIZE);
4217 	num_md_pages += ctx->super->used_page_mask_len;
4218 
4219 	/* The used_clusters mask requires 1 bit per cluster, rounded
4220 	 * up to the nearest page, plus a header.
4221 	 */
4222 	ctx->super->used_cluster_mask_start = num_md_pages;
4223 	ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
4224 					    spdk_divide_round_up(bs->total_clusters, 8),
4225 					    SPDK_BS_PAGE_SIZE);
4226 	num_md_pages += ctx->super->used_cluster_mask_len;
4227 
4228 	/* The used_blobids mask requires 1 bit per metadata page, rounded
4229 	 * up to the nearest page, plus a header.
4230 	 */
4231 	ctx->super->used_blobid_mask_start = num_md_pages;
4232 	ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
4233 					   spdk_divide_round_up(bs->md_len, 8),
4234 					   SPDK_BS_PAGE_SIZE);
4235 	num_md_pages += ctx->super->used_blobid_mask_len;
4236 
4237 	/* The metadata region size was chosen above */
4238 	ctx->super->md_start = bs->md_start = num_md_pages;
4239 	ctx->super->md_len = bs->md_len;
4240 	num_md_pages += bs->md_len;
4241 
4242 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
4243 
4244 	ctx->super->size = dev->blockcnt * dev->blocklen;
4245 
4246 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
4247 
4248 	num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster);
4249 	if (num_md_clusters > bs->total_clusters) {
4250 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
4251 			    "please decrease number of pages reserved for metadata "
4252 			    "or increase cluster size.\n");
4253 		spdk_free(ctx->super);
4254 		free(ctx);
4255 		_spdk_bs_free(bs);
4256 		cb_fn(cb_arg, NULL, -ENOMEM);
4257 		return;
4258 	}
4259 	/* Claim all of the clusters used by the metadata */
4260 	for (i = 0; i < num_md_clusters; i++) {
4261 		_spdk_bs_claim_cluster(bs, i);
4262 	}
4263 
4264 	bs->total_data_clusters = bs->num_free_clusters;
4265 
4266 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
4267 	cpl.u.bs_handle.cb_fn = cb_fn;
4268 	cpl.u.bs_handle.cb_arg = cb_arg;
4269 	cpl.u.bs_handle.bs = bs;
4270 
4271 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4272 	if (!seq) {
4273 		spdk_free(ctx->super);
4274 		free(ctx);
4275 		_spdk_bs_free(bs);
4276 		cb_fn(cb_arg, NULL, -ENOMEM);
4277 		return;
4278 	}
4279 
4280 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
4281 
4282 	/* Clear metadata space */
4283 	spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
4284 
4285 	switch (opts.clear_method) {
4286 	case BS_CLEAR_WITH_UNMAP:
4287 		/* Trim data clusters */
4288 		spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
4289 		break;
4290 	case BS_CLEAR_WITH_WRITE_ZEROES:
4291 		/* Write_zeroes to data clusters */
4292 		spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
4293 		break;
4294 	case BS_CLEAR_WITH_NONE:
4295 	default:
4296 		break;
4297 	}
4298 
4299 	spdk_bs_batch_close(batch);
4300 }
4301 
4302 /* END spdk_bs_init */
4303 
4304 /* START spdk_bs_destroy */
4305 
4306 static void
4307 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4308 {
4309 	struct spdk_bs_init_ctx *ctx = cb_arg;
4310 	struct spdk_blob_store *bs = ctx->bs;
4311 
4312 	/*
4313 	 * We need to defer calling spdk_bs_call_cpl() until after
4314 	 * dev destruction, so tuck these away for later use.
4315 	 */
4316 	bs->unload_err = bserrno;
4317 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
4318 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
4319 
4320 	spdk_bs_sequence_finish(seq, bserrno);
4321 
4322 	_spdk_bs_free(bs);
4323 	free(ctx);
4324 }
4325 
4326 void
4327 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
4328 		void *cb_arg)
4329 {
4330 	struct spdk_bs_cpl	cpl;
4331 	spdk_bs_sequence_t	*seq;
4332 	struct spdk_bs_init_ctx *ctx;
4333 
4334 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
4335 
4336 	if (!TAILQ_EMPTY(&bs->blobs)) {
4337 		SPDK_ERRLOG("Blobstore still has open blobs\n");
4338 		cb_fn(cb_arg, -EBUSY);
4339 		return;
4340 	}
4341 
4342 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4343 	cpl.u.bs_basic.cb_fn = cb_fn;
4344 	cpl.u.bs_basic.cb_arg = cb_arg;
4345 
4346 	ctx = calloc(1, sizeof(*ctx));
4347 	if (!ctx) {
4348 		cb_fn(cb_arg, -ENOMEM);
4349 		return;
4350 	}
4351 
4352 	ctx->bs = bs;
4353 
4354 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4355 	if (!seq) {
4356 		free(ctx);
4357 		cb_fn(cb_arg, -ENOMEM);
4358 		return;
4359 	}
4360 
4361 	/* Write zeroes to the super block */
4362 	spdk_bs_sequence_write_zeroes_dev(seq,
4363 					  _spdk_bs_page_to_lba(bs, 0),
4364 					  _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
4365 					  _spdk_bs_destroy_trim_cpl, ctx);
4366 }
4367 
4368 /* END spdk_bs_destroy */
4369 
4370 /* START spdk_bs_unload */
4371 
4372 static void
4373 _spdk_bs_unload_finish(struct spdk_bs_load_ctx *ctx, int bserrno)
4374 {
4375 	spdk_bs_sequence_t *seq = ctx->seq;
4376 
4377 	spdk_free(ctx->super);
4378 
4379 	/*
4380 	 * We need to defer calling spdk_bs_call_cpl() until after
4381 	 * dev destruction, so tuck these away for later use.
4382 	 */
4383 	ctx->bs->unload_err = bserrno;
4384 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
4385 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
4386 
4387 	spdk_bs_sequence_finish(seq, bserrno);
4388 
4389 	_spdk_bs_free(ctx->bs);
4390 	free(ctx);
4391 }
4392 
4393 static void
4394 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4395 {
4396 	struct spdk_bs_load_ctx	*ctx = cb_arg;
4397 
4398 	_spdk_bs_unload_finish(ctx, bserrno);
4399 }
4400 
4401 static void
4402 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4403 {
4404 	struct spdk_bs_load_ctx	*ctx = cb_arg;
4405 
4406 	spdk_free(ctx->mask);
4407 
4408 	if (bserrno != 0) {
4409 		_spdk_bs_unload_finish(ctx, bserrno);
4410 		return;
4411 	}
4412 
4413 	ctx->super->clean = 1;
4414 
4415 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
4416 }
4417 
4418 static void
4419 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4420 {
4421 	struct spdk_bs_load_ctx	*ctx = cb_arg;
4422 
4423 	spdk_free(ctx->mask);
4424 	ctx->mask = NULL;
4425 
4426 	if (bserrno != 0) {
4427 		_spdk_bs_unload_finish(ctx, bserrno);
4428 		return;
4429 	}
4430 
4431 	_spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_unload_write_used_clusters_cpl);
4432 }
4433 
4434 static void
4435 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4436 {
4437 	struct spdk_bs_load_ctx	*ctx = cb_arg;
4438 
4439 	spdk_free(ctx->mask);
4440 	ctx->mask = NULL;
4441 
4442 	if (bserrno != 0) {
4443 		_spdk_bs_unload_finish(ctx, bserrno);
4444 		return;
4445 	}
4446 
4447 	_spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_unload_write_used_blobids_cpl);
4448 }
4449 
4450 static void
4451 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4452 {
4453 	struct spdk_bs_load_ctx	*ctx = cb_arg;
4454 
4455 	if (bserrno != 0) {
4456 		_spdk_bs_unload_finish(ctx, bserrno);
4457 		return;
4458 	}
4459 
4460 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
4461 }
4462 
4463 void
4464 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
4465 {
4466 	struct spdk_bs_cpl	cpl;
4467 	struct spdk_bs_load_ctx *ctx;
4468 
4469 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
4470 
4471 	if (!TAILQ_EMPTY(&bs->blobs)) {
4472 		SPDK_ERRLOG("Blobstore still has open blobs\n");
4473 		cb_fn(cb_arg, -EBUSY);
4474 		return;
4475 	}
4476 
4477 	ctx = calloc(1, sizeof(*ctx));
4478 	if (!ctx) {
4479 		cb_fn(cb_arg, -ENOMEM);
4480 		return;
4481 	}
4482 
4483 	ctx->bs = bs;
4484 
4485 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4486 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4487 	if (!ctx->super) {
4488 		free(ctx);
4489 		cb_fn(cb_arg, -ENOMEM);
4490 		return;
4491 	}
4492 
4493 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4494 	cpl.u.bs_basic.cb_fn = cb_fn;
4495 	cpl.u.bs_basic.cb_arg = cb_arg;
4496 
4497 	ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4498 	if (!ctx->seq) {
4499 		spdk_free(ctx->super);
4500 		free(ctx);
4501 		cb_fn(cb_arg, -ENOMEM);
4502 		return;
4503 	}
4504 
4505 	/* Read super block */
4506 	spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
4507 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
4508 				  _spdk_bs_unload_read_super_cpl, ctx);
4509 }
4510 
4511 /* END spdk_bs_unload */
4512 
4513 /* START spdk_bs_set_super */
4514 
4515 struct spdk_bs_set_super_ctx {
4516 	struct spdk_blob_store		*bs;
4517 	struct spdk_bs_super_block	*super;
4518 };
4519 
4520 static void
4521 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4522 {
4523 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
4524 
4525 	if (bserrno != 0) {
4526 		SPDK_ERRLOG("Unable to write to super block of blobstore\n");
4527 	}
4528 
4529 	spdk_free(ctx->super);
4530 
4531 	spdk_bs_sequence_finish(seq, bserrno);
4532 
4533 	free(ctx);
4534 }
4535 
4536 static void
4537 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4538 {
4539 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
4540 
4541 	if (bserrno != 0) {
4542 		SPDK_ERRLOG("Unable to read super block of blobstore\n");
4543 		spdk_free(ctx->super);
4544 		spdk_bs_sequence_finish(seq, bserrno);
4545 		free(ctx);
4546 		return;
4547 	}
4548 
4549 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx);
4550 }
4551 
4552 void
4553 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
4554 		  spdk_bs_op_complete cb_fn, void *cb_arg)
4555 {
4556 	struct spdk_bs_cpl		cpl;
4557 	spdk_bs_sequence_t		*seq;
4558 	struct spdk_bs_set_super_ctx	*ctx;
4559 
4560 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n");
4561 
4562 	ctx = calloc(1, sizeof(*ctx));
4563 	if (!ctx) {
4564 		cb_fn(cb_arg, -ENOMEM);
4565 		return;
4566 	}
4567 
4568 	ctx->bs = bs;
4569 
4570 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4571 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4572 	if (!ctx->super) {
4573 		free(ctx);
4574 		cb_fn(cb_arg, -ENOMEM);
4575 		return;
4576 	}
4577 
4578 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4579 	cpl.u.bs_basic.cb_fn = cb_fn;
4580 	cpl.u.bs_basic.cb_arg = cb_arg;
4581 
4582 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4583 	if (!seq) {
4584 		spdk_free(ctx->super);
4585 		free(ctx);
4586 		cb_fn(cb_arg, -ENOMEM);
4587 		return;
4588 	}
4589 
4590 	bs->super_blob = blobid;
4591 
4592 	/* Read super block */
4593 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
4594 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
4595 				  _spdk_bs_set_super_read_cpl, ctx);
4596 }
4597 
4598 /* END spdk_bs_set_super */
4599 
4600 void
4601 spdk_bs_get_super(struct spdk_blob_store *bs,
4602 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4603 {
4604 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
4605 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
4606 	} else {
4607 		cb_fn(cb_arg, bs->super_blob, 0);
4608 	}
4609 }
4610 
4611 uint64_t
4612 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
4613 {
4614 	return bs->cluster_sz;
4615 }
4616 
4617 uint64_t
4618 spdk_bs_get_page_size(struct spdk_blob_store *bs)
4619 {
4620 	return SPDK_BS_PAGE_SIZE;
4621 }
4622 
4623 uint64_t
4624 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs)
4625 {
4626 	return bs->io_unit_size;
4627 }
4628 
4629 uint64_t
4630 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
4631 {
4632 	return bs->num_free_clusters;
4633 }
4634 
4635 uint64_t
4636 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
4637 {
4638 	return bs->total_data_clusters;
4639 }
4640 
4641 static int
4642 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
4643 {
4644 	bs->md_channel = spdk_get_io_channel(bs);
4645 	if (!bs->md_channel) {
4646 		SPDK_ERRLOG("Failed to get IO channel.\n");
4647 		return -1;
4648 	}
4649 
4650 	return 0;
4651 }
4652 
4653 static int
4654 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
4655 {
4656 	spdk_put_io_channel(bs->md_channel);
4657 
4658 	return 0;
4659 }
4660 
4661 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
4662 {
4663 	assert(blob != NULL);
4664 
4665 	return blob->id;
4666 }
4667 
4668 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
4669 {
4670 	assert(blob != NULL);
4671 
4672 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
4673 }
4674 
4675 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob)
4676 {
4677 	assert(blob != NULL);
4678 
4679 	return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs);
4680 }
4681 
4682 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
4683 {
4684 	assert(blob != NULL);
4685 
4686 	return blob->active.num_clusters;
4687 }
4688 
4689 /* START spdk_bs_create_blob */
4690 
4691 static void
4692 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4693 {
4694 	struct spdk_blob *blob = cb_arg;
4695 
4696 	_spdk_blob_free(blob);
4697 
4698 	spdk_bs_sequence_finish(seq, bserrno);
4699 }
4700 
4701 static int
4702 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs,
4703 		      bool internal)
4704 {
4705 	uint64_t i;
4706 	size_t value_len = 0;
4707 	int rc;
4708 	const void *value = NULL;
4709 	if (xattrs->count > 0 && xattrs->get_value == NULL) {
4710 		return -EINVAL;
4711 	}
4712 	for (i = 0; i < xattrs->count; i++) {
4713 		xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len);
4714 		if (value == NULL || value_len == 0) {
4715 			return -EINVAL;
4716 		}
4717 		rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal);
4718 		if (rc < 0) {
4719 			return rc;
4720 		}
4721 	}
4722 	return 0;
4723 }
4724 
4725 static void
4726 _spdk_bs_create_blob(struct spdk_blob_store *bs,
4727 		     const struct spdk_blob_opts *opts,
4728 		     const struct spdk_blob_xattr_opts *internal_xattrs,
4729 		     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4730 {
4731 	struct spdk_blob	*blob;
4732 	uint32_t		page_idx;
4733 	struct spdk_bs_cpl	cpl;
4734 	struct spdk_blob_opts	opts_default;
4735 	struct spdk_blob_xattr_opts internal_xattrs_default;
4736 	spdk_bs_sequence_t	*seq;
4737 	spdk_blob_id		id;
4738 	int rc;
4739 
4740 	assert(spdk_get_thread() == bs->md_thread);
4741 
4742 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
4743 	if (page_idx == UINT32_MAX) {
4744 		cb_fn(cb_arg, 0, -ENOMEM);
4745 		return;
4746 	}
4747 	spdk_bit_array_set(bs->used_blobids, page_idx);
4748 	_spdk_bs_claim_md_page(bs, page_idx);
4749 
4750 	id = _spdk_bs_page_to_blobid(page_idx);
4751 
4752 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
4753 
4754 	blob = _spdk_blob_alloc(bs, id);
4755 	if (!blob) {
4756 		cb_fn(cb_arg, 0, -ENOMEM);
4757 		return;
4758 	}
4759 
4760 	if (!opts) {
4761 		spdk_blob_opts_init(&opts_default);
4762 		opts = &opts_default;
4763 	}
4764 
4765 	blob->use_extent_table = opts->use_extent_table;
4766 
4767 	if (!internal_xattrs) {
4768 		_spdk_blob_xattrs_init(&internal_xattrs_default);
4769 		internal_xattrs = &internal_xattrs_default;
4770 	}
4771 
4772 	rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false);
4773 	if (rc < 0) {
4774 		_spdk_blob_free(blob);
4775 		cb_fn(cb_arg, 0, rc);
4776 		return;
4777 	}
4778 
4779 	rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true);
4780 	if (rc < 0) {
4781 		_spdk_blob_free(blob);
4782 		cb_fn(cb_arg, 0, rc);
4783 		return;
4784 	}
4785 
4786 	if (opts->thin_provision) {
4787 		_spdk_blob_set_thin_provision(blob);
4788 	}
4789 
4790 	_spdk_blob_set_clear_method(blob, opts->clear_method);
4791 
4792 	rc = _spdk_blob_resize(blob, opts->num_clusters);
4793 	if (rc < 0) {
4794 		_spdk_blob_free(blob);
4795 		cb_fn(cb_arg, 0, rc);
4796 		return;
4797 	}
4798 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4799 	cpl.u.blobid.cb_fn = cb_fn;
4800 	cpl.u.blobid.cb_arg = cb_arg;
4801 	cpl.u.blobid.blobid = blob->id;
4802 
4803 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4804 	if (!seq) {
4805 		_spdk_blob_free(blob);
4806 		cb_fn(cb_arg, 0, -ENOMEM);
4807 		return;
4808 	}
4809 
4810 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
4811 }
4812 
4813 void spdk_bs_create_blob(struct spdk_blob_store *bs,
4814 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4815 {
4816 	_spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg);
4817 }
4818 
4819 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
4820 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4821 {
4822 	_spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg);
4823 }
4824 
4825 /* END spdk_bs_create_blob */
4826 
4827 /* START blob_cleanup */
4828 
4829 struct spdk_clone_snapshot_ctx {
4830 	struct spdk_bs_cpl      cpl;
4831 	int bserrno;
4832 	bool frozen;
4833 
4834 	struct spdk_io_channel *channel;
4835 
4836 	/* Current cluster for inflate operation */
4837 	uint64_t cluster;
4838 
4839 	/* For inflation force allocation of all unallocated clusters and remove
4840 	 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */
4841 	bool allocate_all;
4842 
4843 	struct {
4844 		spdk_blob_id id;
4845 		struct spdk_blob *blob;
4846 	} original;
4847 	struct {
4848 		spdk_blob_id id;
4849 		struct spdk_blob *blob;
4850 	} new;
4851 
4852 	/* xattrs specified for snapshot/clones only. They have no impact on
4853 	 * the original blobs xattrs. */
4854 	const struct spdk_blob_xattr_opts *xattrs;
4855 };
4856 
4857 static void
4858 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno)
4859 {
4860 	struct spdk_clone_snapshot_ctx *ctx = cb_arg;
4861 	struct spdk_bs_cpl *cpl = &ctx->cpl;
4862 
4863 	if (bserrno != 0) {
4864 		if (ctx->bserrno != 0) {
4865 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4866 		} else {
4867 			ctx->bserrno = bserrno;
4868 		}
4869 	}
4870 
4871 	switch (cpl->type) {
4872 	case SPDK_BS_CPL_TYPE_BLOBID:
4873 		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno);
4874 		break;
4875 	case SPDK_BS_CPL_TYPE_BLOB_BASIC:
4876 		cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno);
4877 		break;
4878 	default:
4879 		SPDK_UNREACHABLE();
4880 		break;
4881 	}
4882 
4883 	free(ctx);
4884 }
4885 
4886 static void
4887 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno)
4888 {
4889 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4890 	struct spdk_blob *origblob = ctx->original.blob;
4891 
4892 	if (bserrno != 0) {
4893 		if (ctx->bserrno != 0) {
4894 			SPDK_ERRLOG("Unfreeze error %d\n", bserrno);
4895 		} else {
4896 			ctx->bserrno = bserrno;
4897 		}
4898 	}
4899 
4900 	ctx->original.id = origblob->id;
4901 	origblob->locked_operation_in_progress = false;
4902 
4903 	spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4904 }
4905 
4906 static void
4907 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
4908 {
4909 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4910 	struct spdk_blob *origblob = ctx->original.blob;
4911 
4912 	if (bserrno != 0) {
4913 		if (ctx->bserrno != 0) {
4914 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4915 		} else {
4916 			ctx->bserrno = bserrno;
4917 		}
4918 	}
4919 
4920 	if (ctx->frozen) {
4921 		/* Unfreeze any outstanding I/O */
4922 		_spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx);
4923 	} else {
4924 		_spdk_bs_snapshot_unfreeze_cpl(ctx, 0);
4925 	}
4926 
4927 }
4928 
4929 static void
4930 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno)
4931 {
4932 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4933 	struct spdk_blob *newblob = ctx->new.blob;
4934 
4935 	if (bserrno != 0) {
4936 		if (ctx->bserrno != 0) {
4937 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4938 		} else {
4939 			ctx->bserrno = bserrno;
4940 		}
4941 	}
4942 
4943 	ctx->new.id = newblob->id;
4944 	spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4945 }
4946 
4947 /* END blob_cleanup */
4948 
4949 /* START spdk_bs_create_snapshot */
4950 
4951 static void
4952 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2)
4953 {
4954 	uint64_t *cluster_temp;
4955 	uint32_t *extent_page_temp;
4956 
4957 	cluster_temp = blob1->active.clusters;
4958 	blob1->active.clusters = blob2->active.clusters;
4959 	blob2->active.clusters = cluster_temp;
4960 
4961 	extent_page_temp = blob1->active.extent_pages;
4962 	blob1->active.extent_pages = blob2->active.extent_pages;
4963 	blob2->active.extent_pages = extent_page_temp;
4964 }
4965 
4966 static void
4967 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno)
4968 {
4969 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4970 	struct spdk_blob *origblob = ctx->original.blob;
4971 	struct spdk_blob *newblob = ctx->new.blob;
4972 
4973 	if (bserrno != 0) {
4974 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4975 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4976 		return;
4977 	}
4978 
4979 	/* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */
4980 	bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true);
4981 	if (bserrno != 0) {
4982 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4983 		return;
4984 	}
4985 
4986 	_spdk_bs_blob_list_add(ctx->original.blob);
4987 
4988 	spdk_blob_set_read_only(newblob);
4989 
4990 	/* sync snapshot metadata */
4991 	spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4992 }
4993 
4994 static void
4995 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno)
4996 {
4997 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4998 	struct spdk_blob *origblob = ctx->original.blob;
4999 	struct spdk_blob *newblob = ctx->new.blob;
5000 
5001 	if (bserrno != 0) {
5002 		/* return cluster map back to original */
5003 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
5004 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
5005 		return;
5006 	}
5007 
5008 	/* Set internal xattr for snapshot id */
5009 	bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true);
5010 	if (bserrno != 0) {
5011 		/* return cluster map back to original */
5012 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
5013 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
5014 		return;
5015 	}
5016 
5017 	_spdk_bs_blob_list_remove(origblob);
5018 	origblob->parent_id = newblob->id;
5019 
5020 	/* Create new back_bs_dev for snapshot */
5021 	origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob);
5022 	if (origblob->back_bs_dev == NULL) {
5023 		/* return cluster map back to original */
5024 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
5025 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL);
5026 		return;
5027 	}
5028 
5029 	/* set clone blob as thin provisioned */
5030 	_spdk_blob_set_thin_provision(origblob);
5031 
5032 	_spdk_bs_blob_list_add(newblob);
5033 
5034 	/* sync clone metadata */
5035 	spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx);
5036 }
5037 
5038 static void
5039 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc)
5040 {
5041 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5042 	struct spdk_blob *origblob = ctx->original.blob;
5043 	struct spdk_blob *newblob = ctx->new.blob;
5044 	int bserrno;
5045 
5046 	if (rc != 0) {
5047 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc);
5048 		return;
5049 	}
5050 
5051 	ctx->frozen = true;
5052 
5053 	/* set new back_bs_dev for snapshot */
5054 	newblob->back_bs_dev = origblob->back_bs_dev;
5055 	/* Set invalid flags from origblob */
5056 	newblob->invalid_flags = origblob->invalid_flags;
5057 
5058 	/* inherit parent from original blob if set */
5059 	newblob->parent_id = origblob->parent_id;
5060 	if (origblob->parent_id != SPDK_BLOBID_INVALID) {
5061 		/* Set internal xattr for snapshot id */
5062 		bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT,
5063 					       &origblob->parent_id, sizeof(spdk_blob_id), true);
5064 		if (bserrno != 0) {
5065 			_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
5066 			return;
5067 		}
5068 	}
5069 
5070 	/* swap cluster maps */
5071 	_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
5072 
5073 	/* Set the clear method on the new blob to match the original. */
5074 	_spdk_blob_set_clear_method(newblob, origblob->clear_method);
5075 
5076 	/* sync snapshot metadata */
5077 	spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
5078 }
5079 
5080 static void
5081 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
5082 {
5083 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5084 	struct spdk_blob *origblob = ctx->original.blob;
5085 	struct spdk_blob *newblob = _blob;
5086 
5087 	if (bserrno != 0) {
5088 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
5089 		return;
5090 	}
5091 
5092 	ctx->new.blob = newblob;
5093 	assert(spdk_blob_is_thin_provisioned(newblob));
5094 	assert(spdk_mem_all_zero(newblob->active.clusters,
5095 				 newblob->active.num_clusters * sizeof(*newblob->active.clusters)));
5096 	assert(spdk_mem_all_zero(newblob->active.extent_pages,
5097 				 newblob->active.num_extent_pages * sizeof(*newblob->active.extent_pages)));
5098 
5099 	_spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx);
5100 }
5101 
5102 static void
5103 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
5104 {
5105 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5106 	struct spdk_blob *origblob = ctx->original.blob;
5107 
5108 	if (bserrno != 0) {
5109 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
5110 		return;
5111 	}
5112 
5113 	ctx->new.id = blobid;
5114 	ctx->cpl.u.blobid.blobid = blobid;
5115 
5116 	spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx);
5117 }
5118 
5119 
5120 static void
5121 _spdk_bs_xattr_snapshot(void *arg, const char *name,
5122 			const void **value, size_t *value_len)
5123 {
5124 	assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0);
5125 
5126 	struct spdk_blob *blob = (struct spdk_blob *)arg;
5127 	*value = &blob->id;
5128 	*value_len = sizeof(blob->id);
5129 }
5130 
5131 static void
5132 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
5133 {
5134 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5135 	struct spdk_blob_opts opts;
5136 	struct spdk_blob_xattr_opts internal_xattrs;
5137 	char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS };
5138 
5139 	if (bserrno != 0) {
5140 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
5141 		return;
5142 	}
5143 
5144 	ctx->original.blob = _blob;
5145 
5146 	if (_blob->data_ro || _blob->md_ro) {
5147 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n",
5148 			      _blob->id);
5149 		ctx->bserrno = -EINVAL;
5150 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
5151 		return;
5152 	}
5153 
5154 	if (_blob->locked_operation_in_progress) {
5155 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n");
5156 		ctx->bserrno = -EBUSY;
5157 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
5158 		return;
5159 	}
5160 
5161 	_blob->locked_operation_in_progress = true;
5162 
5163 	spdk_blob_opts_init(&opts);
5164 	_spdk_blob_xattrs_init(&internal_xattrs);
5165 
5166 	/* Change the size of new blob to the same as in original blob,
5167 	 * but do not allocate clusters */
5168 	opts.thin_provision = true;
5169 	opts.num_clusters = spdk_blob_get_num_clusters(_blob);
5170 	opts.use_extent_table = _blob->use_extent_table;
5171 
5172 	/* If there are any xattrs specified for snapshot, set them now */
5173 	if (ctx->xattrs) {
5174 		memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
5175 	}
5176 	/* Set internal xattr SNAPSHOT_IN_PROGRESS */
5177 	internal_xattrs.count = 1;
5178 	internal_xattrs.ctx = _blob;
5179 	internal_xattrs.names = xattrs_names;
5180 	internal_xattrs.get_value = _spdk_bs_xattr_snapshot;
5181 
5182 	_spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
5183 			     _spdk_bs_snapshot_newblob_create_cpl, ctx);
5184 }
5185 
5186 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
5187 			     const struct spdk_blob_xattr_opts *snapshot_xattrs,
5188 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
5189 {
5190 	struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
5191 
5192 	if (!ctx) {
5193 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
5194 		return;
5195 	}
5196 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
5197 	ctx->cpl.u.blobid.cb_fn = cb_fn;
5198 	ctx->cpl.u.blobid.cb_arg = cb_arg;
5199 	ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
5200 	ctx->bserrno = 0;
5201 	ctx->frozen = false;
5202 	ctx->original.id = blobid;
5203 	ctx->xattrs = snapshot_xattrs;
5204 
5205 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx);
5206 }
5207 /* END spdk_bs_create_snapshot */
5208 
5209 /* START spdk_bs_create_clone */
5210 
5211 static void
5212 _spdk_bs_xattr_clone(void *arg, const char *name,
5213 		     const void **value, size_t *value_len)
5214 {
5215 	assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0);
5216 
5217 	struct spdk_blob *blob = (struct spdk_blob *)arg;
5218 	*value = &blob->id;
5219 	*value_len = sizeof(blob->id);
5220 }
5221 
5222 static void
5223 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
5224 {
5225 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5226 	struct spdk_blob *clone = _blob;
5227 
5228 	ctx->new.blob = clone;
5229 	_spdk_bs_blob_list_add(clone);
5230 
5231 	spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
5232 }
5233 
5234 static void
5235 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
5236 {
5237 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5238 
5239 	ctx->cpl.u.blobid.blobid = blobid;
5240 	spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx);
5241 }
5242 
5243 static void
5244 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
5245 {
5246 	struct spdk_clone_snapshot_ctx	*ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5247 	struct spdk_blob_opts		opts;
5248 	struct spdk_blob_xattr_opts internal_xattrs;
5249 	char *xattr_names[] = { BLOB_SNAPSHOT };
5250 
5251 	if (bserrno != 0) {
5252 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
5253 		return;
5254 	}
5255 
5256 	ctx->original.blob = _blob;
5257 
5258 	if (!_blob->data_ro || !_blob->md_ro) {
5259 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n");
5260 		ctx->bserrno = -EINVAL;
5261 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
5262 		return;
5263 	}
5264 
5265 	if (_blob->locked_operation_in_progress) {
5266 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n");
5267 		ctx->bserrno = -EBUSY;
5268 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
5269 		return;
5270 	}
5271 
5272 	_blob->locked_operation_in_progress = true;
5273 
5274 	spdk_blob_opts_init(&opts);
5275 	_spdk_blob_xattrs_init(&internal_xattrs);
5276 
5277 	opts.thin_provision = true;
5278 	opts.num_clusters = spdk_blob_get_num_clusters(_blob);
5279 	opts.use_extent_table = _blob->use_extent_table;
5280 	if (ctx->xattrs) {
5281 		memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
5282 	}
5283 
5284 	/* Set internal xattr BLOB_SNAPSHOT */
5285 	internal_xattrs.count = 1;
5286 	internal_xattrs.ctx = _blob;
5287 	internal_xattrs.names = xattr_names;
5288 	internal_xattrs.get_value = _spdk_bs_xattr_clone;
5289 
5290 	_spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
5291 			     _spdk_bs_clone_newblob_create_cpl, ctx);
5292 }
5293 
5294 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid,
5295 			  const struct spdk_blob_xattr_opts *clone_xattrs,
5296 			  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
5297 {
5298 	struct spdk_clone_snapshot_ctx	*ctx = calloc(1, sizeof(*ctx));
5299 
5300 	if (!ctx) {
5301 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
5302 		return;
5303 	}
5304 
5305 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
5306 	ctx->cpl.u.blobid.cb_fn = cb_fn;
5307 	ctx->cpl.u.blobid.cb_arg = cb_arg;
5308 	ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
5309 	ctx->bserrno = 0;
5310 	ctx->xattrs = clone_xattrs;
5311 	ctx->original.id = blobid;
5312 
5313 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx);
5314 }
5315 
5316 /* END spdk_bs_create_clone */
5317 
5318 /* START spdk_bs_inflate_blob */
5319 
5320 static void
5321 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno)
5322 {
5323 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5324 	struct spdk_blob *_blob = ctx->original.blob;
5325 
5326 	if (bserrno != 0) {
5327 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
5328 		return;
5329 	}
5330 
5331 	assert(_parent != NULL);
5332 
5333 	_spdk_bs_blob_list_remove(_blob);
5334 	_blob->parent_id = _parent->id;
5335 	_spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id,
5336 			     sizeof(spdk_blob_id), true);
5337 
5338 	_blob->back_bs_dev->destroy(_blob->back_bs_dev);
5339 	_blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent);
5340 	_spdk_bs_blob_list_add(_blob);
5341 
5342 	spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
5343 }
5344 
5345 static void
5346 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno)
5347 {
5348 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5349 	struct spdk_blob *_blob = ctx->original.blob;
5350 	struct spdk_blob *_parent;
5351 
5352 	if (bserrno != 0) {
5353 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
5354 		return;
5355 	}
5356 
5357 	if (ctx->allocate_all) {
5358 		/* remove thin provisioning */
5359 		_spdk_bs_blob_list_remove(_blob);
5360 		_spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
5361 		_blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV;
5362 		_blob->back_bs_dev->destroy(_blob->back_bs_dev);
5363 		_blob->back_bs_dev = NULL;
5364 		_blob->parent_id = SPDK_BLOBID_INVALID;
5365 	} else {
5366 		_parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob;
5367 		if (_parent->parent_id != SPDK_BLOBID_INVALID) {
5368 			/* We must change the parent of the inflated blob */
5369 			spdk_bs_open_blob(_blob->bs, _parent->parent_id,
5370 					  _spdk_bs_inflate_blob_set_parent_cpl, ctx);
5371 			return;
5372 		}
5373 
5374 		_spdk_bs_blob_list_remove(_blob);
5375 		_spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
5376 		_blob->parent_id = SPDK_BLOBID_INVALID;
5377 		_blob->back_bs_dev->destroy(_blob->back_bs_dev);
5378 		_blob->back_bs_dev = spdk_bs_create_zeroes_dev();
5379 	}
5380 
5381 	_blob->state = SPDK_BLOB_STATE_DIRTY;
5382 	spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
5383 }
5384 
5385 /* Check if cluster needs allocation */
5386 static inline bool
5387 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all)
5388 {
5389 	struct spdk_blob_bs_dev *b;
5390 
5391 	assert(blob != NULL);
5392 
5393 	if (blob->active.clusters[cluster] != 0) {
5394 		/* Cluster is already allocated */
5395 		return false;
5396 	}
5397 
5398 	if (blob->parent_id == SPDK_BLOBID_INVALID) {
5399 		/* Blob have no parent blob */
5400 		return allocate_all;
5401 	}
5402 
5403 	b = (struct spdk_blob_bs_dev *)blob->back_bs_dev;
5404 	return (allocate_all || b->blob->active.clusters[cluster] != 0);
5405 }
5406 
5407 static void
5408 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno)
5409 {
5410 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5411 	struct spdk_blob *_blob = ctx->original.blob;
5412 	uint64_t offset;
5413 
5414 	if (bserrno != 0) {
5415 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
5416 		return;
5417 	}
5418 
5419 	for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) {
5420 		if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) {
5421 			break;
5422 		}
5423 	}
5424 
5425 	if (ctx->cluster < _blob->active.num_clusters) {
5426 		offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster);
5427 
5428 		/* We may safely increment a cluster before write */
5429 		ctx->cluster++;
5430 
5431 		/* Use zero length write to touch a cluster */
5432 		spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0,
5433 				   _spdk_bs_inflate_blob_touch_next, ctx);
5434 	} else {
5435 		_spdk_bs_inflate_blob_done(cb_arg, bserrno);
5436 	}
5437 }
5438 
5439 static void
5440 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
5441 {
5442 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
5443 	uint64_t lfc; /* lowest free cluster */
5444 	uint64_t i;
5445 
5446 	if (bserrno != 0) {
5447 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
5448 		return;
5449 	}
5450 
5451 	ctx->original.blob = _blob;
5452 
5453 	if (_blob->locked_operation_in_progress) {
5454 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n");
5455 		ctx->bserrno = -EBUSY;
5456 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
5457 		return;
5458 	}
5459 
5460 	_blob->locked_operation_in_progress = true;
5461 
5462 	if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) {
5463 		/* This blob have no parent, so we cannot decouple it. */
5464 		SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n");
5465 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
5466 		return;
5467 	}
5468 
5469 	if (spdk_blob_is_thin_provisioned(_blob) == false) {
5470 		/* This is not thin provisioned blob. No need to inflate. */
5471 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0);
5472 		return;
5473 	}
5474 
5475 	/* Do two passes - one to verify that we can obtain enough clusters
5476 	 * and another to actually claim them.
5477 	 */
5478 	lfc = 0;
5479 	for (i = 0; i < _blob->active.num_clusters; i++) {
5480 		if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) {
5481 			lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc);
5482 			if (lfc == UINT32_MAX) {
5483 				/* No more free clusters. Cannot satisfy the request */
5484 				_spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC);
5485 				return;
5486 			}
5487 			lfc++;
5488 		}
5489 	}
5490 
5491 	ctx->cluster = 0;
5492 	_spdk_bs_inflate_blob_touch_next(ctx, 0);
5493 }
5494 
5495 static void
5496 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5497 		      spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg)
5498 {
5499 	struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
5500 
5501 	if (!ctx) {
5502 		cb_fn(cb_arg, -ENOMEM);
5503 		return;
5504 	}
5505 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5506 	ctx->cpl.u.bs_basic.cb_fn = cb_fn;
5507 	ctx->cpl.u.bs_basic.cb_arg = cb_arg;
5508 	ctx->bserrno = 0;
5509 	ctx->original.id = blobid;
5510 	ctx->channel = channel;
5511 	ctx->allocate_all = allocate_all;
5512 
5513 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx);
5514 }
5515 
5516 void
5517 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5518 		     spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
5519 {
5520 	_spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg);
5521 }
5522 
5523 void
5524 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5525 			     spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
5526 {
5527 	_spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg);
5528 }
5529 /* END spdk_bs_inflate_blob */
5530 
5531 /* START spdk_blob_resize */
5532 struct spdk_bs_resize_ctx {
5533 	spdk_blob_op_complete cb_fn;
5534 	void *cb_arg;
5535 	struct spdk_blob *blob;
5536 	uint64_t sz;
5537 	int rc;
5538 };
5539 
5540 static void
5541 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc)
5542 {
5543 	struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
5544 
5545 	if (rc != 0) {
5546 		SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc);
5547 	}
5548 
5549 	if (ctx->rc != 0) {
5550 		SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc);
5551 		rc = ctx->rc;
5552 	}
5553 
5554 	ctx->blob->locked_operation_in_progress = false;
5555 
5556 	ctx->cb_fn(ctx->cb_arg, rc);
5557 	free(ctx);
5558 }
5559 
5560 static void
5561 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc)
5562 {
5563 	struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
5564 
5565 	if (rc != 0) {
5566 		ctx->blob->locked_operation_in_progress = false;
5567 		ctx->cb_fn(ctx->cb_arg, rc);
5568 		free(ctx);
5569 		return;
5570 	}
5571 
5572 	ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz);
5573 
5574 	_spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx);
5575 }
5576 
5577 void
5578 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg)
5579 {
5580 	struct spdk_bs_resize_ctx *ctx;
5581 
5582 	_spdk_blob_verify_md_op(blob);
5583 
5584 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
5585 
5586 	if (blob->md_ro) {
5587 		cb_fn(cb_arg, -EPERM);
5588 		return;
5589 	}
5590 
5591 	if (sz == blob->active.num_clusters) {
5592 		cb_fn(cb_arg, 0);
5593 		return;
5594 	}
5595 
5596 	if (blob->locked_operation_in_progress) {
5597 		cb_fn(cb_arg, -EBUSY);
5598 		return;
5599 	}
5600 
5601 	ctx = calloc(1, sizeof(*ctx));
5602 	if (!ctx) {
5603 		cb_fn(cb_arg, -ENOMEM);
5604 		return;
5605 	}
5606 
5607 	blob->locked_operation_in_progress = true;
5608 	ctx->cb_fn = cb_fn;
5609 	ctx->cb_arg = cb_arg;
5610 	ctx->blob = blob;
5611 	ctx->sz = sz;
5612 	_spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx);
5613 }
5614 
5615 /* END spdk_blob_resize */
5616 
5617 
5618 /* START spdk_bs_delete_blob */
5619 
5620 static void
5621 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
5622 {
5623 	spdk_bs_sequence_t *seq = cb_arg;
5624 
5625 	spdk_bs_sequence_finish(seq, bserrno);
5626 }
5627 
5628 static void
5629 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5630 {
5631 	struct spdk_blob *blob = cb_arg;
5632 
5633 	if (bserrno != 0) {
5634 		/*
5635 		 * We already removed this blob from the blobstore tailq, so
5636 		 *  we need to free it here since this is the last reference
5637 		 *  to it.
5638 		 */
5639 		_spdk_blob_free(blob);
5640 		_spdk_bs_delete_close_cpl(seq, bserrno);
5641 		return;
5642 	}
5643 
5644 	/*
5645 	 * This will immediately decrement the ref_count and call
5646 	 *  the completion routine since the metadata state is clean.
5647 	 *  By calling spdk_blob_close, we reduce the number of call
5648 	 *  points into code that touches the blob->open_ref count
5649 	 *  and the blobstore's blob list.
5650 	 */
5651 	spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq);
5652 }
5653 
5654 struct delete_snapshot_ctx {
5655 	struct spdk_blob_list *parent_snapshot_entry;
5656 	struct spdk_blob *snapshot;
5657 	bool snapshot_md_ro;
5658 	struct spdk_blob *clone;
5659 	bool clone_md_ro;
5660 	spdk_blob_op_with_handle_complete cb_fn;
5661 	void *cb_arg;
5662 	int bserrno;
5663 };
5664 
5665 static void
5666 _spdk_delete_blob_cleanup_finish(void *cb_arg, int bserrno)
5667 {
5668 	struct delete_snapshot_ctx *ctx = cb_arg;
5669 
5670 	if (bserrno != 0) {
5671 		SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno);
5672 	}
5673 
5674 	assert(ctx != NULL);
5675 
5676 	if (bserrno != 0 && ctx->bserrno == 0) {
5677 		ctx->bserrno = bserrno;
5678 	}
5679 
5680 	ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno);
5681 	free(ctx);
5682 }
5683 
5684 static void
5685 _spdk_delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno)
5686 {
5687 	struct delete_snapshot_ctx *ctx = cb_arg;
5688 
5689 	if (bserrno != 0) {
5690 		ctx->bserrno = bserrno;
5691 		SPDK_ERRLOG("Clone cleanup error %d\n", bserrno);
5692 	}
5693 
5694 	/* open_ref == 1 menas that only deletion context has opened this snapshot
5695 	 * open_ref == 2 menas that clone has opened this snapshot as well,
5696 	 * so we have to add it back to the blobs list */
5697 	if (ctx->snapshot->open_ref == 2) {
5698 		TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link);
5699 	}
5700 
5701 	ctx->snapshot->locked_operation_in_progress = false;
5702 	ctx->snapshot->md_ro = ctx->snapshot_md_ro;
5703 
5704 	spdk_blob_close(ctx->snapshot, _spdk_delete_blob_cleanup_finish, ctx);
5705 }
5706 
5707 static void
5708 _spdk_delete_snapshot_cleanup_clone(void *cb_arg, int bserrno)
5709 {
5710 	struct delete_snapshot_ctx *ctx = cb_arg;
5711 
5712 	ctx->clone->locked_operation_in_progress = false;
5713 	ctx->clone->md_ro = ctx->clone_md_ro;
5714 
5715 	spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx);
5716 }
5717 
5718 static void
5719 _spdk_delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno)
5720 {
5721 	struct delete_snapshot_ctx *ctx = cb_arg;
5722 
5723 	if (bserrno) {
5724 		ctx->bserrno = bserrno;
5725 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5726 		return;
5727 	}
5728 
5729 	ctx->clone->locked_operation_in_progress = false;
5730 	spdk_blob_close(ctx->clone, _spdk_delete_blob_cleanup_finish, ctx);
5731 }
5732 
5733 static void
5734 _spdk_delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno)
5735 {
5736 	struct delete_snapshot_ctx *ctx = cb_arg;
5737 	struct spdk_blob_list *parent_snapshot_entry = NULL;
5738 	struct spdk_blob_list *snapshot_entry = NULL;
5739 	struct spdk_blob_list *clone_entry = NULL;
5740 	struct spdk_blob_list *snapshot_clone_entry = NULL;
5741 
5742 	if (bserrno) {
5743 		SPDK_ERRLOG("Failed to sync MD on blob\n");
5744 		ctx->bserrno = bserrno;
5745 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5746 		return;
5747 	}
5748 
5749 	/* Get snapshot entry for the snapshot we want to remove */
5750 	snapshot_entry = _spdk_bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id);
5751 
5752 	assert(snapshot_entry != NULL);
5753 
5754 	/* Remove clone entry in this snapshot (at this point there can be only one clone) */
5755 	clone_entry = TAILQ_FIRST(&snapshot_entry->clones);
5756 	assert(clone_entry != NULL);
5757 	TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
5758 	snapshot_entry->clone_count--;
5759 	assert(TAILQ_EMPTY(&snapshot_entry->clones));
5760 
5761 	if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) {
5762 		/* This snapshot is at the same time a clone of another snapshot - we need to
5763 		 * update parent snapshot (remove current clone, add new one inherited from
5764 		 * the snapshot that is being removed) */
5765 
5766 		/* Get snapshot entry for parent snapshot and clone entry within that snapshot for
5767 		 * snapshot that we are removing */
5768 		_spdk_blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry,
5769 				&snapshot_clone_entry);
5770 
5771 		/* Switch clone entry in parent snapshot */
5772 		TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link);
5773 		TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link);
5774 		free(snapshot_clone_entry);
5775 	} else {
5776 		/* No parent snapshot - just remove clone entry */
5777 		free(clone_entry);
5778 	}
5779 
5780 	/* Restore md_ro flags */
5781 	ctx->clone->md_ro = ctx->clone_md_ro;
5782 	ctx->snapshot->md_ro = ctx->snapshot_md_ro;
5783 
5784 	_spdk_blob_unfreeze_io(ctx->clone, _spdk_delete_snapshot_unfreeze_cpl, ctx);
5785 }
5786 
5787 static void
5788 _spdk_delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno)
5789 {
5790 	struct delete_snapshot_ctx *ctx = cb_arg;
5791 	uint64_t i;
5792 
5793 	ctx->snapshot->md_ro = false;
5794 
5795 	if (bserrno) {
5796 		SPDK_ERRLOG("Failed to sync MD on clone\n");
5797 		ctx->bserrno = bserrno;
5798 
5799 		/* Restore snapshot to previous state */
5800 		bserrno = _spdk_blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true);
5801 		if (bserrno != 0) {
5802 			_spdk_delete_snapshot_cleanup_clone(ctx, bserrno);
5803 			return;
5804 		}
5805 
5806 		spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_cleanup_clone, ctx);
5807 		return;
5808 	}
5809 
5810 	/* Clear cluster map entries for snapshot */
5811 	for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) {
5812 		if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) {
5813 			ctx->snapshot->active.clusters[i] = 0;
5814 		}
5815 	}
5816 
5817 	ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY;
5818 
5819 	if (ctx->parent_snapshot_entry != NULL) {
5820 		ctx->snapshot->back_bs_dev = NULL;
5821 	}
5822 
5823 	spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_cpl, ctx);
5824 }
5825 
5826 static void
5827 _spdk_delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno)
5828 {
5829 	struct delete_snapshot_ctx *ctx = cb_arg;
5830 	uint64_t i;
5831 
5832 	/* Temporarily override md_ro flag for clone for MD modification */
5833 	ctx->clone_md_ro = ctx->clone->md_ro;
5834 	ctx->clone->md_ro = false;
5835 
5836 	if (bserrno) {
5837 		SPDK_ERRLOG("Failed to sync MD with xattr on blob\n");
5838 		ctx->bserrno = bserrno;
5839 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5840 		return;
5841 	}
5842 
5843 	/* Copy snapshot map to clone map (only unallocated clusters in clone) */
5844 	for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) {
5845 		if (ctx->clone->active.clusters[i] == 0) {
5846 			ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i];
5847 		}
5848 	}
5849 
5850 	/* Delete old backing bs_dev from clone (related to snapshot that will be removed) */
5851 	ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev);
5852 
5853 	/* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */
5854 	if (ctx->parent_snapshot_entry != NULL) {
5855 		/* ...to parent snapshot */
5856 		ctx->clone->parent_id = ctx->parent_snapshot_entry->id;
5857 		ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev;
5858 		_spdk_blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id,
5859 				     sizeof(spdk_blob_id),
5860 				     true);
5861 	} else {
5862 		/* ...to blobid invalid and zeroes dev */
5863 		ctx->clone->parent_id = SPDK_BLOBID_INVALID;
5864 		ctx->clone->back_bs_dev = spdk_bs_create_zeroes_dev();
5865 		_spdk_blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true);
5866 	}
5867 
5868 	spdk_blob_sync_md(ctx->clone, _spdk_delete_snapshot_sync_clone_cpl, ctx);
5869 }
5870 
5871 static void
5872 _spdk_delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno)
5873 {
5874 	struct delete_snapshot_ctx *ctx = cb_arg;
5875 
5876 	if (bserrno) {
5877 		SPDK_ERRLOG("Failed to freeze I/O on clone\n");
5878 		ctx->bserrno = bserrno;
5879 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5880 		return;
5881 	}
5882 
5883 	/* Temporarily override md_ro flag for snapshot for MD modification */
5884 	ctx->snapshot_md_ro = ctx->snapshot->md_ro;
5885 	ctx->snapshot->md_ro = false;
5886 
5887 	/* Mark blob as pending for removal for power failure safety, use clone id for recovery */
5888 	ctx->bserrno = _spdk_blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id,
5889 					    sizeof(spdk_blob_id), true);
5890 	if (ctx->bserrno != 0) {
5891 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5892 		return;
5893 	}
5894 
5895 	spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_xattr_cpl, ctx);
5896 }
5897 
5898 static void
5899 _spdk_delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno)
5900 {
5901 	struct delete_snapshot_ctx *ctx = cb_arg;
5902 
5903 	if (bserrno) {
5904 		SPDK_ERRLOG("Failed to open clone\n");
5905 		ctx->bserrno = bserrno;
5906 		_spdk_delete_snapshot_cleanup_snapshot(ctx, 0);
5907 		return;
5908 	}
5909 
5910 	ctx->clone = clone;
5911 
5912 	if (clone->locked_operation_in_progress) {
5913 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n");
5914 		ctx->bserrno = -EBUSY;
5915 		spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx);
5916 		return;
5917 	}
5918 
5919 	clone->locked_operation_in_progress = true;
5920 
5921 	_spdk_blob_freeze_io(clone, _spdk_delete_snapshot_freeze_io_cb, ctx);
5922 }
5923 
5924 static void
5925 _spdk_update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx)
5926 {
5927 	struct spdk_blob_list *snapshot_entry = NULL;
5928 	struct spdk_blob_list *clone_entry = NULL;
5929 	struct spdk_blob_list *snapshot_clone_entry = NULL;
5930 
5931 	/* Get snapshot entry for the snapshot we want to remove */
5932 	snapshot_entry = _spdk_bs_get_snapshot_entry(snapshot->bs, snapshot->id);
5933 
5934 	assert(snapshot_entry != NULL);
5935 
5936 	/* Get clone of the snapshot (at this point there can be only one clone) */
5937 	clone_entry = TAILQ_FIRST(&snapshot_entry->clones);
5938 	assert(snapshot_entry->clone_count == 1);
5939 	assert(clone_entry != NULL);
5940 
5941 	/* Get snapshot entry for parent snapshot and clone entry within that snapshot for
5942 	 * snapshot that we are removing */
5943 	_spdk_blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry,
5944 			&snapshot_clone_entry);
5945 
5946 	spdk_bs_open_blob(snapshot->bs, clone_entry->id, _spdk_delete_snapshot_open_clone_cb, ctx);
5947 }
5948 
5949 static void
5950 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno)
5951 {
5952 	spdk_bs_sequence_t *seq = cb_arg;
5953 	struct spdk_blob_list *snapshot_entry = NULL;
5954 	uint32_t page_num;
5955 
5956 	if (bserrno) {
5957 		SPDK_ERRLOG("Failed to remove blob\n");
5958 		spdk_bs_sequence_finish(seq, bserrno);
5959 		return;
5960 	}
5961 
5962 	/* Remove snapshot from the list */
5963 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
5964 	if (snapshot_entry != NULL) {
5965 		TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link);
5966 		free(snapshot_entry);
5967 	}
5968 
5969 	page_num = _spdk_bs_blobid_to_page(blob->id);
5970 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
5971 	blob->state = SPDK_BLOB_STATE_DIRTY;
5972 	blob->active.num_pages = 0;
5973 	_spdk_blob_resize(blob, 0);
5974 
5975 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob);
5976 }
5977 
5978 static int
5979 _spdk_bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone)
5980 {
5981 	struct spdk_blob_list *snapshot_entry = NULL;
5982 	struct spdk_blob_list *clone_entry = NULL;
5983 	struct spdk_blob *clone = NULL;
5984 	bool has_one_clone = false;
5985 
5986 	/* Check if this is a snapshot with clones */
5987 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
5988 	if (snapshot_entry != NULL) {
5989 		if (snapshot_entry->clone_count > 1) {
5990 			SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n");
5991 			return -EBUSY;
5992 		} else if (snapshot_entry->clone_count == 1) {
5993 			has_one_clone = true;
5994 		}
5995 	}
5996 
5997 	/* Check if someone has this blob open (besides this delete context):
5998 	 * - open_ref = 1 - only this context opened blob, so it is ok to remove it
5999 	 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot
6000 	 *	and that is ok, because we will update it accordingly */
6001 	if (blob->open_ref <= 2 && has_one_clone) {
6002 		clone_entry = TAILQ_FIRST(&snapshot_entry->clones);
6003 		assert(clone_entry != NULL);
6004 		clone = _spdk_blob_lookup(blob->bs, clone_entry->id);
6005 
6006 		if (blob->open_ref == 2 && clone == NULL) {
6007 			/* Clone is closed and someone else opened this blob */
6008 			SPDK_ERRLOG("Cannot remove snapshot because it is open\n");
6009 			return -EBUSY;
6010 		}
6011 
6012 		*update_clone = true;
6013 		return 0;
6014 	}
6015 
6016 	if (blob->open_ref > 1) {
6017 		SPDK_ERRLOG("Cannot remove snapshot because it is open\n");
6018 		return -EBUSY;
6019 	}
6020 
6021 	assert(has_one_clone == false);
6022 	*update_clone = false;
6023 	return 0;
6024 }
6025 
6026 static void
6027 _spdk_bs_delete_enomem_close_cpl(void *cb_arg, int bserrno)
6028 {
6029 	spdk_bs_sequence_t *seq = cb_arg;
6030 
6031 	spdk_bs_sequence_finish(seq, -ENOMEM);
6032 }
6033 
6034 static void
6035 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
6036 {
6037 	spdk_bs_sequence_t *seq = cb_arg;
6038 	struct delete_snapshot_ctx *ctx;
6039 	bool update_clone = false;
6040 
6041 	if (bserrno != 0) {
6042 		spdk_bs_sequence_finish(seq, bserrno);
6043 		return;
6044 	}
6045 
6046 	_spdk_blob_verify_md_op(blob);
6047 
6048 	ctx = calloc(1, sizeof(*ctx));
6049 	if (ctx == NULL) {
6050 		spdk_blob_close(blob, _spdk_bs_delete_enomem_close_cpl, seq);
6051 		return;
6052 	}
6053 
6054 	ctx->snapshot = blob;
6055 	ctx->cb_fn = _spdk_bs_delete_blob_finish;
6056 	ctx->cb_arg = seq;
6057 
6058 	/* Check if blob can be removed and if it is a snapshot with clone on top of it */
6059 	ctx->bserrno = _spdk_bs_is_blob_deletable(blob, &update_clone);
6060 	if (ctx->bserrno) {
6061 		spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx);
6062 		return;
6063 	}
6064 
6065 	if (blob->locked_operation_in_progress) {
6066 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n");
6067 		ctx->bserrno = -EBUSY;
6068 		spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx);
6069 		return;
6070 	}
6071 
6072 	blob->locked_operation_in_progress = true;
6073 
6074 	/*
6075 	 * Remove the blob from the blob_store list now, to ensure it does not
6076 	 *  get returned after this point by _spdk_blob_lookup().
6077 	 */
6078 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
6079 
6080 	if (update_clone) {
6081 		/* This blob is a snapshot with active clone - update clone first */
6082 		_spdk_update_clone_on_snapshot_deletion(blob, ctx);
6083 	} else {
6084 		/* This blob does not have any clones - just remove it */
6085 		_spdk_bs_blob_list_remove(blob);
6086 		_spdk_bs_delete_blob_finish(seq, blob, 0);
6087 		free(ctx);
6088 	}
6089 }
6090 
6091 void
6092 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
6093 		    spdk_blob_op_complete cb_fn, void *cb_arg)
6094 {
6095 	struct spdk_bs_cpl	cpl;
6096 	spdk_bs_sequence_t	*seq;
6097 
6098 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
6099 
6100 	assert(spdk_get_thread() == bs->md_thread);
6101 
6102 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
6103 	cpl.u.blob_basic.cb_fn = cb_fn;
6104 	cpl.u.blob_basic.cb_arg = cb_arg;
6105 
6106 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
6107 	if (!seq) {
6108 		cb_fn(cb_arg, -ENOMEM);
6109 		return;
6110 	}
6111 
6112 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
6113 }
6114 
6115 /* END spdk_bs_delete_blob */
6116 
6117 /* START spdk_bs_open_blob */
6118 
6119 static void
6120 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
6121 {
6122 	struct spdk_blob *blob = cb_arg;
6123 
6124 	if (bserrno != 0) {
6125 		_spdk_blob_free(blob);
6126 		seq->cpl.u.blob_handle.blob = NULL;
6127 		spdk_bs_sequence_finish(seq, bserrno);
6128 		return;
6129 	}
6130 
6131 	blob->open_ref++;
6132 
6133 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
6134 
6135 	spdk_bs_sequence_finish(seq, bserrno);
6136 }
6137 
6138 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
6139 			       struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
6140 {
6141 	struct spdk_blob		*blob;
6142 	struct spdk_bs_cpl		cpl;
6143 	struct spdk_blob_open_opts	opts_default;
6144 	spdk_bs_sequence_t		*seq;
6145 	uint32_t			page_num;
6146 
6147 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
6148 	assert(spdk_get_thread() == bs->md_thread);
6149 
6150 	page_num = _spdk_bs_blobid_to_page(blobid);
6151 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
6152 		/* Invalid blobid */
6153 		cb_fn(cb_arg, NULL, -ENOENT);
6154 		return;
6155 	}
6156 
6157 	blob = _spdk_blob_lookup(bs, blobid);
6158 	if (blob) {
6159 		blob->open_ref++;
6160 		cb_fn(cb_arg, blob, 0);
6161 		return;
6162 	}
6163 
6164 	blob = _spdk_blob_alloc(bs, blobid);
6165 	if (!blob) {
6166 		cb_fn(cb_arg, NULL, -ENOMEM);
6167 		return;
6168 	}
6169 
6170 	if (!opts) {
6171 		spdk_blob_open_opts_init(&opts_default);
6172 		opts = &opts_default;
6173 	}
6174 
6175 	blob->clear_method = opts->clear_method;
6176 
6177 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
6178 	cpl.u.blob_handle.cb_fn = cb_fn;
6179 	cpl.u.blob_handle.cb_arg = cb_arg;
6180 	cpl.u.blob_handle.blob = blob;
6181 
6182 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
6183 	if (!seq) {
6184 		_spdk_blob_free(blob);
6185 		cb_fn(cb_arg, NULL, -ENOMEM);
6186 		return;
6187 	}
6188 
6189 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
6190 }
6191 
6192 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
6193 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
6194 {
6195 	_spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg);
6196 }
6197 
6198 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid,
6199 			   struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
6200 {
6201 	_spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg);
6202 }
6203 
6204 /* END spdk_bs_open_blob */
6205 
6206 /* START spdk_blob_set_read_only */
6207 int spdk_blob_set_read_only(struct spdk_blob *blob)
6208 {
6209 	_spdk_blob_verify_md_op(blob);
6210 
6211 	blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
6212 
6213 	blob->state = SPDK_BLOB_STATE_DIRTY;
6214 	return 0;
6215 }
6216 /* END spdk_blob_set_read_only */
6217 
6218 /* START spdk_blob_sync_md */
6219 
6220 static void
6221 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
6222 {
6223 	struct spdk_blob *blob = cb_arg;
6224 
6225 	if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
6226 		blob->data_ro = true;
6227 		blob->md_ro = true;
6228 	}
6229 
6230 	spdk_bs_sequence_finish(seq, bserrno);
6231 }
6232 
6233 static void
6234 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
6235 {
6236 	struct spdk_bs_cpl	cpl;
6237 	spdk_bs_sequence_t	*seq;
6238 
6239 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
6240 	cpl.u.blob_basic.cb_fn = cb_fn;
6241 	cpl.u.blob_basic.cb_arg = cb_arg;
6242 
6243 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
6244 	if (!seq) {
6245 		cb_fn(cb_arg, -ENOMEM);
6246 		return;
6247 	}
6248 
6249 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
6250 }
6251 
6252 void
6253 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
6254 {
6255 	_spdk_blob_verify_md_op(blob);
6256 
6257 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
6258 
6259 	if (blob->md_ro) {
6260 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
6261 		cb_fn(cb_arg, 0);
6262 		return;
6263 	}
6264 
6265 	_spdk_blob_sync_md(blob, cb_fn, cb_arg);
6266 }
6267 
6268 /* END spdk_blob_sync_md */
6269 
6270 struct spdk_blob_insert_cluster_ctx {
6271 	struct spdk_thread	*thread;
6272 	struct spdk_blob	*blob;
6273 	uint32_t		cluster_num;	/* cluster index in blob */
6274 	uint32_t		cluster;	/* cluster on disk */
6275 	uint32_t		extent_page;	/* extent page on disk */
6276 	int			rc;
6277 	spdk_blob_op_complete	cb_fn;
6278 	void			*cb_arg;
6279 };
6280 
6281 static void
6282 _spdk_blob_insert_cluster_msg_cpl(void *arg)
6283 {
6284 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
6285 
6286 	ctx->cb_fn(ctx->cb_arg, ctx->rc);
6287 	free(ctx);
6288 }
6289 
6290 static void
6291 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
6292 {
6293 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
6294 
6295 	ctx->rc = bserrno;
6296 	spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
6297 }
6298 
6299 static void
6300 _spdk_blob_persist_extent_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
6301 {
6302 	struct spdk_blob_md_page        *page = cb_arg;
6303 
6304 	spdk_bs_sequence_finish(seq, bserrno);
6305 	spdk_free(page);
6306 }
6307 
6308 static void
6309 _spdk_blob_insert_extent(struct spdk_blob *blob, uint32_t extent, uint64_t cluster_num,
6310 			 spdk_blob_op_complete cb_fn, void *cb_arg)
6311 {
6312 	spdk_bs_sequence_t		*seq;
6313 	struct spdk_bs_cpl		cpl;
6314 	struct spdk_blob_md_page	*page = NULL;
6315 	uint32_t			page_count = 0;
6316 	int				rc;
6317 
6318 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
6319 	cpl.u.blob_basic.cb_fn = cb_fn;
6320 	cpl.u.blob_basic.cb_arg = cb_arg;
6321 
6322 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
6323 	if (!seq) {
6324 		cb_fn(cb_arg, -ENOMEM);
6325 		return;
6326 	}
6327 	rc = _spdk_blob_serialize_add_page(blob, &page, &page_count, &page);
6328 	if (rc < 0) {
6329 		spdk_bs_sequence_finish(seq, rc);
6330 		return;
6331 	}
6332 
6333 	_spdk_blob_serialize_extent_page(blob, cluster_num, page);
6334 
6335 	page->crc = _spdk_blob_md_page_calc_crc(page);
6336 
6337 	assert(spdk_bit_array_get(blob->bs->used_md_pages, extent) == true);
6338 
6339 	spdk_bs_sequence_write_dev(seq, page, _spdk_bs_md_page_to_lba(blob->bs, extent),
6340 				   _spdk_bs_byte_to_lba(blob->bs, SPDK_BS_PAGE_SIZE),
6341 				   _spdk_blob_persist_extent_page_cpl, page);
6342 }
6343 
6344 static void
6345 _spdk_blob_insert_cluster_msg(void *arg)
6346 {
6347 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
6348 	uint32_t *extent_page = _spdk_bs_cluster_to_extent_page(ctx->blob, ctx->cluster_num);
6349 
6350 	ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
6351 	if (ctx->rc != 0) {
6352 		spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
6353 		return;
6354 	}
6355 
6356 	if (extent_page == NULL) {
6357 		/* Extent page are not used, proceed with sync of md that will contain Extents RLE */
6358 		ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
6359 		_spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
6360 	} else if (*extent_page == 0) {
6361 		/* Extent page requires allocation.
6362 		 * It was already claimed in the used_md_pages map and placed in ctx.
6363 		 * Blob persist will take care of writing out new extent page on disk. */
6364 		assert(ctx->extent_page != 0);
6365 		assert(spdk_bit_array_get(ctx->blob->bs->used_md_pages, ctx->extent_page) == true);
6366 		*extent_page = ctx->extent_page;
6367 		ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
6368 		_spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
6369 	} else {
6370 		assert(ctx->extent_page == 0);
6371 		/* Extent page already allocated.
6372 		 * Every cluster allocation, requires just an update of single extent page. */
6373 		_spdk_blob_insert_extent(ctx->blob, ctx->extent_page, ctx->cluster_num,
6374 					 _spdk_blob_insert_cluster_msg_cb, ctx);
6375 	}
6376 }
6377 
6378 static void
6379 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
6380 				       uint64_t cluster, uint32_t extent_page, spdk_blob_op_complete cb_fn, void *cb_arg)
6381 {
6382 	struct spdk_blob_insert_cluster_ctx *ctx;
6383 
6384 	ctx = calloc(1, sizeof(*ctx));
6385 	if (ctx == NULL) {
6386 		cb_fn(cb_arg, -ENOMEM);
6387 		return;
6388 	}
6389 
6390 	ctx->thread = spdk_get_thread();
6391 	ctx->blob = blob;
6392 	ctx->cluster_num = cluster_num;
6393 	ctx->cluster = cluster;
6394 	ctx->extent_page = extent_page;
6395 	ctx->cb_fn = cb_fn;
6396 	ctx->cb_arg = cb_arg;
6397 
6398 	spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
6399 }
6400 
6401 /* START spdk_blob_close */
6402 
6403 static void
6404 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
6405 {
6406 	struct spdk_blob *blob = cb_arg;
6407 
6408 	if (bserrno == 0) {
6409 		blob->open_ref--;
6410 		if (blob->open_ref == 0) {
6411 			/*
6412 			 * Blobs with active.num_pages == 0 are deleted blobs.
6413 			 *  these blobs are removed from the blob_store list
6414 			 *  when the deletion process starts - so don't try to
6415 			 *  remove them again.
6416 			 */
6417 			if (blob->active.num_pages > 0) {
6418 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
6419 			}
6420 			_spdk_blob_free(blob);
6421 		}
6422 	}
6423 
6424 	spdk_bs_sequence_finish(seq, bserrno);
6425 }
6426 
6427 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
6428 {
6429 	struct spdk_bs_cpl	cpl;
6430 	spdk_bs_sequence_t	*seq;
6431 
6432 	_spdk_blob_verify_md_op(blob);
6433 
6434 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
6435 
6436 	if (blob->open_ref == 0) {
6437 		cb_fn(cb_arg, -EBADF);
6438 		return;
6439 	}
6440 
6441 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
6442 	cpl.u.blob_basic.cb_fn = cb_fn;
6443 	cpl.u.blob_basic.cb_arg = cb_arg;
6444 
6445 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
6446 	if (!seq) {
6447 		cb_fn(cb_arg, -ENOMEM);
6448 		return;
6449 	}
6450 
6451 	/* Sync metadata */
6452 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
6453 }
6454 
6455 /* END spdk_blob_close */
6456 
6457 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
6458 {
6459 	return spdk_get_io_channel(bs);
6460 }
6461 
6462 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
6463 {
6464 	spdk_put_io_channel(channel);
6465 }
6466 
6467 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel,
6468 			uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
6469 {
6470 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
6471 				     SPDK_BLOB_UNMAP);
6472 }
6473 
6474 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel,
6475 			       uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
6476 {
6477 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
6478 				     SPDK_BLOB_WRITE_ZEROES);
6479 }
6480 
6481 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel,
6482 			void *payload, uint64_t offset, uint64_t length,
6483 			spdk_blob_op_complete cb_fn, void *cb_arg)
6484 {
6485 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
6486 				     SPDK_BLOB_WRITE);
6487 }
6488 
6489 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel,
6490 		       void *payload, uint64_t offset, uint64_t length,
6491 		       spdk_blob_op_complete cb_fn, void *cb_arg)
6492 {
6493 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
6494 				     SPDK_BLOB_READ);
6495 }
6496 
6497 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel,
6498 			 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
6499 			 spdk_blob_op_complete cb_fn, void *cb_arg)
6500 {
6501 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
6502 }
6503 
6504 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel,
6505 			struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
6506 			spdk_blob_op_complete cb_fn, void *cb_arg)
6507 {
6508 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
6509 }
6510 
6511 struct spdk_bs_iter_ctx {
6512 	int64_t page_num;
6513 	struct spdk_blob_store *bs;
6514 
6515 	spdk_blob_op_with_handle_complete cb_fn;
6516 	void *cb_arg;
6517 };
6518 
6519 static void
6520 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
6521 {
6522 	struct spdk_bs_iter_ctx *ctx = cb_arg;
6523 	struct spdk_blob_store *bs = ctx->bs;
6524 	spdk_blob_id id;
6525 
6526 	if (bserrno == 0) {
6527 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
6528 		free(ctx);
6529 		return;
6530 	}
6531 
6532 	ctx->page_num++;
6533 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
6534 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
6535 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
6536 		free(ctx);
6537 		return;
6538 	}
6539 
6540 	id = _spdk_bs_page_to_blobid(ctx->page_num);
6541 
6542 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
6543 }
6544 
6545 void
6546 spdk_bs_iter_first(struct spdk_blob_store *bs,
6547 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
6548 {
6549 	struct spdk_bs_iter_ctx *ctx;
6550 
6551 	ctx = calloc(1, sizeof(*ctx));
6552 	if (!ctx) {
6553 		cb_fn(cb_arg, NULL, -ENOMEM);
6554 		return;
6555 	}
6556 
6557 	ctx->page_num = -1;
6558 	ctx->bs = bs;
6559 	ctx->cb_fn = cb_fn;
6560 	ctx->cb_arg = cb_arg;
6561 
6562 	_spdk_bs_iter_cpl(ctx, NULL, -1);
6563 }
6564 
6565 static void
6566 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
6567 {
6568 	struct spdk_bs_iter_ctx *ctx = cb_arg;
6569 
6570 	_spdk_bs_iter_cpl(ctx, NULL, -1);
6571 }
6572 
6573 void
6574 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob,
6575 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
6576 {
6577 	struct spdk_bs_iter_ctx *ctx;
6578 
6579 	assert(blob != NULL);
6580 
6581 	ctx = calloc(1, sizeof(*ctx));
6582 	if (!ctx) {
6583 		cb_fn(cb_arg, NULL, -ENOMEM);
6584 		return;
6585 	}
6586 
6587 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
6588 	ctx->bs = bs;
6589 	ctx->cb_fn = cb_fn;
6590 	ctx->cb_arg = cb_arg;
6591 
6592 	/* Close the existing blob */
6593 	spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx);
6594 }
6595 
6596 static int
6597 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
6598 		     uint16_t value_len, bool internal)
6599 {
6600 	struct spdk_xattr_tailq *xattrs;
6601 	struct spdk_xattr	*xattr;
6602 	size_t			desc_size;
6603 
6604 	_spdk_blob_verify_md_op(blob);
6605 
6606 	if (blob->md_ro) {
6607 		return -EPERM;
6608 	}
6609 
6610 	desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len;
6611 	if (desc_size > SPDK_BS_MAX_DESC_SIZE) {
6612 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name,
6613 			      desc_size, SPDK_BS_MAX_DESC_SIZE);
6614 		return -ENOMEM;
6615 	}
6616 
6617 	if (internal) {
6618 		xattrs = &blob->xattrs_internal;
6619 		blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR;
6620 	} else {
6621 		xattrs = &blob->xattrs;
6622 	}
6623 
6624 	TAILQ_FOREACH(xattr, xattrs, link) {
6625 		if (!strcmp(name, xattr->name)) {
6626 			free(xattr->value);
6627 			xattr->value_len = value_len;
6628 			xattr->value = malloc(value_len);
6629 			memcpy(xattr->value, value, value_len);
6630 
6631 			blob->state = SPDK_BLOB_STATE_DIRTY;
6632 
6633 			return 0;
6634 		}
6635 	}
6636 
6637 	xattr = calloc(1, sizeof(*xattr));
6638 	if (!xattr) {
6639 		return -ENOMEM;
6640 	}
6641 	xattr->name = strdup(name);
6642 	xattr->value_len = value_len;
6643 	xattr->value = malloc(value_len);
6644 	memcpy(xattr->value, value, value_len);
6645 	TAILQ_INSERT_TAIL(xattrs, xattr, link);
6646 
6647 	blob->state = SPDK_BLOB_STATE_DIRTY;
6648 
6649 	return 0;
6650 }
6651 
6652 int
6653 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
6654 		    uint16_t value_len)
6655 {
6656 	return _spdk_blob_set_xattr(blob, name, value, value_len, false);
6657 }
6658 
6659 static int
6660 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal)
6661 {
6662 	struct spdk_xattr_tailq *xattrs;
6663 	struct spdk_xattr	*xattr;
6664 
6665 	_spdk_blob_verify_md_op(blob);
6666 
6667 	if (blob->md_ro) {
6668 		return -EPERM;
6669 	}
6670 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
6671 
6672 	TAILQ_FOREACH(xattr, xattrs, link) {
6673 		if (!strcmp(name, xattr->name)) {
6674 			TAILQ_REMOVE(xattrs, xattr, link);
6675 			free(xattr->value);
6676 			free(xattr->name);
6677 			free(xattr);
6678 
6679 			if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) {
6680 				blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR;
6681 			}
6682 			blob->state = SPDK_BLOB_STATE_DIRTY;
6683 
6684 			return 0;
6685 		}
6686 	}
6687 
6688 	return -ENOENT;
6689 }
6690 
6691 int
6692 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name)
6693 {
6694 	return _spdk_blob_remove_xattr(blob, name, false);
6695 }
6696 
6697 static int
6698 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
6699 			   const void **value, size_t *value_len, bool internal)
6700 {
6701 	struct spdk_xattr	*xattr;
6702 	struct spdk_xattr_tailq *xattrs;
6703 
6704 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
6705 
6706 	TAILQ_FOREACH(xattr, xattrs, link) {
6707 		if (!strcmp(name, xattr->name)) {
6708 			*value = xattr->value;
6709 			*value_len = xattr->value_len;
6710 			return 0;
6711 		}
6712 	}
6713 	return -ENOENT;
6714 }
6715 
6716 int
6717 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
6718 			  const void **value, size_t *value_len)
6719 {
6720 	_spdk_blob_verify_md_op(blob);
6721 
6722 	return _spdk_blob_get_xattr_value(blob, name, value, value_len, false);
6723 }
6724 
6725 struct spdk_xattr_names {
6726 	uint32_t	count;
6727 	const char	*names[0];
6728 };
6729 
6730 static int
6731 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names)
6732 {
6733 	struct spdk_xattr	*xattr;
6734 	int			count = 0;
6735 
6736 	TAILQ_FOREACH(xattr, xattrs, link) {
6737 		count++;
6738 	}
6739 
6740 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
6741 	if (*names == NULL) {
6742 		return -ENOMEM;
6743 	}
6744 
6745 	TAILQ_FOREACH(xattr, xattrs, link) {
6746 		(*names)->names[(*names)->count++] = xattr->name;
6747 	}
6748 
6749 	return 0;
6750 }
6751 
6752 int
6753 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names)
6754 {
6755 	_spdk_blob_verify_md_op(blob);
6756 
6757 	return _spdk_blob_get_xattr_names(&blob->xattrs, names);
6758 }
6759 
6760 uint32_t
6761 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
6762 {
6763 	assert(names != NULL);
6764 
6765 	return names->count;
6766 }
6767 
6768 const char *
6769 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
6770 {
6771 	if (index >= names->count) {
6772 		return NULL;
6773 	}
6774 
6775 	return names->names[index];
6776 }
6777 
6778 void
6779 spdk_xattr_names_free(struct spdk_xattr_names *names)
6780 {
6781 	free(names);
6782 }
6783 
6784 struct spdk_bs_type
6785 spdk_bs_get_bstype(struct spdk_blob_store *bs)
6786 {
6787 	return bs->bstype;
6788 }
6789 
6790 void
6791 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
6792 {
6793 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
6794 }
6795 
6796 bool
6797 spdk_blob_is_read_only(struct spdk_blob *blob)
6798 {
6799 	assert(blob != NULL);
6800 	return (blob->data_ro || blob->md_ro);
6801 }
6802 
6803 bool
6804 spdk_blob_is_snapshot(struct spdk_blob *blob)
6805 {
6806 	struct spdk_blob_list *snapshot_entry;
6807 
6808 	assert(blob != NULL);
6809 
6810 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
6811 	if (snapshot_entry == NULL) {
6812 		return false;
6813 	}
6814 
6815 	return true;
6816 }
6817 
6818 bool
6819 spdk_blob_is_clone(struct spdk_blob *blob)
6820 {
6821 	assert(blob != NULL);
6822 
6823 	if (blob->parent_id != SPDK_BLOBID_INVALID) {
6824 		assert(spdk_blob_is_thin_provisioned(blob));
6825 		return true;
6826 	}
6827 
6828 	return false;
6829 }
6830 
6831 bool
6832 spdk_blob_is_thin_provisioned(struct spdk_blob *blob)
6833 {
6834 	assert(blob != NULL);
6835 	return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV);
6836 }
6837 
6838 static void
6839 _spdk_blob_update_clear_method(struct spdk_blob *blob)
6840 {
6841 	enum blob_clear_method stored_cm;
6842 
6843 	assert(blob != NULL);
6844 
6845 	/* If BLOB_CLEAR_WITH_DEFAULT was passed in, use the setting stored
6846 	 * in metadata previously.  If something other than the default was
6847 	 * specified, ignore stored value and used what was passed in.
6848 	 */
6849 	stored_cm = ((blob->md_ro_flags & SPDK_BLOB_CLEAR_METHOD) >> SPDK_BLOB_CLEAR_METHOD_SHIFT);
6850 
6851 	if (blob->clear_method == BLOB_CLEAR_WITH_DEFAULT) {
6852 		blob->clear_method = stored_cm;
6853 	} else if (blob->clear_method != stored_cm) {
6854 		SPDK_WARNLOG("Using passed in clear method 0x%x instead of stored value of 0x%x\n",
6855 			     blob->clear_method, stored_cm);
6856 	}
6857 }
6858 
6859 spdk_blob_id
6860 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id)
6861 {
6862 	struct spdk_blob_list *snapshot_entry = NULL;
6863 	struct spdk_blob_list *clone_entry = NULL;
6864 
6865 	TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
6866 		TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
6867 			if (clone_entry->id == blob_id) {
6868 				return snapshot_entry->id;
6869 			}
6870 		}
6871 	}
6872 
6873 	return SPDK_BLOBID_INVALID;
6874 }
6875 
6876 int
6877 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids,
6878 		     size_t *count)
6879 {
6880 	struct spdk_blob_list *snapshot_entry, *clone_entry;
6881 	size_t n;
6882 
6883 	snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid);
6884 	if (snapshot_entry == NULL) {
6885 		*count = 0;
6886 		return 0;
6887 	}
6888 
6889 	if (ids == NULL || *count < snapshot_entry->clone_count) {
6890 		*count = snapshot_entry->clone_count;
6891 		return -ENOMEM;
6892 	}
6893 	*count = snapshot_entry->clone_count;
6894 
6895 	n = 0;
6896 	TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
6897 		ids[n++] = clone_entry->id;
6898 	}
6899 
6900 	return 0;
6901 }
6902 
6903 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
6904