xref: /spdk/lib/blob/blobstore.c (revision 3219bc9a80bb834322bdeb419603aa28ad3927d6)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/thread.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 #include "spdk/util.h"
44 #include "spdk/string.h"
45 
46 #include "spdk_internal/assert.h"
47 #include "spdk_internal/log.h"
48 
49 #include "blobstore.h"
50 
51 #define BLOB_CRC32C_INITIAL    0xffffffffUL
52 
53 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
54 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
55 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
56 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
57 		uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg);
58 
59 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
60 				uint16_t value_len, bool internal);
61 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
62 				      const void **value, size_t *value_len, bool internal);
63 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
64 
65 static void
66 _spdk_blob_verify_md_op(struct spdk_blob *blob)
67 {
68 	assert(blob != NULL);
69 	assert(spdk_get_thread() == blob->bs->md_thread);
70 	assert(blob->state != SPDK_BLOB_STATE_LOADING);
71 }
72 
73 static struct spdk_blob_list *
74 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid)
75 {
76 	struct spdk_blob_list *snapshot_entry = NULL;
77 
78 	TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
79 		if (snapshot_entry->id == blobid) {
80 			break;
81 		}
82 	}
83 
84 	return snapshot_entry;
85 }
86 
87 static void
88 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
89 {
90 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
91 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
92 	assert(bs->num_free_clusters > 0);
93 
94 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
95 
96 	spdk_bit_array_set(bs->used_clusters, cluster_num);
97 	bs->num_free_clusters--;
98 }
99 
100 static int
101 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster)
102 {
103 	uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
104 
105 	_spdk_blob_verify_md_op(blob);
106 
107 	if (*cluster_lba != 0) {
108 		return -EEXIST;
109 	}
110 
111 	*cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
112 	return 0;
113 }
114 
115 static int
116 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
117 			  uint64_t *lowest_free_cluster, bool update_map)
118 {
119 	pthread_mutex_lock(&blob->bs->used_clusters_mutex);
120 	*lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
121 			       *lowest_free_cluster);
122 	if (*lowest_free_cluster == UINT32_MAX) {
123 		/* No more free clusters. Cannot satisfy the request */
124 		pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
125 		return -ENOSPC;
126 	}
127 
128 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
129 	_spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
130 	pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
131 
132 	if (update_map) {
133 		_spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
134 	}
135 
136 	return 0;
137 }
138 
139 static void
140 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
141 {
142 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
143 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
144 	assert(bs->num_free_clusters < bs->total_clusters);
145 
146 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
147 
148 	pthread_mutex_lock(&bs->used_clusters_mutex);
149 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
150 	bs->num_free_clusters++;
151 	pthread_mutex_unlock(&bs->used_clusters_mutex);
152 }
153 
154 static void
155 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs)
156 {
157 	xattrs->count = 0;
158 	xattrs->names = NULL;
159 	xattrs->ctx = NULL;
160 	xattrs->get_value = NULL;
161 }
162 
163 void
164 spdk_blob_opts_init(struct spdk_blob_opts *opts)
165 {
166 	opts->num_clusters = 0;
167 	opts->thin_provision = false;
168 	_spdk_blob_xattrs_init(&opts->xattrs);
169 }
170 
171 void
172 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts)
173 {
174 	opts->clear_method = BLOB_CLEAR_WITH_DEFAULT;
175 }
176 
177 static struct spdk_blob *
178 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
179 {
180 	struct spdk_blob *blob;
181 
182 	blob = calloc(1, sizeof(*blob));
183 	if (!blob) {
184 		return NULL;
185 	}
186 
187 	blob->id = id;
188 	blob->bs = bs;
189 
190 	blob->parent_id = SPDK_BLOBID_INVALID;
191 
192 	blob->state = SPDK_BLOB_STATE_DIRTY;
193 	blob->active.num_pages = 1;
194 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
195 	if (!blob->active.pages) {
196 		free(blob);
197 		return NULL;
198 	}
199 
200 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
201 
202 	TAILQ_INIT(&blob->xattrs);
203 	TAILQ_INIT(&blob->xattrs_internal);
204 
205 	return blob;
206 }
207 
208 static void
209 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
210 {
211 	struct spdk_xattr	*xattr, *xattr_tmp;
212 
213 	TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) {
214 		TAILQ_REMOVE(xattrs, xattr, link);
215 		free(xattr->name);
216 		free(xattr->value);
217 		free(xattr);
218 	}
219 }
220 
221 static void
222 _spdk_blob_free(struct spdk_blob *blob)
223 {
224 	assert(blob != NULL);
225 
226 	free(blob->active.clusters);
227 	free(blob->clean.clusters);
228 	free(blob->active.pages);
229 	free(blob->clean.pages);
230 
231 	_spdk_xattrs_free(&blob->xattrs);
232 	_spdk_xattrs_free(&blob->xattrs_internal);
233 
234 	if (blob->back_bs_dev) {
235 		blob->back_bs_dev->destroy(blob->back_bs_dev);
236 	}
237 
238 	free(blob);
239 }
240 
241 struct freeze_io_ctx {
242 	struct spdk_bs_cpl cpl;
243 	struct spdk_blob *blob;
244 };
245 
246 static void
247 _spdk_blob_io_sync(struct spdk_io_channel_iter *i)
248 {
249 	spdk_for_each_channel_continue(i, 0);
250 }
251 
252 static void
253 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i)
254 {
255 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
256 	struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch);
257 	struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
258 	struct spdk_bs_request_set	*set;
259 	struct spdk_bs_user_op_args	*args;
260 	spdk_bs_user_op_t *op, *tmp;
261 
262 	TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) {
263 		set = (struct spdk_bs_request_set *)op;
264 		args = &set->u.user_op;
265 
266 		if (args->blob == ctx->blob) {
267 			TAILQ_REMOVE(&ch->queued_io, op, link);
268 			spdk_bs_user_op_execute(op);
269 		}
270 	}
271 
272 	spdk_for_each_channel_continue(i, 0);
273 }
274 
275 static void
276 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status)
277 {
278 	struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
279 
280 	ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0);
281 
282 	free(ctx);
283 }
284 
285 static void
286 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
287 {
288 	struct freeze_io_ctx *ctx;
289 
290 	ctx = calloc(1, sizeof(*ctx));
291 	if (!ctx) {
292 		cb_fn(cb_arg, -ENOMEM);
293 		return;
294 	}
295 
296 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
297 	ctx->cpl.u.blob_basic.cb_fn = cb_fn;
298 	ctx->cpl.u.blob_basic.cb_arg = cb_arg;
299 	ctx->blob = blob;
300 
301 	/* Freeze I/O on blob */
302 	blob->frozen_refcnt++;
303 
304 	if (blob->frozen_refcnt == 1) {
305 		spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl);
306 	} else {
307 		cb_fn(cb_arg, 0);
308 		free(ctx);
309 	}
310 }
311 
312 static void
313 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
314 {
315 	struct freeze_io_ctx *ctx;
316 
317 	ctx = calloc(1, sizeof(*ctx));
318 	if (!ctx) {
319 		cb_fn(cb_arg, -ENOMEM);
320 		return;
321 	}
322 
323 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
324 	ctx->cpl.u.blob_basic.cb_fn = cb_fn;
325 	ctx->cpl.u.blob_basic.cb_arg = cb_arg;
326 	ctx->blob = blob;
327 
328 	assert(blob->frozen_refcnt > 0);
329 
330 	blob->frozen_refcnt--;
331 
332 	if (blob->frozen_refcnt == 0) {
333 		spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl);
334 	} else {
335 		cb_fn(cb_arg, 0);
336 		free(ctx);
337 	}
338 }
339 
340 static int
341 _spdk_blob_mark_clean(struct spdk_blob *blob)
342 {
343 	uint64_t *clusters = NULL;
344 	uint32_t *pages = NULL;
345 
346 	assert(blob != NULL);
347 
348 	if (blob->active.num_clusters) {
349 		assert(blob->active.clusters);
350 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
351 		if (!clusters) {
352 			return -ENOMEM;
353 		}
354 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters));
355 	}
356 
357 	if (blob->active.num_pages) {
358 		assert(blob->active.pages);
359 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
360 		if (!pages) {
361 			free(clusters);
362 			return -ENOMEM;
363 		}
364 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages));
365 	}
366 
367 	free(blob->clean.clusters);
368 	free(blob->clean.pages);
369 
370 	blob->clean.num_clusters = blob->active.num_clusters;
371 	blob->clean.clusters = blob->active.clusters;
372 	blob->clean.num_pages = blob->active.num_pages;
373 	blob->clean.pages = blob->active.pages;
374 
375 	blob->active.clusters = clusters;
376 	blob->active.pages = pages;
377 
378 	/* If the metadata was dirtied again while the metadata was being written to disk,
379 	 *  we do not want to revert the DIRTY state back to CLEAN here.
380 	 */
381 	if (blob->state == SPDK_BLOB_STATE_LOADING) {
382 		blob->state = SPDK_BLOB_STATE_CLEAN;
383 	}
384 
385 	return 0;
386 }
387 
388 static int
389 _spdk_blob_deserialize_xattr(struct spdk_blob *blob,
390 			     struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal)
391 {
392 	struct spdk_xattr                       *xattr;
393 
394 	if (desc_xattr->length != sizeof(desc_xattr->name_length) +
395 	    sizeof(desc_xattr->value_length) +
396 	    desc_xattr->name_length + desc_xattr->value_length) {
397 		return -EINVAL;
398 	}
399 
400 	xattr = calloc(1, sizeof(*xattr));
401 	if (xattr == NULL) {
402 		return -ENOMEM;
403 	}
404 
405 	xattr->name = malloc(desc_xattr->name_length + 1);
406 	if (xattr->name == NULL) {
407 		free(xattr);
408 		return -ENOMEM;
409 	}
410 	memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
411 	xattr->name[desc_xattr->name_length] = '\0';
412 
413 	xattr->value = malloc(desc_xattr->value_length);
414 	if (xattr->value == NULL) {
415 		free(xattr->name);
416 		free(xattr);
417 		return -ENOMEM;
418 	}
419 	xattr->value_len = desc_xattr->value_length;
420 	memcpy(xattr->value,
421 	       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
422 	       desc_xattr->value_length);
423 
424 	TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link);
425 
426 	return 0;
427 }
428 
429 
430 static int
431 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
432 {
433 	struct spdk_blob_md_descriptor *desc;
434 	size_t	cur_desc = 0;
435 	void *tmp;
436 
437 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
438 	while (cur_desc < sizeof(page->descriptors)) {
439 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
440 			if (desc->length == 0) {
441 				/* If padding and length are 0, this terminates the page */
442 				break;
443 			}
444 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
445 			struct spdk_blob_md_descriptor_flags	*desc_flags;
446 
447 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
448 
449 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
450 				return -EINVAL;
451 			}
452 
453 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
454 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
455 				return -EINVAL;
456 			}
457 
458 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
459 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
460 				blob->data_ro = true;
461 				blob->md_ro = true;
462 			}
463 
464 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
465 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
466 				blob->md_ro = true;
467 			}
468 
469 			if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
470 				blob->data_ro = true;
471 				blob->md_ro = true;
472 			}
473 
474 			blob->invalid_flags = desc_flags->invalid_flags;
475 			blob->data_ro_flags = desc_flags->data_ro_flags;
476 			blob->md_ro_flags = desc_flags->md_ro_flags;
477 
478 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) {
479 			struct spdk_blob_md_descriptor_extent_rle	*desc_extent_rle;
480 			unsigned int				i, j;
481 			unsigned int				cluster_count = blob->active.num_clusters;
482 
483 			desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc;
484 
485 			if (desc_extent_rle->length == 0 ||
486 			    (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) {
487 				return -EINVAL;
488 			}
489 
490 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
491 				for (j = 0; j < desc_extent_rle->extents[i].length; j++) {
492 					if (desc_extent_rle->extents[i].cluster_idx != 0) {
493 						if (!spdk_bit_array_get(blob->bs->used_clusters,
494 									desc_extent_rle->extents[i].cluster_idx + j)) {
495 							return -EINVAL;
496 						}
497 					}
498 					cluster_count++;
499 				}
500 			}
501 
502 			if (cluster_count == 0) {
503 				return -EINVAL;
504 			}
505 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters));
506 			if (tmp == NULL) {
507 				return -ENOMEM;
508 			}
509 			blob->active.clusters = tmp;
510 			blob->active.cluster_array_size = cluster_count;
511 
512 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
513 				for (j = 0; j < desc_extent_rle->extents[i].length; j++) {
514 					if (desc_extent_rle->extents[i].cluster_idx != 0) {
515 						blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
516 								desc_extent_rle->extents[i].cluster_idx + j);
517 					} else if (spdk_blob_is_thin_provisioned(blob)) {
518 						blob->active.clusters[blob->active.num_clusters++] = 0;
519 					} else {
520 						return -EINVAL;
521 					}
522 				}
523 			}
524 
525 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
526 			int rc;
527 
528 			rc = _spdk_blob_deserialize_xattr(blob,
529 							  (struct spdk_blob_md_descriptor_xattr *) desc, false);
530 			if (rc != 0) {
531 				return rc;
532 			}
533 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
534 			int rc;
535 
536 			rc = _spdk_blob_deserialize_xattr(blob,
537 							  (struct spdk_blob_md_descriptor_xattr *) desc, true);
538 			if (rc != 0) {
539 				return rc;
540 			}
541 		} else {
542 			/* Unrecognized descriptor type.  Do not fail - just continue to the
543 			 *  next descriptor.  If this descriptor is associated with some feature
544 			 *  defined in a newer version of blobstore, that version of blobstore
545 			 *  should create and set an associated feature flag to specify if this
546 			 *  blob can be loaded or not.
547 			 */
548 		}
549 
550 		/* Advance to the next descriptor */
551 		cur_desc += sizeof(*desc) + desc->length;
552 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
553 			break;
554 		}
555 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
556 	}
557 
558 	return 0;
559 }
560 
561 static int
562 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
563 		 struct spdk_blob *blob)
564 {
565 	const struct spdk_blob_md_page *page;
566 	uint32_t i;
567 	int rc;
568 
569 	assert(page_count > 0);
570 	assert(pages[0].sequence_num == 0);
571 	assert(blob != NULL);
572 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
573 	assert(blob->active.clusters == NULL);
574 
575 	/* The blobid provided doesn't match what's in the MD, this can
576 	 * happen for example if a bogus blobid is passed in through open.
577 	 */
578 	if (blob->id != pages[0].id) {
579 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
580 			    blob->id, pages[0].id);
581 		return -ENOENT;
582 	}
583 
584 	for (i = 0; i < page_count; i++) {
585 		page = &pages[i];
586 
587 		assert(page->id == blob->id);
588 		assert(page->sequence_num == i);
589 
590 		rc = _spdk_blob_parse_page(page, blob);
591 		if (rc != 0) {
592 			return rc;
593 		}
594 	}
595 
596 	return 0;
597 }
598 
599 static int
600 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
601 			      struct spdk_blob_md_page **pages,
602 			      uint32_t *page_count,
603 			      struct spdk_blob_md_page **last_page)
604 {
605 	struct spdk_blob_md_page *page;
606 
607 	assert(pages != NULL);
608 	assert(page_count != NULL);
609 
610 	if (*page_count == 0) {
611 		assert(*pages == NULL);
612 		*page_count = 1;
613 		*pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
614 				     NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
615 	} else {
616 		assert(*pages != NULL);
617 		(*page_count)++;
618 		*pages = spdk_realloc(*pages,
619 				      SPDK_BS_PAGE_SIZE * (*page_count),
620 				      SPDK_BS_PAGE_SIZE);
621 	}
622 
623 	if (*pages == NULL) {
624 		*page_count = 0;
625 		*last_page = NULL;
626 		return -ENOMEM;
627 	}
628 
629 	page = &(*pages)[*page_count - 1];
630 	memset(page, 0, sizeof(*page));
631 	page->id = blob->id;
632 	page->sequence_num = *page_count - 1;
633 	page->next = SPDK_INVALID_MD_PAGE;
634 	*last_page = page;
635 
636 	return 0;
637 }
638 
639 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
640  * Update required_sz on both success and failure.
641  *
642  */
643 static int
644 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
645 			   uint8_t *buf, size_t buf_sz,
646 			   size_t *required_sz, bool internal)
647 {
648 	struct spdk_blob_md_descriptor_xattr	*desc;
649 
650 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
651 		       strlen(xattr->name) +
652 		       xattr->value_len;
653 
654 	if (buf_sz < *required_sz) {
655 		return -1;
656 	}
657 
658 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
659 
660 	desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR;
661 	desc->length = sizeof(desc->name_length) +
662 		       sizeof(desc->value_length) +
663 		       strlen(xattr->name) +
664 		       xattr->value_len;
665 	desc->name_length = strlen(xattr->name);
666 	desc->value_length = xattr->value_len;
667 
668 	memcpy(desc->name, xattr->name, desc->name_length);
669 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
670 	       xattr->value,
671 	       desc->value_length);
672 
673 	return 0;
674 }
675 
676 static void
677 _spdk_blob_serialize_extent_rle(const struct spdk_blob *blob,
678 				uint64_t start_cluster, uint64_t *next_cluster,
679 				uint8_t **buf, size_t *buf_sz)
680 {
681 	struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle;
682 	size_t cur_sz;
683 	uint64_t i, extent_idx;
684 	uint64_t lba, lba_per_cluster, lba_count;
685 
686 	/* The buffer must have room for at least one extent */
687 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]);
688 	if (*buf_sz < cur_sz) {
689 		*next_cluster = start_cluster;
690 		return;
691 	}
692 
693 	desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf;
694 	desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE;
695 
696 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
697 
698 	lba = blob->active.clusters[start_cluster];
699 	lba_count = lba_per_cluster;
700 	extent_idx = 0;
701 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
702 		if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) {
703 			/* Run-length encode sequential non-zero LBA */
704 			lba_count += lba_per_cluster;
705 			continue;
706 		} else if (lba == 0 && blob->active.clusters[i] == 0) {
707 			/* Run-length encode unallocated clusters */
708 			lba_count += lba_per_cluster;
709 			continue;
710 		}
711 		desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
712 		desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster;
713 		extent_idx++;
714 
715 		cur_sz += sizeof(desc_extent_rle->extents[extent_idx]);
716 
717 		if (*buf_sz < cur_sz) {
718 			/* If we ran out of buffer space, return */
719 			*next_cluster = i;
720 			goto finish;
721 		}
722 
723 		lba = blob->active.clusters[i];
724 		lba_count = lba_per_cluster;
725 	}
726 
727 	desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
728 	desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster;
729 	extent_idx++;
730 
731 	*next_cluster = blob->active.num_clusters;
732 
733 finish:
734 	desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx;
735 	*buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length;
736 	*buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length;
737 
738 	return;
739 }
740 
741 static int
742 _spdk_blob_serialize_extents_rle(const struct spdk_blob *blob,
743 				 struct spdk_blob_md_page **pages,
744 				 struct spdk_blob_md_page *cur_page,
745 				 uint32_t *page_count, uint8_t **buf,
746 				 size_t *remaining_sz)
747 {
748 	uint64_t				last_cluster;
749 	int					rc;
750 
751 	last_cluster = 0;
752 	while (last_cluster < blob->active.num_clusters) {
753 		_spdk_blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz);
754 
755 		if (last_cluster == blob->active.num_clusters) {
756 			break;
757 		}
758 
759 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
760 		if (rc < 0) {
761 			return rc;
762 		}
763 
764 		*buf = (uint8_t *)cur_page->descriptors;
765 		*remaining_sz = sizeof(cur_page->descriptors);
766 	}
767 
768 	return 0;
769 }
770 
771 static void
772 _spdk_blob_serialize_flags(const struct spdk_blob *blob,
773 			   uint8_t *buf, size_t *buf_sz)
774 {
775 	struct spdk_blob_md_descriptor_flags *desc;
776 
777 	/*
778 	 * Flags get serialized first, so we should always have room for the flags
779 	 *  descriptor.
780 	 */
781 	assert(*buf_sz >= sizeof(*desc));
782 
783 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
784 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
785 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
786 	desc->invalid_flags = blob->invalid_flags;
787 	desc->data_ro_flags = blob->data_ro_flags;
788 	desc->md_ro_flags = blob->md_ro_flags;
789 
790 	*buf_sz -= sizeof(*desc);
791 }
792 
793 static int
794 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob,
795 			    const struct spdk_xattr_tailq *xattrs, bool internal,
796 			    struct spdk_blob_md_page **pages,
797 			    struct spdk_blob_md_page *cur_page,
798 			    uint32_t *page_count, uint8_t **buf,
799 			    size_t *remaining_sz)
800 {
801 	const struct spdk_xattr	*xattr;
802 	int	rc;
803 
804 	TAILQ_FOREACH(xattr, xattrs, link) {
805 		size_t required_sz = 0;
806 
807 		rc = _spdk_blob_serialize_xattr(xattr,
808 						*buf, *remaining_sz,
809 						&required_sz, internal);
810 		if (rc < 0) {
811 			/* Need to add a new page to the chain */
812 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
813 							   &cur_page);
814 			if (rc < 0) {
815 				spdk_free(*pages);
816 				*pages = NULL;
817 				*page_count = 0;
818 				return rc;
819 			}
820 
821 			*buf = (uint8_t *)cur_page->descriptors;
822 			*remaining_sz = sizeof(cur_page->descriptors);
823 
824 			/* Try again */
825 			required_sz = 0;
826 			rc = _spdk_blob_serialize_xattr(xattr,
827 							*buf, *remaining_sz,
828 							&required_sz, internal);
829 
830 			if (rc < 0) {
831 				spdk_free(*pages);
832 				*pages = NULL;
833 				*page_count = 0;
834 				return rc;
835 			}
836 		}
837 
838 		*remaining_sz -= required_sz;
839 		*buf += required_sz;
840 	}
841 
842 	return 0;
843 }
844 
845 static int
846 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
847 		     uint32_t *page_count)
848 {
849 	struct spdk_blob_md_page		*cur_page;
850 	int					rc;
851 	uint8_t					*buf;
852 	size_t					remaining_sz;
853 
854 	assert(pages != NULL);
855 	assert(page_count != NULL);
856 	assert(blob != NULL);
857 	assert(blob->state == SPDK_BLOB_STATE_DIRTY);
858 
859 	*pages = NULL;
860 	*page_count = 0;
861 
862 	/* A blob always has at least 1 page, even if it has no descriptors */
863 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
864 	if (rc < 0) {
865 		return rc;
866 	}
867 
868 	buf = (uint8_t *)cur_page->descriptors;
869 	remaining_sz = sizeof(cur_page->descriptors);
870 
871 	/* Serialize flags */
872 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
873 	buf += sizeof(struct spdk_blob_md_descriptor_flags);
874 
875 	/* Serialize xattrs */
876 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false,
877 					 pages, cur_page, page_count, &buf, &remaining_sz);
878 	if (rc < 0) {
879 		return rc;
880 	}
881 
882 	/* Serialize internal xattrs */
883 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true,
884 					 pages, cur_page, page_count, &buf, &remaining_sz);
885 	if (rc < 0) {
886 		return rc;
887 	}
888 
889 	/* Serialize extents */
890 	rc = _spdk_blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz);
891 
892 	return rc;
893 }
894 
895 struct spdk_blob_load_ctx {
896 	struct spdk_blob		*blob;
897 
898 	struct spdk_blob_md_page	*pages;
899 	uint32_t			num_pages;
900 	spdk_bs_sequence_t	        *seq;
901 
902 	spdk_bs_sequence_cpl		cb_fn;
903 	void				*cb_arg;
904 };
905 
906 static uint32_t
907 _spdk_blob_md_page_calc_crc(void *page)
908 {
909 	uint32_t		crc;
910 
911 	crc = BLOB_CRC32C_INITIAL;
912 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
913 	crc ^= BLOB_CRC32C_INITIAL;
914 
915 	return crc;
916 
917 }
918 
919 static void
920 _spdk_blob_load_final(void *cb_arg, int bserrno)
921 {
922 	struct spdk_blob_load_ctx	*ctx = cb_arg;
923 	struct spdk_blob		*blob = ctx->blob;
924 
925 	if (bserrno == 0) {
926 		_spdk_blob_mark_clean(blob);
927 	}
928 
929 	ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno);
930 
931 	/* Free the memory */
932 	spdk_free(ctx->pages);
933 	free(ctx);
934 }
935 
936 static void
937 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno)
938 {
939 	struct spdk_blob_load_ctx	*ctx = cb_arg;
940 	struct spdk_blob		*blob = ctx->blob;
941 
942 	if (bserrno == 0) {
943 		blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot);
944 		if (blob->back_bs_dev == NULL) {
945 			bserrno = -ENOMEM;
946 		}
947 	}
948 	if (bserrno != 0) {
949 		SPDK_ERRLOG("Snapshot fail\n");
950 	}
951 
952 	_spdk_blob_load_final(ctx, bserrno);
953 }
954 
955 static void
956 _spdk_blob_load_backing_dev(void *cb_arg)
957 {
958 	struct spdk_blob_load_ctx	*ctx = cb_arg;
959 	struct spdk_blob		*blob = ctx->blob;
960 	const void			*value;
961 	size_t				len;
962 	int				rc;
963 
964 	if (spdk_blob_is_thin_provisioned(blob)) {
965 		rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true);
966 		if (rc == 0) {
967 			if (len != sizeof(spdk_blob_id)) {
968 				_spdk_blob_load_final(ctx, -EINVAL);
969 				return;
970 			}
971 			/* open snapshot blob and continue in the callback function */
972 			blob->parent_id = *(spdk_blob_id *)value;
973 			spdk_bs_open_blob(blob->bs, blob->parent_id,
974 					  _spdk_blob_load_snapshot_cpl, ctx);
975 			return;
976 		} else {
977 			/* add zeroes_dev for thin provisioned blob */
978 			blob->back_bs_dev = spdk_bs_create_zeroes_dev();
979 		}
980 	} else {
981 		/* standard blob */
982 		blob->back_bs_dev = NULL;
983 	}
984 	_spdk_blob_load_final(ctx, 0);
985 }
986 
987 static void
988 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
989 {
990 	struct spdk_blob_load_ctx	*ctx = cb_arg;
991 	struct spdk_blob		*blob = ctx->blob;
992 	struct spdk_blob_md_page	*page;
993 	int				rc;
994 	uint32_t			crc;
995 
996 	if (bserrno) {
997 		SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno);
998 		_spdk_blob_load_final(ctx, bserrno);
999 		return;
1000 	}
1001 
1002 	page = &ctx->pages[ctx->num_pages - 1];
1003 	crc = _spdk_blob_md_page_calc_crc(page);
1004 	if (crc != page->crc) {
1005 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
1006 		_spdk_blob_load_final(ctx, -EINVAL);
1007 		return;
1008 	}
1009 
1010 	if (page->next != SPDK_INVALID_MD_PAGE) {
1011 		uint32_t next_page = page->next;
1012 		uint64_t next_lba = _spdk_bs_md_page_to_lba(blob->bs, next_page);
1013 
1014 		/* Read the next page */
1015 		ctx->num_pages++;
1016 		ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
1017 					  sizeof(*page));
1018 		if (ctx->pages == NULL) {
1019 			_spdk_blob_load_final(ctx, -ENOMEM);
1020 			return;
1021 		}
1022 
1023 		spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
1024 					  next_lba,
1025 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
1026 					  _spdk_blob_load_cpl, ctx);
1027 		return;
1028 	}
1029 
1030 	/* Parse the pages */
1031 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
1032 	if (rc) {
1033 		_spdk_blob_load_final(ctx, rc);
1034 		return;
1035 	}
1036 
1037 	_spdk_blob_load_backing_dev(ctx);
1038 }
1039 
1040 /* Load a blob from disk given a blobid */
1041 static void
1042 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1043 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1044 {
1045 	struct spdk_blob_load_ctx *ctx;
1046 	struct spdk_blob_store *bs;
1047 	uint32_t page_num;
1048 	uint64_t lba;
1049 
1050 	_spdk_blob_verify_md_op(blob);
1051 
1052 	bs = blob->bs;
1053 
1054 	ctx = calloc(1, sizeof(*ctx));
1055 	if (!ctx) {
1056 		cb_fn(seq, cb_arg, -ENOMEM);
1057 		return;
1058 	}
1059 
1060 	ctx->blob = blob;
1061 	ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE);
1062 	if (!ctx->pages) {
1063 		free(ctx);
1064 		cb_fn(seq, cb_arg, -ENOMEM);
1065 		return;
1066 	}
1067 	ctx->num_pages = 1;
1068 	ctx->cb_fn = cb_fn;
1069 	ctx->cb_arg = cb_arg;
1070 	ctx->seq = seq;
1071 
1072 	page_num = _spdk_bs_blobid_to_page(blob->id);
1073 	lba = _spdk_bs_md_page_to_lba(blob->bs, page_num);
1074 
1075 	blob->state = SPDK_BLOB_STATE_LOADING;
1076 
1077 	spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
1078 				  _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
1079 				  _spdk_blob_load_cpl, ctx);
1080 }
1081 
1082 struct spdk_blob_persist_ctx {
1083 	struct spdk_blob		*blob;
1084 
1085 	struct spdk_bs_super_block	*super;
1086 
1087 	struct spdk_blob_md_page	*pages;
1088 
1089 	spdk_bs_sequence_t		*seq;
1090 	spdk_bs_sequence_cpl		cb_fn;
1091 	void				*cb_arg;
1092 };
1093 
1094 static void
1095 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba,
1096 			uint32_t lba_count)
1097 {
1098 	switch (ctx->blob->clear_method) {
1099 	case BLOB_CLEAR_WITH_DEFAULT:
1100 	case BLOB_CLEAR_WITH_UNMAP:
1101 		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1102 		break;
1103 	case BLOB_CLEAR_WITH_WRITE_ZEROES:
1104 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1105 		break;
1106 	case BLOB_CLEAR_WITH_NONE:
1107 	default:
1108 		break;
1109 	}
1110 }
1111 
1112 static void
1113 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1114 {
1115 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1116 	struct spdk_blob		*blob = ctx->blob;
1117 
1118 	if (bserrno == 0) {
1119 		_spdk_blob_mark_clean(blob);
1120 	}
1121 
1122 	/* Call user callback */
1123 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
1124 
1125 	/* Free the memory */
1126 	spdk_free(ctx->pages);
1127 	free(ctx);
1128 }
1129 
1130 static void
1131 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1132 {
1133 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1134 	struct spdk_blob		*blob = ctx->blob;
1135 	struct spdk_blob_store		*bs = blob->bs;
1136 	size_t				i;
1137 
1138 	/* Release all clusters that were truncated */
1139 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1140 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
1141 
1142 		/* Nothing to release if it was not allocated */
1143 		if (blob->active.clusters[i] != 0) {
1144 			_spdk_bs_release_cluster(bs, cluster_num);
1145 		}
1146 	}
1147 
1148 	if (blob->active.num_clusters == 0) {
1149 		free(blob->active.clusters);
1150 		blob->active.clusters = NULL;
1151 		blob->active.cluster_array_size = 0;
1152 	} else if (blob->active.num_clusters != blob->active.cluster_array_size) {
1153 #ifndef __clang_analyzer__
1154 		void *tmp;
1155 
1156 		/* scan-build really can't figure reallocs, workaround it */
1157 		tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters);
1158 		assert(tmp != NULL);
1159 		blob->active.clusters = tmp;
1160 #endif
1161 		blob->active.cluster_array_size = blob->active.num_clusters;
1162 	}
1163 
1164 	_spdk_blob_persist_complete(seq, ctx, bserrno);
1165 }
1166 
1167 static void
1168 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1169 {
1170 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1171 	struct spdk_blob		*blob = ctx->blob;
1172 	struct spdk_blob_store		*bs = blob->bs;
1173 	spdk_bs_batch_t			*batch;
1174 	size_t				i;
1175 	uint64_t			lba;
1176 	uint32_t			lba_count;
1177 
1178 	/* Clusters don't move around in blobs. The list shrinks or grows
1179 	 * at the end, but no changes ever occur in the middle of the list.
1180 	 */
1181 
1182 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx);
1183 
1184 	/* Clear all clusters that were truncated */
1185 	lba = 0;
1186 	lba_count = 0;
1187 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1188 		uint64_t next_lba = blob->active.clusters[i];
1189 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
1190 
1191 		if (next_lba > 0 && (lba + lba_count) == next_lba) {
1192 			/* This cluster is contiguous with the previous one. */
1193 			lba_count += next_lba_count;
1194 			continue;
1195 		}
1196 
1197 		/* This cluster is not contiguous with the previous one. */
1198 
1199 		/* If a run of LBAs previously existing, clear them now */
1200 		if (lba_count > 0) {
1201 			spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count);
1202 		}
1203 
1204 		/* Start building the next batch */
1205 		lba = next_lba;
1206 		if (next_lba > 0) {
1207 			lba_count = next_lba_count;
1208 		} else {
1209 			lba_count = 0;
1210 		}
1211 	}
1212 
1213 	/* If we ended with a contiguous set of LBAs, clear them now */
1214 	if (lba_count > 0) {
1215 		spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count);
1216 	}
1217 
1218 	spdk_bs_batch_close(batch);
1219 }
1220 
1221 static void
1222 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1223 {
1224 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1225 	struct spdk_blob		*blob = ctx->blob;
1226 	struct spdk_blob_store		*bs = blob->bs;
1227 	size_t				i;
1228 
1229 	/* This loop starts at 1 because the first page is special and handled
1230 	 * below. The pages (except the first) are never written in place,
1231 	 * so any pages in the clean list must be zeroed.
1232 	 */
1233 	for (i = 1; i < blob->clean.num_pages; i++) {
1234 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
1235 	}
1236 
1237 	if (blob->active.num_pages == 0) {
1238 		uint32_t page_num;
1239 
1240 		page_num = _spdk_bs_blobid_to_page(blob->id);
1241 		spdk_bit_array_clear(bs->used_md_pages, page_num);
1242 	}
1243 
1244 	/* Move on to clearing clusters */
1245 	_spdk_blob_persist_clear_clusters(seq, ctx, 0);
1246 }
1247 
1248 static void
1249 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1250 {
1251 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1252 	struct spdk_blob		*blob = ctx->blob;
1253 	struct spdk_blob_store		*bs = blob->bs;
1254 	uint64_t			lba;
1255 	uint32_t			lba_count;
1256 	spdk_bs_batch_t			*batch;
1257 	size_t				i;
1258 
1259 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
1260 
1261 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
1262 
1263 	/* This loop starts at 1 because the first page is special and handled
1264 	 * below. The pages (except the first) are never written in place,
1265 	 * so any pages in the clean list must be zeroed.
1266 	 */
1267 	for (i = 1; i < blob->clean.num_pages; i++) {
1268 		lba = _spdk_bs_md_page_to_lba(bs, blob->clean.pages[i]);
1269 
1270 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1271 	}
1272 
1273 	/* The first page will only be zeroed if this is a delete. */
1274 	if (blob->active.num_pages == 0) {
1275 		uint32_t page_num;
1276 
1277 		/* The first page in the metadata goes where the blobid indicates */
1278 		page_num = _spdk_bs_blobid_to_page(blob->id);
1279 		lba = _spdk_bs_md_page_to_lba(bs, page_num);
1280 
1281 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1282 	}
1283 
1284 	spdk_bs_batch_close(batch);
1285 }
1286 
1287 static void
1288 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1289 {
1290 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1291 	struct spdk_blob		*blob = ctx->blob;
1292 	struct spdk_blob_store		*bs = blob->bs;
1293 	uint64_t			lba;
1294 	uint32_t			lba_count;
1295 	struct spdk_blob_md_page	*page;
1296 
1297 	if (blob->active.num_pages == 0) {
1298 		/* Move on to the next step */
1299 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1300 		return;
1301 	}
1302 
1303 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1304 
1305 	page = &ctx->pages[0];
1306 	/* The first page in the metadata goes where the blobid indicates */
1307 	lba = _spdk_bs_md_page_to_lba(bs, _spdk_bs_blobid_to_page(blob->id));
1308 
1309 	spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1310 				   _spdk_blob_persist_zero_pages, ctx);
1311 }
1312 
1313 static void
1314 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1315 {
1316 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1317 	struct spdk_blob		*blob = ctx->blob;
1318 	struct spdk_blob_store		*bs = blob->bs;
1319 	uint64_t			lba;
1320 	uint32_t			lba_count;
1321 	struct spdk_blob_md_page	*page;
1322 	spdk_bs_batch_t			*batch;
1323 	size_t				i;
1324 
1325 	/* Clusters don't move around in blobs. The list shrinks or grows
1326 	 * at the end, but no changes ever occur in the middle of the list.
1327 	 */
1328 
1329 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1330 
1331 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1332 
1333 	/* This starts at 1. The root page is not written until
1334 	 * all of the others are finished
1335 	 */
1336 	for (i = 1; i < blob->active.num_pages; i++) {
1337 		page = &ctx->pages[i];
1338 		assert(page->sequence_num == i);
1339 
1340 		lba = _spdk_bs_md_page_to_lba(bs, blob->active.pages[i]);
1341 
1342 		spdk_bs_batch_write_dev(batch, page, lba, lba_count);
1343 	}
1344 
1345 	spdk_bs_batch_close(batch);
1346 }
1347 
1348 static int
1349 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz)
1350 {
1351 	uint64_t	i;
1352 	uint64_t	*tmp;
1353 	uint64_t	lfc; /* lowest free cluster */
1354 	uint64_t	num_clusters;
1355 	struct spdk_blob_store *bs;
1356 
1357 	bs = blob->bs;
1358 
1359 	_spdk_blob_verify_md_op(blob);
1360 
1361 	if (blob->active.num_clusters == sz) {
1362 		return 0;
1363 	}
1364 
1365 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1366 		/* If this blob was resized to be larger, then smaller, then
1367 		 * larger without syncing, then the cluster array already
1368 		 * contains spare assigned clusters we can use.
1369 		 */
1370 		num_clusters = spdk_min(blob->active.cluster_array_size,
1371 					sz);
1372 	} else {
1373 		num_clusters = blob->active.num_clusters;
1374 	}
1375 
1376 	/* Do two passes - one to verify that we can obtain enough clusters
1377 	 * and another to actually claim them.
1378 	 */
1379 
1380 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1381 		lfc = 0;
1382 		for (i = num_clusters; i < sz; i++) {
1383 			lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1384 			if (lfc == UINT32_MAX) {
1385 				/* No more free clusters. Cannot satisfy the request */
1386 				return -ENOSPC;
1387 			}
1388 			lfc++;
1389 		}
1390 	}
1391 
1392 	if (sz > num_clusters) {
1393 		/* Expand the cluster array if necessary.
1394 		 * We only shrink the array when persisting.
1395 		 */
1396 		tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz);
1397 		if (sz > 0 && tmp == NULL) {
1398 			return -ENOMEM;
1399 		}
1400 		memset(tmp + blob->active.cluster_array_size, 0,
1401 		       sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size));
1402 		blob->active.clusters = tmp;
1403 		blob->active.cluster_array_size = sz;
1404 	}
1405 
1406 	blob->state = SPDK_BLOB_STATE_DIRTY;
1407 
1408 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1409 		lfc = 0;
1410 		for (i = num_clusters; i < sz; i++) {
1411 			_spdk_bs_allocate_cluster(blob, i, &lfc, true);
1412 			lfc++;
1413 		}
1414 	}
1415 
1416 	blob->active.num_clusters = sz;
1417 
1418 	return 0;
1419 }
1420 
1421 static void
1422 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
1423 {
1424 	spdk_bs_sequence_t *seq = ctx->seq;
1425 	struct spdk_blob *blob = ctx->blob;
1426 	struct spdk_blob_store *bs = blob->bs;
1427 	uint64_t i;
1428 	uint32_t page_num;
1429 	void *tmp;
1430 	int rc;
1431 
1432 	if (blob->active.num_pages == 0) {
1433 		/* This is the signal that the blob should be deleted.
1434 		 * Immediately jump to the clean up routine. */
1435 		assert(blob->clean.num_pages > 0);
1436 		blob->state = SPDK_BLOB_STATE_CLEAN;
1437 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1438 		return;
1439 
1440 	}
1441 
1442 	/* Generate the new metadata */
1443 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1444 	if (rc < 0) {
1445 		_spdk_blob_persist_complete(seq, ctx, rc);
1446 		return;
1447 	}
1448 
1449 	assert(blob->active.num_pages >= 1);
1450 
1451 	/* Resize the cache of page indices */
1452 	tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages));
1453 	if (!tmp) {
1454 		_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1455 		return;
1456 	}
1457 	blob->active.pages = tmp;
1458 
1459 	/* Assign this metadata to pages. This requires two passes -
1460 	 * one to verify that there are enough pages and a second
1461 	 * to actually claim them. */
1462 	page_num = 0;
1463 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1464 	for (i = 1; i < blob->active.num_pages; i++) {
1465 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1466 		if (page_num == UINT32_MAX) {
1467 			_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1468 			return;
1469 		}
1470 		page_num++;
1471 	}
1472 
1473 	page_num = 0;
1474 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1475 	for (i = 1; i < blob->active.num_pages; i++) {
1476 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1477 		ctx->pages[i - 1].next = page_num;
1478 		/* Now that previous metadata page is complete, calculate the crc for it. */
1479 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1480 		blob->active.pages[i] = page_num;
1481 		spdk_bit_array_set(bs->used_md_pages, page_num);
1482 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1483 		page_num++;
1484 	}
1485 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1486 	/* Start writing the metadata from last page to first */
1487 	blob->state = SPDK_BLOB_STATE_CLEAN;
1488 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1489 }
1490 
1491 static void
1492 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1493 {
1494 	struct spdk_blob_persist_ctx *ctx = cb_arg;
1495 
1496 	ctx->blob->bs->clean = 0;
1497 
1498 	spdk_free(ctx->super);
1499 
1500 	_spdk_blob_persist_start(ctx);
1501 }
1502 
1503 static void
1504 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1505 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg);
1506 
1507 
1508 static void
1509 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1510 {
1511 	struct spdk_blob_persist_ctx *ctx = cb_arg;
1512 
1513 	ctx->super->clean = 0;
1514 	if (ctx->super->size == 0) {
1515 		ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen;
1516 	}
1517 
1518 	_spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx);
1519 }
1520 
1521 
1522 /* Write a blob to disk */
1523 static void
1524 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1525 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1526 {
1527 	struct spdk_blob_persist_ctx *ctx;
1528 
1529 	_spdk_blob_verify_md_op(blob);
1530 
1531 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1532 		cb_fn(seq, cb_arg, 0);
1533 		return;
1534 	}
1535 
1536 	ctx = calloc(1, sizeof(*ctx));
1537 	if (!ctx) {
1538 		cb_fn(seq, cb_arg, -ENOMEM);
1539 		return;
1540 	}
1541 	ctx->blob = blob;
1542 	ctx->seq = seq;
1543 	ctx->cb_fn = cb_fn;
1544 	ctx->cb_arg = cb_arg;
1545 
1546 	if (blob->bs->clean) {
1547 		ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
1548 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1549 		if (!ctx->super) {
1550 			cb_fn(seq, cb_arg, -ENOMEM);
1551 			free(ctx);
1552 			return;
1553 		}
1554 
1555 		spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0),
1556 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)),
1557 					  _spdk_blob_persist_dirty, ctx);
1558 	} else {
1559 		_spdk_blob_persist_start(ctx);
1560 	}
1561 }
1562 
1563 struct spdk_blob_copy_cluster_ctx {
1564 	struct spdk_blob *blob;
1565 	uint8_t *buf;
1566 	uint64_t page;
1567 	uint64_t new_cluster;
1568 	spdk_bs_sequence_t *seq;
1569 };
1570 
1571 static void
1572 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno)
1573 {
1574 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1575 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq;
1576 	TAILQ_HEAD(, spdk_bs_request_set) requests;
1577 	spdk_bs_user_op_t *op;
1578 
1579 	TAILQ_INIT(&requests);
1580 	TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link);
1581 
1582 	while (!TAILQ_EMPTY(&requests)) {
1583 		op = TAILQ_FIRST(&requests);
1584 		TAILQ_REMOVE(&requests, op, link);
1585 		if (bserrno == 0) {
1586 			spdk_bs_user_op_execute(op);
1587 		} else {
1588 			spdk_bs_user_op_abort(op);
1589 		}
1590 	}
1591 
1592 	spdk_free(ctx->buf);
1593 	free(ctx);
1594 }
1595 
1596 static void
1597 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno)
1598 {
1599 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1600 
1601 	if (bserrno) {
1602 		if (bserrno == -EEXIST) {
1603 			/* The metadata insert failed because another thread
1604 			 * allocated the cluster first. Free our cluster
1605 			 * but continue without error. */
1606 			bserrno = 0;
1607 		}
1608 		_spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster);
1609 	}
1610 
1611 	spdk_bs_sequence_finish(ctx->seq, bserrno);
1612 }
1613 
1614 static void
1615 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1616 {
1617 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1618 	uint32_t cluster_number;
1619 
1620 	if (bserrno) {
1621 		/* The write failed, so jump to the final completion handler */
1622 		spdk_bs_sequence_finish(seq, bserrno);
1623 		return;
1624 	}
1625 
1626 	cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1627 
1628 	_spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1629 					       _spdk_blob_insert_cluster_cpl, ctx);
1630 }
1631 
1632 static void
1633 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1634 {
1635 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1636 
1637 	if (bserrno != 0) {
1638 		/* The read failed, so jump to the final completion handler */
1639 		spdk_bs_sequence_finish(seq, bserrno);
1640 		return;
1641 	}
1642 
1643 	/* Write whole cluster */
1644 	spdk_bs_sequence_write_dev(seq, ctx->buf,
1645 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster),
1646 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, 1),
1647 				   _spdk_blob_write_copy_cpl, ctx);
1648 }
1649 
1650 static void
1651 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob,
1652 				   struct spdk_io_channel *_ch,
1653 				   uint64_t io_unit, spdk_bs_user_op_t *op)
1654 {
1655 	struct spdk_bs_cpl cpl;
1656 	struct spdk_bs_channel *ch;
1657 	struct spdk_blob_copy_cluster_ctx *ctx;
1658 	uint32_t cluster_start_page;
1659 	uint32_t cluster_number;
1660 	int rc;
1661 
1662 	ch = spdk_io_channel_get_ctx(_ch);
1663 
1664 	if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) {
1665 		/* There are already operations pending. Queue this user op
1666 		 * and return because it will be re-executed when the outstanding
1667 		 * cluster allocation completes. */
1668 		TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1669 		return;
1670 	}
1671 
1672 	/* Round the io_unit offset down to the first page in the cluster */
1673 	cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit);
1674 
1675 	/* Calculate which index in the metadata cluster array the corresponding
1676 	 * cluster is supposed to be at. */
1677 	cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit);
1678 
1679 	ctx = calloc(1, sizeof(*ctx));
1680 	if (!ctx) {
1681 		spdk_bs_user_op_abort(op);
1682 		return;
1683 	}
1684 
1685 	assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0);
1686 
1687 	ctx->blob = blob;
1688 	ctx->page = cluster_start_page;
1689 
1690 	if (blob->parent_id != SPDK_BLOBID_INVALID) {
1691 		ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen,
1692 				       NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1693 		if (!ctx->buf) {
1694 			SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n",
1695 				    blob->bs->cluster_sz);
1696 			free(ctx);
1697 			spdk_bs_user_op_abort(op);
1698 			return;
1699 		}
1700 	}
1701 
1702 	rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false);
1703 	if (rc != 0) {
1704 		spdk_free(ctx->buf);
1705 		free(ctx);
1706 		spdk_bs_user_op_abort(op);
1707 		return;
1708 	}
1709 
1710 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1711 	cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl;
1712 	cpl.u.blob_basic.cb_arg = ctx;
1713 
1714 	ctx->seq = spdk_bs_sequence_start(_ch, &cpl);
1715 	if (!ctx->seq) {
1716 		_spdk_bs_release_cluster(blob->bs, ctx->new_cluster);
1717 		spdk_free(ctx->buf);
1718 		free(ctx);
1719 		spdk_bs_user_op_abort(op);
1720 		return;
1721 	}
1722 
1723 	/* Queue the user op to block other incoming operations */
1724 	TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1725 
1726 	if (blob->parent_id != SPDK_BLOBID_INVALID) {
1727 		/* Read cluster from backing device */
1728 		spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf,
1729 					     _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page),
1730 					     _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz),
1731 					     _spdk_blob_write_copy, ctx);
1732 	} else {
1733 		_spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1734 						       _spdk_blob_insert_cluster_cpl, ctx);
1735 	}
1736 }
1737 
1738 static void
1739 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length,
1740 				       uint64_t *lba,	uint32_t *lba_count)
1741 {
1742 	*lba_count = length;
1743 
1744 	if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) {
1745 		assert(blob->back_bs_dev != NULL);
1746 		*lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit);
1747 		*lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count);
1748 	} else {
1749 		*lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit);
1750 	}
1751 }
1752 
1753 struct op_split_ctx {
1754 	struct spdk_blob *blob;
1755 	struct spdk_io_channel *channel;
1756 	uint64_t io_unit_offset;
1757 	uint64_t io_units_remaining;
1758 	void *curr_payload;
1759 	enum spdk_blob_op_type op_type;
1760 	spdk_bs_sequence_t *seq;
1761 };
1762 
1763 static void
1764 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno)
1765 {
1766 	struct op_split_ctx	*ctx = cb_arg;
1767 	struct spdk_blob	*blob = ctx->blob;
1768 	struct spdk_io_channel	*ch = ctx->channel;
1769 	enum spdk_blob_op_type	op_type = ctx->op_type;
1770 	uint8_t			*buf = ctx->curr_payload;
1771 	uint64_t		offset = ctx->io_unit_offset;
1772 	uint64_t		length = ctx->io_units_remaining;
1773 	uint64_t		op_length;
1774 
1775 	if (bserrno != 0 || ctx->io_units_remaining == 0) {
1776 		spdk_bs_sequence_finish(ctx->seq, bserrno);
1777 		free(ctx);
1778 		return;
1779 	}
1780 
1781 	op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob,
1782 			     offset));
1783 
1784 	/* Update length and payload for next operation */
1785 	ctx->io_units_remaining -= op_length;
1786 	ctx->io_unit_offset += op_length;
1787 	if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1788 		ctx->curr_payload += op_length * blob->bs->io_unit_size;
1789 	}
1790 
1791 	switch (op_type) {
1792 	case SPDK_BLOB_READ:
1793 		spdk_blob_io_read(blob, ch, buf, offset, op_length,
1794 				  _spdk_blob_request_submit_op_split_next, ctx);
1795 		break;
1796 	case SPDK_BLOB_WRITE:
1797 		spdk_blob_io_write(blob, ch, buf, offset, op_length,
1798 				   _spdk_blob_request_submit_op_split_next, ctx);
1799 		break;
1800 	case SPDK_BLOB_UNMAP:
1801 		spdk_blob_io_unmap(blob, ch, offset, op_length,
1802 				   _spdk_blob_request_submit_op_split_next, ctx);
1803 		break;
1804 	case SPDK_BLOB_WRITE_ZEROES:
1805 		spdk_blob_io_write_zeroes(blob, ch, offset, op_length,
1806 					  _spdk_blob_request_submit_op_split_next, ctx);
1807 		break;
1808 	case SPDK_BLOB_READV:
1809 	case SPDK_BLOB_WRITEV:
1810 		SPDK_ERRLOG("readv/write not valid\n");
1811 		spdk_bs_sequence_finish(ctx->seq, -EINVAL);
1812 		free(ctx);
1813 		break;
1814 	}
1815 }
1816 
1817 static void
1818 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
1819 				   void *payload, uint64_t offset, uint64_t length,
1820 				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1821 {
1822 	struct op_split_ctx *ctx;
1823 	spdk_bs_sequence_t *seq;
1824 	struct spdk_bs_cpl cpl;
1825 
1826 	assert(blob != NULL);
1827 
1828 	ctx = calloc(1, sizeof(struct op_split_ctx));
1829 	if (ctx == NULL) {
1830 		cb_fn(cb_arg, -ENOMEM);
1831 		return;
1832 	}
1833 
1834 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1835 	cpl.u.blob_basic.cb_fn = cb_fn;
1836 	cpl.u.blob_basic.cb_arg = cb_arg;
1837 
1838 	seq = spdk_bs_sequence_start(ch, &cpl);
1839 	if (!seq) {
1840 		free(ctx);
1841 		cb_fn(cb_arg, -ENOMEM);
1842 		return;
1843 	}
1844 
1845 	ctx->blob = blob;
1846 	ctx->channel = ch;
1847 	ctx->curr_payload = payload;
1848 	ctx->io_unit_offset = offset;
1849 	ctx->io_units_remaining = length;
1850 	ctx->op_type = op_type;
1851 	ctx->seq = seq;
1852 
1853 	_spdk_blob_request_submit_op_split_next(ctx, 0);
1854 }
1855 
1856 static void
1857 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob,
1858 				    void *payload, uint64_t offset, uint64_t length,
1859 				    spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1860 {
1861 	struct spdk_bs_cpl cpl;
1862 	uint64_t lba;
1863 	uint32_t lba_count;
1864 
1865 	assert(blob != NULL);
1866 
1867 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1868 	cpl.u.blob_basic.cb_fn = cb_fn;
1869 	cpl.u.blob_basic.cb_arg = cb_arg;
1870 
1871 	_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1872 
1873 	if (blob->frozen_refcnt) {
1874 		/* This blob I/O is frozen */
1875 		spdk_bs_user_op_t *op;
1876 		struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch);
1877 
1878 		op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1879 		if (!op) {
1880 			cb_fn(cb_arg, -ENOMEM);
1881 			return;
1882 		}
1883 
1884 		TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
1885 
1886 		return;
1887 	}
1888 
1889 	switch (op_type) {
1890 	case SPDK_BLOB_READ: {
1891 		spdk_bs_batch_t *batch;
1892 
1893 		batch = spdk_bs_batch_open(_ch, &cpl);
1894 		if (!batch) {
1895 			cb_fn(cb_arg, -ENOMEM);
1896 			return;
1897 		}
1898 
1899 		if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1900 			/* Read from the blob */
1901 			spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
1902 		} else {
1903 			/* Read from the backing block device */
1904 			spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count);
1905 		}
1906 
1907 		spdk_bs_batch_close(batch);
1908 		break;
1909 	}
1910 	case SPDK_BLOB_WRITE:
1911 	case SPDK_BLOB_WRITE_ZEROES: {
1912 		if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1913 			/* Write to the blob */
1914 			spdk_bs_batch_t *batch;
1915 
1916 			if (lba_count == 0) {
1917 				cb_fn(cb_arg, 0);
1918 				return;
1919 			}
1920 
1921 			batch = spdk_bs_batch_open(_ch, &cpl);
1922 			if (!batch) {
1923 				cb_fn(cb_arg, -ENOMEM);
1924 				return;
1925 			}
1926 
1927 			if (op_type == SPDK_BLOB_WRITE) {
1928 				spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
1929 			} else {
1930 				spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1931 			}
1932 
1933 			spdk_bs_batch_close(batch);
1934 		} else {
1935 			/* Queue this operation and allocate the cluster */
1936 			spdk_bs_user_op_t *op;
1937 
1938 			op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1939 			if (!op) {
1940 				cb_fn(cb_arg, -ENOMEM);
1941 				return;
1942 			}
1943 
1944 			_spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op);
1945 		}
1946 		break;
1947 	}
1948 	case SPDK_BLOB_UNMAP: {
1949 		spdk_bs_batch_t *batch;
1950 
1951 		batch = spdk_bs_batch_open(_ch, &cpl);
1952 		if (!batch) {
1953 			cb_fn(cb_arg, -ENOMEM);
1954 			return;
1955 		}
1956 
1957 		if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
1958 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1959 		}
1960 
1961 		spdk_bs_batch_close(batch);
1962 		break;
1963 	}
1964 	case SPDK_BLOB_READV:
1965 	case SPDK_BLOB_WRITEV:
1966 		SPDK_ERRLOG("readv/write not valid\n");
1967 		cb_fn(cb_arg, -EINVAL);
1968 		break;
1969 	}
1970 }
1971 
1972 static void
1973 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1974 			     void *payload, uint64_t offset, uint64_t length,
1975 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1976 {
1977 	assert(blob != NULL);
1978 
1979 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1980 		cb_fn(cb_arg, -EPERM);
1981 		return;
1982 	}
1983 
1984 	if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
1985 		cb_fn(cb_arg, -EINVAL);
1986 		return;
1987 	}
1988 	if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) {
1989 		_spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length,
1990 						    cb_fn, cb_arg, op_type);
1991 	} else {
1992 		_spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length,
1993 						   cb_fn, cb_arg, op_type);
1994 	}
1995 }
1996 
1997 struct rw_iov_ctx {
1998 	struct spdk_blob *blob;
1999 	struct spdk_io_channel *channel;
2000 	spdk_blob_op_complete cb_fn;
2001 	void *cb_arg;
2002 	bool read;
2003 	int iovcnt;
2004 	struct iovec *orig_iov;
2005 	uint64_t io_unit_offset;
2006 	uint64_t io_units_remaining;
2007 	uint64_t io_units_done;
2008 	struct iovec iov[0];
2009 };
2010 
2011 static void
2012 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2013 {
2014 	assert(cb_arg == NULL);
2015 	spdk_bs_sequence_finish(seq, bserrno);
2016 }
2017 
2018 static void
2019 _spdk_rw_iov_split_next(void *cb_arg, int bserrno)
2020 {
2021 	struct rw_iov_ctx *ctx = cb_arg;
2022 	struct spdk_blob *blob = ctx->blob;
2023 	struct iovec *iov, *orig_iov;
2024 	int iovcnt;
2025 	size_t orig_iovoff;
2026 	uint64_t io_units_count, io_units_to_boundary, io_unit_offset;
2027 	uint64_t byte_count;
2028 
2029 	if (bserrno != 0 || ctx->io_units_remaining == 0) {
2030 		ctx->cb_fn(ctx->cb_arg, bserrno);
2031 		free(ctx);
2032 		return;
2033 	}
2034 
2035 	io_unit_offset = ctx->io_unit_offset;
2036 	io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset);
2037 	io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary);
2038 	/*
2039 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
2040 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
2041 	 *  point to the current position in the I/O sequence.
2042 	 */
2043 	byte_count = ctx->io_units_done * blob->bs->io_unit_size;
2044 	orig_iov = &ctx->orig_iov[0];
2045 	orig_iovoff = 0;
2046 	while (byte_count > 0) {
2047 		if (byte_count >= orig_iov->iov_len) {
2048 			byte_count -= orig_iov->iov_len;
2049 			orig_iov++;
2050 		} else {
2051 			orig_iovoff = byte_count;
2052 			byte_count = 0;
2053 		}
2054 	}
2055 
2056 	/*
2057 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
2058 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
2059 	 */
2060 	byte_count = io_units_count * blob->bs->io_unit_size;
2061 	iov = &ctx->iov[0];
2062 	iovcnt = 0;
2063 	while (byte_count > 0) {
2064 		assert(iovcnt < ctx->iovcnt);
2065 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
2066 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
2067 		byte_count -= iov->iov_len;
2068 		orig_iovoff = 0;
2069 		orig_iov++;
2070 		iov++;
2071 		iovcnt++;
2072 	}
2073 
2074 	ctx->io_unit_offset += io_units_count;
2075 	ctx->io_units_remaining -= io_units_count;
2076 	ctx->io_units_done += io_units_count;
2077 	iov = &ctx->iov[0];
2078 
2079 	if (ctx->read) {
2080 		spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2081 				   io_units_count, _spdk_rw_iov_split_next, ctx);
2082 	} else {
2083 		spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset,
2084 				    io_units_count, _spdk_rw_iov_split_next, ctx);
2085 	}
2086 }
2087 
2088 static void
2089 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
2090 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2091 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
2092 {
2093 	struct spdk_bs_cpl	cpl;
2094 
2095 	assert(blob != NULL);
2096 
2097 	if (!read && blob->data_ro) {
2098 		cb_fn(cb_arg, -EPERM);
2099 		return;
2100 	}
2101 
2102 	if (length == 0) {
2103 		cb_fn(cb_arg, 0);
2104 		return;
2105 	}
2106 
2107 	if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
2108 		cb_fn(cb_arg, -EINVAL);
2109 		return;
2110 	}
2111 
2112 	/*
2113 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
2114 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
2115 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
2116 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
2117 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
2118 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
2119 	 *  but since this case happens very infrequently, any performance impact will be negligible.
2120 	 *
2121 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
2122 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
2123 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
2124 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
2125 	 */
2126 	if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) {
2127 		uint32_t lba_count;
2128 		uint64_t lba;
2129 
2130 		cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2131 		cpl.u.blob_basic.cb_fn = cb_fn;
2132 		cpl.u.blob_basic.cb_arg = cb_arg;
2133 
2134 		if (blob->frozen_refcnt) {
2135 			/* This blob I/O is frozen */
2136 			enum spdk_blob_op_type op_type;
2137 			spdk_bs_user_op_t *op;
2138 			struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel);
2139 
2140 			op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV;
2141 			op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length);
2142 			if (!op) {
2143 				cb_fn(cb_arg, -ENOMEM);
2144 				return;
2145 			}
2146 
2147 			TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link);
2148 
2149 			return;
2150 		}
2151 
2152 		_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
2153 
2154 		if (read) {
2155 			spdk_bs_sequence_t *seq;
2156 
2157 			seq = spdk_bs_sequence_start(_channel, &cpl);
2158 			if (!seq) {
2159 				cb_fn(cb_arg, -ENOMEM);
2160 				return;
2161 			}
2162 
2163 			if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2164 				spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2165 			} else {
2166 				spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count,
2167 							      _spdk_rw_iov_done, NULL);
2168 			}
2169 		} else {
2170 			if (_spdk_bs_io_unit_is_allocated(blob, offset)) {
2171 				spdk_bs_sequence_t *seq;
2172 
2173 				seq = spdk_bs_sequence_start(_channel, &cpl);
2174 				if (!seq) {
2175 					cb_fn(cb_arg, -ENOMEM);
2176 					return;
2177 				}
2178 
2179 				spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
2180 			} else {
2181 				/* Queue this operation and allocate the cluster */
2182 				spdk_bs_user_op_t *op;
2183 
2184 				op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset,
2185 							   length);
2186 				if (!op) {
2187 					cb_fn(cb_arg, -ENOMEM);
2188 					return;
2189 				}
2190 
2191 				_spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op);
2192 			}
2193 		}
2194 	} else {
2195 		struct rw_iov_ctx *ctx;
2196 
2197 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
2198 		if (ctx == NULL) {
2199 			cb_fn(cb_arg, -ENOMEM);
2200 			return;
2201 		}
2202 
2203 		ctx->blob = blob;
2204 		ctx->channel = _channel;
2205 		ctx->cb_fn = cb_fn;
2206 		ctx->cb_arg = cb_arg;
2207 		ctx->read = read;
2208 		ctx->orig_iov = iov;
2209 		ctx->iovcnt = iovcnt;
2210 		ctx->io_unit_offset = offset;
2211 		ctx->io_units_remaining = length;
2212 		ctx->io_units_done = 0;
2213 
2214 		_spdk_rw_iov_split_next(ctx, 0);
2215 	}
2216 }
2217 
2218 static struct spdk_blob *
2219 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
2220 {
2221 	struct spdk_blob *blob;
2222 
2223 	TAILQ_FOREACH(blob, &bs->blobs, link) {
2224 		if (blob->id == blobid) {
2225 			return blob;
2226 		}
2227 	}
2228 
2229 	return NULL;
2230 }
2231 
2232 static void
2233 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob,
2234 		struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry)
2235 {
2236 	assert(blob != NULL);
2237 	*snapshot_entry = NULL;
2238 	*clone_entry = NULL;
2239 
2240 	if (blob->parent_id == SPDK_BLOBID_INVALID) {
2241 		return;
2242 	}
2243 
2244 	TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) {
2245 		if ((*snapshot_entry)->id == blob->parent_id) {
2246 			break;
2247 		}
2248 	}
2249 
2250 	if (*snapshot_entry != NULL) {
2251 		TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) {
2252 			if ((*clone_entry)->id == blob->id) {
2253 				break;
2254 			}
2255 		}
2256 
2257 		assert(clone_entry != NULL);
2258 	}
2259 }
2260 
2261 static int
2262 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
2263 {
2264 	struct spdk_blob_store		*bs = io_device;
2265 	struct spdk_bs_channel		*channel = ctx_buf;
2266 	struct spdk_bs_dev		*dev;
2267 	uint32_t			max_ops = bs->max_channel_ops;
2268 	uint32_t			i;
2269 
2270 	dev = bs->dev;
2271 
2272 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
2273 	if (!channel->req_mem) {
2274 		return -1;
2275 	}
2276 
2277 	TAILQ_INIT(&channel->reqs);
2278 
2279 	for (i = 0; i < max_ops; i++) {
2280 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
2281 	}
2282 
2283 	channel->bs = bs;
2284 	channel->dev = dev;
2285 	channel->dev_channel = dev->create_channel(dev);
2286 
2287 	if (!channel->dev_channel) {
2288 		SPDK_ERRLOG("Failed to create device channel.\n");
2289 		free(channel->req_mem);
2290 		return -1;
2291 	}
2292 
2293 	TAILQ_INIT(&channel->need_cluster_alloc);
2294 	TAILQ_INIT(&channel->queued_io);
2295 
2296 	return 0;
2297 }
2298 
2299 static void
2300 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
2301 {
2302 	struct spdk_bs_channel *channel = ctx_buf;
2303 	spdk_bs_user_op_t *op;
2304 
2305 	while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) {
2306 		op = TAILQ_FIRST(&channel->need_cluster_alloc);
2307 		TAILQ_REMOVE(&channel->need_cluster_alloc, op, link);
2308 		spdk_bs_user_op_abort(op);
2309 	}
2310 
2311 	while (!TAILQ_EMPTY(&channel->queued_io)) {
2312 		op = TAILQ_FIRST(&channel->queued_io);
2313 		TAILQ_REMOVE(&channel->queued_io, op, link);
2314 		spdk_bs_user_op_abort(op);
2315 	}
2316 
2317 	free(channel->req_mem);
2318 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
2319 }
2320 
2321 static void
2322 _spdk_bs_dev_destroy(void *io_device)
2323 {
2324 	struct spdk_blob_store *bs = io_device;
2325 	struct spdk_blob	*blob, *blob_tmp;
2326 
2327 	bs->dev->destroy(bs->dev);
2328 
2329 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
2330 		TAILQ_REMOVE(&bs->blobs, blob, link);
2331 		_spdk_blob_free(blob);
2332 	}
2333 
2334 	pthread_mutex_destroy(&bs->used_clusters_mutex);
2335 
2336 	spdk_bit_array_free(&bs->used_blobids);
2337 	spdk_bit_array_free(&bs->used_md_pages);
2338 	spdk_bit_array_free(&bs->used_clusters);
2339 	/*
2340 	 * If this function is called for any reason except a successful unload,
2341 	 * the unload_cpl type will be NONE and this will be a nop.
2342 	 */
2343 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
2344 
2345 	free(bs);
2346 }
2347 
2348 static int
2349 _spdk_bs_blob_list_add(struct spdk_blob *blob)
2350 {
2351 	spdk_blob_id snapshot_id;
2352 	struct spdk_blob_list *snapshot_entry = NULL;
2353 	struct spdk_blob_list *clone_entry = NULL;
2354 
2355 	assert(blob != NULL);
2356 
2357 	snapshot_id = blob->parent_id;
2358 	if (snapshot_id == SPDK_BLOBID_INVALID) {
2359 		return 0;
2360 	}
2361 
2362 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id);
2363 	if (snapshot_entry == NULL) {
2364 		/* Snapshot not found */
2365 		snapshot_entry = calloc(1, sizeof(struct spdk_blob_list));
2366 		if (snapshot_entry == NULL) {
2367 			return -ENOMEM;
2368 		}
2369 		snapshot_entry->id = snapshot_id;
2370 		TAILQ_INIT(&snapshot_entry->clones);
2371 		TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link);
2372 	} else {
2373 		TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
2374 			if (clone_entry->id == blob->id) {
2375 				break;
2376 			}
2377 		}
2378 	}
2379 
2380 	if (clone_entry == NULL) {
2381 		/* Clone not found */
2382 		clone_entry = calloc(1, sizeof(struct spdk_blob_list));
2383 		if (clone_entry == NULL) {
2384 			return -ENOMEM;
2385 		}
2386 		clone_entry->id = blob->id;
2387 		TAILQ_INIT(&clone_entry->clones);
2388 		TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link);
2389 		snapshot_entry->clone_count++;
2390 	}
2391 
2392 	return 0;
2393 }
2394 
2395 static void
2396 _spdk_bs_blob_list_remove(struct spdk_blob *blob)
2397 {
2398 	struct spdk_blob_list *snapshot_entry = NULL;
2399 	struct spdk_blob_list *clone_entry = NULL;
2400 
2401 	_spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry);
2402 
2403 	if (snapshot_entry == NULL) {
2404 		return;
2405 	}
2406 
2407 	blob->parent_id = SPDK_BLOBID_INVALID;
2408 	TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2409 	free(clone_entry);
2410 
2411 	snapshot_entry->clone_count--;
2412 }
2413 
2414 static int
2415 _spdk_bs_blob_list_free(struct spdk_blob_store *bs)
2416 {
2417 	struct spdk_blob_list *snapshot_entry;
2418 	struct spdk_blob_list *snapshot_entry_tmp;
2419 	struct spdk_blob_list *clone_entry;
2420 	struct spdk_blob_list *clone_entry_tmp;
2421 
2422 	TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) {
2423 		TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) {
2424 			TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
2425 			free(clone_entry);
2426 		}
2427 		TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link);
2428 		free(snapshot_entry);
2429 	}
2430 
2431 	return 0;
2432 }
2433 
2434 static void
2435 _spdk_bs_free(struct spdk_blob_store *bs)
2436 {
2437 	_spdk_bs_blob_list_free(bs);
2438 
2439 	spdk_bs_unregister_md_thread(bs);
2440 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
2441 }
2442 
2443 void
2444 spdk_bs_opts_init(struct spdk_bs_opts *opts)
2445 {
2446 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
2447 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
2448 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
2449 	opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
2450 	opts->clear_method = BS_CLEAR_WITH_UNMAP;
2451 	memset(&opts->bstype, 0, sizeof(opts->bstype));
2452 	opts->iter_cb_fn = NULL;
2453 	opts->iter_cb_arg = NULL;
2454 }
2455 
2456 static int
2457 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
2458 {
2459 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
2460 	    opts->max_channel_ops == 0) {
2461 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2462 		return -1;
2463 	}
2464 
2465 	return 0;
2466 }
2467 
2468 static int
2469 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs)
2470 {
2471 	struct spdk_blob_store	*bs;
2472 	uint64_t dev_size;
2473 	int rc;
2474 
2475 	dev_size = dev->blocklen * dev->blockcnt;
2476 	if (dev_size < opts->cluster_sz) {
2477 		/* Device size cannot be smaller than cluster size of blobstore */
2478 		SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n",
2479 			     dev_size, opts->cluster_sz);
2480 		return -ENOSPC;
2481 	}
2482 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
2483 		/* Cluster size cannot be smaller than page size */
2484 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n",
2485 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
2486 		return -EINVAL;
2487 	}
2488 	bs = calloc(1, sizeof(struct spdk_blob_store));
2489 	if (!bs) {
2490 		return -ENOMEM;
2491 	}
2492 
2493 	TAILQ_INIT(&bs->blobs);
2494 	TAILQ_INIT(&bs->snapshots);
2495 	bs->dev = dev;
2496 	bs->md_thread = spdk_get_thread();
2497 	assert(bs->md_thread != NULL);
2498 
2499 	/*
2500 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2501 	 *  even multiple of the cluster size.
2502 	 */
2503 	bs->cluster_sz = opts->cluster_sz;
2504 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
2505 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2506 	bs->num_free_clusters = bs->total_clusters;
2507 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
2508 	bs->io_unit_size = dev->blocklen;
2509 	if (bs->used_clusters == NULL) {
2510 		free(bs);
2511 		return -ENOMEM;
2512 	}
2513 
2514 	bs->max_channel_ops = opts->max_channel_ops;
2515 	bs->super_blob = SPDK_BLOBID_INVALID;
2516 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
2517 
2518 	/* The metadata is assumed to be at least 1 page */
2519 	bs->used_md_pages = spdk_bit_array_create(1);
2520 	bs->used_blobids = spdk_bit_array_create(0);
2521 
2522 	pthread_mutex_init(&bs->used_clusters_mutex, NULL);
2523 
2524 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
2525 				sizeof(struct spdk_bs_channel), "blobstore");
2526 	rc = spdk_bs_register_md_thread(bs);
2527 	if (rc == -1) {
2528 		spdk_io_device_unregister(bs, NULL);
2529 		pthread_mutex_destroy(&bs->used_clusters_mutex);
2530 		spdk_bit_array_free(&bs->used_blobids);
2531 		spdk_bit_array_free(&bs->used_md_pages);
2532 		spdk_bit_array_free(&bs->used_clusters);
2533 		free(bs);
2534 		/* FIXME: this is a lie but don't know how to get a proper error code here */
2535 		return -ENOMEM;
2536 	}
2537 
2538 	*_bs = bs;
2539 	return 0;
2540 }
2541 
2542 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2543 
2544 struct spdk_bs_load_ctx {
2545 	struct spdk_blob_store		*bs;
2546 	struct spdk_bs_super_block	*super;
2547 
2548 	struct spdk_bs_md_mask		*mask;
2549 	bool				in_page_chain;
2550 	uint32_t			page_index;
2551 	uint32_t			cur_page;
2552 	struct spdk_blob_md_page	*page;
2553 
2554 	spdk_bs_sequence_t			*seq;
2555 	spdk_blob_op_with_handle_complete	iter_cb_fn;
2556 	void					*iter_cb_arg;
2557 	struct spdk_blob			*blob;
2558 	spdk_blob_id				blobid;
2559 };
2560 
2561 static void
2562 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2563 {
2564 	assert(bserrno != 0);
2565 
2566 	spdk_free(ctx->super);
2567 	spdk_bs_sequence_finish(seq, bserrno);
2568 	_spdk_bs_free(ctx->bs);
2569 	free(ctx);
2570 }
2571 
2572 static void
2573 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
2574 {
2575 	uint32_t i = 0;
2576 
2577 	while (true) {
2578 		i = spdk_bit_array_find_first_set(array, i);
2579 		if (i >= mask->length) {
2580 			break;
2581 		}
2582 		mask->mask[i / 8] |= 1U << (i % 8);
2583 		i++;
2584 	}
2585 }
2586 
2587 static int
2588 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask)
2589 {
2590 	struct spdk_bit_array *array;
2591 	uint32_t i;
2592 
2593 	if (spdk_bit_array_resize(array_ptr, mask->length) < 0) {
2594 		return -ENOMEM;
2595 	}
2596 
2597 	array = *array_ptr;
2598 	for (i = 0; i < mask->length; i++) {
2599 		if (mask->mask[i / 8] & (1U << (i % 8))) {
2600 			spdk_bit_array_set(array, i);
2601 		}
2602 	}
2603 
2604 	return 0;
2605 }
2606 
2607 static void
2608 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
2609 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
2610 {
2611 	/* Update the values in the super block */
2612 	super->super_blob = bs->super_blob;
2613 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
2614 	super->crc = _spdk_blob_md_page_calc_crc(super);
2615 	spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
2616 				   _spdk_bs_byte_to_lba(bs, sizeof(*super)),
2617 				   cb_fn, cb_arg);
2618 }
2619 
2620 static void
2621 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2622 {
2623 	struct spdk_bs_load_ctx	*ctx = arg;
2624 	uint64_t	mask_size, lba, lba_count;
2625 
2626 	/* Write out the used clusters mask */
2627 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2628 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
2629 				 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2630 	if (!ctx->mask) {
2631 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2632 		return;
2633 	}
2634 
2635 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
2636 	ctx->mask->length = ctx->bs->total_clusters;
2637 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
2638 
2639 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
2640 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2641 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2642 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2643 }
2644 
2645 static void
2646 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2647 {
2648 	struct spdk_bs_load_ctx	*ctx = arg;
2649 	uint64_t	mask_size, lba, lba_count;
2650 
2651 	if (seq->bserrno) {
2652 		_spdk_bs_load_ctx_fail(seq, ctx, seq->bserrno);
2653 		return;
2654 	}
2655 
2656 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2657 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
2658 				 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2659 	if (!ctx->mask) {
2660 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2661 		return;
2662 	}
2663 
2664 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
2665 	ctx->mask->length = ctx->super->md_len;
2666 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
2667 
2668 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
2669 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2670 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2671 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2672 }
2673 
2674 static void
2675 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2676 {
2677 	struct spdk_bs_load_ctx	*ctx = arg;
2678 	uint64_t	mask_size, lba, lba_count;
2679 
2680 	if (ctx->super->used_blobid_mask_len == 0) {
2681 		/*
2682 		 * This is a pre-v3 on-disk format where the blobid mask does not get
2683 		 *  written to disk.
2684 		 */
2685 		cb_fn(seq, arg, 0);
2686 		return;
2687 	}
2688 
2689 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2690 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
2691 				 SPDK_MALLOC_DMA);
2692 	if (!ctx->mask) {
2693 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2694 		return;
2695 	}
2696 
2697 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
2698 	ctx->mask->length = ctx->super->md_len;
2699 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
2700 
2701 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
2702 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2703 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2704 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2705 }
2706 
2707 static void
2708 _spdk_blob_set_thin_provision(struct spdk_blob *blob)
2709 {
2710 	_spdk_blob_verify_md_op(blob);
2711 	blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
2712 	blob->state = SPDK_BLOB_STATE_DIRTY;
2713 }
2714 
2715 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno);
2716 
2717 static void
2718 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno)
2719 {
2720 	struct spdk_bs_load_ctx *ctx = cb_arg;
2721 	spdk_blob_id id;
2722 	int64_t page_num;
2723 
2724 	/* Iterate to next blob (we can't use spdk_bs_iter_next function as our
2725 	 * last blob has been removed */
2726 	page_num = _spdk_bs_blobid_to_page(ctx->blobid);
2727 	page_num++;
2728 	page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num);
2729 	if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) {
2730 		_spdk_bs_load_iter(ctx, NULL, -ENOENT);
2731 		return;
2732 	}
2733 
2734 	id = _spdk_bs_page_to_blobid(page_num);
2735 
2736 	spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx);
2737 }
2738 
2739 static void
2740 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno)
2741 {
2742 	struct spdk_bs_load_ctx *ctx = cb_arg;
2743 
2744 	if (bserrno != 0) {
2745 		SPDK_ERRLOG("Failed to close corrupted blob\n");
2746 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2747 		return;
2748 	}
2749 
2750 	spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx);
2751 }
2752 
2753 static void
2754 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno)
2755 {
2756 	struct spdk_bs_load_ctx *ctx = cb_arg;
2757 	uint64_t i;
2758 
2759 	if (bserrno != 0) {
2760 		SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
2761 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2762 		return;
2763 	}
2764 
2765 	/* Snapshot and clone have the same copy of cluster map at this point.
2766 	 * Let's clear cluster map for snpashot now so that it won't be cleared
2767 	 * for clone later when we remove snapshot. Also set thin provision to
2768 	 * pass data corruption check */
2769 	for (i = 0; i < ctx->blob->active.num_clusters; i++) {
2770 		ctx->blob->active.clusters[i] = 0;
2771 	}
2772 
2773 	ctx->blob->md_ro = false;
2774 
2775 	_spdk_blob_set_thin_provision(ctx->blob);
2776 
2777 	ctx->blobid = ctx->blob->id;
2778 
2779 	spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx);
2780 }
2781 
2782 static void
2783 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno)
2784 {
2785 	struct spdk_bs_load_ctx *ctx = cb_arg;
2786 
2787 	if (bserrno != 0) {
2788 		SPDK_ERRLOG("Failed to close clone of a corrupted blob\n");
2789 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2790 		return;
2791 	}
2792 
2793 	ctx->blob->md_ro = false;
2794 	_spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true);
2795 	_spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true);
2796 	spdk_blob_set_read_only(ctx->blob);
2797 
2798 	if (ctx->iter_cb_fn) {
2799 		ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0);
2800 	}
2801 	_spdk_bs_blob_list_add(ctx->blob);
2802 
2803 	spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2804 }
2805 
2806 static void
2807 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno)
2808 {
2809 	struct spdk_bs_load_ctx *ctx = cb_arg;
2810 
2811 	if (bserrno != 0) {
2812 		SPDK_ERRLOG("Failed to open clone of a corrupted blob\n");
2813 		spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx);
2814 		return;
2815 	}
2816 
2817 	if (blob->parent_id == ctx->blob->id) {
2818 		/* Power failure occured before updating clone (snapshot delete case)
2819 		 * or after updating clone (creating snapshot case) - keep snapshot */
2820 		spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx);
2821 	} else {
2822 		/* Power failure occured after updating clone (snapshot delete case)
2823 		 * or before updating clone (creating snapshot case) - remove snapshot */
2824 		spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx);
2825 	}
2826 }
2827 
2828 static void
2829 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno)
2830 {
2831 	struct spdk_bs_load_ctx *ctx = arg;
2832 	const void *value;
2833 	size_t len;
2834 	int rc = 0;
2835 
2836 	if (bserrno == 0) {
2837 		/* Examine blob if it is corrupted after power failure. Fix
2838 		 * the ones that can be fixed and remove any other corrupted
2839 		 * ones. If it is not corrupted just process it */
2840 		rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true);
2841 		if (rc != 0) {
2842 			rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true);
2843 			if (rc != 0) {
2844 				/* Not corrupted - process it and continue with iterating through blobs */
2845 				if (ctx->iter_cb_fn) {
2846 					ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0);
2847 				}
2848 				_spdk_bs_blob_list_add(blob);
2849 				spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx);
2850 				return;
2851 			}
2852 
2853 		}
2854 
2855 		assert(len == sizeof(spdk_blob_id));
2856 
2857 		ctx->blob = blob;
2858 
2859 		/* Open clone to check if we are able to fix this blob or should we remove it */
2860 		spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx);
2861 		return;
2862 	} else if (bserrno == -ENOENT) {
2863 		bserrno = 0;
2864 	} else {
2865 		/*
2866 		 * This case needs to be looked at further.  Same problem
2867 		 *  exists with applications that rely on explicit blob
2868 		 *  iteration.  We should just skip the blob that failed
2869 		 *  to load and continue on to the next one.
2870 		 */
2871 		SPDK_ERRLOG("Error in iterating blobs\n");
2872 	}
2873 
2874 	ctx->iter_cb_fn = NULL;
2875 
2876 	spdk_free(ctx->super);
2877 	spdk_free(ctx->mask);
2878 	spdk_bs_sequence_finish(ctx->seq, bserrno);
2879 	free(ctx);
2880 }
2881 
2882 static void
2883 _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2884 {
2885 	ctx->seq = seq;
2886 	spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx);
2887 }
2888 
2889 static void
2890 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2891 {
2892 	struct spdk_bs_load_ctx *ctx = cb_arg;
2893 	int rc;
2894 
2895 	/* The type must be correct */
2896 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
2897 
2898 	/* The length of the mask (in bits) must not be greater than
2899 	 * the length of the buffer (converted to bits) */
2900 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
2901 
2902 	/* The length of the mask must be exactly equal to the size
2903 	 * (in pages) of the metadata region */
2904 	assert(ctx->mask->length == ctx->super->md_len);
2905 
2906 	rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask);
2907 	if (rc < 0) {
2908 		spdk_free(ctx->mask);
2909 		_spdk_bs_load_ctx_fail(seq, ctx, rc);
2910 		return;
2911 	}
2912 
2913 	_spdk_bs_load_complete(seq, ctx, bserrno);
2914 }
2915 
2916 static void
2917 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2918 {
2919 	struct spdk_bs_load_ctx *ctx = cb_arg;
2920 	uint64_t		lba, lba_count, mask_size;
2921 	int			rc;
2922 
2923 	/* The type must be correct */
2924 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
2925 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2926 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
2927 					     struct spdk_blob_md_page) * 8));
2928 	/* The length of the mask must be exactly equal to the total number of clusters */
2929 	assert(ctx->mask->length == ctx->bs->total_clusters);
2930 
2931 	rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask);
2932 	if (rc < 0) {
2933 		spdk_free(ctx->mask);
2934 		_spdk_bs_load_ctx_fail(seq, ctx, rc);
2935 		return;
2936 	}
2937 
2938 	ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters);
2939 	assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters);
2940 
2941 	spdk_free(ctx->mask);
2942 
2943 	/* Read the used blobids mask */
2944 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2945 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
2946 				 SPDK_MALLOC_DMA);
2947 	if (!ctx->mask) {
2948 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2949 		return;
2950 	}
2951 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2952 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2953 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2954 				  _spdk_bs_load_used_blobids_cpl, ctx);
2955 }
2956 
2957 static void
2958 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2959 {
2960 	struct spdk_bs_load_ctx *ctx = cb_arg;
2961 	uint64_t		lba, lba_count, mask_size;
2962 	int			rc;
2963 
2964 	/* The type must be correct */
2965 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
2966 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2967 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
2968 				     8));
2969 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
2970 	assert(ctx->mask->length == ctx->super->md_len);
2971 
2972 	rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask);
2973 	if (rc < 0) {
2974 		spdk_free(ctx->mask);
2975 		_spdk_bs_load_ctx_fail(seq, ctx, rc);
2976 		return;
2977 	}
2978 
2979 	spdk_free(ctx->mask);
2980 
2981 	/* Read the used clusters mask */
2982 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2983 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
2984 				 SPDK_MALLOC_DMA);
2985 	if (!ctx->mask) {
2986 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2987 		return;
2988 	}
2989 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2990 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2991 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2992 				  _spdk_bs_load_used_clusters_cpl, ctx);
2993 }
2994 
2995 static void
2996 _spdk_bs_load_read_used_pages(spdk_bs_sequence_t *seq, void *cb_arg)
2997 {
2998 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2999 	uint64_t lba, lba_count, mask_size;
3000 
3001 	/* Read the used pages mask */
3002 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
3003 	ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL,
3004 				 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3005 	if (!ctx->mask) {
3006 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3007 		return;
3008 	}
3009 
3010 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
3011 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
3012 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
3013 				  _spdk_bs_load_used_pages_cpl, ctx);
3014 }
3015 
3016 static int
3017 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
3018 {
3019 	struct spdk_blob_md_descriptor *desc;
3020 	size_t	cur_desc = 0;
3021 
3022 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
3023 	while (cur_desc < sizeof(page->descriptors)) {
3024 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
3025 			if (desc->length == 0) {
3026 				/* If padding and length are 0, this terminates the page */
3027 				break;
3028 			}
3029 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) {
3030 			struct spdk_blob_md_descriptor_extent_rle	*desc_extent_rle;
3031 			unsigned int				i, j;
3032 			unsigned int				cluster_count = 0;
3033 			uint32_t				cluster_idx;
3034 
3035 			desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc;
3036 
3037 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
3038 				for (j = 0; j < desc_extent_rle->extents[i].length; j++) {
3039 					cluster_idx = desc_extent_rle->extents[i].cluster_idx;
3040 					/*
3041 					 * cluster_idx = 0 means an unallocated cluster - don't mark that
3042 					 * in the used cluster map.
3043 					 */
3044 					if (cluster_idx != 0) {
3045 						spdk_bit_array_set(bs->used_clusters, cluster_idx + j);
3046 						if (bs->num_free_clusters == 0) {
3047 							return -ENOSPC;
3048 						}
3049 						bs->num_free_clusters--;
3050 					}
3051 					cluster_count++;
3052 				}
3053 			}
3054 			if (cluster_count == 0) {
3055 				return -EINVAL;
3056 			}
3057 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
3058 			/* Skip this item */
3059 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
3060 			/* Skip this item */
3061 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
3062 			/* Skip this item */
3063 		} else {
3064 			/* Error */
3065 			return -EINVAL;
3066 		}
3067 		/* Advance to the next descriptor */
3068 		cur_desc += sizeof(*desc) + desc->length;
3069 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
3070 			break;
3071 		}
3072 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
3073 	}
3074 	return 0;
3075 }
3076 
3077 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
3078 {
3079 	uint32_t crc;
3080 
3081 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
3082 	if (crc != ctx->page->crc) {
3083 		return false;
3084 	}
3085 
3086 	if (ctx->page->sequence_num == 0 &&
3087 	    _spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
3088 		return false;
3089 	}
3090 	return true;
3091 }
3092 
3093 static void
3094 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
3095 
3096 static void
3097 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3098 {
3099 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3100 
3101 	_spdk_bs_load_complete(seq, ctx, bserrno);
3102 }
3103 
3104 static void
3105 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3106 {
3107 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3108 
3109 	spdk_free(ctx->mask);
3110 	ctx->mask = NULL;
3111 
3112 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
3113 }
3114 
3115 static void
3116 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3117 {
3118 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3119 
3120 	spdk_free(ctx->mask);
3121 	ctx->mask = NULL;
3122 
3123 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
3124 }
3125 
3126 static void
3127 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3128 {
3129 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
3130 }
3131 
3132 static void
3133 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3134 {
3135 	struct spdk_bs_load_ctx *ctx = cb_arg;
3136 	uint64_t num_md_clusters;
3137 	uint64_t i;
3138 	uint32_t page_num;
3139 
3140 	if (bserrno != 0) {
3141 		_spdk_bs_load_ctx_fail(seq, ctx, bserrno);
3142 		return;
3143 	}
3144 
3145 	page_num = ctx->cur_page;
3146 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
3147 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
3148 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
3149 			if (ctx->page->sequence_num == 0) {
3150 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
3151 			}
3152 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
3153 				_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3154 				return;
3155 			}
3156 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
3157 				ctx->in_page_chain = true;
3158 				ctx->cur_page = ctx->page->next;
3159 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
3160 				return;
3161 			}
3162 		}
3163 	}
3164 
3165 	ctx->in_page_chain = false;
3166 
3167 	do {
3168 		ctx->page_index++;
3169 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
3170 
3171 	if (ctx->page_index < ctx->super->md_len) {
3172 		ctx->cur_page = ctx->page_index;
3173 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
3174 	} else {
3175 		/* Claim all of the clusters used by the metadata */
3176 		num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
3177 		for (i = 0; i < num_md_clusters; i++) {
3178 			_spdk_bs_claim_cluster(ctx->bs, i);
3179 		}
3180 		spdk_free(ctx->page);
3181 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
3182 	}
3183 }
3184 
3185 static void
3186 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
3187 {
3188 	struct spdk_bs_load_ctx *ctx = cb_arg;
3189 	uint64_t lba;
3190 
3191 	assert(ctx->cur_page < ctx->super->md_len);
3192 	lba = _spdk_bs_md_page_to_lba(ctx->bs, ctx->cur_page);
3193 	spdk_bs_sequence_read_dev(seq, ctx->page, lba,
3194 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3195 				  _spdk_bs_load_replay_md_cpl, ctx);
3196 }
3197 
3198 static void
3199 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
3200 {
3201 	struct spdk_bs_load_ctx *ctx = cb_arg;
3202 
3203 	ctx->page_index = 0;
3204 	ctx->cur_page = 0;
3205 	ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
3206 				 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3207 	if (!ctx->page) {
3208 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3209 		return;
3210 	}
3211 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
3212 }
3213 
3214 static void
3215 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg)
3216 {
3217 	struct spdk_bs_load_ctx *ctx = cb_arg;
3218 	int		rc;
3219 
3220 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
3221 	if (rc < 0) {
3222 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3223 		return;
3224 	}
3225 
3226 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
3227 	if (rc < 0) {
3228 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3229 		return;
3230 	}
3231 
3232 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3233 	if (rc < 0) {
3234 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3235 		return;
3236 	}
3237 
3238 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
3239 	_spdk_bs_load_replay_md(seq, cb_arg);
3240 }
3241 
3242 static void
3243 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3244 {
3245 	struct spdk_bs_load_ctx *ctx = cb_arg;
3246 	uint32_t	crc;
3247 	int		rc;
3248 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
3249 
3250 	if (ctx->super->version > SPDK_BS_VERSION ||
3251 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
3252 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3253 		return;
3254 	}
3255 
3256 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3257 		   sizeof(ctx->super->signature)) != 0) {
3258 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3259 		return;
3260 	}
3261 
3262 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
3263 	if (crc != ctx->super->crc) {
3264 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3265 		return;
3266 	}
3267 
3268 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3269 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
3270 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
3271 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
3272 	} else {
3273 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
3274 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3275 		SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
3276 		_spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
3277 		return;
3278 	}
3279 
3280 	if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) {
3281 		SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n",
3282 			       ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size);
3283 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
3284 		return;
3285 	}
3286 
3287 	if (ctx->super->size == 0) {
3288 		ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen;
3289 	}
3290 
3291 	if (ctx->super->io_unit_size == 0) {
3292 		ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE;
3293 	}
3294 
3295 	/* Parse the super block */
3296 	ctx->bs->clean = 1;
3297 	ctx->bs->cluster_sz = ctx->super->cluster_size;
3298 	ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size;
3299 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
3300 	ctx->bs->io_unit_size = ctx->super->io_unit_size;
3301 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
3302 	if (rc < 0) {
3303 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
3304 		return;
3305 	}
3306 	ctx->bs->md_start = ctx->super->md_start;
3307 	ctx->bs->md_len = ctx->super->md_len;
3308 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up(
3309 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
3310 	ctx->bs->super_blob = ctx->super->super_blob;
3311 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
3312 
3313 	if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) {
3314 		_spdk_bs_recover(seq, ctx);
3315 	} else {
3316 		_spdk_bs_load_read_used_pages(seq, ctx);
3317 	}
3318 }
3319 
3320 void
3321 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
3322 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
3323 {
3324 	struct spdk_blob_store	*bs;
3325 	struct spdk_bs_cpl	cpl;
3326 	spdk_bs_sequence_t	*seq;
3327 	struct spdk_bs_load_ctx *ctx;
3328 	struct spdk_bs_opts	opts = {};
3329 	int err;
3330 
3331 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
3332 
3333 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
3334 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen);
3335 		dev->destroy(dev);
3336 		cb_fn(cb_arg, NULL, -EINVAL);
3337 		return;
3338 	}
3339 
3340 	if (o) {
3341 		opts = *o;
3342 	} else {
3343 		spdk_bs_opts_init(&opts);
3344 	}
3345 
3346 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
3347 		dev->destroy(dev);
3348 		cb_fn(cb_arg, NULL, -EINVAL);
3349 		return;
3350 	}
3351 
3352 	err = _spdk_bs_alloc(dev, &opts, &bs);
3353 	if (err) {
3354 		dev->destroy(dev);
3355 		cb_fn(cb_arg, NULL, err);
3356 		return;
3357 	}
3358 
3359 	ctx = calloc(1, sizeof(*ctx));
3360 	if (!ctx) {
3361 		_spdk_bs_free(bs);
3362 		cb_fn(cb_arg, NULL, -ENOMEM);
3363 		return;
3364 	}
3365 
3366 	ctx->bs = bs;
3367 	ctx->iter_cb_fn = opts.iter_cb_fn;
3368 	ctx->iter_cb_arg = opts.iter_cb_arg;
3369 
3370 	/* Allocate memory for the super block */
3371 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
3372 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3373 	if (!ctx->super) {
3374 		free(ctx);
3375 		_spdk_bs_free(bs);
3376 		cb_fn(cb_arg, NULL, -ENOMEM);
3377 		return;
3378 	}
3379 
3380 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3381 	cpl.u.bs_handle.cb_fn = cb_fn;
3382 	cpl.u.bs_handle.cb_arg = cb_arg;
3383 	cpl.u.bs_handle.bs = bs;
3384 
3385 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3386 	if (!seq) {
3387 		spdk_free(ctx->super);
3388 		free(ctx);
3389 		_spdk_bs_free(bs);
3390 		cb_fn(cb_arg, NULL, -ENOMEM);
3391 		return;
3392 	}
3393 
3394 	/* Read the super block */
3395 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3396 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3397 				  _spdk_bs_load_super_cpl, ctx);
3398 }
3399 
3400 /* END spdk_bs_load */
3401 
3402 /* START spdk_bs_dump */
3403 
3404 struct spdk_bs_dump_ctx {
3405 	struct spdk_blob_store		*bs;
3406 	struct spdk_bs_super_block	*super;
3407 	uint32_t			cur_page;
3408 	struct spdk_blob_md_page	*page;
3409 	spdk_bs_sequence_t		*seq;
3410 	FILE				*fp;
3411 	spdk_bs_dump_print_xattr	print_xattr_fn;
3412 	char				xattr_name[4096];
3413 };
3414 
3415 static void
3416 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno)
3417 {
3418 	spdk_free(ctx->super);
3419 
3420 	/*
3421 	 * We need to defer calling spdk_bs_call_cpl() until after
3422 	 * dev destruction, so tuck these away for later use.
3423 	 */
3424 	ctx->bs->unload_err = bserrno;
3425 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3426 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3427 
3428 	spdk_bs_sequence_finish(seq, 0);
3429 	_spdk_bs_free(ctx->bs);
3430 	free(ctx);
3431 }
3432 
3433 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
3434 
3435 static void
3436 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx)
3437 {
3438 	uint32_t page_idx = ctx->cur_page;
3439 	struct spdk_blob_md_page *page = ctx->page;
3440 	struct spdk_blob_md_descriptor *desc;
3441 	size_t cur_desc = 0;
3442 	uint32_t crc;
3443 
3444 	fprintf(ctx->fp, "=========\n");
3445 	fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx);
3446 	fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id);
3447 
3448 	crc = _spdk_blob_md_page_calc_crc(page);
3449 	fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch");
3450 
3451 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
3452 	while (cur_desc < sizeof(page->descriptors)) {
3453 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
3454 			if (desc->length == 0) {
3455 				/* If padding and length are 0, this terminates the page */
3456 				break;
3457 			}
3458 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) {
3459 			struct spdk_blob_md_descriptor_extent_rle	*desc_extent_rle;
3460 			unsigned int				i;
3461 
3462 			desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc;
3463 
3464 			for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) {
3465 				if (desc_extent_rle->extents[i].cluster_idx != 0) {
3466 					fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32,
3467 						desc_extent_rle->extents[i].cluster_idx);
3468 				} else {
3469 					fprintf(ctx->fp, "Unallocated Extent - ");
3470 				}
3471 				fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length);
3472 				fprintf(ctx->fp, "\n");
3473 			}
3474 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
3475 			struct spdk_blob_md_descriptor_xattr *desc_xattr;
3476 			uint32_t i;
3477 
3478 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
3479 
3480 			if (desc_xattr->length !=
3481 			    sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) +
3482 			    desc_xattr->name_length + desc_xattr->value_length) {
3483 			}
3484 
3485 			memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length);
3486 			ctx->xattr_name[desc_xattr->name_length] = '\0';
3487 			fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name);
3488 			fprintf(ctx->fp, "       value = \"");
3489 			ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name,
3490 					    (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
3491 					    desc_xattr->value_length);
3492 			fprintf(ctx->fp, "\"\n");
3493 			for (i = 0; i < desc_xattr->value_length; i++) {
3494 				if (i % 16 == 0) {
3495 					fprintf(ctx->fp, "               ");
3496 				}
3497 				fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i));
3498 				if ((i + 1) % 16 == 0) {
3499 					fprintf(ctx->fp, "\n");
3500 				}
3501 			}
3502 			if (i % 16 != 0) {
3503 				fprintf(ctx->fp, "\n");
3504 			}
3505 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
3506 			/* TODO */
3507 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
3508 			/* TODO */
3509 		} else {
3510 			/* Error */
3511 		}
3512 		/* Advance to the next descriptor */
3513 		cur_desc += sizeof(*desc) + desc->length;
3514 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
3515 			break;
3516 		}
3517 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
3518 	}
3519 }
3520 
3521 static void
3522 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3523 {
3524 	struct spdk_bs_dump_ctx *ctx = cb_arg;
3525 
3526 	if (bserrno != 0) {
3527 		_spdk_bs_dump_finish(seq, ctx, bserrno);
3528 		return;
3529 	}
3530 
3531 	if (ctx->page->id != 0) {
3532 		_spdk_bs_dump_print_md_page(ctx);
3533 	}
3534 
3535 	ctx->cur_page++;
3536 
3537 	if (ctx->cur_page < ctx->super->md_len) {
3538 		_spdk_bs_dump_read_md_page(seq, cb_arg);
3539 	} else {
3540 		spdk_free(ctx->page);
3541 		_spdk_bs_dump_finish(seq, ctx, 0);
3542 	}
3543 }
3544 
3545 static void
3546 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
3547 {
3548 	struct spdk_bs_dump_ctx *ctx = cb_arg;
3549 	uint64_t lba;
3550 
3551 	assert(ctx->cur_page < ctx->super->md_len);
3552 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
3553 	spdk_bs_sequence_read_dev(seq, ctx->page, lba,
3554 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
3555 				  _spdk_bs_dump_read_md_page_cpl, ctx);
3556 }
3557 
3558 static void
3559 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3560 {
3561 	struct spdk_bs_dump_ctx *ctx = cb_arg;
3562 
3563 	fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature);
3564 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3565 		   sizeof(ctx->super->signature)) != 0) {
3566 		fprintf(ctx->fp, "(Mismatch)\n");
3567 		_spdk_bs_dump_finish(seq, ctx, bserrno);
3568 		return;
3569 	} else {
3570 		fprintf(ctx->fp, "(OK)\n");
3571 	}
3572 	fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version);
3573 	fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc,
3574 		(ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch");
3575 	fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype);
3576 	fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size);
3577 	fprintf(ctx->fp, "Super Blob ID: ");
3578 	if (ctx->super->super_blob == SPDK_BLOBID_INVALID) {
3579 		fprintf(ctx->fp, "(None)\n");
3580 	} else {
3581 		fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob);
3582 	}
3583 	fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean);
3584 	fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start);
3585 	fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len);
3586 	fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start);
3587 	fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len);
3588 	fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start);
3589 	fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len);
3590 	fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start);
3591 	fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len);
3592 
3593 	ctx->cur_page = 0;
3594 	ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE,
3595 				 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3596 	if (!ctx->page) {
3597 		_spdk_bs_dump_finish(seq, ctx, -ENOMEM);
3598 		return;
3599 	}
3600 	_spdk_bs_dump_read_md_page(seq, cb_arg);
3601 }
3602 
3603 void
3604 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn,
3605 	     spdk_bs_op_complete cb_fn, void *cb_arg)
3606 {
3607 	struct spdk_blob_store	*bs;
3608 	struct spdk_bs_cpl	cpl;
3609 	spdk_bs_sequence_t	*seq;
3610 	struct spdk_bs_dump_ctx *ctx;
3611 	struct spdk_bs_opts	opts = {};
3612 	int err;
3613 
3614 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev);
3615 
3616 	spdk_bs_opts_init(&opts);
3617 
3618 	err = _spdk_bs_alloc(dev, &opts, &bs);
3619 	if (err) {
3620 		dev->destroy(dev);
3621 		cb_fn(cb_arg, err);
3622 		return;
3623 	}
3624 
3625 	ctx = calloc(1, sizeof(*ctx));
3626 	if (!ctx) {
3627 		_spdk_bs_free(bs);
3628 		cb_fn(cb_arg, -ENOMEM);
3629 		return;
3630 	}
3631 
3632 	ctx->bs = bs;
3633 	ctx->fp = fp;
3634 	ctx->print_xattr_fn = print_xattr_fn;
3635 
3636 	/* Allocate memory for the super block */
3637 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
3638 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3639 	if (!ctx->super) {
3640 		free(ctx);
3641 		_spdk_bs_free(bs);
3642 		cb_fn(cb_arg, -ENOMEM);
3643 		return;
3644 	}
3645 
3646 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3647 	cpl.u.bs_basic.cb_fn = cb_fn;
3648 	cpl.u.bs_basic.cb_arg = cb_arg;
3649 
3650 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3651 	if (!seq) {
3652 		spdk_free(ctx->super);
3653 		free(ctx);
3654 		_spdk_bs_free(bs);
3655 		cb_fn(cb_arg, -ENOMEM);
3656 		return;
3657 	}
3658 
3659 	/* Read the super block */
3660 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3661 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3662 				  _spdk_bs_dump_super_cpl, ctx);
3663 }
3664 
3665 /* END spdk_bs_dump */
3666 
3667 /* START spdk_bs_init */
3668 
3669 struct spdk_bs_init_ctx {
3670 	struct spdk_blob_store		*bs;
3671 	struct spdk_bs_super_block	*super;
3672 };
3673 
3674 static void
3675 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3676 {
3677 	struct spdk_bs_init_ctx *ctx = cb_arg;
3678 
3679 	spdk_free(ctx->super);
3680 	free(ctx);
3681 
3682 	spdk_bs_sequence_finish(seq, bserrno);
3683 }
3684 
3685 static void
3686 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3687 {
3688 	struct spdk_bs_init_ctx *ctx = cb_arg;
3689 
3690 	/* Write super block */
3691 	spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
3692 				   _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
3693 				   _spdk_bs_init_persist_super_cpl, ctx);
3694 }
3695 
3696 void
3697 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
3698 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
3699 {
3700 	struct spdk_bs_init_ctx *ctx;
3701 	struct spdk_blob_store	*bs;
3702 	struct spdk_bs_cpl	cpl;
3703 	spdk_bs_sequence_t	*seq;
3704 	spdk_bs_batch_t		*batch;
3705 	uint64_t		num_md_lba;
3706 	uint64_t		num_md_pages;
3707 	uint64_t		num_md_clusters;
3708 	uint32_t		i;
3709 	struct spdk_bs_opts	opts = {};
3710 	int			rc;
3711 
3712 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
3713 
3714 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
3715 		SPDK_ERRLOG("unsupported dev block length of %d\n",
3716 			    dev->blocklen);
3717 		dev->destroy(dev);
3718 		cb_fn(cb_arg, NULL, -EINVAL);
3719 		return;
3720 	}
3721 
3722 	if (o) {
3723 		opts = *o;
3724 	} else {
3725 		spdk_bs_opts_init(&opts);
3726 	}
3727 
3728 	if (_spdk_bs_opts_verify(&opts) != 0) {
3729 		dev->destroy(dev);
3730 		cb_fn(cb_arg, NULL, -EINVAL);
3731 		return;
3732 	}
3733 
3734 	rc = _spdk_bs_alloc(dev, &opts, &bs);
3735 	if (rc) {
3736 		dev->destroy(dev);
3737 		cb_fn(cb_arg, NULL, rc);
3738 		return;
3739 	}
3740 
3741 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
3742 		/* By default, allocate 1 page per cluster.
3743 		 * Technically, this over-allocates metadata
3744 		 * because more metadata will reduce the number
3745 		 * of usable clusters. This can be addressed with
3746 		 * more complex math in the future.
3747 		 */
3748 		bs->md_len = bs->total_clusters;
3749 	} else {
3750 		bs->md_len = opts.num_md_pages;
3751 	}
3752 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
3753 	if (rc < 0) {
3754 		_spdk_bs_free(bs);
3755 		cb_fn(cb_arg, NULL, -ENOMEM);
3756 		return;
3757 	}
3758 
3759 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
3760 	if (rc < 0) {
3761 		_spdk_bs_free(bs);
3762 		cb_fn(cb_arg, NULL, -ENOMEM);
3763 		return;
3764 	}
3765 
3766 	ctx = calloc(1, sizeof(*ctx));
3767 	if (!ctx) {
3768 		_spdk_bs_free(bs);
3769 		cb_fn(cb_arg, NULL, -ENOMEM);
3770 		return;
3771 	}
3772 
3773 	ctx->bs = bs;
3774 
3775 	/* Allocate memory for the super block */
3776 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
3777 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
3778 	if (!ctx->super) {
3779 		free(ctx);
3780 		_spdk_bs_free(bs);
3781 		cb_fn(cb_arg, NULL, -ENOMEM);
3782 		return;
3783 	}
3784 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3785 	       sizeof(ctx->super->signature));
3786 	ctx->super->version = SPDK_BS_VERSION;
3787 	ctx->super->length = sizeof(*ctx->super);
3788 	ctx->super->super_blob = bs->super_blob;
3789 	ctx->super->clean = 0;
3790 	ctx->super->cluster_size = bs->cluster_sz;
3791 	ctx->super->io_unit_size = bs->io_unit_size;
3792 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
3793 
3794 	/* Calculate how many pages the metadata consumes at the front
3795 	 * of the disk.
3796 	 */
3797 
3798 	/* The super block uses 1 page */
3799 	num_md_pages = 1;
3800 
3801 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
3802 	 * up to the nearest page, plus a header.
3803 	 */
3804 	ctx->super->used_page_mask_start = num_md_pages;
3805 	ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
3806 					 spdk_divide_round_up(bs->md_len, 8),
3807 					 SPDK_BS_PAGE_SIZE);
3808 	num_md_pages += ctx->super->used_page_mask_len;
3809 
3810 	/* The used_clusters mask requires 1 bit per cluster, rounded
3811 	 * up to the nearest page, plus a header.
3812 	 */
3813 	ctx->super->used_cluster_mask_start = num_md_pages;
3814 	ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
3815 					    spdk_divide_round_up(bs->total_clusters, 8),
3816 					    SPDK_BS_PAGE_SIZE);
3817 	num_md_pages += ctx->super->used_cluster_mask_len;
3818 
3819 	/* The used_blobids mask requires 1 bit per metadata page, rounded
3820 	 * up to the nearest page, plus a header.
3821 	 */
3822 	ctx->super->used_blobid_mask_start = num_md_pages;
3823 	ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) +
3824 					   spdk_divide_round_up(bs->md_len, 8),
3825 					   SPDK_BS_PAGE_SIZE);
3826 	num_md_pages += ctx->super->used_blobid_mask_len;
3827 
3828 	/* The metadata region size was chosen above */
3829 	ctx->super->md_start = bs->md_start = num_md_pages;
3830 	ctx->super->md_len = bs->md_len;
3831 	num_md_pages += bs->md_len;
3832 
3833 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
3834 
3835 	ctx->super->size = dev->blockcnt * dev->blocklen;
3836 
3837 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
3838 
3839 	num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster);
3840 	if (num_md_clusters > bs->total_clusters) {
3841 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
3842 			    "please decrease number of pages reserved for metadata "
3843 			    "or increase cluster size.\n");
3844 		spdk_free(ctx->super);
3845 		free(ctx);
3846 		_spdk_bs_free(bs);
3847 		cb_fn(cb_arg, NULL, -ENOMEM);
3848 		return;
3849 	}
3850 	/* Claim all of the clusters used by the metadata */
3851 	for (i = 0; i < num_md_clusters; i++) {
3852 		_spdk_bs_claim_cluster(bs, i);
3853 	}
3854 
3855 	bs->total_data_clusters = bs->num_free_clusters;
3856 
3857 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3858 	cpl.u.bs_handle.cb_fn = cb_fn;
3859 	cpl.u.bs_handle.cb_arg = cb_arg;
3860 	cpl.u.bs_handle.bs = bs;
3861 
3862 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3863 	if (!seq) {
3864 		spdk_free(ctx->super);
3865 		free(ctx);
3866 		_spdk_bs_free(bs);
3867 		cb_fn(cb_arg, NULL, -ENOMEM);
3868 		return;
3869 	}
3870 
3871 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
3872 
3873 	/* Clear metadata space */
3874 	spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
3875 
3876 	switch (opts.clear_method) {
3877 	case BS_CLEAR_WITH_UNMAP:
3878 		/* Trim data clusters */
3879 		spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3880 		break;
3881 	case BS_CLEAR_WITH_WRITE_ZEROES:
3882 		/* Write_zeroes to data clusters */
3883 		spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3884 		break;
3885 	case BS_CLEAR_WITH_NONE:
3886 	default:
3887 		break;
3888 	}
3889 
3890 	spdk_bs_batch_close(batch);
3891 }
3892 
3893 /* END spdk_bs_init */
3894 
3895 /* START spdk_bs_destroy */
3896 
3897 static void
3898 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3899 {
3900 	struct spdk_bs_init_ctx *ctx = cb_arg;
3901 	struct spdk_blob_store *bs = ctx->bs;
3902 
3903 	/*
3904 	 * We need to defer calling spdk_bs_call_cpl() until after
3905 	 * dev destruction, so tuck these away for later use.
3906 	 */
3907 	bs->unload_err = bserrno;
3908 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3909 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3910 
3911 	spdk_bs_sequence_finish(seq, bserrno);
3912 
3913 	_spdk_bs_free(bs);
3914 	free(ctx);
3915 }
3916 
3917 void
3918 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
3919 		void *cb_arg)
3920 {
3921 	struct spdk_bs_cpl	cpl;
3922 	spdk_bs_sequence_t	*seq;
3923 	struct spdk_bs_init_ctx *ctx;
3924 
3925 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
3926 
3927 	if (!TAILQ_EMPTY(&bs->blobs)) {
3928 		SPDK_ERRLOG("Blobstore still has open blobs\n");
3929 		cb_fn(cb_arg, -EBUSY);
3930 		return;
3931 	}
3932 
3933 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3934 	cpl.u.bs_basic.cb_fn = cb_fn;
3935 	cpl.u.bs_basic.cb_arg = cb_arg;
3936 
3937 	ctx = calloc(1, sizeof(*ctx));
3938 	if (!ctx) {
3939 		cb_fn(cb_arg, -ENOMEM);
3940 		return;
3941 	}
3942 
3943 	ctx->bs = bs;
3944 
3945 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3946 	if (!seq) {
3947 		free(ctx);
3948 		cb_fn(cb_arg, -ENOMEM);
3949 		return;
3950 	}
3951 
3952 	/* Write zeroes to the super block */
3953 	spdk_bs_sequence_write_zeroes_dev(seq,
3954 					  _spdk_bs_page_to_lba(bs, 0),
3955 					  _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
3956 					  _spdk_bs_destroy_trim_cpl, ctx);
3957 }
3958 
3959 /* END spdk_bs_destroy */
3960 
3961 /* START spdk_bs_unload */
3962 
3963 static void
3964 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3965 {
3966 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3967 
3968 	spdk_free(ctx->super);
3969 
3970 	/*
3971 	 * We need to defer calling spdk_bs_call_cpl() until after
3972 	 * dev destruction, so tuck these away for later use.
3973 	 */
3974 	ctx->bs->unload_err = bserrno;
3975 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3976 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3977 
3978 	spdk_bs_sequence_finish(seq, bserrno);
3979 
3980 	_spdk_bs_free(ctx->bs);
3981 	free(ctx);
3982 }
3983 
3984 static void
3985 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3986 {
3987 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3988 
3989 	spdk_free(ctx->mask);
3990 	ctx->super->clean = 1;
3991 
3992 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
3993 }
3994 
3995 static void
3996 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3997 {
3998 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3999 
4000 	spdk_free(ctx->mask);
4001 	ctx->mask = NULL;
4002 
4003 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
4004 }
4005 
4006 static void
4007 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4008 {
4009 	struct spdk_bs_load_ctx	*ctx = cb_arg;
4010 
4011 	spdk_free(ctx->mask);
4012 	ctx->mask = NULL;
4013 
4014 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
4015 }
4016 
4017 static void
4018 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4019 {
4020 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
4021 }
4022 
4023 void
4024 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
4025 {
4026 	struct spdk_bs_cpl	cpl;
4027 	spdk_bs_sequence_t	*seq;
4028 	struct spdk_bs_load_ctx *ctx;
4029 
4030 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
4031 
4032 	if (!TAILQ_EMPTY(&bs->blobs)) {
4033 		SPDK_ERRLOG("Blobstore still has open blobs\n");
4034 		cb_fn(cb_arg, -EBUSY);
4035 		return;
4036 	}
4037 
4038 	ctx = calloc(1, sizeof(*ctx));
4039 	if (!ctx) {
4040 		cb_fn(cb_arg, -ENOMEM);
4041 		return;
4042 	}
4043 
4044 	ctx->bs = bs;
4045 
4046 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4047 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4048 	if (!ctx->super) {
4049 		free(ctx);
4050 		cb_fn(cb_arg, -ENOMEM);
4051 		return;
4052 	}
4053 
4054 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4055 	cpl.u.bs_basic.cb_fn = cb_fn;
4056 	cpl.u.bs_basic.cb_arg = cb_arg;
4057 
4058 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4059 	if (!seq) {
4060 		spdk_free(ctx->super);
4061 		free(ctx);
4062 		cb_fn(cb_arg, -ENOMEM);
4063 		return;
4064 	}
4065 
4066 	/* Read super block */
4067 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
4068 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
4069 				  _spdk_bs_unload_read_super_cpl, ctx);
4070 }
4071 
4072 /* END spdk_bs_unload */
4073 
4074 /* START spdk_bs_set_super */
4075 
4076 struct spdk_bs_set_super_ctx {
4077 	struct spdk_blob_store		*bs;
4078 	struct spdk_bs_super_block	*super;
4079 };
4080 
4081 static void
4082 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4083 {
4084 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
4085 
4086 	if (bserrno != 0) {
4087 		SPDK_ERRLOG("Unable to write to super block of blobstore\n");
4088 	}
4089 
4090 	spdk_free(ctx->super);
4091 
4092 	spdk_bs_sequence_finish(seq, bserrno);
4093 
4094 	free(ctx);
4095 }
4096 
4097 static void
4098 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4099 {
4100 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
4101 
4102 	if (bserrno != 0) {
4103 		SPDK_ERRLOG("Unable to read super block of blobstore\n");
4104 		spdk_free(ctx->super);
4105 		spdk_bs_sequence_finish(seq, bserrno);
4106 		free(ctx);
4107 		return;
4108 	}
4109 
4110 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx);
4111 }
4112 
4113 void
4114 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
4115 		  spdk_bs_op_complete cb_fn, void *cb_arg)
4116 {
4117 	struct spdk_bs_cpl		cpl;
4118 	spdk_bs_sequence_t		*seq;
4119 	struct spdk_bs_set_super_ctx	*ctx;
4120 
4121 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n");
4122 
4123 	ctx = calloc(1, sizeof(*ctx));
4124 	if (!ctx) {
4125 		cb_fn(cb_arg, -ENOMEM);
4126 		return;
4127 	}
4128 
4129 	ctx->bs = bs;
4130 
4131 	ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL,
4132 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
4133 	if (!ctx->super) {
4134 		free(ctx);
4135 		cb_fn(cb_arg, -ENOMEM);
4136 		return;
4137 	}
4138 
4139 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
4140 	cpl.u.bs_basic.cb_fn = cb_fn;
4141 	cpl.u.bs_basic.cb_arg = cb_arg;
4142 
4143 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4144 	if (!seq) {
4145 		spdk_free(ctx->super);
4146 		free(ctx);
4147 		cb_fn(cb_arg, -ENOMEM);
4148 		return;
4149 	}
4150 
4151 	bs->super_blob = blobid;
4152 
4153 	/* Read super block */
4154 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
4155 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
4156 				  _spdk_bs_set_super_read_cpl, ctx);
4157 }
4158 
4159 /* END spdk_bs_set_super */
4160 
4161 void
4162 spdk_bs_get_super(struct spdk_blob_store *bs,
4163 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4164 {
4165 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
4166 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
4167 	} else {
4168 		cb_fn(cb_arg, bs->super_blob, 0);
4169 	}
4170 }
4171 
4172 uint64_t
4173 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
4174 {
4175 	return bs->cluster_sz;
4176 }
4177 
4178 uint64_t
4179 spdk_bs_get_page_size(struct spdk_blob_store *bs)
4180 {
4181 	return SPDK_BS_PAGE_SIZE;
4182 }
4183 
4184 uint64_t
4185 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs)
4186 {
4187 	return bs->io_unit_size;
4188 }
4189 
4190 uint64_t
4191 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
4192 {
4193 	return bs->num_free_clusters;
4194 }
4195 
4196 uint64_t
4197 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
4198 {
4199 	return bs->total_data_clusters;
4200 }
4201 
4202 static int
4203 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
4204 {
4205 	bs->md_channel = spdk_get_io_channel(bs);
4206 	if (!bs->md_channel) {
4207 		SPDK_ERRLOG("Failed to get IO channel.\n");
4208 		return -1;
4209 	}
4210 
4211 	return 0;
4212 }
4213 
4214 static int
4215 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
4216 {
4217 	spdk_put_io_channel(bs->md_channel);
4218 
4219 	return 0;
4220 }
4221 
4222 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
4223 {
4224 	assert(blob != NULL);
4225 
4226 	return blob->id;
4227 }
4228 
4229 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
4230 {
4231 	assert(blob != NULL);
4232 
4233 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
4234 }
4235 
4236 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob)
4237 {
4238 	assert(blob != NULL);
4239 
4240 	return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs);
4241 }
4242 
4243 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
4244 {
4245 	assert(blob != NULL);
4246 
4247 	return blob->active.num_clusters;
4248 }
4249 
4250 /* START spdk_bs_create_blob */
4251 
4252 static void
4253 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4254 {
4255 	struct spdk_blob *blob = cb_arg;
4256 
4257 	_spdk_blob_free(blob);
4258 
4259 	spdk_bs_sequence_finish(seq, bserrno);
4260 }
4261 
4262 static int
4263 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs,
4264 		      bool internal)
4265 {
4266 	uint64_t i;
4267 	size_t value_len = 0;
4268 	int rc;
4269 	const void *value = NULL;
4270 	if (xattrs->count > 0 && xattrs->get_value == NULL) {
4271 		return -EINVAL;
4272 	}
4273 	for (i = 0; i < xattrs->count; i++) {
4274 		xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len);
4275 		if (value == NULL || value_len == 0) {
4276 			return -EINVAL;
4277 		}
4278 		rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal);
4279 		if (rc < 0) {
4280 			return rc;
4281 		}
4282 	}
4283 	return 0;
4284 }
4285 
4286 static void
4287 _spdk_bs_create_blob(struct spdk_blob_store *bs,
4288 		     const struct spdk_blob_opts *opts,
4289 		     const struct spdk_blob_xattr_opts *internal_xattrs,
4290 		     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4291 {
4292 	struct spdk_blob	*blob;
4293 	uint32_t		page_idx;
4294 	struct spdk_bs_cpl	cpl;
4295 	struct spdk_blob_opts	opts_default;
4296 	struct spdk_blob_xattr_opts internal_xattrs_default;
4297 	spdk_bs_sequence_t	*seq;
4298 	spdk_blob_id		id;
4299 	int rc;
4300 
4301 	assert(spdk_get_thread() == bs->md_thread);
4302 
4303 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
4304 	if (page_idx == UINT32_MAX) {
4305 		cb_fn(cb_arg, 0, -ENOMEM);
4306 		return;
4307 	}
4308 	spdk_bit_array_set(bs->used_blobids, page_idx);
4309 	spdk_bit_array_set(bs->used_md_pages, page_idx);
4310 
4311 	id = _spdk_bs_page_to_blobid(page_idx);
4312 
4313 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
4314 
4315 	blob = _spdk_blob_alloc(bs, id);
4316 	if (!blob) {
4317 		cb_fn(cb_arg, 0, -ENOMEM);
4318 		return;
4319 	}
4320 
4321 	if (!opts) {
4322 		spdk_blob_opts_init(&opts_default);
4323 		opts = &opts_default;
4324 	}
4325 	if (!internal_xattrs) {
4326 		_spdk_blob_xattrs_init(&internal_xattrs_default);
4327 		internal_xattrs = &internal_xattrs_default;
4328 	}
4329 
4330 	rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false);
4331 	if (rc < 0) {
4332 		_spdk_blob_free(blob);
4333 		cb_fn(cb_arg, 0, rc);
4334 		return;
4335 	}
4336 
4337 	rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true);
4338 	if (rc < 0) {
4339 		_spdk_blob_free(blob);
4340 		cb_fn(cb_arg, 0, rc);
4341 		return;
4342 	}
4343 
4344 	if (opts->thin_provision) {
4345 		_spdk_blob_set_thin_provision(blob);
4346 	}
4347 
4348 	rc = _spdk_blob_resize(blob, opts->num_clusters);
4349 	if (rc < 0) {
4350 		_spdk_blob_free(blob);
4351 		cb_fn(cb_arg, 0, rc);
4352 		return;
4353 	}
4354 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4355 	cpl.u.blobid.cb_fn = cb_fn;
4356 	cpl.u.blobid.cb_arg = cb_arg;
4357 	cpl.u.blobid.blobid = blob->id;
4358 
4359 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4360 	if (!seq) {
4361 		_spdk_blob_free(blob);
4362 		cb_fn(cb_arg, 0, -ENOMEM);
4363 		return;
4364 	}
4365 
4366 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
4367 }
4368 
4369 void spdk_bs_create_blob(struct spdk_blob_store *bs,
4370 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4371 {
4372 	_spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg);
4373 }
4374 
4375 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
4376 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4377 {
4378 	_spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg);
4379 }
4380 
4381 /* END spdk_bs_create_blob */
4382 
4383 /* START blob_cleanup */
4384 
4385 struct spdk_clone_snapshot_ctx {
4386 	struct spdk_bs_cpl      cpl;
4387 	int bserrno;
4388 	bool frozen;
4389 
4390 	struct spdk_io_channel *channel;
4391 
4392 	/* Current cluster for inflate operation */
4393 	uint64_t cluster;
4394 
4395 	/* For inflation force allocation of all unallocated clusters and remove
4396 	 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */
4397 	bool allocate_all;
4398 
4399 	struct {
4400 		spdk_blob_id id;
4401 		struct spdk_blob *blob;
4402 	} original;
4403 	struct {
4404 		spdk_blob_id id;
4405 		struct spdk_blob *blob;
4406 	} new;
4407 
4408 	/* xattrs specified for snapshot/clones only. They have no impact on
4409 	 * the original blobs xattrs. */
4410 	const struct spdk_blob_xattr_opts *xattrs;
4411 };
4412 
4413 static void
4414 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno)
4415 {
4416 	struct spdk_clone_snapshot_ctx *ctx = cb_arg;
4417 	struct spdk_bs_cpl *cpl = &ctx->cpl;
4418 
4419 	if (bserrno != 0) {
4420 		if (ctx->bserrno != 0) {
4421 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4422 		} else {
4423 			ctx->bserrno = bserrno;
4424 		}
4425 	}
4426 
4427 	switch (cpl->type) {
4428 	case SPDK_BS_CPL_TYPE_BLOBID:
4429 		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno);
4430 		break;
4431 	case SPDK_BS_CPL_TYPE_BLOB_BASIC:
4432 		cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno);
4433 		break;
4434 	default:
4435 		SPDK_UNREACHABLE();
4436 		break;
4437 	}
4438 
4439 	free(ctx);
4440 }
4441 
4442 static void
4443 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno)
4444 {
4445 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4446 	struct spdk_blob *origblob = ctx->original.blob;
4447 
4448 	if (bserrno != 0) {
4449 		if (ctx->bserrno != 0) {
4450 			SPDK_ERRLOG("Unfreeze error %d\n", bserrno);
4451 		} else {
4452 			ctx->bserrno = bserrno;
4453 		}
4454 	}
4455 
4456 	ctx->original.id = origblob->id;
4457 	origblob->locked_operation_in_progress = false;
4458 
4459 	spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4460 }
4461 
4462 static void
4463 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
4464 {
4465 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4466 	struct spdk_blob *origblob = ctx->original.blob;
4467 
4468 	if (bserrno != 0) {
4469 		if (ctx->bserrno != 0) {
4470 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4471 		} else {
4472 			ctx->bserrno = bserrno;
4473 		}
4474 	}
4475 
4476 	if (ctx->frozen) {
4477 		/* Unfreeze any outstanding I/O */
4478 		_spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx);
4479 	} else {
4480 		_spdk_bs_snapshot_unfreeze_cpl(ctx, 0);
4481 	}
4482 
4483 }
4484 
4485 static void
4486 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno)
4487 {
4488 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4489 	struct spdk_blob *newblob = ctx->new.blob;
4490 
4491 	if (bserrno != 0) {
4492 		if (ctx->bserrno != 0) {
4493 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
4494 		} else {
4495 			ctx->bserrno = bserrno;
4496 		}
4497 	}
4498 
4499 	ctx->new.id = newblob->id;
4500 	spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4501 }
4502 
4503 /* END blob_cleanup */
4504 
4505 /* START spdk_bs_create_snapshot */
4506 
4507 static void
4508 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2)
4509 {
4510 	uint64_t *cluster_temp;
4511 
4512 	cluster_temp = blob1->active.clusters;
4513 	blob1->active.clusters = blob2->active.clusters;
4514 	blob2->active.clusters = cluster_temp;
4515 }
4516 
4517 static void
4518 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno)
4519 {
4520 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4521 	struct spdk_blob *origblob = ctx->original.blob;
4522 	struct spdk_blob *newblob = ctx->new.blob;
4523 
4524 	if (bserrno != 0) {
4525 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4526 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4527 		return;
4528 	}
4529 
4530 	/* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */
4531 	bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true);
4532 	if (bserrno != 0) {
4533 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4534 		return;
4535 	}
4536 
4537 	_spdk_bs_blob_list_add(ctx->original.blob);
4538 
4539 	spdk_blob_set_read_only(newblob);
4540 
4541 	/* sync snapshot metadata */
4542 	spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, cb_arg);
4543 }
4544 
4545 static void
4546 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno)
4547 {
4548 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4549 	struct spdk_blob *origblob = ctx->original.blob;
4550 	struct spdk_blob *newblob = ctx->new.blob;
4551 
4552 	if (bserrno != 0) {
4553 		/* return cluster map back to original */
4554 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4555 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4556 		return;
4557 	}
4558 
4559 	/* Set internal xattr for snapshot id */
4560 	bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true);
4561 	if (bserrno != 0) {
4562 		/* return cluster map back to original */
4563 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4564 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4565 		return;
4566 	}
4567 
4568 	_spdk_bs_blob_list_remove(origblob);
4569 	origblob->parent_id = newblob->id;
4570 
4571 	/* Create new back_bs_dev for snapshot */
4572 	origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob);
4573 	if (origblob->back_bs_dev == NULL) {
4574 		/* return cluster map back to original */
4575 		_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4576 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL);
4577 		return;
4578 	}
4579 
4580 	/* set clone blob as thin provisioned */
4581 	_spdk_blob_set_thin_provision(origblob);
4582 
4583 	_spdk_bs_blob_list_add(newblob);
4584 
4585 	/* sync clone metadata */
4586 	spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx);
4587 }
4588 
4589 static void
4590 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc)
4591 {
4592 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4593 	struct spdk_blob *origblob = ctx->original.blob;
4594 	struct spdk_blob *newblob = ctx->new.blob;
4595 	int bserrno;
4596 
4597 	if (rc != 0) {
4598 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc);
4599 		return;
4600 	}
4601 
4602 	ctx->frozen = true;
4603 
4604 	/* set new back_bs_dev for snapshot */
4605 	newblob->back_bs_dev = origblob->back_bs_dev;
4606 	/* Set invalid flags from origblob */
4607 	newblob->invalid_flags = origblob->invalid_flags;
4608 
4609 	/* inherit parent from original blob if set */
4610 	newblob->parent_id = origblob->parent_id;
4611 	if (origblob->parent_id != SPDK_BLOBID_INVALID) {
4612 		/* Set internal xattr for snapshot id */
4613 		bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT,
4614 					       &origblob->parent_id, sizeof(spdk_blob_id), true);
4615 		if (bserrno != 0) {
4616 			_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
4617 			return;
4618 		}
4619 	}
4620 
4621 	/* swap cluster maps */
4622 	_spdk_bs_snapshot_swap_cluster_maps(newblob, origblob);
4623 
4624 	/* sync snapshot metadata */
4625 	spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
4626 }
4627 
4628 static void
4629 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4630 {
4631 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4632 	struct spdk_blob *origblob = ctx->original.blob;
4633 	struct spdk_blob *newblob = _blob;
4634 
4635 	if (bserrno != 0) {
4636 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4637 		return;
4638 	}
4639 
4640 	ctx->new.blob = newblob;
4641 	assert(spdk_blob_is_thin_provisioned(newblob));
4642 	assert(spdk_mem_all_zero(newblob->active.clusters,
4643 				 newblob->active.num_clusters * sizeof(*newblob->active.clusters)));
4644 
4645 	_spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx);
4646 }
4647 
4648 static void
4649 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
4650 {
4651 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4652 	struct spdk_blob *origblob = ctx->original.blob;
4653 
4654 	if (bserrno != 0) {
4655 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4656 		return;
4657 	}
4658 
4659 	ctx->new.id = blobid;
4660 	ctx->cpl.u.blobid.blobid = blobid;
4661 
4662 	spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx);
4663 }
4664 
4665 
4666 static void
4667 _spdk_bs_xattr_snapshot(void *arg, const char *name,
4668 			const void **value, size_t *value_len)
4669 {
4670 	assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0);
4671 
4672 	struct spdk_blob *blob = (struct spdk_blob *)arg;
4673 	*value = &blob->id;
4674 	*value_len = sizeof(blob->id);
4675 }
4676 
4677 static void
4678 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4679 {
4680 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4681 	struct spdk_blob_opts opts;
4682 	struct spdk_blob_xattr_opts internal_xattrs;
4683 	char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS };
4684 
4685 	if (bserrno != 0) {
4686 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
4687 		return;
4688 	}
4689 
4690 	ctx->original.blob = _blob;
4691 
4692 	if (_blob->data_ro || _blob->md_ro) {
4693 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n",
4694 			      _blob->id);
4695 		ctx->bserrno = -EINVAL;
4696 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4697 		return;
4698 	}
4699 
4700 	if (_blob->locked_operation_in_progress) {
4701 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n");
4702 		ctx->bserrno = -EBUSY;
4703 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4704 		return;
4705 	}
4706 
4707 	_blob->locked_operation_in_progress = true;
4708 
4709 	spdk_blob_opts_init(&opts);
4710 	_spdk_blob_xattrs_init(&internal_xattrs);
4711 
4712 	/* Change the size of new blob to the same as in original blob,
4713 	 * but do not allocate clusters */
4714 	opts.thin_provision = true;
4715 	opts.num_clusters = spdk_blob_get_num_clusters(_blob);
4716 
4717 	/* If there are any xattrs specified for snapshot, set them now */
4718 	if (ctx->xattrs) {
4719 		memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
4720 	}
4721 	/* Set internal xattr SNAPSHOT_IN_PROGRESS */
4722 	internal_xattrs.count = 1;
4723 	internal_xattrs.ctx = _blob;
4724 	internal_xattrs.names = xattrs_names;
4725 	internal_xattrs.get_value = _spdk_bs_xattr_snapshot;
4726 
4727 	_spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
4728 			     _spdk_bs_snapshot_newblob_create_cpl, ctx);
4729 }
4730 
4731 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
4732 			     const struct spdk_blob_xattr_opts *snapshot_xattrs,
4733 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4734 {
4735 	struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
4736 
4737 	if (!ctx) {
4738 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
4739 		return;
4740 	}
4741 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4742 	ctx->cpl.u.blobid.cb_fn = cb_fn;
4743 	ctx->cpl.u.blobid.cb_arg = cb_arg;
4744 	ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
4745 	ctx->bserrno = 0;
4746 	ctx->frozen = false;
4747 	ctx->original.id = blobid;
4748 	ctx->xattrs = snapshot_xattrs;
4749 
4750 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx);
4751 }
4752 /* END spdk_bs_create_snapshot */
4753 
4754 /* START spdk_bs_create_clone */
4755 
4756 static void
4757 _spdk_bs_xattr_clone(void *arg, const char *name,
4758 		     const void **value, size_t *value_len)
4759 {
4760 	assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0);
4761 
4762 	struct spdk_blob *blob = (struct spdk_blob *)arg;
4763 	*value = &blob->id;
4764 	*value_len = sizeof(blob->id);
4765 }
4766 
4767 static void
4768 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4769 {
4770 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4771 	struct spdk_blob *clone = _blob;
4772 
4773 	ctx->new.blob = clone;
4774 	_spdk_bs_blob_list_add(clone);
4775 
4776 	spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4777 }
4778 
4779 static void
4780 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
4781 {
4782 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4783 
4784 	ctx->cpl.u.blobid.blobid = blobid;
4785 	spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx);
4786 }
4787 
4788 static void
4789 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4790 {
4791 	struct spdk_clone_snapshot_ctx	*ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4792 	struct spdk_blob_opts		opts;
4793 	struct spdk_blob_xattr_opts internal_xattrs;
4794 	char *xattr_names[] = { BLOB_SNAPSHOT };
4795 
4796 	if (bserrno != 0) {
4797 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
4798 		return;
4799 	}
4800 
4801 	ctx->original.blob = _blob;
4802 
4803 	if (!_blob->data_ro || !_blob->md_ro) {
4804 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n");
4805 		ctx->bserrno = -EINVAL;
4806 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4807 		return;
4808 	}
4809 
4810 	if (_blob->locked_operation_in_progress) {
4811 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n");
4812 		ctx->bserrno = -EBUSY;
4813 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
4814 		return;
4815 	}
4816 
4817 	_blob->locked_operation_in_progress = true;
4818 
4819 	spdk_blob_opts_init(&opts);
4820 	_spdk_blob_xattrs_init(&internal_xattrs);
4821 
4822 	opts.thin_provision = true;
4823 	opts.num_clusters = spdk_blob_get_num_clusters(_blob);
4824 	if (ctx->xattrs) {
4825 		memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
4826 	}
4827 
4828 	/* Set internal xattr BLOB_SNAPSHOT */
4829 	internal_xattrs.count = 1;
4830 	internal_xattrs.ctx = _blob;
4831 	internal_xattrs.names = xattr_names;
4832 	internal_xattrs.get_value = _spdk_bs_xattr_clone;
4833 
4834 	_spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
4835 			     _spdk_bs_clone_newblob_create_cpl, ctx);
4836 }
4837 
4838 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid,
4839 			  const struct spdk_blob_xattr_opts *clone_xattrs,
4840 			  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
4841 {
4842 	struct spdk_clone_snapshot_ctx	*ctx = calloc(1, sizeof(*ctx));
4843 
4844 	if (!ctx) {
4845 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
4846 		return;
4847 	}
4848 
4849 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
4850 	ctx->cpl.u.blobid.cb_fn = cb_fn;
4851 	ctx->cpl.u.blobid.cb_arg = cb_arg;
4852 	ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
4853 	ctx->bserrno = 0;
4854 	ctx->xattrs = clone_xattrs;
4855 	ctx->original.id = blobid;
4856 
4857 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx);
4858 }
4859 
4860 /* END spdk_bs_create_clone */
4861 
4862 /* START spdk_bs_inflate_blob */
4863 
4864 static void
4865 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno)
4866 {
4867 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4868 	struct spdk_blob *_blob = ctx->original.blob;
4869 
4870 	if (bserrno != 0) {
4871 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4872 		return;
4873 	}
4874 
4875 	assert(_parent != NULL);
4876 
4877 	_spdk_bs_blob_list_remove(_blob);
4878 	_blob->parent_id = _parent->id;
4879 	_spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id,
4880 			     sizeof(spdk_blob_id), true);
4881 
4882 	_blob->back_bs_dev->destroy(_blob->back_bs_dev);
4883 	_blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent);
4884 	_spdk_bs_blob_list_add(_blob);
4885 
4886 	spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4887 }
4888 
4889 static void
4890 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno)
4891 {
4892 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4893 	struct spdk_blob *_blob = ctx->original.blob;
4894 	struct spdk_blob *_parent;
4895 
4896 	if (bserrno != 0) {
4897 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4898 		return;
4899 	}
4900 
4901 	if (ctx->allocate_all) {
4902 		/* remove thin provisioning */
4903 		_spdk_bs_blob_list_remove(_blob);
4904 		_spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
4905 		_blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV;
4906 		_blob->back_bs_dev->destroy(_blob->back_bs_dev);
4907 		_blob->back_bs_dev = NULL;
4908 		_blob->parent_id = SPDK_BLOBID_INVALID;
4909 	} else {
4910 		_parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob;
4911 		if (_parent->parent_id != SPDK_BLOBID_INVALID) {
4912 			/* We must change the parent of the inflated blob */
4913 			spdk_bs_open_blob(_blob->bs, _parent->parent_id,
4914 					  _spdk_bs_inflate_blob_set_parent_cpl, ctx);
4915 			return;
4916 		}
4917 
4918 		_spdk_bs_blob_list_remove(_blob);
4919 		_spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true);
4920 		_blob->parent_id = SPDK_BLOBID_INVALID;
4921 		_blob->back_bs_dev->destroy(_blob->back_bs_dev);
4922 		_blob->back_bs_dev = spdk_bs_create_zeroes_dev();
4923 	}
4924 
4925 	_blob->state = SPDK_BLOB_STATE_DIRTY;
4926 	spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
4927 }
4928 
4929 /* Check if cluster needs allocation */
4930 static inline bool
4931 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all)
4932 {
4933 	struct spdk_blob_bs_dev *b;
4934 
4935 	assert(blob != NULL);
4936 
4937 	if (blob->active.clusters[cluster] != 0) {
4938 		/* Cluster is already allocated */
4939 		return false;
4940 	}
4941 
4942 	if (blob->parent_id == SPDK_BLOBID_INVALID) {
4943 		/* Blob have no parent blob */
4944 		return allocate_all;
4945 	}
4946 
4947 	b = (struct spdk_blob_bs_dev *)blob->back_bs_dev;
4948 	return (allocate_all || b->blob->active.clusters[cluster] != 0);
4949 }
4950 
4951 static void
4952 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno)
4953 {
4954 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4955 	struct spdk_blob *_blob = ctx->original.blob;
4956 	uint64_t offset;
4957 
4958 	if (bserrno != 0) {
4959 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
4960 		return;
4961 	}
4962 
4963 	for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) {
4964 		if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) {
4965 			break;
4966 		}
4967 	}
4968 
4969 	if (ctx->cluster < _blob->active.num_clusters) {
4970 		offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster);
4971 
4972 		/* We may safely increment a cluster before write */
4973 		ctx->cluster++;
4974 
4975 		/* Use zero length write to touch a cluster */
4976 		spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0,
4977 				   _spdk_bs_inflate_blob_touch_next, ctx);
4978 	} else {
4979 		_spdk_bs_inflate_blob_done(cb_arg, bserrno);
4980 	}
4981 }
4982 
4983 static void
4984 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4985 {
4986 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
4987 	uint64_t lfc; /* lowest free cluster */
4988 	uint64_t i;
4989 
4990 	if (bserrno != 0) {
4991 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
4992 		return;
4993 	}
4994 
4995 	ctx->original.blob = _blob;
4996 
4997 	if (_blob->locked_operation_in_progress) {
4998 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n");
4999 		ctx->bserrno = -EBUSY;
5000 		spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
5001 		return;
5002 	}
5003 
5004 	_blob->locked_operation_in_progress = true;
5005 
5006 	if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) {
5007 		/* This blob have no parent, so we cannot decouple it. */
5008 		SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n");
5009 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
5010 		return;
5011 	}
5012 
5013 	if (spdk_blob_is_thin_provisioned(_blob) == false) {
5014 		/* This is not thin provisioned blob. No need to inflate. */
5015 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0);
5016 		return;
5017 	}
5018 
5019 	/* Do two passes - one to verify that we can obtain enough clusters
5020 	 * and another to actually claim them.
5021 	 */
5022 	lfc = 0;
5023 	for (i = 0; i < _blob->active.num_clusters; i++) {
5024 		if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) {
5025 			lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc);
5026 			if (lfc == UINT32_MAX) {
5027 				/* No more free clusters. Cannot satisfy the request */
5028 				_spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC);
5029 				return;
5030 			}
5031 			lfc++;
5032 		}
5033 	}
5034 
5035 	ctx->cluster = 0;
5036 	_spdk_bs_inflate_blob_touch_next(ctx, 0);
5037 }
5038 
5039 static void
5040 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5041 		      spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg)
5042 {
5043 	struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
5044 
5045 	if (!ctx) {
5046 		cb_fn(cb_arg, -ENOMEM);
5047 		return;
5048 	}
5049 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5050 	ctx->cpl.u.bs_basic.cb_fn = cb_fn;
5051 	ctx->cpl.u.bs_basic.cb_arg = cb_arg;
5052 	ctx->bserrno = 0;
5053 	ctx->original.id = blobid;
5054 	ctx->channel = channel;
5055 	ctx->allocate_all = allocate_all;
5056 
5057 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx);
5058 }
5059 
5060 void
5061 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5062 		     spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
5063 {
5064 	_spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg);
5065 }
5066 
5067 void
5068 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel,
5069 			     spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg)
5070 {
5071 	_spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg);
5072 }
5073 /* END spdk_bs_inflate_blob */
5074 
5075 /* START spdk_blob_resize */
5076 struct spdk_bs_resize_ctx {
5077 	spdk_blob_op_complete cb_fn;
5078 	void *cb_arg;
5079 	struct spdk_blob *blob;
5080 	uint64_t sz;
5081 	int rc;
5082 };
5083 
5084 static void
5085 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc)
5086 {
5087 	struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
5088 
5089 	if (rc != 0) {
5090 		SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc);
5091 	}
5092 
5093 	if (ctx->rc != 0) {
5094 		SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc);
5095 		rc = ctx->rc;
5096 	}
5097 
5098 	ctx->blob->locked_operation_in_progress = false;
5099 
5100 	ctx->cb_fn(ctx->cb_arg, rc);
5101 	free(ctx);
5102 }
5103 
5104 static void
5105 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc)
5106 {
5107 	struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg;
5108 
5109 	if (rc != 0) {
5110 		ctx->blob->locked_operation_in_progress = false;
5111 		ctx->cb_fn(ctx->cb_arg, rc);
5112 		free(ctx);
5113 		return;
5114 	}
5115 
5116 	ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz);
5117 
5118 	_spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx);
5119 }
5120 
5121 void
5122 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg)
5123 {
5124 	struct spdk_bs_resize_ctx *ctx;
5125 
5126 	_spdk_blob_verify_md_op(blob);
5127 
5128 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
5129 
5130 	if (blob->md_ro) {
5131 		cb_fn(cb_arg, -EPERM);
5132 		return;
5133 	}
5134 
5135 	if (sz == blob->active.num_clusters) {
5136 		cb_fn(cb_arg, 0);
5137 		return;
5138 	}
5139 
5140 	if (blob->locked_operation_in_progress) {
5141 		cb_fn(cb_arg, -EBUSY);
5142 		return;
5143 	}
5144 
5145 	ctx = calloc(1, sizeof(*ctx));
5146 	if (!ctx) {
5147 		cb_fn(cb_arg, -ENOMEM);
5148 		return;
5149 	}
5150 
5151 	blob->locked_operation_in_progress = true;
5152 	ctx->cb_fn = cb_fn;
5153 	ctx->cb_arg = cb_arg;
5154 	ctx->blob = blob;
5155 	ctx->sz = sz;
5156 	_spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx);
5157 }
5158 
5159 /* END spdk_blob_resize */
5160 
5161 
5162 /* START spdk_bs_delete_blob */
5163 
5164 static void
5165 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
5166 {
5167 	spdk_bs_sequence_t *seq = cb_arg;
5168 
5169 	spdk_bs_sequence_finish(seq, bserrno);
5170 }
5171 
5172 static void
5173 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5174 {
5175 	struct spdk_blob *blob = cb_arg;
5176 
5177 	if (bserrno != 0) {
5178 		/*
5179 		 * We already removed this blob from the blobstore tailq, so
5180 		 *  we need to free it here since this is the last reference
5181 		 *  to it.
5182 		 */
5183 		_spdk_blob_free(blob);
5184 		_spdk_bs_delete_close_cpl(seq, bserrno);
5185 		return;
5186 	}
5187 
5188 	/*
5189 	 * This will immediately decrement the ref_count and call
5190 	 *  the completion routine since the metadata state is clean.
5191 	 *  By calling spdk_blob_close, we reduce the number of call
5192 	 *  points into code that touches the blob->open_ref count
5193 	 *  and the blobstore's blob list.
5194 	 */
5195 	spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq);
5196 }
5197 
5198 struct delete_snapshot_ctx {
5199 	struct spdk_blob_list *parent_snapshot_entry;
5200 	struct spdk_blob *snapshot;
5201 	bool snapshot_md_ro;
5202 	struct spdk_blob *clone;
5203 	bool clone_md_ro;
5204 	spdk_blob_op_with_handle_complete cb_fn;
5205 	void *cb_arg;
5206 	int bserrno;
5207 };
5208 
5209 static void
5210 _spdk_delete_blob_cleanup_finish(void *cb_arg, int bserrno)
5211 {
5212 	struct delete_snapshot_ctx *ctx = cb_arg;
5213 
5214 	if (bserrno != 0) {
5215 		SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno);
5216 	}
5217 
5218 	assert(ctx != NULL);
5219 
5220 	if (bserrno != 0 && ctx->bserrno == 0) {
5221 		ctx->bserrno = bserrno;
5222 	}
5223 
5224 	ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno);
5225 	free(ctx);
5226 }
5227 
5228 static void
5229 _spdk_delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno)
5230 {
5231 	struct delete_snapshot_ctx *ctx = cb_arg;
5232 
5233 	if (bserrno != 0) {
5234 		ctx->bserrno = bserrno;
5235 		SPDK_ERRLOG("Clone cleanup error %d\n", bserrno);
5236 	}
5237 
5238 	/* open_ref == 1 menas that only deletion context has opened this snapshot
5239 	 * open_ref == 2 menas that clone has opened this snapshot as well,
5240 	 * so we have to add it back to the blobs list */
5241 	if (ctx->snapshot->open_ref == 2) {
5242 		TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link);
5243 	}
5244 
5245 	ctx->snapshot->locked_operation_in_progress = false;
5246 	ctx->snapshot->md_ro = ctx->snapshot_md_ro;
5247 
5248 	spdk_blob_close(ctx->snapshot, _spdk_delete_blob_cleanup_finish, ctx);
5249 }
5250 
5251 static void
5252 _spdk_delete_snapshot_cleanup_clone(void *cb_arg, int bserrno)
5253 {
5254 	struct delete_snapshot_ctx *ctx = cb_arg;
5255 
5256 	ctx->clone->locked_operation_in_progress = false;
5257 	ctx->clone->md_ro = ctx->clone_md_ro;
5258 
5259 	spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx);
5260 }
5261 
5262 static void
5263 _spdk_delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno)
5264 {
5265 	struct delete_snapshot_ctx *ctx = cb_arg;
5266 
5267 	if (bserrno) {
5268 		ctx->bserrno = bserrno;
5269 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5270 		return;
5271 	}
5272 
5273 	ctx->clone->locked_operation_in_progress = false;
5274 	spdk_blob_close(ctx->clone, _spdk_delete_blob_cleanup_finish, ctx);
5275 }
5276 
5277 static void
5278 _spdk_delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno)
5279 {
5280 	struct delete_snapshot_ctx *ctx = cb_arg;
5281 	struct spdk_blob_list *parent_snapshot_entry = NULL;
5282 	struct spdk_blob_list *snapshot_entry = NULL;
5283 	struct spdk_blob_list *clone_entry = NULL;
5284 	struct spdk_blob_list *snapshot_clone_entry = NULL;
5285 
5286 	if (bserrno) {
5287 		SPDK_ERRLOG("Failed to sync MD on blob\n");
5288 		ctx->bserrno = bserrno;
5289 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5290 		return;
5291 	}
5292 
5293 	/* Get snapshot entry for the snapshot we want to remove */
5294 	snapshot_entry = _spdk_bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id);
5295 
5296 	assert(snapshot_entry != NULL);
5297 
5298 	/* Remove clone entry in this snapshot (at this point there can be only one clone) */
5299 	clone_entry = TAILQ_FIRST(&snapshot_entry->clones);
5300 	assert(clone_entry != NULL);
5301 	TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link);
5302 	snapshot_entry->clone_count--;
5303 	assert(TAILQ_EMPTY(&snapshot_entry->clones));
5304 
5305 	if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) {
5306 		/* This snapshot is at the same time a clone of another snapshot - we need to
5307 		 * update parent snapshot (remove current clone, add new one inherited from
5308 		 * the snapshot that is being removed) */
5309 
5310 		/* Get snapshot entry for parent snapshot and clone entry within that snapshot for
5311 		 * snapshot that we are removing */
5312 		_spdk_blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry,
5313 				&snapshot_clone_entry);
5314 
5315 		/* Switch clone entry in parent snapshot */
5316 		TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link);
5317 		TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link);
5318 		free(snapshot_clone_entry);
5319 	} else {
5320 		/* No parent snapshot - just remove clone entry */
5321 		free(clone_entry);
5322 	}
5323 
5324 	/* Restore md_ro flags */
5325 	ctx->clone->md_ro = ctx->clone_md_ro;
5326 	ctx->snapshot->md_ro = ctx->snapshot_md_ro;
5327 
5328 	_spdk_blob_unfreeze_io(ctx->clone, _spdk_delete_snapshot_unfreeze_cpl, ctx);
5329 }
5330 
5331 static void
5332 _spdk_delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno)
5333 {
5334 	struct delete_snapshot_ctx *ctx = cb_arg;
5335 	uint64_t i;
5336 
5337 	ctx->snapshot->md_ro = false;
5338 
5339 	if (bserrno) {
5340 		SPDK_ERRLOG("Failed to sync MD on clone\n");
5341 		ctx->bserrno = bserrno;
5342 
5343 		/* Restore snapshot to previous state */
5344 		bserrno = _spdk_blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true);
5345 		if (bserrno != 0) {
5346 			_spdk_delete_snapshot_cleanup_clone(ctx, bserrno);
5347 			return;
5348 		}
5349 
5350 		spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_cleanup_clone, ctx);
5351 		return;
5352 	}
5353 
5354 	/* Clear cluster map entries for snapshot */
5355 	for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) {
5356 		if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) {
5357 			ctx->snapshot->active.clusters[i] = 0;
5358 		}
5359 	}
5360 
5361 	ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY;
5362 
5363 	if (ctx->parent_snapshot_entry != NULL) {
5364 		ctx->snapshot->back_bs_dev = NULL;
5365 	}
5366 
5367 	spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_cpl, ctx);
5368 }
5369 
5370 static void
5371 _spdk_delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno)
5372 {
5373 	struct delete_snapshot_ctx *ctx = cb_arg;
5374 	uint64_t i;
5375 
5376 	/* Temporarily override md_ro flag for clone for MD modification */
5377 	ctx->clone_md_ro = ctx->clone->md_ro;
5378 	ctx->clone->md_ro = false;
5379 
5380 	if (bserrno) {
5381 		SPDK_ERRLOG("Failed to sync MD with xattr on blob\n");
5382 		ctx->bserrno = bserrno;
5383 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5384 		return;
5385 	}
5386 
5387 	/* Copy snapshot map to clone map (only unallocated clusters in clone) */
5388 	for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) {
5389 		if (ctx->clone->active.clusters[i] == 0) {
5390 			ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i];
5391 		}
5392 	}
5393 
5394 	/* Delete old backing bs_dev from clone (related to snapshot that will be removed) */
5395 	ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev);
5396 
5397 	/* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */
5398 	if (ctx->parent_snapshot_entry != NULL) {
5399 		/* ...to parent snapshot */
5400 		ctx->clone->parent_id = ctx->parent_snapshot_entry->id;
5401 		ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev;
5402 		_spdk_blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id,
5403 				     sizeof(spdk_blob_id),
5404 				     true);
5405 	} else {
5406 		/* ...to blobid invalid and zeroes dev */
5407 		ctx->clone->parent_id = SPDK_BLOBID_INVALID;
5408 		ctx->clone->back_bs_dev = spdk_bs_create_zeroes_dev();
5409 		_spdk_blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true);
5410 	}
5411 
5412 	spdk_blob_sync_md(ctx->clone, _spdk_delete_snapshot_sync_clone_cpl, ctx);
5413 }
5414 
5415 static void
5416 _spdk_delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno)
5417 {
5418 	struct delete_snapshot_ctx *ctx = cb_arg;
5419 
5420 	if (bserrno) {
5421 		SPDK_ERRLOG("Failed to freeze I/O on clone\n");
5422 		ctx->bserrno = bserrno;
5423 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5424 		return;
5425 	}
5426 
5427 	/* Temporarily override md_ro flag for snapshot for MD modification */
5428 	ctx->snapshot_md_ro = ctx->snapshot->md_ro;
5429 	ctx->snapshot->md_ro = false;
5430 
5431 	/* Mark blob as pending for removal for power failure safety, use clone id for recovery */
5432 	ctx->bserrno = _spdk_blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id,
5433 					    sizeof(spdk_blob_id), true);
5434 	if (ctx->bserrno != 0) {
5435 		_spdk_delete_snapshot_cleanup_clone(ctx, 0);
5436 		return;
5437 	}
5438 
5439 	spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_xattr_cpl, ctx);
5440 }
5441 
5442 static void
5443 _spdk_delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno)
5444 {
5445 	struct delete_snapshot_ctx *ctx = cb_arg;
5446 
5447 	if (bserrno) {
5448 		SPDK_ERRLOG("Failed to open clone\n");
5449 		ctx->bserrno = bserrno;
5450 		_spdk_delete_snapshot_cleanup_snapshot(ctx, 0);
5451 		return;
5452 	}
5453 
5454 	ctx->clone = clone;
5455 
5456 	if (clone->locked_operation_in_progress) {
5457 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n");
5458 		ctx->bserrno = -EBUSY;
5459 		spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx);
5460 		return;
5461 	}
5462 
5463 	clone->locked_operation_in_progress = true;
5464 
5465 	_spdk_blob_freeze_io(clone, _spdk_delete_snapshot_freeze_io_cb, ctx);
5466 }
5467 
5468 static void
5469 _spdk_update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx)
5470 {
5471 	struct spdk_blob_list *snapshot_entry = NULL;
5472 	struct spdk_blob_list *clone_entry = NULL;
5473 	struct spdk_blob_list *snapshot_clone_entry = NULL;
5474 
5475 	/* Get snapshot entry for the snapshot we want to remove */
5476 	snapshot_entry = _spdk_bs_get_snapshot_entry(snapshot->bs, snapshot->id);
5477 
5478 	assert(snapshot_entry != NULL);
5479 
5480 	/* Get clone of the snapshot (at this point there can be only one clone) */
5481 	clone_entry = TAILQ_FIRST(&snapshot_entry->clones);
5482 	assert(snapshot_entry->clone_count == 1);
5483 	assert(clone_entry != NULL);
5484 
5485 	/* Get snapshot entry for parent snapshot and clone entry within that snapshot for
5486 	 * snapshot that we are removing */
5487 	_spdk_blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry,
5488 			&snapshot_clone_entry);
5489 
5490 	spdk_bs_open_blob(snapshot->bs, clone_entry->id, _spdk_delete_snapshot_open_clone_cb, ctx);
5491 }
5492 
5493 static void
5494 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno)
5495 {
5496 	spdk_bs_sequence_t *seq = cb_arg;
5497 	struct spdk_blob_list *snapshot_entry = NULL;
5498 	uint32_t page_num;
5499 
5500 	if (bserrno) {
5501 		SPDK_ERRLOG("Failed to remove blob\n");
5502 		spdk_bs_sequence_finish(seq, bserrno);
5503 		return;
5504 	}
5505 
5506 	/* Remove snapshot from the list */
5507 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
5508 	if (snapshot_entry != NULL) {
5509 		TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link);
5510 		free(snapshot_entry);
5511 	}
5512 
5513 	page_num = _spdk_bs_blobid_to_page(blob->id);
5514 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
5515 	blob->state = SPDK_BLOB_STATE_DIRTY;
5516 	blob->active.num_pages = 0;
5517 	_spdk_blob_resize(blob, 0);
5518 
5519 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob);
5520 }
5521 
5522 static int
5523 _spdk_bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone)
5524 {
5525 	struct spdk_blob_list *snapshot_entry = NULL;
5526 	struct spdk_blob_list *clone_entry = NULL;
5527 	struct spdk_blob *clone = NULL;
5528 	bool has_one_clone = false;
5529 
5530 	/* Check if this is a snapshot with clones */
5531 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
5532 	if (snapshot_entry != NULL) {
5533 		if (snapshot_entry->clone_count > 1) {
5534 			SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n");
5535 			return -EBUSY;
5536 		} else if (snapshot_entry->clone_count == 1) {
5537 			has_one_clone = true;
5538 		}
5539 	}
5540 
5541 	/* Check if someone has this blob open (besides this delete context):
5542 	 * - open_ref = 1 - only this context opened blob, so it is ok to remove it
5543 	 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot
5544 	 *	and that is ok, because we will update it accordingly */
5545 	if (blob->open_ref <= 2 && has_one_clone) {
5546 		clone_entry = TAILQ_FIRST(&snapshot_entry->clones);
5547 		assert(clone_entry != NULL);
5548 		clone = _spdk_blob_lookup(blob->bs, clone_entry->id);
5549 
5550 		if (blob->open_ref == 2 && clone == NULL) {
5551 			/* Clone is closed and someone else opened this blob */
5552 			SPDK_ERRLOG("Cannot remove snapshot because it is open\n");
5553 			return -EBUSY;
5554 		}
5555 
5556 		*update_clone = true;
5557 		return 0;
5558 	}
5559 
5560 	if (blob->open_ref > 1) {
5561 		SPDK_ERRLOG("Cannot remove snapshot because it is open\n");
5562 		return -EBUSY;
5563 	}
5564 
5565 	assert(has_one_clone == false);
5566 	*update_clone = false;
5567 	return 0;
5568 }
5569 
5570 static void
5571 _spdk_bs_delete_enomem_close_cpl(void *cb_arg, int bserrno)
5572 {
5573 	spdk_bs_sequence_t *seq = cb_arg;
5574 
5575 	spdk_bs_sequence_finish(seq, -ENOMEM);
5576 }
5577 
5578 static void
5579 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
5580 {
5581 	spdk_bs_sequence_t *seq = cb_arg;
5582 	struct delete_snapshot_ctx *ctx;
5583 	bool update_clone = false;
5584 
5585 	if (bserrno != 0) {
5586 		spdk_bs_sequence_finish(seq, bserrno);
5587 		return;
5588 	}
5589 
5590 	_spdk_blob_verify_md_op(blob);
5591 
5592 	ctx = calloc(1, sizeof(*ctx));
5593 	if (ctx == NULL) {
5594 		spdk_blob_close(blob, _spdk_bs_delete_enomem_close_cpl, seq);
5595 		return;
5596 	}
5597 
5598 	ctx->snapshot = blob;
5599 	ctx->cb_fn = _spdk_bs_delete_blob_finish;
5600 	ctx->cb_arg = seq;
5601 
5602 	/* Check if blob can be removed and if it is a snapshot with clone on top of it */
5603 	ctx->bserrno = _spdk_bs_is_blob_deletable(blob, &update_clone);
5604 	if (ctx->bserrno) {
5605 		spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx);
5606 		return;
5607 	}
5608 
5609 	if (blob->locked_operation_in_progress) {
5610 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n");
5611 		ctx->bserrno = -EBUSY;
5612 		spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx);
5613 		return;
5614 	}
5615 
5616 	blob->locked_operation_in_progress = true;
5617 
5618 	/*
5619 	 * Remove the blob from the blob_store list now, to ensure it does not
5620 	 *  get returned after this point by _spdk_blob_lookup().
5621 	 */
5622 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
5623 
5624 	if (update_clone) {
5625 		/* This blob is a snapshot with active clone - update clone first */
5626 		_spdk_update_clone_on_snapshot_deletion(blob, ctx);
5627 	} else {
5628 		/* This blob does not have any clones - just remove it */
5629 		_spdk_bs_blob_list_remove(blob);
5630 		_spdk_bs_delete_blob_finish(seq, blob, 0);
5631 		free(ctx);
5632 	}
5633 }
5634 
5635 void
5636 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5637 		    spdk_blob_op_complete cb_fn, void *cb_arg)
5638 {
5639 	struct spdk_bs_cpl	cpl;
5640 	spdk_bs_sequence_t	*seq;
5641 
5642 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
5643 
5644 	assert(spdk_get_thread() == bs->md_thread);
5645 
5646 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5647 	cpl.u.blob_basic.cb_fn = cb_fn;
5648 	cpl.u.blob_basic.cb_arg = cb_arg;
5649 
5650 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
5651 	if (!seq) {
5652 		cb_fn(cb_arg, -ENOMEM);
5653 		return;
5654 	}
5655 
5656 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
5657 }
5658 
5659 /* END spdk_bs_delete_blob */
5660 
5661 /* START spdk_bs_open_blob */
5662 
5663 static void
5664 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5665 {
5666 	struct spdk_blob *blob = cb_arg;
5667 
5668 	if (bserrno != 0) {
5669 		_spdk_blob_free(blob);
5670 		seq->cpl.u.blob_handle.blob = NULL;
5671 		spdk_bs_sequence_finish(seq, bserrno);
5672 		return;
5673 	}
5674 
5675 	blob->open_ref++;
5676 
5677 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
5678 
5679 	spdk_bs_sequence_finish(seq, bserrno);
5680 }
5681 
5682 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5683 			       struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5684 {
5685 	struct spdk_blob		*blob;
5686 	struct spdk_bs_cpl		cpl;
5687 	struct spdk_blob_open_opts	opts_default;
5688 	spdk_bs_sequence_t		*seq;
5689 	uint32_t			page_num;
5690 
5691 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
5692 	assert(spdk_get_thread() == bs->md_thread);
5693 
5694 	page_num = _spdk_bs_blobid_to_page(blobid);
5695 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
5696 		/* Invalid blobid */
5697 		cb_fn(cb_arg, NULL, -ENOENT);
5698 		return;
5699 	}
5700 
5701 	blob = _spdk_blob_lookup(bs, blobid);
5702 	if (blob) {
5703 		blob->open_ref++;
5704 		cb_fn(cb_arg, blob, 0);
5705 		return;
5706 	}
5707 
5708 	blob = _spdk_blob_alloc(bs, blobid);
5709 	if (!blob) {
5710 		cb_fn(cb_arg, NULL, -ENOMEM);
5711 		return;
5712 	}
5713 
5714 	if (!opts) {
5715 		spdk_blob_open_opts_init(&opts_default);
5716 		opts = &opts_default;
5717 	}
5718 
5719 	blob->clear_method = opts->clear_method;
5720 
5721 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
5722 	cpl.u.blob_handle.cb_fn = cb_fn;
5723 	cpl.u.blob_handle.cb_arg = cb_arg;
5724 	cpl.u.blob_handle.blob = blob;
5725 
5726 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
5727 	if (!seq) {
5728 		_spdk_blob_free(blob);
5729 		cb_fn(cb_arg, NULL, -ENOMEM);
5730 		return;
5731 	}
5732 
5733 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
5734 }
5735 
5736 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
5737 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5738 {
5739 	_spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg);
5740 }
5741 
5742 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid,
5743 			   struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
5744 {
5745 	_spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg);
5746 }
5747 
5748 /* END spdk_bs_open_blob */
5749 
5750 /* START spdk_blob_set_read_only */
5751 int spdk_blob_set_read_only(struct spdk_blob *blob)
5752 {
5753 	_spdk_blob_verify_md_op(blob);
5754 
5755 	blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
5756 
5757 	blob->state = SPDK_BLOB_STATE_DIRTY;
5758 	return 0;
5759 }
5760 /* END spdk_blob_set_read_only */
5761 
5762 /* START spdk_blob_sync_md */
5763 
5764 static void
5765 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5766 {
5767 	struct spdk_blob *blob = cb_arg;
5768 
5769 	if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
5770 		blob->data_ro = true;
5771 		blob->md_ro = true;
5772 	}
5773 
5774 	spdk_bs_sequence_finish(seq, bserrno);
5775 }
5776 
5777 static void
5778 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
5779 {
5780 	struct spdk_bs_cpl	cpl;
5781 	spdk_bs_sequence_t	*seq;
5782 
5783 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5784 	cpl.u.blob_basic.cb_fn = cb_fn;
5785 	cpl.u.blob_basic.cb_arg = cb_arg;
5786 
5787 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
5788 	if (!seq) {
5789 		cb_fn(cb_arg, -ENOMEM);
5790 		return;
5791 	}
5792 
5793 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
5794 }
5795 
5796 void
5797 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
5798 {
5799 	_spdk_blob_verify_md_op(blob);
5800 
5801 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
5802 
5803 	if (blob->md_ro) {
5804 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
5805 		cb_fn(cb_arg, 0);
5806 		return;
5807 	}
5808 
5809 	_spdk_blob_sync_md(blob, cb_fn, cb_arg);
5810 }
5811 
5812 /* END spdk_blob_sync_md */
5813 
5814 struct spdk_blob_insert_cluster_ctx {
5815 	struct spdk_thread	*thread;
5816 	struct spdk_blob	*blob;
5817 	uint32_t		cluster_num;	/* cluster index in blob */
5818 	uint32_t		cluster;	/* cluster on disk */
5819 	int			rc;
5820 	spdk_blob_op_complete	cb_fn;
5821 	void			*cb_arg;
5822 };
5823 
5824 static void
5825 _spdk_blob_insert_cluster_msg_cpl(void *arg)
5826 {
5827 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
5828 
5829 	ctx->cb_fn(ctx->cb_arg, ctx->rc);
5830 	free(ctx);
5831 }
5832 
5833 static void
5834 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
5835 {
5836 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
5837 
5838 	ctx->rc = bserrno;
5839 	spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
5840 }
5841 
5842 static void
5843 _spdk_blob_insert_cluster_msg(void *arg)
5844 {
5845 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
5846 
5847 	ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
5848 	if (ctx->rc != 0) {
5849 		spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
5850 		return;
5851 	}
5852 
5853 	ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
5854 	_spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
5855 }
5856 
5857 static void
5858 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
5859 				       uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg)
5860 {
5861 	struct spdk_blob_insert_cluster_ctx *ctx;
5862 
5863 	ctx = calloc(1, sizeof(*ctx));
5864 	if (ctx == NULL) {
5865 		cb_fn(cb_arg, -ENOMEM);
5866 		return;
5867 	}
5868 
5869 	ctx->thread = spdk_get_thread();
5870 	ctx->blob = blob;
5871 	ctx->cluster_num = cluster_num;
5872 	ctx->cluster = cluster;
5873 	ctx->cb_fn = cb_fn;
5874 	ctx->cb_arg = cb_arg;
5875 
5876 	spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
5877 }
5878 
5879 /* START spdk_blob_close */
5880 
5881 static void
5882 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
5883 {
5884 	struct spdk_blob *blob = cb_arg;
5885 
5886 	if (bserrno == 0) {
5887 		blob->open_ref--;
5888 		if (blob->open_ref == 0) {
5889 			/*
5890 			 * Blobs with active.num_pages == 0 are deleted blobs.
5891 			 *  these blobs are removed from the blob_store list
5892 			 *  when the deletion process starts - so don't try to
5893 			 *  remove them again.
5894 			 */
5895 			if (blob->active.num_pages > 0) {
5896 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
5897 			}
5898 			_spdk_blob_free(blob);
5899 		}
5900 	}
5901 
5902 	spdk_bs_sequence_finish(seq, bserrno);
5903 }
5904 
5905 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
5906 {
5907 	struct spdk_bs_cpl	cpl;
5908 	spdk_bs_sequence_t	*seq;
5909 
5910 	_spdk_blob_verify_md_op(blob);
5911 
5912 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
5913 
5914 	if (blob->open_ref == 0) {
5915 		cb_fn(cb_arg, -EBADF);
5916 		return;
5917 	}
5918 
5919 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
5920 	cpl.u.blob_basic.cb_fn = cb_fn;
5921 	cpl.u.blob_basic.cb_arg = cb_arg;
5922 
5923 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
5924 	if (!seq) {
5925 		cb_fn(cb_arg, -ENOMEM);
5926 		return;
5927 	}
5928 
5929 	/* Sync metadata */
5930 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
5931 }
5932 
5933 /* END spdk_blob_close */
5934 
5935 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
5936 {
5937 	return spdk_get_io_channel(bs);
5938 }
5939 
5940 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
5941 {
5942 	spdk_put_io_channel(channel);
5943 }
5944 
5945 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel,
5946 			uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
5947 {
5948 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
5949 				     SPDK_BLOB_UNMAP);
5950 }
5951 
5952 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel,
5953 			       uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
5954 {
5955 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
5956 				     SPDK_BLOB_WRITE_ZEROES);
5957 }
5958 
5959 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel,
5960 			void *payload, uint64_t offset, uint64_t length,
5961 			spdk_blob_op_complete cb_fn, void *cb_arg)
5962 {
5963 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
5964 				     SPDK_BLOB_WRITE);
5965 }
5966 
5967 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel,
5968 		       void *payload, uint64_t offset, uint64_t length,
5969 		       spdk_blob_op_complete cb_fn, void *cb_arg)
5970 {
5971 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
5972 				     SPDK_BLOB_READ);
5973 }
5974 
5975 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel,
5976 			 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
5977 			 spdk_blob_op_complete cb_fn, void *cb_arg)
5978 {
5979 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
5980 }
5981 
5982 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel,
5983 			struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
5984 			spdk_blob_op_complete cb_fn, void *cb_arg)
5985 {
5986 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
5987 }
5988 
5989 struct spdk_bs_iter_ctx {
5990 	int64_t page_num;
5991 	struct spdk_blob_store *bs;
5992 
5993 	spdk_blob_op_with_handle_complete cb_fn;
5994 	void *cb_arg;
5995 };
5996 
5997 static void
5998 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
5999 {
6000 	struct spdk_bs_iter_ctx *ctx = cb_arg;
6001 	struct spdk_blob_store *bs = ctx->bs;
6002 	spdk_blob_id id;
6003 
6004 	if (bserrno == 0) {
6005 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
6006 		free(ctx);
6007 		return;
6008 	}
6009 
6010 	ctx->page_num++;
6011 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
6012 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
6013 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
6014 		free(ctx);
6015 		return;
6016 	}
6017 
6018 	id = _spdk_bs_page_to_blobid(ctx->page_num);
6019 
6020 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
6021 }
6022 
6023 void
6024 spdk_bs_iter_first(struct spdk_blob_store *bs,
6025 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
6026 {
6027 	struct spdk_bs_iter_ctx *ctx;
6028 
6029 	ctx = calloc(1, sizeof(*ctx));
6030 	if (!ctx) {
6031 		cb_fn(cb_arg, NULL, -ENOMEM);
6032 		return;
6033 	}
6034 
6035 	ctx->page_num = -1;
6036 	ctx->bs = bs;
6037 	ctx->cb_fn = cb_fn;
6038 	ctx->cb_arg = cb_arg;
6039 
6040 	_spdk_bs_iter_cpl(ctx, NULL, -1);
6041 }
6042 
6043 static void
6044 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
6045 {
6046 	struct spdk_bs_iter_ctx *ctx = cb_arg;
6047 
6048 	_spdk_bs_iter_cpl(ctx, NULL, -1);
6049 }
6050 
6051 void
6052 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob,
6053 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
6054 {
6055 	struct spdk_bs_iter_ctx *ctx;
6056 
6057 	assert(blob != NULL);
6058 
6059 	ctx = calloc(1, sizeof(*ctx));
6060 	if (!ctx) {
6061 		cb_fn(cb_arg, NULL, -ENOMEM);
6062 		return;
6063 	}
6064 
6065 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
6066 	ctx->bs = bs;
6067 	ctx->cb_fn = cb_fn;
6068 	ctx->cb_arg = cb_arg;
6069 
6070 	/* Close the existing blob */
6071 	spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx);
6072 }
6073 
6074 static int
6075 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
6076 		     uint16_t value_len, bool internal)
6077 {
6078 	struct spdk_xattr_tailq *xattrs;
6079 	struct spdk_xattr	*xattr;
6080 	size_t			desc_size;
6081 
6082 	_spdk_blob_verify_md_op(blob);
6083 
6084 	if (blob->md_ro) {
6085 		return -EPERM;
6086 	}
6087 
6088 	desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len;
6089 	if (desc_size > SPDK_BS_MAX_DESC_SIZE) {
6090 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name,
6091 			      desc_size, SPDK_BS_MAX_DESC_SIZE);
6092 		return -ENOMEM;
6093 	}
6094 
6095 	if (internal) {
6096 		xattrs = &blob->xattrs_internal;
6097 		blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR;
6098 	} else {
6099 		xattrs = &blob->xattrs;
6100 	}
6101 
6102 	TAILQ_FOREACH(xattr, xattrs, link) {
6103 		if (!strcmp(name, xattr->name)) {
6104 			free(xattr->value);
6105 			xattr->value_len = value_len;
6106 			xattr->value = malloc(value_len);
6107 			memcpy(xattr->value, value, value_len);
6108 
6109 			blob->state = SPDK_BLOB_STATE_DIRTY;
6110 
6111 			return 0;
6112 		}
6113 	}
6114 
6115 	xattr = calloc(1, sizeof(*xattr));
6116 	if (!xattr) {
6117 		return -ENOMEM;
6118 	}
6119 	xattr->name = strdup(name);
6120 	xattr->value_len = value_len;
6121 	xattr->value = malloc(value_len);
6122 	memcpy(xattr->value, value, value_len);
6123 	TAILQ_INSERT_TAIL(xattrs, xattr, link);
6124 
6125 	blob->state = SPDK_BLOB_STATE_DIRTY;
6126 
6127 	return 0;
6128 }
6129 
6130 int
6131 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
6132 		    uint16_t value_len)
6133 {
6134 	return _spdk_blob_set_xattr(blob, name, value, value_len, false);
6135 }
6136 
6137 static int
6138 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal)
6139 {
6140 	struct spdk_xattr_tailq *xattrs;
6141 	struct spdk_xattr	*xattr;
6142 
6143 	_spdk_blob_verify_md_op(blob);
6144 
6145 	if (blob->md_ro) {
6146 		return -EPERM;
6147 	}
6148 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
6149 
6150 	TAILQ_FOREACH(xattr, xattrs, link) {
6151 		if (!strcmp(name, xattr->name)) {
6152 			TAILQ_REMOVE(xattrs, xattr, link);
6153 			free(xattr->value);
6154 			free(xattr->name);
6155 			free(xattr);
6156 
6157 			if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) {
6158 				blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR;
6159 			}
6160 			blob->state = SPDK_BLOB_STATE_DIRTY;
6161 
6162 			return 0;
6163 		}
6164 	}
6165 
6166 	return -ENOENT;
6167 }
6168 
6169 int
6170 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name)
6171 {
6172 	return _spdk_blob_remove_xattr(blob, name, false);
6173 }
6174 
6175 static int
6176 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
6177 			   const void **value, size_t *value_len, bool internal)
6178 {
6179 	struct spdk_xattr	*xattr;
6180 	struct spdk_xattr_tailq *xattrs;
6181 
6182 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
6183 
6184 	TAILQ_FOREACH(xattr, xattrs, link) {
6185 		if (!strcmp(name, xattr->name)) {
6186 			*value = xattr->value;
6187 			*value_len = xattr->value_len;
6188 			return 0;
6189 		}
6190 	}
6191 	return -ENOENT;
6192 }
6193 
6194 int
6195 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
6196 			  const void **value, size_t *value_len)
6197 {
6198 	_spdk_blob_verify_md_op(blob);
6199 
6200 	return _spdk_blob_get_xattr_value(blob, name, value, value_len, false);
6201 }
6202 
6203 struct spdk_xattr_names {
6204 	uint32_t	count;
6205 	const char	*names[0];
6206 };
6207 
6208 static int
6209 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names)
6210 {
6211 	struct spdk_xattr	*xattr;
6212 	int			count = 0;
6213 
6214 	TAILQ_FOREACH(xattr, xattrs, link) {
6215 		count++;
6216 	}
6217 
6218 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
6219 	if (*names == NULL) {
6220 		return -ENOMEM;
6221 	}
6222 
6223 	TAILQ_FOREACH(xattr, xattrs, link) {
6224 		(*names)->names[(*names)->count++] = xattr->name;
6225 	}
6226 
6227 	return 0;
6228 }
6229 
6230 int
6231 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names)
6232 {
6233 	_spdk_blob_verify_md_op(blob);
6234 
6235 	return _spdk_blob_get_xattr_names(&blob->xattrs, names);
6236 }
6237 
6238 uint32_t
6239 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
6240 {
6241 	assert(names != NULL);
6242 
6243 	return names->count;
6244 }
6245 
6246 const char *
6247 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
6248 {
6249 	if (index >= names->count) {
6250 		return NULL;
6251 	}
6252 
6253 	return names->names[index];
6254 }
6255 
6256 void
6257 spdk_xattr_names_free(struct spdk_xattr_names *names)
6258 {
6259 	free(names);
6260 }
6261 
6262 struct spdk_bs_type
6263 spdk_bs_get_bstype(struct spdk_blob_store *bs)
6264 {
6265 	return bs->bstype;
6266 }
6267 
6268 void
6269 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
6270 {
6271 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
6272 }
6273 
6274 bool
6275 spdk_blob_is_read_only(struct spdk_blob *blob)
6276 {
6277 	assert(blob != NULL);
6278 	return (blob->data_ro || blob->md_ro);
6279 }
6280 
6281 bool
6282 spdk_blob_is_snapshot(struct spdk_blob *blob)
6283 {
6284 	struct spdk_blob_list *snapshot_entry;
6285 
6286 	assert(blob != NULL);
6287 
6288 	snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id);
6289 	if (snapshot_entry == NULL) {
6290 		return false;
6291 	}
6292 
6293 	return true;
6294 }
6295 
6296 bool
6297 spdk_blob_is_clone(struct spdk_blob *blob)
6298 {
6299 	assert(blob != NULL);
6300 
6301 	if (blob->parent_id != SPDK_BLOBID_INVALID) {
6302 		assert(spdk_blob_is_thin_provisioned(blob));
6303 		return true;
6304 	}
6305 
6306 	return false;
6307 }
6308 
6309 bool
6310 spdk_blob_is_thin_provisioned(struct spdk_blob *blob)
6311 {
6312 	assert(blob != NULL);
6313 	return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV);
6314 }
6315 
6316 spdk_blob_id
6317 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id)
6318 {
6319 	struct spdk_blob_list *snapshot_entry = NULL;
6320 	struct spdk_blob_list *clone_entry = NULL;
6321 
6322 	TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) {
6323 		TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
6324 			if (clone_entry->id == blob_id) {
6325 				return snapshot_entry->id;
6326 			}
6327 		}
6328 	}
6329 
6330 	return SPDK_BLOBID_INVALID;
6331 }
6332 
6333 int
6334 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids,
6335 		     size_t *count)
6336 {
6337 	struct spdk_blob_list *snapshot_entry, *clone_entry;
6338 	size_t n;
6339 
6340 	snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid);
6341 	if (snapshot_entry == NULL) {
6342 		*count = 0;
6343 		return 0;
6344 	}
6345 
6346 	if (ids == NULL || *count < snapshot_entry->clone_count) {
6347 		*count = snapshot_entry->clone_count;
6348 		return -ENOMEM;
6349 	}
6350 	*count = snapshot_entry->clone_count;
6351 
6352 	n = 0;
6353 	TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) {
6354 		ids[n++] = clone_entry->id;
6355 	}
6356 
6357 	return 0;
6358 }
6359 
6360 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
6361